Learn Python asyncio and socket by checking ports

Create a program to check ports and learn some asyncio and sockets

Introduction

There are several network scanners and tools that let you find out open ports. This is a demonstration of using socket, ipaddress and asyncio to check ports concurrently.

Basic Check

A basic check can be written using socket module:

with socket.socket(family, sock_type) as s:
        result = s.connect_ex(address)

For our example, we will only support the following:

  • family: AF_INET for IPv4, AF_INET6 for IPv6

  • sock_type : SOCK_STREAM for TCP, SOCK_DGRAM for UDP

Notice, we use connect_ex as it returns an error indication instead of raising exceptions. Now, let us create a function check_port that accepts a server IP address, port and port type and returns the result 0 if success

def check_port(server: str, port: int, port_type: str) -> int:
    """Check whether a port is open or not
    :param server: IP Address 
    :param port: Port number
    :param port_type: Port Type
    :return: 0 if successful, err code no.
    """
    if port_type == "TCP":
        sock_type = socket.SOCK_STREAM
    elif port_type == "UDP":
        sock_type = socket.SOCK_DGRAM
    else:
        raise ValueError("Port type should be TCP or UDP")
    # validate the address
    ip_address = ipaddress.ip_address(server)
    if ip_address.version == 4:
        family = socket.AF_INET
    else:
        family = socket.AF_INET6
    location = (server, port)
    # It helps to track the time 
    start = time.perf_counter_ns()
    with socket.socket(family, sock_type) as a_socket:
        # timeouts saves us from a forever blocking loop
        a_socket.settimeout(TIMEOUT)
        result = a_socket.connect_ex(location)
        a_socket.close()
    end = time.perf_counter_ns() - start
    return result

Run concurrently with asyncio

We should be able to run this against multiple addresses and ports. The obvious way to achieve this is to pass a list of ports and addresses and execute the logic in a loop. This would a serial approach. The time taken by this execution will be the sum of all the checks. This does not sound optimal.

The next option is to run these checks concurrently. We have already added a timeout to ensure that the time does not exceed TIMEOUT

So, let us use asyncio to run this function concurrently.

import asyncio
import ipaddress
import socket
import time

from rich.progress import track

SERVERS = []
# List of ports to check
TCP_PORTS = [53, 389, 445]
UDP_PORTS = [3268, 636, 185]
TIMEOUT = 2

async def main():
    for server in track(SERVERS, description="Checking..."):
        rows = await asyncio.gather(
            *[asyncio.to_thread(check_port, server, port, "UDP") for port in UDP_PORTS],
            *[asyncio.to_thread(check_port, server, port, "TCP") for port in TCP_PORTS],
        )

if __name__ == "__main__":
    s = time.perf_counter()
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"Executed in {elapsed:0.2f} seconds.")

Now, let us breakdown the code to understand it better:

asyncio.to_thread runs the function in a separate thread. So, we run a separate thread for each port. asyncio.gather runs these threads, which are awaitable objects, concurrently. To illustrate that multiple threads can be passed to gather, we have separated UDP and TCP.

We wrap all this logic in async def main() which itself is run by asyncio.run(main())

Rich Table Output

Next, we print the out in a formatted table using the amazing rich package

import asyncio
import ipaddress
import socket
import time

from rich.console import Console
from rich.progress import track
from rich.table import Table

SERVERS = []

TCP_PORTS = [53, 389, 445, 3268, 636]
UDP_PORTS = [3268, 636, 185]
TIMEOUT = 2

# Rich Table
console = Console()
table = Table(title="Results")
table.add_column("Server", justify="left")
table.add_column("Type", justify="left")
table.add_column("Port", justify="left")
table.add_column("Status", justify="left")
table.add_column("Time (ns)", justify="right")


def check_port(server: str, port: int, port_type: str) -> dict:
    """Check whether a port is open or not
    :param server: IP address
    :param port: Port number
    :param port_type: Port Type
    :return: rich table row as dict
    """
    if port_type == "TCP":
        sock_type = socket.SOCK_STREAM
    elif port_type == "UDP":
        sock_type = socket.SOCK_DGRAM
    else:
        raise ValueError("Port type should be TCP or UDP")
    ip_address = ipaddress.ip_address(server)
    if ip_address.version == 4:
        family = socket.AF_INET
    else:
        family = socket.AF_INET6

    location = (server, port)
    start = time.perf_counter_ns()
    with socket.socket(family, sock_type) as a_socket:
        a_socket.settimeout(TIMEOUT)
        result_of_check = a_socket.connect_ex(location)
        a_socket.close()
    end = time.perf_counter_ns() - start
    # Result to rich format
    if result_of_check == 0:
        return {
            "row": [server, port_type, str(port), "OPEN", f"{end:0.2f}"],
            "style": "green",
        }
    else:
        return {
            "row": [server, port_type, str(port), "CLOSED", f"{end:0.2f}"],
            "style": "red",
        }

async def main():
    for server in track(SERVERS, description="Checking..."):
        rows = await asyncio.gather(
            *[asyncio.to_thread(check_port, server, port, "UDP") for port in UDP_PORTS],
            *[asyncio.to_thread(check_port, server, port, "TCP") for port in TCP_PORTS],
        )
        [table.add_row(*r["row"], style=r["style"]) for r in rows]


if __name__ == "__main__":
    s = time.perf_counter()
    asyncio.run(main())
    console.print(table)
    elapsed = time.perf_counter() - s
    print(f"Executed in {elapsed:0.2f} seconds.")

Now, let us understand what's happening here.

The rows are appended to the rich table for each server. Once the asyncio loop is finished, we display the table using console.print(table)

Here's the output of the program run against Google's public DNS IPs.

The progress bar is displayed by the track method in rich.progress

Fork it on Replit

The entire program can be run from the Replit. Feel free to fork it.

WARNING: Please run this program responsibly, it is meant to be a demo only. Port scanning can be considered malicious activity.