Python3 Debian Network Monitor
Need to log interface statistics on debian to include errors, down time, etc?
I was experiencing disconnects and only on a particular laptop. I thought maybe the drivers were bad or needed reconfiguration as it was a fresh install. After a bit of troubleshooting it was determined to be the switch.
That sent me on a rabbit hole of monitoring things and so I wrote a script to do it. It works on debian but should work on any linux system. It’s reading ip
and only requires matplotlib (python3 -m pip install matplotlib
). As always, I recommend a venv!
Features
- Pings once per second to local and remote ips, like mtr
- Logs to window when an outtage has occurred (based on ping down)
- Saves historical json data files
- Parses the json data on a graph
- updates the tkinter window with threads
- Green background to view at a distance (good pings)
- Red backgrond to view at a distance (bad pings)
The Network Monitoring Script
# Standard library imports
import datetime
import glob
import json
import os
import re
import subprocess
import threading
import time
from datetime import datetime, timedelta
# Tkinter related imports (also part of the standard library)
import tkinter as tk
from tkinter import ttk
import tkinter.messagebox # unused at moment
# Third-party imports
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.dates as mdates
from matplotlib.dates import DateFormatter, MinuteLocator
APPVERSION = .06
class NetworkMonitorApp:
def __init__(self, master):
self.master = master
self.master.title(f"IP Monitor and Interface Statistics v{APPVERSION}")
self.setup_ui()
self.after_jobs = [] # List to keep track of 'after' callbacks
self.master.protocol("WM_DELETE_WINDOW", self.on_close)
def setup_ui(self):
self.input_frame = ttk.Frame(self.master, padding="10")
self.input_frame.grid(row=0, column=0, sticky="ew")
self.input_frame.columnconfigure([0, 1, 2, 3], weight=1)
ttk.Label(self.input_frame, text="Test Local IP:").grid(
row=0, column=0, sticky="w", padx=5
)
self.local_ip_entry = ttk.Entry(self.input_frame)
self.local_ip_entry.grid(row=0, column=1, sticky="ew", padx=5)
self.local_ip_entry.insert(0, "192.168.7.1") # default value
ttk.Label(self.input_frame, text="Test Public IP:").grid(
row=1, column=0, sticky="w", padx=5
)
self.public_ip_entry = ttk.Entry(self.input_frame)
self.public_ip_entry.grid(row=1, column=1, sticky="ew", padx=5)
self.public_ip_entry.insert(0, "1.1.1.1") # default value
self.view_graphs_button = ttk.Button(
self.input_frame, text="View Graphs", command=self.open_graphs_popup
)
self.view_graphs_button.grid(row=0, column=2, padx=5, pady=5, sticky="ew")
self.snapshot_button = ttk.Button(
self.input_frame, text="Save Report", command=self.save_snapshot
)
self.snapshot_button.grid(row=1, column=2, padx=5, pady=5, sticky="ew")
self.start_button = ttk.Button(
self.input_frame, text="Start Monitoring", command=self.start_monitoring
)
self.start_button.grid(
row=2, column=0, columnspan=4, padx=5, pady=5, sticky="ew"
)
self.setup_treeview()
self.setup_text_area()
def setup_treeview(self):
self.tree_view = ttk.Treeview(
self.master,
columns=(
"Interface",
"MAC",
"IP",
"RX Packets",
"TX Packets",
"RX Errors",
"TX Errors",
"Drops",
"Missed",
),
show="headings",
)
self.tree_view.grid(row=3, column=0, sticky="nsew", padx=5, pady=5)
self.master.rowconfigure(3, weight=1)
# Configure headings
headings = (
"Interface",
"MAC",
"IP",
"RX Packets",
"TX Packets",
"RX Errors",
"TX Errors",
"Drops",
"Missed",
)
# Specify individual column widths
column_widths = {
"Interface": 150,
"MAC": 150,
"IP": 110,
"RX Packets": 90,
"TX Packets": 90,
"RX Errors": 80,
"TX Errors": 80,
"Drops": 70,
"Missed": 70,
}
for heading in headings:
self.tree_view.heading(heading, text=heading)
self.tree_view.column(
heading, width=column_widths[heading]
) # Set column width individually
def setup_text_area(self):
self.text_area = tk.Text(
self.master, height=10
) # Use self.master instead of self.root
self.text_area.grid(row=2, column=0, sticky="nsew", pady=5)
def start_monitoring(self):
self.text_area.insert(tk.END, "Monitoring started...\n")
# Clear Treeview
self.tree_view.delete(*self.tree_view.get_children())
# Get IPs from entry fields
local_ip = self.local_ip_entry.get()
public_ip = self.public_ip_entry.get()
# Example Treeview update using the IPs from entry fields
self.tree_view.insert("", "end", values=(local_ip, "Ready"))
self.tree_view.insert("", "end", values=(public_ip, "Ready"))
# Start threads for monitoring IPs using the IPs from entry fields
threading.Thread(target=self.monitor_ip, args=(local_ip,), daemon=True).start()
threading.Thread(target=self.monitor_ip, args=(public_ip,), daemon=True).start()
# Update Treeview with interface stats periodically
job = self.master.after(1000, self.update_treeview_periodically)
self.after_jobs.append(job)
def monitor_ip(self, ip_address):
while True:
if self.ping_ip(ip_address):
# If connectivity is good, change text area background to green
self.master.after(0, lambda: self.text_area.configure(bg="light green"))
else:
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
log_message = f"Connectivity lost to {ip_address} at {current_time}\n"
self.master.after(0, lambda: self.text_area.insert(tk.END, log_message))
self.master.after(0, lambda: self.text_area.see(tk.END))
self.master.after(0, lambda: self.text_area.configure(bg="pink"))
time.sleep(5)
def get_interface_stats(self):
output = subprocess.check_output(["ip", "-s", "link"], text=True)
interface_stats = {}
for line in output.splitlines():
line = line.strip()
if not line:
continue
if ":" in line and line.split(":")[0].isdigit():
parts = line.split(":")
interface_name = parts[1].split()[0].strip()
interface_stats[interface_name] = {"name": interface_name}
elif line.startswith("link/"):
mac = line.split()[1]
interface_stats[interface_name]["mac"] = mac
elif any(prefix in line for prefix in ("RX:", "TX:")):
current_stat_type = line.split()[0]
elif line[0].isdigit():
parts = line.split()
stats = {
"packets": parts[1],
"errors": parts[2],
"dropped": parts[3],
"missed": parts[4] if len(parts) > 4 else "N/A",
}
if current_stat_type == "RX:":
interface_stats[interface_name]["RX"] = stats
elif current_stat_type == "TX:":
interface_stats[interface_name]["TX"] = stats
# Now we need to call get_ip_address for each interface to populate the IP addresses
for iface_name in interface_stats.keys():
interface_stats[iface_name]["ip"] = self.get_ip_address(iface_name)
return interface_stats
def update_treeview(self):
self.tree_view.delete(*self.tree_view.get_children()) # Clear the Treeview
interface_stats = self.get_interface_stats() # Get updated stats
for interface_id, stats in interface_stats.items():
# Accessing the nested dictionaries for RX and TX
rx_stats = stats.get("RX", {})
tx_stats = stats.get("TX", {})
self.tree_view.insert(
"",
"end",
values=(
stats["name"],
stats.get("mac", "N/A"),
stats.get("ip", "N/A"),
rx_stats.get("packets", "N/A"),
tx_stats.get("packets", "N/A"),
rx_stats.get("errors", "N/A"),
tx_stats.get("errors", "N/A"),
rx_stats.get("dropped", "N/A"),
tx_stats.get("dropped", "N/A"),
rx_stats.get("missed", "N/A"),
),
)
def get_ip_address(self, interface_name):
try:
# Execute the command and decode the output to get the interface details
output = subprocess.check_output(
["ip", "addr", "show", interface_name], text=True
)
# Regex pattern to match IPv4 addresses
ip_pattern = re.compile(r"inet (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})")
# Search for the pattern in the command output
match = ip_pattern.search(output)
if match:
return match.group(1) # Return the first matched IP address
else:
return "N/A" # Return "N/A" if no IP address is found
except subprocess.CalledProcessError as e:
return "N/A" # Return "N/A" in case of any error
def update_treeview_periodically(self):
self.update_treeview()
# Current time formatted to include up to minutes (YYYYMMDD_HHMM)
current_minute_timestamp = datetime.now().strftime("%Y%m%d_%H%M")
# Directory where snapshots are saved
directory = "snapshots"
# Construct the snapshot file pattern for the current minute
snapshot_pattern = f"snapshot_*_{current_minute_timestamp}*.json"
# Check if any file matches the pattern for the current minute
matching_snapshots = glob.glob(os.path.join(directory, snapshot_pattern))
# If no snapshot exists for the current minute, save a new snapshot
if not matching_snapshots:
self.save_snapshot()
# Schedule this method to be called again after a short delay
# This retains the original frequency of updates to the tree view
job = self.master.after(
1000, self.update_treeview_periodically
) # 1000ms = 1 second
self.after_jobs.append(job)
def save_snapshot(self):
# Get the current interface stats
interface_stats = self.get_interface_stats()
# Get the local and public IP from the entry fields
local_ip = self.local_ip_entry.get()
public_ip = self.public_ip_entry.get()
# Create a unique filename based on the local IP, public IP, and current time
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"snapshot_{local_ip}_{public_ip}_{timestamp}.json"
# Use the os.path.join to create a full path, to save in 'snapshots' directory
directory = "snapshots"
if not os.path.exists(directory):
os.makedirs(
directory
) # Create the 'snapshots' directory if it doesn't exist
filepath = os.path.join(directory, filename)
# Write the data to a JSON file
with open(filepath, "w", encoding="utf-8") as f:
json.dump(interface_stats, f, indent=4)
# Log the action to the text area
self.text_area.insert(tk.END, f"Snapshot saved to {filepath}\n")
def ping_ip(self, ip_address):
try:
subprocess.run(
["ping", "-c", "1", "-W", "5", ip_address],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
return True # Ping succeeded
except subprocess.CalledProcessError:
return False # Ping failed
def on_close(self):
# Cancel all scheduled 'after' callbacks
for job in self.after_jobs:
self.master.after_cancel(job)
self.master.destroy() # Close the application window
def list_interfaces(self, directory="snapshots"):
interfaces = set()
# Correctly use os.path.join with 'directory' and pattern
for filepath in glob.glob(os.path.join(directory, "snapshot_*.json")):
try:
with open(filepath, "r", encoding="utf-8") as file:
data = json.load(file)
for interface_key in data.keys():
interfaces.add(interface_key)
except json.JSONDecodeError:
print(f"Skipping malformed JSON file: {filepath}")
return list(interfaces)
def load_data_points_cumalative(self, interface_name, directory="snapshots"):
data_points = {"RX_packets": [], "TX_packets": [], "timestamps": []}
for filepath in glob.glob(os.path.join(directory, "snapshot_*.json")):
try:
filename = os.path.basename(filepath)
parts = filename.split("_")
# Remove the '.json' part from the last segment
timestamp_str = "_".join(parts[-2:]).replace(".json", "")
print("Extracted timestamp string:", timestamp_str) # For debugging
# Parse the timestamp
timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
with open(filepath, "r", encoding="utf-8") as file:
snapshot = json.load(file)
interface_data = snapshot.get(interface_name)
if interface_data:
rx_packets = int(interface_data["RX"]["packets"])
tx_packets = int(interface_data["TX"]["packets"])
data_points["RX_packets"].append(rx_packets)
data_points["TX_packets"].append(tx_packets)
data_points["timestamps"].append(timestamp)
except json.JSONDecodeError:
print(f"Skipping malformed JSON file: {filepath}")
return data_points
# Calculates over last 20 min, at first, but then adds more
def load_data_points(self, interface_name, directory="snapshots"):
data_points = {
"RX_packets": [],
"TX_packets": [],
"RX_errors": [],
"TX_errors": [],
"Drops": [],
"Missed": [],
"timestamps": [],
}
snapshots = []
snapshot_path = os.path.join(directory, "snapshot_*.json")
for filepath in glob.glob(snapshot_path):
try:
filename = os.path.basename(filepath)
# Assumes "snapshot_{localIP}_{publicIP}_{YYYYMMDD}_{HHMMSS}.json"
parts = filename.split("_")
timestamp_str = (
parts[-2] + "_" + parts[-1].replace(".json", "")
)
timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
with open(filepath, "r", encoding="utf-8") as file:
snapshot = json.load(file)
interface_data = snapshot.get(interface_name)
if interface_data:
# Extracting all required metrics
rx_packets = int(interface_data["RX"]["packets"])
tx_packets = int(interface_data["TX"]["packets"])
rx_errors = int(interface_data["RX"]["errors"])
tx_errors = int(interface_data["TX"]["errors"])
drops = int(interface_data["RX"]["dropped"]) + int(
interface_data["TX"]["dropped"]
) # Assuming combined drops
missed = int(
interface_data["RX"]["missed"]
) # Assuming only RX missed is relevant or available
snapshots.append(
(
timestamp,
rx_packets,
tx_packets,
rx_errors,
tx_errors,
drops,
missed,
)
)
except json.JSONDecodeError:
print(f"Skipping malformed JSON file: {filepath}")
except Exception as e:
print(f"Error processing file {filepath}: {e}") # General error catch
# Sort snapshots by timestamp
snapshots.sort(key=lambda x: x[0])
previous_snapshot = None
for snapshot in snapshots:
if previous_snapshot is not None:
delta_seconds = (snapshot[0] - previous_snapshot[0]).total_seconds()
if delta_seconds > 0:
rx_delta = (snapshot[1] - previous_snapshot[1]) / delta_seconds
tx_delta = (snapshot[2] - previous_snapshot[2]) / delta_seconds
rx_error_delta = (snapshot[3] - previous_snapshot[3]) / delta_seconds
tx_error_delta = (snapshot[4] - previous_snapshot[4]) / delta_seconds
drop_delta = (snapshot[5] - previous_snapshot[5]) / delta_seconds
missed_delta = (snapshot[6] - previous_snapshot[6]) / delta_seconds
data_points["RX_packets"].append(rx_delta)
data_points["TX_packets"].append(tx_delta)
data_points["RX_errors"].append(rx_error_delta)
data_points["TX_errors"].append(tx_error_delta)
data_points["Drops"].append(drop_delta)
data_points["Missed"].append(missed_delta)
data_points["timestamps"].append(snapshot[0])
previous_snapshot = snapshot
# Limit data points to the last 20 minutes
now = datetime.now()
twenty_minutes_ago = now - timedelta(minutes=20)
data_points = {
key: [
val
for i, val in enumerate(data_points[key])
if data_points["timestamps"][i] >= twenty_minutes_ago
]
for key in data_points
}
return data_points
def open_graphs_popup(self):
popup = tk.Toplevel(self.master)
popup.title("Graphs")
popup.geometry("900x800")
interfaces = self.list_interfaces() # Correct use of self
selected_interface = tk.StringVar(value=interfaces[0])
interface_selector = ttk.Combobox(
popup, textvariable=selected_interface, values=interfaces
)
interface_selector.pack()
# Set up the matplotlib figure and canvas
fig = plt.Figure(figsize=(5, 4), dpi=100)
canvas = FigureCanvasTkAgg(fig, master=popup)
canvas_widget = canvas.get_tk_widget()
canvas_widget.pack(fill=tk.BOTH, expand=True)
# Automatic refresh control variable
auto_refresh_var = tk.IntVar(value=0)
def auto_update_graph():
if auto_refresh_var.get():
update_graph()
popup.after(1000, auto_update_graph) # Schedule next auto-update
def update_graph(*args):
interface_name = selected_interface.get()
data_points = self.load_data_points(interface_name)
fig.clear()
ax = fig.add_subplot(111)
if data_points["timestamps"]:
dates = [mdates.date2num(ts) for ts in data_points["timestamps"]]
ax.plot_date(
dates, data_points["RX_packets"], "-", label="RX Packets/s"
)
ax.plot_date(
dates, data_points["TX_packets"], "-", label="TX Packets/s"
)
ax.plot_date(dates, data_points["RX_errors"], "-", label="RX Errors/s")
ax.plot_date(dates, data_points["TX_errors"], "-", label="TX Errors/s")
ax.plot_date(dates, data_points["Drops"], "-", label="Drops/s")
ax.plot_date(dates, data_points["Missed"], "-", label="Missed/s")
# Set locator for x-axis to MinuteLocator for 1-minute intervals
ax.xaxis.set_major_locator(MinuteLocator(interval=1)) # Every minute
# Set formatter for x-axis to show the hour and minute
ax.xaxis.set_major_formatter(DateFormatter("%H:%M"))
fig.autofmt_xdate() # Rotate dates for better spacing
ax.legend()
canvas.draw()
interface_selector.bind("<<ComboboxSelected>>", update_graph)
update_graph() # Initial call to display the graph
# Buttons frame
buttons_frame = ttk.Frame(popup)
buttons_frame.pack(side=tk.TOP, fill=tk.X, pady=10)
# "Refresh Graph" button inside the frame
refresh_button = ttk.Button(
buttons_frame, text="Refresh Graph", command=update_graph
)
refresh_button.pack(side=tk.LEFT, padx=5)
# "Snapshot" button inside the frame
snapshot_button = ttk.Button(
buttons_frame, text="Snapshot", command=self.save_snapshot
)
snapshot_button.pack(side=tk.RIGHT, padx=5)
# "Auto 1s" tick box inside the frame
auto_refresh_check = ttk.Checkbutton(
buttons_frame,
text="Auto 1s",
variable=auto_refresh_var,
command=auto_update_graph,
)
auto_refresh_check.pack(side=tk.LEFT, padx=5)
def main():
root = tk.Tk()
app = NetworkMonitorApp(root)
root.mainloop()
if __name__ == "__main__":
main()