# Learn Python asyncio and socket by checking ports

# Introduction

There are several network scanners and tools that let you find out open ports. This is a demonstration of using `socket`, `ipaddress` and `asyncio` to check ports concurrently.

# Basic Check

A basic check can be written using `socket` module:

```python
with socket.socket(family, sock_type) as s:
        result = s.connect_ex(address)
```

For our example, we will only support the following:

*   `family`: `AF_INET` for IPv4, `AF_INET6` for IPv6
    
*   `sock_type` : `SOCK_STREAM` for TCP, `SOCK_DGRAM` for UDP
    

Notice, we use [connect\_ex](https://docs.python.org/3/library/socket.html#socket.socket.connect_ex) as it returns an error indication instead of raising exceptions. Now, let us create a function `check_port` that accepts a server IP address, port and port type and returns the result `0` if success

```python
def check_port(server: str, port: int, port_type: str) -> int:
    """Check whether a port is open or not
    :param server: IP Address 
    :param port: Port number
    :param port_type: Port Type
    :return: 0 if successful, err code no.
    """
    if port_type == "TCP":
        sock_type = socket.SOCK_STREAM
    elif port_type == "UDP":
        sock_type = socket.SOCK_DGRAM
    else:
        raise ValueError("Port type should be TCP or UDP")
    # validate the address
    ip_address = ipaddress.ip_address(server)
    if ip_address.version == 4:
        family = socket.AF_INET
    else:
        family = socket.AF_INET6
    location = (server, port)
    # It helps to track the time 
    start = time.perf_counter_ns()
    with socket.socket(family, sock_type) as a_socket:
        # timeouts saves us from a forever blocking loop
        a_socket.settimeout(TIMEOUT)
        result = a_socket.connect_ex(location)
        a_socket.close()
    end = time.perf_counter_ns() - start
    return result
```

# Run concurrently with asyncio

We should be able to run this against multiple addresses and ports. The obvious way to achieve this is to pass a `list` of ports and addresses and execute the logic in a loop. This would a serial approach. The time taken by this execution will be the sum of all the checks. This does not sound optimal.

The next option is to run these checks concurrently. We have already added a timeout to ensure that the time does not exceed `TIMEOUT`

So, let us use `asyncio` to run this function concurrently.

```python
import asyncio
import ipaddress
import socket
import time

from rich.progress import track

SERVERS = []
# List of ports to check
TCP_PORTS = [53, 389, 445]
UDP_PORTS = [3268, 636, 185]
TIMEOUT = 2

async def main():
    for server in track(SERVERS, description="Checking..."):
        rows = await asyncio.gather(
            *[asyncio.to_thread(check_port, server, port, "UDP") for port in UDP_PORTS],
            *[asyncio.to_thread(check_port, server, port, "TCP") for port in TCP_PORTS],
        )

if __name__ == "__main__":
    s = time.perf_counter()
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"Executed in {elapsed:0.2f} seconds.")
```

Now, let us breakdown the code to understand it better:

`asyncio.to_thread` runs the function in a separate thread. So, we run a separate thread for each port. `asyncio.gather` runs these threads, which are awaitable objects, concurrently. To illustrate that multiple threads can be passed to `gather`, we have separated UDP and TCP.

We wrap all this logic in `async def main()` which itself is run by `asyncio.run(main())`

# Rich Table Output

Next, we print the out in a formatted table using the amazing `rich` package

```python
import asyncio
import ipaddress
import socket
import time

from rich.console import Console
from rich.progress import track
from rich.table import Table

SERVERS = []

TCP_PORTS = [53, 389, 445, 3268, 636]
UDP_PORTS = [3268, 636, 185]
TIMEOUT = 2

# Rich Table
console = Console()
table = Table(title="Results")
table.add_column("Server", justify="left")
table.add_column("Type", justify="left")
table.add_column("Port", justify="left")
table.add_column("Status", justify="left")
table.add_column("Time (ns)", justify="right")


def check_port(server: str, port: int, port_type: str) -> dict:
    """Check whether a port is open or not
    :param server: IP address
    :param port: Port number
    :param port_type: Port Type
    :return: rich table row as dict
    """
    if port_type == "TCP":
        sock_type = socket.SOCK_STREAM
    elif port_type == "UDP":
        sock_type = socket.SOCK_DGRAM
    else:
        raise ValueError("Port type should be TCP or UDP")
    ip_address = ipaddress.ip_address(server)
    if ip_address.version == 4:
        family = socket.AF_INET
    else:
        family = socket.AF_INET6

    location = (server, port)
    start = time.perf_counter_ns()
    with socket.socket(family, sock_type) as a_socket:
        a_socket.settimeout(TIMEOUT)
        result_of_check = a_socket.connect_ex(location)
        a_socket.close()
    end = time.perf_counter_ns() - start
    # Result to rich format
    if result_of_check == 0:
        return {
            "row": [server, port_type, str(port), "OPEN", f"{end:0.2f}"],
            "style": "green",
        }
    else:
        return {
            "row": [server, port_type, str(port), "CLOSED", f"{end:0.2f}"],
            "style": "red",
        }

async def main():
    for server in track(SERVERS, description="Checking..."):
        rows = await asyncio.gather(
            *[asyncio.to_thread(check_port, server, port, "UDP") for port in UDP_PORTS],
            *[asyncio.to_thread(check_port, server, port, "TCP") for port in TCP_PORTS],
        )
        [table.add_row(*r["row"], style=r["style"]) for r in rows]


if __name__ == "__main__":
    s = time.perf_counter()
    asyncio.run(main())
    console.print(table)
    elapsed = time.perf_counter() - s
    print(f"Executed in {elapsed:0.2f} seconds.")
```

Now, let us understand what's happening here.

The rows are appended to the `rich` table for each server. Once the `asyncio` loop is finished, we display the table using `console.print(table)`

Here's the output of the program run against Google's public DNS IPs.

The progress bar is displayed by the `track` method in `rich.progress`

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1670929896080/CU3aoFp03.png align="center")

# Fork it on Replit

The entire program can be run from the Replit. Feel free to fork it.

**WARNING: Please run this program responsibly, it is meant to be a demo only. Port scanning can be considered malicious activity.**

%[https://replit.com/@smanas/portcheck?v=1]
