ipflow/main.py

187 lines
5.2 KiB
Python

import pprint
from typing import List, Text, Optional
from dataclasses import dataclass
from flask import Flask, render_template, request, jsonify
import uuid
@dataclass
class ChainRule:
packets: str
byte_s: str
target: str
protocol: str
options: str
inp: str
out: str
source: str
destination: str
extra: str
raw: str
id: Optional[str] = None
def name(self):
if self.id is None:
self.id = str(uuid.uuid4())
return f"{self.protocol}\n{self.inp}->{self.out}>{self.target}{'(' + self.extra + ')' if self.extra else ''}"
@dataclass
class Chain:
name: str
policy: Optional[str]
rules: List[ChainRule]
referenced: bool = False
def add_rule(self, rule: ChainRule):
self.rules.append(rule)
app = Flask(__name__)
def find_id_position(list_of_dicts, target_id):
for i, d in enumerate(list_of_dicts):
if d.get("id") == target_id:
return i
return None
def build_chain_tree(chain: Chain):
# Initialize the tree shape
ret = [[{"id": chain.name}], [], []]
# Add each rule
for rule in chain.rules:
# Every rule is a new one, so just append
ret[1].append({"id": rule.name(), "parents": [chain.name]})
# The destination is trickier
# For each rule, check to see if the output exists in the third slot of the return
# If the output does exist, then append the rules name to the list,
# If the output does not exist, add a new output with the rules name
pos = find_id_position(ret[2], rule.target)
if pos is None:
ret[2].append({"id": rule.target, "parents": [rule.name()]})
else:
ret[2][pos]["parents"].append(rule.name())
return ret
def build_full_chain_tree(chains: List[Chain]):
ret = [[], [], []]
pprint.pp(len(chains))
for chain in chains.copy():
if not chain.referenced:
tree = build_chain_tree(chain)
ret[0] = ret[0] + tree[0]
ret[1] = ret[1] + tree[1]
ret[2] = ret[2] + tree[2]
chains.remove(chain)
pprint.pp(len(chains))
# Merge all ends
merged = []
for val in ret[-1]:
t = find_id_position(merged, val["id"])
if t is None:
merged.append(val)
else:
merged[t]["parents"] = merged[t]["parents"] + val["parents"]
ret[-1] = merged
while len(chains) > 0:
next = chains.pop(0)
next_tree = build_chain_tree(next)
# weird rebuild
next_id = find_id_position(ret[-1], next.name)
next_pop_id = ret[-1].pop(next_id)
append_ret = ret[-1]
next_tree[-1] = append_ret + next_tree[-1]
ret[-1] = [next_pop_id]
next_tree.pop(0)
ret = ret + next_tree
merged = []
for val in ret[-1]:
t = find_id_position(merged, val["id"])
if t is None:
merged.append(val)
else:
merged[t]["parents"] = merged[t]["parents"] + val["parents"]
ret[-1] = merged
pprint.pp(ret)
return ret
@app.route('/parse', methods=['POST'])
def manual_parse_chain():
raw_data = request.form.get('data')
chains = raw_data.split("Chain")
chains: List[Text] = list(filter(len, chains))
trees = []
parsed_chains = []
for chain in chains:
rules_raw = chain.splitlines()
if len(rules_raw) < 2:
raise ValueError("bad chain")
# First line of a chain is the chain metadata, such as name
chain_meta_raw = rules_raw[0].strip().split(" ")
policy = None
if not ('references' in rules_raw[0]):
policy = chain_meta_raw[2]
chain = Chain(chain_meta_raw[0], policy, [])
if not ('policy' in rules_raw[0]):
chain.referenced = True
# Second line is headers for the table, so drop
# Lines past this point may not exist, so care in parsing
# Third line and onwards is the rule itself
for rule in rules_raw[2:]:
r =[part for part in rule.split(" ") if part.strip()]
if len(r) == 0:
continue
if len(r) > 8:
r = r[:9] + [' '.join(r[9:])]
cr = ChainRule(r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], "", rule)
if len(r) > 9:
cr = ChainRule(r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], rule)
chain.add_rule(cr)
# If the chain has no rules, read the policy and add default rule of any
if len(chain.rules) == 0:
chain.add_rule(ChainRule(
0, 0, chain.policy, "all", "--", "any", "any", "anywhere", "anywhere", "", ""
))
# Build the tree for each chain
tree = build_chain_tree(chain)
trees.append(tree)
parsed_chains.append(chain)
full_tree = build_full_chain_tree(parsed_chains)
return jsonify(full_tree)
@app.route('/', methods=["POST"])
def capture():
# Why is counter strike pinging on port 5000???
pprint.pp(request.json)
pprint.pp(request.data)
pprint.pp(request.values)
return jsonify({})
@app.route('/')
def hello_world():
return render_template('index.html')
if __name__ == "__main__":
app.run()