Python3 Multi Threaded Ping Sweep Scanner
We know that nmap and other tools are great for scanning a network. What if we want to do it in python?
The following code is a demonstration of multi threading, using python 3 to sweep a network using pings on windows. The single threaded version took almost 2 minutes to check a network, but this version takes about 2 seconds.
Why not NMAP for a Ping Sweep?
Arguably, nmap is better. But you may not have nmap installed, or you might want to learn about multithreading! Regardless, there are many things learned from writing scripts like this.
Python Style
There are things done in this script that might be written more elegantly, but for learning the basics of multi-threading, doing a ping sweep is a great example.
Topics Learned in This Script
Programming is all about the big picture. If you can imagine a concept, the rest is just pieces of a puzzle. It’s useful to break down tasks into smaller and smaller tasks and solve them 1 by 1. However, unless you have a big picture goal in mind, your code gets more like spaghetti as you add features and rewrite your dream into reality.
In this script, if you understand each line, you will also understand all of the following components:
importing modules in python
hiding windows using subprocess
calling a subprocess from OS using python
coloring the CLI of python output
reading CIDR network addresses using python
measuring python program run time
python functions
passing data into python functions
basic python variables
creating a list in python
accessing members of a list
if else logic and skipping branches in python
while and for loops
creating a thread queue in python
python worker pool creation
locking print threads while workers finish
converting to string
Converting a thread to daemon so it dies gracefully
printing without a newline
Finding length of a list
list iteration/looping over a list
formatting a float to precision desired
Mulithreaded Ping Sweeper in Python 3 Code
import time # let's time our script
import ipaddress # https://docs.python.org/3/library/ipaddress.html
# convert ip/mask to list of hosts
import subprocess # https://docs.python.org/3/library/subprocess.html
# to make a popup window quiet
from colorama import init # colors https://en.wikipedia.org/wiki/ANSI_escape_code
init() # https://pypi.org/project/colorama/
import threading # for threading functions, lock, queue
from queue import Queue # https://docs.python.org/3/library/queue.html
# define a lock that we can use later to keep
# prints from writing over itself
print_lock = threading.Lock()
# Prompt the user to input a network address
net_addr = input("Enter Network (192.168.1.0/24): ")
# actual code start time
startTime = time.time()
# Create the network
ip_net = ipaddress.ip_network(net_addr)
# Get all hosts on that network
all_hosts = list(ip_net.hosts())
# Configure subprocess to hide the console window
info = subprocess.STARTUPINFO()
info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
info.wShowWindow = subprocess.SW_HIDE
# quick message/update
print ('Sweeping Network with ICMP: ', net_addr)
# the actual ping definition and logic.
# it's called from a pool, repeatedly threaded, not serial
def pingsweep(ip):
# for windows: -n is ping count, -w is wait (ms)
# for linux: -c is ping count, -w is wait (ms)
# I didn't test subprocess in linux, but know the ping count must change if OS changes
output = subprocess.Popen(['ping', '-n', '1', '-w', '150', str(all_hosts[ip])], stdout=subprocess.PIPE, startupinfo=info).communicate()[0]
# lock this section, until we get a complete chunk
# then free it (so it doesn't write all over itself)
with print_lock:
# normalize colors to grey
print('\033[93m', end='')
# code logic if we have/don't have good response
if "Reply" in output.decode('utf-8'):
print(str(all_hosts[ip]), '\033[32m'+"is Online")
elif "Destination host unreachable" in output.decode('utf-8'):
#print(str(all_hosts[ip]), '\033[90m'+"is Offline (Unreachable)")
pass
elif "Request timed out" in output.decode('utf-8'):
#print(str(all_hosts[ip]), '\033[90m'+"is Offline (Timeout)")
pass
else:
# print colors in green if online
print("UNKNOWN", end='')
# defines a new ping using def pingsweep for each thread
# holds task until thread completes
def threader():
while True:
worker = q.get()
pingsweep(worker)
q.task_done()
q = Queue()
# up to 100 threads, daemon for cleaner shutdown
# just spawns the threads and makes them daemon mode
for x in range(100):
t = threading.Thread(target = threader)
t.daemon = True
t.start()
# loops over the last octet in our network object
# passing it to q.put (entering it into queue)
for worker in range(len(all_hosts)):
q.put(worker)
# queue management
q.join()
# ok, give us a final time report
runtime = float("%0.2f" % (time.time() - startTime))
print("Run Time: ", runtime, "seconds")