Information Technology Grimoire

Version .0.0.1

IT Notes from various projects because I forget, and hopefully they help you too.

Python3 Multi Threaded Ping Sweep Scanner

We know that nmap and other tools are great for scanning a network. What if we want to do it in python?

The following code is a demonstration of multi threading, using python 3 to sweep a network using pings on windows. The single threaded version took almost 2 minutes to check a network, but this version takes about 2 seconds.

Why not NMAP for a Ping Sweep?

Arguably, nmap is better. But you may not have nmap installed, or you might want to learn about multithreading! Regardless, there are many things learned from writing scripts like this.

Python Style

There are things done in this script that might be written more elegantly, but for learning the basics of multi-threading, doing a ping sweep is a great example.

Topics Learned in This Script

Programming is all about the big picture. If you can imagine a concept, the rest is just pieces of a puzzle. It’s useful to break down tasks into smaller and smaller tasks and solve them 1 by 1. However, unless you have a big picture goal in mind, your code gets more like spaghetti as you add features and rewrite your dream into reality.

In this script, if you understand each line, you will also understand all of the following components:

  • importing modules in python

  • hiding windows using subprocess

  • calling a subprocess from OS using python

  • coloring the CLI of python output

  • reading CIDR network addresses using python

  • measuring python program run time

  • python functions

  • passing data into python functions

  • basic python variables

  • creating a list in python

  • accessing members of a list

  • if else logic and skipping branches in python

  • while and for loops

  • creating a thread queue in python

  • python worker pool creation

  • locking print threads while workers finish

  • converting to string

  • Converting a thread to daemon so it dies gracefully

  • printing without a newline

  • Finding length of a list

  • list iteration/looping over a list

  • formatting a float to precision desired

Mulithreaded Ping Sweeper in Python 3 Code

import time       # let's time our script

import ipaddress  # https://docs.python.org/3/library/ipaddress.html
                  # convert ip/mask to list of hosts

import subprocess # https://docs.python.org/3/library/subprocess.html
                  # to make a popup window quiet

from colorama import init  # colors https://en.wikipedia.org/wiki/ANSI_escape_code
init()                     # https://pypi.org/project/colorama/

import threading           # for threading functions, lock, queue
from queue import Queue    # https://docs.python.org/3/library/queue.html

# define a lock that we can use later to keep
# prints from writing over itself
print_lock = threading.Lock()

# Prompt the user to input a network address
net_addr = input("Enter Network (192.168.1.0/24): ")

# actual code start time
startTime = time.time()

# Create the network
ip_net = ipaddress.ip_network(net_addr)

# Get all hosts on that network
all_hosts = list(ip_net.hosts())

# Configure subprocess to hide the console window
info = subprocess.STARTUPINFO()
info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
info.wShowWindow = subprocess.SW_HIDE

# quick message/update
print ('Sweeping Network with ICMP: ', net_addr)

# the actual ping definition and logic.
# it's called from a pool, repeatedly threaded, not serial
def pingsweep(ip):

    # for windows:   -n is ping count, -w is wait (ms)
    # for linux: -c is ping count, -w is wait (ms)
    # I didn't test subprocess in linux, but know the ping count must change if OS changes

    output = subprocess.Popen(['ping', '-n', '1', '-w', '150', str(all_hosts[ip])], stdout=subprocess.PIPE, startupinfo=info).communicate()[0]

    # lock this section, until we get a complete chunk
    # then free it (so it doesn't write all over itself)
    with print_lock:

      # normalize colors to grey
      print('\033[93m', end='')

      # code logic if we have/don't have good response
      if "Reply" in output.decode('utf-8'):
         print(str(all_hosts[ip]), '\033[32m'+"is Online")
      elif "Destination host unreachable" in output.decode('utf-8'):
         #print(str(all_hosts[ip]), '\033[90m'+"is Offline (Unreachable)")
         pass
      elif "Request timed out" in output.decode('utf-8'):
         #print(str(all_hosts[ip]), '\033[90m'+"is Offline (Timeout)")
         pass
      else:
         # print colors in green if online
         print("UNKNOWN", end='')

# defines a new ping using def pingsweep for each thread
# holds task until thread completes
def threader():
   while True:
      worker = q.get()
      pingsweep(worker)
      q.task_done()

q = Queue()

# up to 100 threads, daemon for cleaner shutdown
# just spawns the threads and makes them daemon mode
for x in range(100):
   t = threading.Thread(target = threader)
   t.daemon = True
   t.start()

# loops over the last octet in our network object
# passing it to q.put (entering it into queue)
for worker in range(len(all_hosts)):
   q.put(worker)

# queue management
q.join()

# ok, give us a final time report
runtime = float("%0.2f" % (time.time() - startTime))
print("Run Time: ", runtime, "seconds")
Last updated on 11 Apr 2019
Published on 11 Apr 2019