import pprint from typing import List, Text, Optional from dataclasses import dataclass from flask import Flask, render_template, request, jsonify import uuid @dataclass class ChainRule: packets: str byte_s: str target: str protocol: str options: str inp: str out: str source: str destination: str extra: str raw: str id: Optional[str] = None def name(self): if self.id is None: self.id = str(uuid.uuid4()) return f"{self.protocol}\n{self.inp}->{self.out}>{self.target}{'(' + self.extra + ')' if self.extra else ''}" @dataclass class Chain: name: str policy: Optional[str] rules: List[ChainRule] referenced: bool = False def add_rule(self, rule: ChainRule): self.rules.append(rule) app = Flask(__name__) def find_id_position(list_of_dicts, target_id): for i, d in enumerate(list_of_dicts): if d.get("id") == target_id: return i return None def build_chain_tree(chain: Chain): # Initialize the tree shape ret = [[{"id": chain.name}], [], []] # Add each rule for rule in chain.rules: # Every rule is a new one, so just append ret[1].append({"id": rule.name(), "parents": [chain.name]}) # The destination is trickier # For each rule, check to see if the output exists in the third slot of the return # If the output does exist, then append the rules name to the list, # If the output does not exist, add a new output with the rules name pos = find_id_position(ret[2], rule.target) if pos is None: ret[2].append({"id": rule.target, "parents": [rule.name()]}) else: ret[2][pos]["parents"].append(rule.name()) return ret def build_full_chain_tree(chains: List[Chain]): ret = [[], [], []] pprint.pp(len(chains)) for chain in chains.copy(): if not chain.referenced: tree = build_chain_tree(chain) ret[0] = ret[0] + tree[0] ret[1] = ret[1] + tree[1] ret[2] = ret[2] + tree[2] chains.remove(chain) pprint.pp(len(chains)) # Merge all ends merged = [] for val in ret[-1]: t = find_id_position(merged, val["id"]) if t is None: merged.append(val) else: merged[t]["parents"] = merged[t]["parents"] + val["parents"] ret[-1] = merged while len(chains) > 0: next = chains.pop(0) next_tree = build_chain_tree(next) # weird rebuild next_id = find_id_position(ret[-1], next.name) next_pop_id = ret[-1].pop(next_id) append_ret = ret[-1] next_tree[-1] = append_ret + next_tree[-1] ret[-1] = [next_pop_id] next_tree.pop(0) ret = ret + next_tree merged = [] for val in ret[-1]: t = find_id_position(merged, val["id"]) if t is None: merged.append(val) else: merged[t]["parents"] = merged[t]["parents"] + val["parents"] ret[-1] = merged pprint.pp(ret) return ret @app.route('/parse', methods=['POST']) def manual_parse_chain(): raw_data = request.form.get('data') chains = raw_data.split("Chain") chains: List[Text] = list(filter(len, chains)) trees = [] parsed_chains = [] for chain in chains: rules_raw = chain.splitlines() if len(rules_raw) < 2: raise ValueError("bad chain") # First line of a chain is the chain metadata, such as name chain_meta_raw = rules_raw[0].strip().split(" ") policy = None if not ('references' in rules_raw[0]): policy = chain_meta_raw[2] chain = Chain(chain_meta_raw[0], policy, []) if not ('policy' in rules_raw[0]): chain.referenced = True # Second line is headers for the table, so drop # Lines past this point may not exist, so care in parsing # Third line and onwards is the rule itself for rule in rules_raw[2:]: r =[part for part in rule.split(" ") if part.strip()] if len(r) == 0: continue if len(r) > 8: r = r[:9] + [' '.join(r[9:])] cr = ChainRule(r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], "", rule) if len(r) > 9: cr = ChainRule(r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], rule) chain.add_rule(cr) # If the chain has no rules, read the policy and add default rule of any if len(chain.rules) == 0: chain.add_rule(ChainRule( 0, 0, chain.policy, "all", "--", "any", "any", "anywhere", "anywhere", "", "" )) # Build the tree for each chain tree = build_chain_tree(chain) trees.append(tree) parsed_chains.append(chain) full_tree = build_full_chain_tree(parsed_chains) return jsonify(full_tree) @app.route('/', methods=["POST"]) def capture(): # Why is counter strike pinging on port 5000??? pprint.pp(request.json) pprint.pp(request.data) pprint.pp(request.values) return jsonify({}) @app.route('/') def hello_world(): return render_template('index.html') if __name__ == "__main__": app.run()