Learn Python asyncio and socket by checking ports
Create a program to check ports and learn some asyncio and sockets
Photo by Leone Venter on Unsplash
Introduction
There are several network scanners and tools that let you find out open ports. This is a demonstration of using socket
, ipaddress
and asyncio
to check ports concurrently.
Basic Check
A basic check can be written using socket
module:
with socket.socket(family, sock_type) as s:
result = s.connect_ex(address)
For our example, we will only support the following:
family
:AF_INET
for IPv4,AF_INET6
for IPv6sock_type
:SOCK_STREAM
for TCP,SOCK_DGRAM
for UDP
Notice, we use connect_ex as it returns an error indication instead of raising exceptions. Now, let us create a function check_port
that accepts a server IP address, port and port type and returns the result 0
if success
def check_port(server: str, port: int, port_type: str) -> int:
"""Check whether a port is open or not
:param server: IP Address
:param port: Port number
:param port_type: Port Type
:return: 0 if successful, err code no.
"""
if port_type == "TCP":
sock_type = socket.SOCK_STREAM
elif port_type == "UDP":
sock_type = socket.SOCK_DGRAM
else:
raise ValueError("Port type should be TCP or UDP")
# validate the address
ip_address = ipaddress.ip_address(server)
if ip_address.version == 4:
family = socket.AF_INET
else:
family = socket.AF_INET6
location = (server, port)
# It helps to track the time
start = time.perf_counter_ns()
with socket.socket(family, sock_type) as a_socket:
# timeouts saves us from a forever blocking loop
a_socket.settimeout(TIMEOUT)
result = a_socket.connect_ex(location)
a_socket.close()
end = time.perf_counter_ns() - start
return result
Run concurrently with asyncio
We should be able to run this against multiple addresses and ports. The obvious way to achieve this is to pass a list
of ports and addresses and execute the logic in a loop. This would a serial approach. The time taken by this execution will be the sum of all the checks. This does not sound optimal.
The next option is to run these checks concurrently. We have already added a timeout to ensure that the time does not exceed TIMEOUT
So, let us use asyncio
to run this function concurrently.
import asyncio
import ipaddress
import socket
import time
from rich.progress import track
SERVERS = []
# List of ports to check
TCP_PORTS = [53, 389, 445]
UDP_PORTS = [3268, 636, 185]
TIMEOUT = 2
async def main():
for server in track(SERVERS, description="Checking..."):
rows = await asyncio.gather(
*[asyncio.to_thread(check_port, server, port, "UDP") for port in UDP_PORTS],
*[asyncio.to_thread(check_port, server, port, "TCP") for port in TCP_PORTS],
)
if __name__ == "__main__":
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"Executed in {elapsed:0.2f} seconds.")
Now, let us breakdown the code to understand it better:
asyncio.to_thread
runs the function in a separate thread. So, we run a separate thread for each port. asyncio.gather
runs these threads, which are awaitable objects, concurrently. To illustrate that multiple threads can be passed to gather
, we have separated UDP and TCP.
We wrap all this logic in async def main()
which itself is run by asyncio.run(main())
Rich Table Output
Next, we print the out in a formatted table using the amazing rich
package
import asyncio
import ipaddress
import socket
import time
from rich.console import Console
from rich.progress import track
from rich.table import Table
SERVERS = []
TCP_PORTS = [53, 389, 445, 3268, 636]
UDP_PORTS = [3268, 636, 185]
TIMEOUT = 2
# Rich Table
console = Console()
table = Table(title="Results")
table.add_column("Server", justify="left")
table.add_column("Type", justify="left")
table.add_column("Port", justify="left")
table.add_column("Status", justify="left")
table.add_column("Time (ns)", justify="right")
def check_port(server: str, port: int, port_type: str) -> dict:
"""Check whether a port is open or not
:param server: IP address
:param port: Port number
:param port_type: Port Type
:return: rich table row as dict
"""
if port_type == "TCP":
sock_type = socket.SOCK_STREAM
elif port_type == "UDP":
sock_type = socket.SOCK_DGRAM
else:
raise ValueError("Port type should be TCP or UDP")
ip_address = ipaddress.ip_address(server)
if ip_address.version == 4:
family = socket.AF_INET
else:
family = socket.AF_INET6
location = (server, port)
start = time.perf_counter_ns()
with socket.socket(family, sock_type) as a_socket:
a_socket.settimeout(TIMEOUT)
result_of_check = a_socket.connect_ex(location)
a_socket.close()
end = time.perf_counter_ns() - start
# Result to rich format
if result_of_check == 0:
return {
"row": [server, port_type, str(port), "OPEN", f"{end:0.2f}"],
"style": "green",
}
else:
return {
"row": [server, port_type, str(port), "CLOSED", f"{end:0.2f}"],
"style": "red",
}
async def main():
for server in track(SERVERS, description="Checking..."):
rows = await asyncio.gather(
*[asyncio.to_thread(check_port, server, port, "UDP") for port in UDP_PORTS],
*[asyncio.to_thread(check_port, server, port, "TCP") for port in TCP_PORTS],
)
[table.add_row(*r["row"], style=r["style"]) for r in rows]
if __name__ == "__main__":
s = time.perf_counter()
asyncio.run(main())
console.print(table)
elapsed = time.perf_counter() - s
print(f"Executed in {elapsed:0.2f} seconds.")
Now, let us understand what's happening here.
The rows are appended to the rich
table for each server. Once the asyncio
loop is finished, we display the table using console.print(table)
Here's the output of the program run against Google's public DNS IPs.
The progress bar is displayed by the track
method in rich.progress
Fork it on Replit
The entire program can be run from the Replit. Feel free to fork it.
WARNING: Please run this program responsibly, it is meant to be a demo only. Port scanning can be considered malicious activity.