import re
def extract_ip(line):
ip_match = r.search(r'\some regex here for ip', line)
if ip_match and ip_match.group(1):
return ip_match.group(1)
elif ip_match and ip_match.group(2):
return ip_match.group(2)
return '7.7.7.7'
inf = 'normalizethis.txt'
outf = 'normalized.txt'
with open(inf, 'r') as file:
lines = file.readlines()
normalized_ips = []
for line in lines:
ip = extract_ip(line)
normalized_ips.append(ip + '\n')
with open(outf, 'w') as file:
file.writelines(normalized_ips)
import csv, argparse, os, re, time
from collections import Counter
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--infile", help="Input CSV file")
parser.add_argument("-o", "--outfile", help="Output CSV file")
if not os.path.isfile(args.infile):
parser.print_usage()
exit(1)
RULE_ID = "10"
POLICY_NAME = "DCE-Temp"
ACCESS_RULE = "Temporary Rule"
ACTION = "Accept"
BLADE = "Firewall"
IPS = 'accepted-ip.csv' # hostname, ip, x
PORTS = 'accepted-ports.csv' # tcp,9389
connections = {}
total_dest_hits = Counter()
not_skipped = 0
skipped_e = 0
skipped_f = 0
skipped_id = 0
total_rows = 0
def check_networks(ip_add, net_add):
ip_parts = ip_add.split('.')
net_parts = net_add.split('.')
# /16 tests only
return ip_parts[:2] == net_parts[:2]
accepted_ip = set()
if os.path.isfile(IPS):
with open(IPS, 'r') as these_ips:
reader = csv.reader(these_ips)
accepted_ip = {row[1].strip() for row in reader if row}
accepted_ports = set()
if os.path.isfile(PORTS):
with open(PORTS, 'r') as these_ports:
reader = csv.reader(these_ports)
accepted_ports = { (row[0].strip().upper(), str(row[1].strip())) for row in reader if row}
with open(args.infile, 'r') as f:
reader = csv.DictReader(f)
reader.fieldnames = [name.lower() for name in reader.fieldnames]
for row in reader:
total_rows += 1
try:
port = str(row.get('destination port', '0'>> if row.get('protocol', "").startswith('ICMP') else str(row.get('destination port'))
protocol = row.get('protocol', "").split(' ')[0].upper()
ip = row['source'].strip() # or 'destination'
match = re.search(r'\b\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}\b', ip)
if match:
ip = match.group(0)
if str(row['rule']) != RULE_ID:
skipped_id += 1
continue
if (
# row['policy name'] != POLICY_NAME or
# etc
#any (check_networks(ip, network) for network in accepted_ip) and # for networks
#(accepted_ip and ip in accepted_ip) and
(accepted_ports and (protocol, port) in accepted_ports)
):
pass
else:
skipped_f += 1
continue
key = (row['origin'], protocol, row[destination'] + ':' + port)
source = row.get('source')
if not source or not isinstance(source,str):
print(f"inavlid source: {row}")
continue
if key not in connections:
connections[key] = Counter()
connections[key][source] += 1
total_dest_hits[row['destination'] + ":" + port] += 1
not_skipped += 1
except KeyError as e:
print(f"Missing field {e} in {row}")
skipped_e +=1
coninue
with open(args.outfile, 'w', newline='') as f:
fieldnames = ['Origin', 'Protocol', 'Source', 'Destination', 'Port', 'Count', 'Total Dest Hits']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for key, sources in connections.items():
for source, count in sources.items():
origin, protocol, dest_port = key
destination, port = dest_port.split(':')
writer.writerow({
'Origin': orgin,
'Protocol': protocol,
'Source':soruce,
'Destination': destination,
'Port':port,
'Count': count,
'Total Dest Hits': total_dest_hits[dest_port]
})
print(total_rows)
print(skipped_id)
print(skipped_e)
print(skipped_f)
print(not_skipped)