187 lines
5.2 KiB
Python
187 lines
5.2 KiB
Python
import pprint
|
|
|
|
from typing import List, Text, Optional
|
|
|
|
from dataclasses import dataclass
|
|
|
|
from flask import Flask, render_template, request, jsonify
|
|
|
|
import uuid
|
|
|
|
@dataclass
|
|
class ChainRule:
|
|
packets: str
|
|
byte_s: str
|
|
target: str
|
|
protocol: str
|
|
options: str
|
|
inp: str
|
|
out: str
|
|
source: str
|
|
destination: str
|
|
extra: str
|
|
raw: str
|
|
id: Optional[str] = None
|
|
|
|
def name(self):
|
|
if self.id is None:
|
|
self.id = str(uuid.uuid4())
|
|
return f"{self.protocol}\n{self.inp}->{self.out}>{self.target}{'(' + self.extra + ')' if self.extra else ''}"
|
|
|
|
@dataclass
|
|
class Chain:
|
|
name: str
|
|
policy: Optional[str]
|
|
rules: List[ChainRule]
|
|
referenced: bool = False
|
|
|
|
def add_rule(self, rule: ChainRule):
|
|
self.rules.append(rule)
|
|
|
|
app = Flask(__name__)
|
|
|
|
def find_id_position(list_of_dicts, target_id):
|
|
for i, d in enumerate(list_of_dicts):
|
|
if d.get("id") == target_id:
|
|
return i
|
|
return None
|
|
|
|
|
|
def build_chain_tree(chain: Chain):
|
|
# Initialize the tree shape
|
|
ret = [[{"id": chain.name}], [], []]
|
|
# Add each rule
|
|
for rule in chain.rules:
|
|
# Every rule is a new one, so just append
|
|
ret[1].append({"id": rule.name(), "parents": [chain.name]})
|
|
# The destination is trickier
|
|
# For each rule, check to see if the output exists in the third slot of the return
|
|
# If the output does exist, then append the rules name to the list,
|
|
# If the output does not exist, add a new output with the rules name
|
|
pos = find_id_position(ret[2], rule.target)
|
|
if pos is None:
|
|
ret[2].append({"id": rule.target, "parents": [rule.name()]})
|
|
else:
|
|
ret[2][pos]["parents"].append(rule.name())
|
|
|
|
return ret
|
|
|
|
|
|
def build_full_chain_tree(chains: List[Chain]):
|
|
ret = [[], [], []]
|
|
pprint.pp(len(chains))
|
|
for chain in chains.copy():
|
|
if not chain.referenced:
|
|
tree = build_chain_tree(chain)
|
|
ret[0] = ret[0] + tree[0]
|
|
ret[1] = ret[1] + tree[1]
|
|
ret[2] = ret[2] + tree[2]
|
|
chains.remove(chain)
|
|
pprint.pp(len(chains))
|
|
|
|
# Merge all ends
|
|
merged = []
|
|
for val in ret[-1]:
|
|
t = find_id_position(merged, val["id"])
|
|
if t is None:
|
|
merged.append(val)
|
|
else:
|
|
merged[t]["parents"] = merged[t]["parents"] + val["parents"]
|
|
ret[-1] = merged
|
|
|
|
while len(chains) > 0:
|
|
next = chains.pop(0)
|
|
next_tree = build_chain_tree(next)
|
|
# weird rebuild
|
|
next_id = find_id_position(ret[-1], next.name)
|
|
next_pop_id = ret[-1].pop(next_id)
|
|
append_ret = ret[-1]
|
|
next_tree[-1] = append_ret + next_tree[-1]
|
|
ret[-1] = [next_pop_id]
|
|
next_tree.pop(0)
|
|
ret = ret + next_tree
|
|
merged = []
|
|
for val in ret[-1]:
|
|
t = find_id_position(merged, val["id"])
|
|
if t is None:
|
|
merged.append(val)
|
|
else:
|
|
merged[t]["parents"] = merged[t]["parents"] + val["parents"]
|
|
ret[-1] = merged
|
|
|
|
|
|
|
|
|
|
pprint.pp(ret)
|
|
return ret
|
|
|
|
|
|
@app.route('/parse', methods=['POST'])
|
|
def manual_parse_chain():
|
|
raw_data = request.form.get('data')
|
|
chains = raw_data.split("Chain")
|
|
chains: List[Text] = list(filter(len, chains))
|
|
trees = []
|
|
parsed_chains = []
|
|
for chain in chains:
|
|
rules_raw = chain.splitlines()
|
|
if len(rules_raw) < 2:
|
|
raise ValueError("bad chain")
|
|
|
|
# First line of a chain is the chain metadata, such as name
|
|
chain_meta_raw = rules_raw[0].strip().split(" ")
|
|
policy = None
|
|
if not ('references' in rules_raw[0]):
|
|
policy = chain_meta_raw[2]
|
|
|
|
chain = Chain(chain_meta_raw[0], policy, [])
|
|
if not ('policy' in rules_raw[0]):
|
|
chain.referenced = True
|
|
|
|
|
|
# Second line is headers for the table, so drop
|
|
|
|
# Lines past this point may not exist, so care in parsing
|
|
# Third line and onwards is the rule itself
|
|
for rule in rules_raw[2:]:
|
|
r =[part for part in rule.split(" ") if part.strip()]
|
|
if len(r) == 0:
|
|
continue
|
|
if len(r) > 8:
|
|
r = r[:9] + [' '.join(r[9:])]
|
|
cr = ChainRule(r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], "", rule)
|
|
if len(r) > 9:
|
|
cr = ChainRule(r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], rule)
|
|
chain.add_rule(cr)
|
|
|
|
# If the chain has no rules, read the policy and add default rule of any
|
|
if len(chain.rules) == 0:
|
|
chain.add_rule(ChainRule(
|
|
0, 0, chain.policy, "all", "--", "any", "any", "anywhere", "anywhere", "", ""
|
|
))
|
|
|
|
# Build the tree for each chain
|
|
tree = build_chain_tree(chain)
|
|
trees.append(tree)
|
|
parsed_chains.append(chain)
|
|
|
|
full_tree = build_full_chain_tree(parsed_chains)
|
|
return jsonify(full_tree)
|
|
|
|
|
|
@app.route('/', methods=["POST"])
|
|
def capture():
|
|
# Why is counter strike pinging on port 5000???
|
|
pprint.pp(request.json)
|
|
pprint.pp(request.data)
|
|
pprint.pp(request.values)
|
|
return jsonify({})
|
|
|
|
@app.route('/')
|
|
def hello_world():
|
|
return render_template('index.html')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run()
|