Information Technology Grimoire

Version .0.0.1

IT Notes from various projects because I forget, and hopefully they help you too.

Python3 Debian Network Monitor

Need to log interface statistics on debian to include errors, down time, etc?

I was experiencing disconnects and only on a particular laptop. I thought maybe the drivers were bad or needed reconfiguration as it was a fresh install. After a bit of troubleshooting it was determined to be the switch.

That sent me on a rabbit hole of monitoring things and so I wrote a script to do it. It works on debian but should work on any linux system. It’s reading ip and only requires matplotlib (python3 -m pip install matplotlib ). As always, I recommend a venv!

Python Network Monitor

Features

  • Pings once per second to local and remote ips, like mtr
  • Logs to window when an outtage has occurred (based on ping down)
  • Saves historical json data files
  • Parses the json data on a graph
  • updates the tkinter window with threads
  • Green background to view at a distance (good pings)
  • Red backgrond to view at a distance (bad pings)

The Network Monitoring Script

# Standard library imports
import datetime
import glob
import json
import os
import re
import subprocess
import threading
import time
from datetime import datetime, timedelta

# Tkinter related imports (also part of the standard library)
import tkinter as tk
from tkinter import ttk
import tkinter.messagebox   # unused at moment

# Third-party imports
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.dates as mdates
from matplotlib.dates import DateFormatter, MinuteLocator


APPVERSION = .06

class NetworkMonitorApp:
    def __init__(self, master):
        self.master = master
        self.master.title(f"IP Monitor and Interface Statistics v{APPVERSION}")

        self.setup_ui()

        self.after_jobs = []  # List to keep track of 'after' callbacks
        self.master.protocol("WM_DELETE_WINDOW", self.on_close)

    def setup_ui(self):
        self.input_frame = ttk.Frame(self.master, padding="10")
        self.input_frame.grid(row=0, column=0, sticky="ew")
        self.input_frame.columnconfigure([0, 1, 2, 3], weight=1)

        ttk.Label(self.input_frame, text="Test Local IP:").grid(
            row=0, column=0, sticky="w", padx=5
        )
        self.local_ip_entry = ttk.Entry(self.input_frame)
        self.local_ip_entry.grid(row=0, column=1, sticky="ew", padx=5)
        self.local_ip_entry.insert(0, "192.168.7.1")  # default value

        ttk.Label(self.input_frame, text="Test Public IP:").grid(
            row=1, column=0, sticky="w", padx=5
        )
        self.public_ip_entry = ttk.Entry(self.input_frame)
        self.public_ip_entry.grid(row=1, column=1, sticky="ew", padx=5)
        self.public_ip_entry.insert(0, "1.1.1.1")  # default value

        self.view_graphs_button = ttk.Button(
            self.input_frame, text="View Graphs", command=self.open_graphs_popup
        )
        self.view_graphs_button.grid(row=0, column=2, padx=5, pady=5, sticky="ew")

        self.snapshot_button = ttk.Button(
            self.input_frame, text="Save Report", command=self.save_snapshot
        )
        self.snapshot_button.grid(row=1, column=2, padx=5, pady=5, sticky="ew")

        self.start_button = ttk.Button(
            self.input_frame, text="Start Monitoring", command=self.start_monitoring
        )
        self.start_button.grid(
            row=2, column=0, columnspan=4, padx=5, pady=5, sticky="ew"
        )

        self.setup_treeview()
        self.setup_text_area()

    def setup_treeview(self):
        self.tree_view = ttk.Treeview(
            self.master,
            columns=(
                "Interface",
                "MAC",
                "IP",
                "RX Packets",
                "TX Packets",
                "RX Errors",
                "TX Errors",
                "Drops",
                "Missed",
            ),
            show="headings",
        )
        self.tree_view.grid(row=3, column=0, sticky="nsew", padx=5, pady=5)
        self.master.rowconfigure(3, weight=1)

        # Configure headings
        headings = (
            "Interface",
            "MAC",
            "IP",
            "RX Packets",
            "TX Packets",
            "RX Errors",
            "TX Errors",
            "Drops",
            "Missed",
        )

        # Specify individual column widths
        column_widths = {
            "Interface": 150,
            "MAC": 150,
            "IP": 110,
            "RX Packets": 90,
            "TX Packets": 90,
            "RX Errors": 80,
            "TX Errors": 80,
            "Drops": 70,
            "Missed": 70,
        }

        for heading in headings:
            self.tree_view.heading(heading, text=heading)
            self.tree_view.column(
                heading, width=column_widths[heading]
            )  # Set column width individually

    def setup_text_area(self):
        self.text_area = tk.Text(
            self.master, height=10
        )  # Use self.master instead of self.root
        self.text_area.grid(row=2, column=0, sticky="nsew", pady=5)

    def start_monitoring(self):
        self.text_area.insert(tk.END, "Monitoring started...\n")

        # Clear Treeview
        self.tree_view.delete(*self.tree_view.get_children())

        # Get IPs from entry fields
        local_ip = self.local_ip_entry.get()
        public_ip = self.public_ip_entry.get()

        # Example Treeview update using the IPs from entry fields
        self.tree_view.insert("", "end", values=(local_ip, "Ready"))
        self.tree_view.insert("", "end", values=(public_ip, "Ready"))

        # Start threads for monitoring IPs using the IPs from entry fields
        threading.Thread(target=self.monitor_ip, args=(local_ip,), daemon=True).start()
        threading.Thread(target=self.monitor_ip, args=(public_ip,), daemon=True).start()

        # Update Treeview with interface stats periodically
        job = self.master.after(1000, self.update_treeview_periodically)
        self.after_jobs.append(job)

    def monitor_ip(self, ip_address):
        while True:
            if self.ping_ip(ip_address):
                # If connectivity is good, change text area background to green
                self.master.after(0, lambda: self.text_area.configure(bg="light green"))
            else:
                current_time = time.strftime("%Y-%m-%d %H:%M:%S")
                log_message = f"Connectivity lost to {ip_address} at {current_time}\n"
                self.master.after(0, lambda: self.text_area.insert(tk.END, log_message))
                self.master.after(0, lambda: self.text_area.see(tk.END))
                self.master.after(0, lambda: self.text_area.configure(bg="pink"))
            time.sleep(5)

    def get_interface_stats(self):
        output = subprocess.check_output(["ip", "-s", "link"], text=True)
        interface_stats = {}

        for line in output.splitlines():
            line = line.strip()
            if not line:
                continue
            if ":" in line and line.split(":")[0].isdigit():
                parts = line.split(":")
                interface_name = parts[1].split()[0].strip()
                interface_stats[interface_name] = {"name": interface_name}
            elif line.startswith("link/"):
                mac = line.split()[1]
                interface_stats[interface_name]["mac"] = mac
            elif any(prefix in line for prefix in ("RX:", "TX:")):
                current_stat_type = line.split()[0]
            elif line[0].isdigit():
                parts = line.split()
                stats = {
                    "packets": parts[1],
                    "errors": parts[2],
                    "dropped": parts[3],
                    "missed": parts[4] if len(parts) > 4 else "N/A",
                }
                if current_stat_type == "RX:":
                    interface_stats[interface_name]["RX"] = stats
                elif current_stat_type == "TX:":
                    interface_stats[interface_name]["TX"] = stats

        # Now we need to call get_ip_address for each interface to populate the IP addresses
        for iface_name in interface_stats.keys():
            interface_stats[iface_name]["ip"] = self.get_ip_address(iface_name)

        return interface_stats

    def update_treeview(self):
        self.tree_view.delete(*self.tree_view.get_children())  # Clear the Treeview
        interface_stats = self.get_interface_stats()  # Get updated stats

        for interface_id, stats in interface_stats.items():
            # Accessing the nested dictionaries for RX and TX
            rx_stats = stats.get("RX", {})
            tx_stats = stats.get("TX", {})
            self.tree_view.insert(
                "",
                "end",
                values=(
                    stats["name"],
                    stats.get("mac", "N/A"),
                    stats.get("ip", "N/A"),
                    rx_stats.get("packets", "N/A"),
                    tx_stats.get("packets", "N/A"),
                    rx_stats.get("errors", "N/A"),
                    tx_stats.get("errors", "N/A"),
                    rx_stats.get("dropped", "N/A"),
                    tx_stats.get("dropped", "N/A"),
                    rx_stats.get("missed", "N/A"),
                ),
            )

    def get_ip_address(self, interface_name):
        try:
            # Execute the command and decode the output to get the interface details
            output = subprocess.check_output(
                ["ip", "addr", "show", interface_name], text=True
            )

            # Regex pattern to match IPv4 addresses
            ip_pattern = re.compile(r"inet (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})")

            # Search for the pattern in the command output
            match = ip_pattern.search(output)
            if match:
                return match.group(1)  # Return the first matched IP address
            else:
                return "N/A"  # Return "N/A" if no IP address is found

        except subprocess.CalledProcessError as e:
            return "N/A"  # Return "N/A" in case of any error

    def update_treeview_periodically(self):
        self.update_treeview()

        # Current time formatted to include up to minutes (YYYYMMDD_HHMM)
        current_minute_timestamp = datetime.now().strftime("%Y%m%d_%H%M")

        # Directory where snapshots are saved
        directory = "snapshots"

        # Construct the snapshot file pattern for the current minute
        snapshot_pattern = f"snapshot_*_{current_minute_timestamp}*.json"

        # Check if any file matches the pattern for the current minute
        matching_snapshots = glob.glob(os.path.join(directory, snapshot_pattern))

        # If no snapshot exists for the current minute, save a new snapshot
        if not matching_snapshots:
            self.save_snapshot()

        # Schedule this method to be called again after a short delay
        # This retains the original frequency of updates to the tree view
        job = self.master.after(
            1000, self.update_treeview_periodically
        )  # 1000ms = 1 second
        self.after_jobs.append(job)

    def save_snapshot(self):
        # Get the current interface stats
        interface_stats = self.get_interface_stats()

        # Get the local and public IP from the entry fields
        local_ip = self.local_ip_entry.get()
        public_ip = self.public_ip_entry.get()

        # Create a unique filename based on the local IP, public IP, and current time
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"snapshot_{local_ip}_{public_ip}_{timestamp}.json"

        # Use the os.path.join to create a full path, to save in 'snapshots' directory
        directory = "snapshots"
        if not os.path.exists(directory):
            os.makedirs(
                directory
            )  # Create the 'snapshots' directory if it doesn't exist
        filepath = os.path.join(directory, filename)

        # Write the data to a JSON file
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(interface_stats, f, indent=4)

        # Log the action to the text area
        self.text_area.insert(tk.END, f"Snapshot saved to {filepath}\n")

    def ping_ip(self, ip_address):
        try:
            subprocess.run(
                ["ping", "-c", "1", "-W", "5", ip_address],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                check=True,
            )
            return True  # Ping succeeded
        except subprocess.CalledProcessError:
            return False  # Ping failed

    def on_close(self):
        # Cancel all scheduled 'after' callbacks
        for job in self.after_jobs:
            self.master.after_cancel(job)
        self.master.destroy()  # Close the application window

    def list_interfaces(self, directory="snapshots"):
        interfaces = set()
        # Correctly use os.path.join with 'directory' and pattern
        for filepath in glob.glob(os.path.join(directory, "snapshot_*.json")):
            try:
                with open(filepath, "r", encoding="utf-8") as file:
                    data = json.load(file)
                    for interface_key in data.keys():
                        interfaces.add(interface_key)
            except json.JSONDecodeError:
                print(f"Skipping malformed JSON file: {filepath}")
        return list(interfaces)

    def load_data_points_cumalative(self, interface_name, directory="snapshots"):
        data_points = {"RX_packets": [], "TX_packets": [], "timestamps": []}
        for filepath in glob.glob(os.path.join(directory, "snapshot_*.json")):
            try:
                filename = os.path.basename(filepath)
                parts = filename.split("_")
                # Remove the '.json' part from the last segment
                timestamp_str = "_".join(parts[-2:]).replace(".json", "")
                print("Extracted timestamp string:", timestamp_str)  # For debugging

                # Parse the timestamp
                timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")

                with open(filepath, "r", encoding="utf-8") as file:
                    snapshot = json.load(file)
                    interface_data = snapshot.get(interface_name)
                    if interface_data:
                        rx_packets = int(interface_data["RX"]["packets"])
                        tx_packets = int(interface_data["TX"]["packets"])
                        data_points["RX_packets"].append(rx_packets)
                        data_points["TX_packets"].append(tx_packets)
                        data_points["timestamps"].append(timestamp)
            except json.JSONDecodeError:
                print(f"Skipping malformed JSON file: {filepath}")
        return data_points

    # Calculates over last 20 min, at first, but then adds more
    def load_data_points(self, interface_name, directory="snapshots"):
        data_points = {
            "RX_packets": [],
            "TX_packets": [],
            "RX_errors": [],
            "TX_errors": [],
            "Drops": [],
            "Missed": [],
            "timestamps": [],
        }
        snapshots = []

        snapshot_path = os.path.join(directory, "snapshot_*.json")
        for filepath in glob.glob(snapshot_path):
            try:
                filename = os.path.basename(filepath)
                # Assumes "snapshot_{localIP}_{publicIP}_{YYYYMMDD}_{HHMMSS}.json"
                parts = filename.split("_")
                timestamp_str = (
                    parts[-2] + "_" + parts[-1].replace(".json", "")
                ) 

                timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
                with open(filepath, "r", encoding="utf-8") as file:
                    snapshot = json.load(file)

                    interface_data = snapshot.get(interface_name)
                    if interface_data:
                        # Extracting all required metrics
                        rx_packets = int(interface_data["RX"]["packets"])
                        tx_packets = int(interface_data["TX"]["packets"])
                        rx_errors = int(interface_data["RX"]["errors"])
                        tx_errors = int(interface_data["TX"]["errors"])
                        drops = int(interface_data["RX"]["dropped"]) + int(
                            interface_data["TX"]["dropped"]
                        )  # Assuming combined drops
                        missed = int(
                            interface_data["RX"]["missed"]
                        )  # Assuming only RX missed is relevant or available
                        snapshots.append(
                            (
                                timestamp,
                                rx_packets,
                                tx_packets,
                                rx_errors,
                                tx_errors,
                                drops,
                                missed,
                            )
                        )

            except json.JSONDecodeError:
                print(f"Skipping malformed JSON file: {filepath}")
            except Exception as e:
                print(f"Error processing file {filepath}: {e}")  # General error catch

        # Sort snapshots by timestamp
        snapshots.sort(key=lambda x: x[0])

        previous_snapshot = None
        for snapshot in snapshots:
            if previous_snapshot is not None:
                delta_seconds = (snapshot[0] - previous_snapshot[0]).total_seconds()
                if delta_seconds > 0:
                    rx_delta = (snapshot[1] - previous_snapshot[1]) / delta_seconds
                    tx_delta = (snapshot[2] - previous_snapshot[2]) / delta_seconds
                    rx_error_delta = (snapshot[3] - previous_snapshot[3]) / delta_seconds
                    tx_error_delta = (snapshot[4] - previous_snapshot[4]) / delta_seconds
                    drop_delta = (snapshot[5] - previous_snapshot[5]) / delta_seconds
                    missed_delta = (snapshot[6] - previous_snapshot[6]) / delta_seconds

                    data_points["RX_packets"].append(rx_delta)
                    data_points["TX_packets"].append(tx_delta)
                    data_points["RX_errors"].append(rx_error_delta)
                    data_points["TX_errors"].append(tx_error_delta)
                    data_points["Drops"].append(drop_delta)
                    data_points["Missed"].append(missed_delta)
                    data_points["timestamps"].append(snapshot[0])
            previous_snapshot = snapshot

        # Limit data points to the last 20 minutes
        now = datetime.now()
        twenty_minutes_ago = now - timedelta(minutes=20)
        data_points = {
            key: [
                val
                for i, val in enumerate(data_points[key])
                if data_points["timestamps"][i] >= twenty_minutes_ago
            ]
            for key in data_points
        }

        return data_points

    def open_graphs_popup(self):
        popup = tk.Toplevel(self.master)
        popup.title("Graphs")
        popup.geometry("900x800")

        interfaces = self.list_interfaces()  # Correct use of self
        selected_interface = tk.StringVar(value=interfaces[0])

        interface_selector = ttk.Combobox(
            popup, textvariable=selected_interface, values=interfaces
        )
        interface_selector.pack()

        # Set up the matplotlib figure and canvas
        fig = plt.Figure(figsize=(5, 4), dpi=100)
        canvas = FigureCanvasTkAgg(fig, master=popup)
        canvas_widget = canvas.get_tk_widget()
        canvas_widget.pack(fill=tk.BOTH, expand=True)

        # Automatic refresh control variable
        auto_refresh_var = tk.IntVar(value=0)

        def auto_update_graph():
            if auto_refresh_var.get():
                update_graph()
                popup.after(1000, auto_update_graph)  # Schedule next auto-update

        def update_graph(*args):
            interface_name = selected_interface.get()
            data_points = self.load_data_points(interface_name)

            fig.clear()
            ax = fig.add_subplot(111)

            if data_points["timestamps"]:
                dates = [mdates.date2num(ts) for ts in data_points["timestamps"]]
                ax.plot_date(
                    dates, data_points["RX_packets"], "-", label="RX Packets/s"
                )
                ax.plot_date(
                    dates, data_points["TX_packets"], "-", label="TX Packets/s"
                )
                ax.plot_date(dates, data_points["RX_errors"], "-", label="RX Errors/s")
                ax.plot_date(dates, data_points["TX_errors"], "-", label="TX Errors/s")
                ax.plot_date(dates, data_points["Drops"], "-", label="Drops/s")
                ax.plot_date(dates, data_points["Missed"], "-", label="Missed/s")

                # Set locator for x-axis to MinuteLocator for 1-minute intervals
                ax.xaxis.set_major_locator(MinuteLocator(interval=1))  # Every minute

                # Set formatter for x-axis to show the hour and minute
                ax.xaxis.set_major_formatter(DateFormatter("%H:%M"))

                fig.autofmt_xdate()  # Rotate dates for better spacing
                ax.legend()

            canvas.draw()

        interface_selector.bind("<<ComboboxSelected>>", update_graph)
        update_graph()  # Initial call to display the graph

        # Buttons frame
        buttons_frame = ttk.Frame(popup)
        buttons_frame.pack(side=tk.TOP, fill=tk.X, pady=10)

        # "Refresh Graph" button inside the frame
        refresh_button = ttk.Button(
            buttons_frame, text="Refresh Graph", command=update_graph
        )
        refresh_button.pack(side=tk.LEFT, padx=5)

        # "Snapshot" button inside the frame
        snapshot_button = ttk.Button(
            buttons_frame, text="Snapshot", command=self.save_snapshot
        )
        snapshot_button.pack(side=tk.RIGHT, padx=5)

        # "Auto 1s" tick box inside the frame
        auto_refresh_check = ttk.Checkbutton(
            buttons_frame,
            text="Auto 1s",
            variable=auto_refresh_var,
            command=auto_update_graph,
        )
        auto_refresh_check.pack(side=tk.LEFT, padx=5)


def main():
    root = tk.Tk()
    app = NetworkMonitorApp(root)
    root.mainloop()


if __name__ == "__main__":
    main()
Last updated on 9 Feb 2024
Published on 9 Feb 2024