Update Eigener Parser
parent
d5f59c6020
commit
9608cf7d16
@ -100,10 +100,190 @@ Jedes Result bildet hierbei einen einzelnen Durchlauf eines Tools ab, z.B. "otse
|
||||
Weiter haben beide Klassen den `@dataclass` und `@dataclass_json` Decorator. Dieser sorgt dafür, dass das Objekt später leicht serialisiert werden kann und erspart Boiler-Plate Code.
|
||||
|
||||
## Stage 3: Parsen der Daten
|
||||
```python
|
||||
for run in list_of_all:
|
||||
for otseca_path in glob.glob(os.path.join(run.path, "otseca*", "report*")):
|
||||
nr = re.findall(r"otseca-(\d+)", otseca_path)[0]
|
||||
run.otseca_results.append(Result(otseca_path, nr, otseca_parse(otseca_path)))
|
||||
for log_file in os.listdir(run.path):
|
||||
path = os.path.join(run.path, log_file)
|
||||
nr = re.findall(r"(\d+)", log_file)[0]
|
||||
if "lynis-console-" in log_file:
|
||||
run.lynis_results.append(Result(path, nr, lynis_parse(path)))
|
||||
if "testssl-" in log_file:
|
||||
run.testssl_results.append(Result(path, nr, testssl_parse(path)))
|
||||
|
||||
```
|
||||
|
||||
### Stage 3.1: Lynis
|
||||
|
||||
* Was ist wichtig?
|
||||
* Nur das Console-Log benutzt
|
||||
* Dat und Lynis-Log waren unbrauchbar
|
||||
|
||||
```python
|
||||
LYNIS_REGEX = {
|
||||
"green": r"\[1;32m", # more advanced "green": r"\[ .*\[1;32m([\w\d ]*).*",
|
||||
"yellow": r"\[1;33m",
|
||||
"red": r"\[1;31m",
|
||||
"white": r"\[1;37m",
|
||||
"heading": r".*\[1;33m([\w\d ,:-]*).*"
|
||||
}
|
||||
```
|
||||
|
||||
```python
|
||||
LYNIS_BLOCKS = {
|
||||
"Boot and services",
|
||||
...
|
||||
"Kernel Hardening",
|
||||
"Hardening"
|
||||
}
|
||||
```
|
||||
|
||||
```python
|
||||
with open(path_to_log) as handle:
|
||||
text = handle.read()
|
||||
blocks = text.split("[+]")
|
||||
interesting_blocks = {}
|
||||
for block in blocks:
|
||||
heading = re.findall(LYNIS_REGEX["heading"], block.splitlines()[0])
|
||||
if heading and heading[0] in LYNIS_BLOCKS:
|
||||
block_text = "".join(block.splitlines()[2:])
|
||||
interesting_blocks[heading[0]] = {
|
||||
"text": block_text,
|
||||
"counts": {
|
||||
"green": len(re.findall(LYNIS_REGEX['green'], block_text)),
|
||||
"yellow": len(re.findall(LYNIS_REGEX['yellow'], block_text)),
|
||||
"red": len(re.findall(LYNIS_REGEX['red'], block_text)),
|
||||
"white": len(re.findall(LYNIS_REGEX['white'], block_text)),
|
||||
}
|
||||
}
|
||||
```
|
||||
Weiter wurden noch waringCount, bla bla
|
||||
|
||||
Nach dem generellen step wurden dann noch spezielle props aus den obigen blocks extrahiert, dies ginge aber zu weit, genaueres in der datei [lynis.py](https://gitlab.com/marcel.schwarz/it-security-2/-/blob/master/scan_output_parser/lynis.py)
|
||||
|
||||
### Stage 3.2: Testssl
|
||||
* Einer der leichtesten Parser
|
||||
* Leider zu spät bemerkt, dass es einen JSON export gegeben hätte.
|
||||
* TXT datei hat auch funktioniert
|
||||
|
||||
```python
|
||||
def testssl_parse(path):
|
||||
with open(path) as handle:
|
||||
text = handle.read()
|
||||
|
||||
testssl_overall = {
|
||||
"443": {"open": False, "ssl": False},
|
||||
"21": {"open": False, "ssl": False},
|
||||
"465": {"open": False, "ssl": False},
|
||||
"587": {"open": False, "ssl": False},
|
||||
"110": {"open": False, "ssl": False},
|
||||
"995": {"open": False, "ssl": False},
|
||||
"993": {"open": False, "ssl": False},
|
||||
"5432": {"open": False, "ssl": False},
|
||||
"3306": {"open": False, "ssl": False}
|
||||
}
|
||||
|
||||
tests = text.split("## Scan started as: ")
|
||||
for test in tests:
|
||||
port = re.findall("Start .*? -->> 127\.0\.0\.1:(\d*) \(localhost\) <<--", test)
|
||||
if not port:
|
||||
continue
|
||||
port = port[0]
|
||||
if "Overall Grade" in test:
|
||||
testssl_overall[port]["ssl"] = True
|
||||
testssl_overall[port]["open"] = True
|
||||
if "firewall" not in test:
|
||||
testssl_overall[port]["open"] = True
|
||||
continue
|
||||
return testssl_overall
|
||||
```
|
||||
|
||||
* Gelöst durch initialisierung mit default Werten und dann das setzen auf bestimmte regex
|
||||
* Vorher trennung der einzelnen logs der ports durch einen split bei "## Scan startet as: "
|
||||
|
||||
|
||||
### Stage 3.3: Otseca
|
||||
* HTML result einfacher parse
|
||||
* Nur nach rot, gelb und grün und gesamt
|
||||
|
||||
|
||||
```python
|
||||
def otseca_box_counts(path_to_report):
|
||||
counts = {}
|
||||
for (name, curr_file) in OTSECA_FILES.items():
|
||||
curr_path = os.path.join(path_to_report, curr_file)
|
||||
with open(curr_path, encoding="UTF-8") as handle:
|
||||
text = handle.read()
|
||||
counts[name] = {
|
||||
"green": (len(re.findall("background-color: #1F9D55", text))),
|
||||
"yellow": (len(re.findall("background-color: #F2D024", text))),
|
||||
"red": (len(re.findall("background-color: #CC1F1A", text))),
|
||||
"total": (len(re.findall("background-color:", text)))
|
||||
}
|
||||
return counts
|
||||
```
|
||||
|
||||
* Weitere Distro info, was in lynis gefehlt hat.
|
||||
|
||||
```python
|
||||
def otseca_distro_info(path_to_report):
|
||||
with open(os.path.join(path_to_report, OTSECA_FILES["distro"])) as handle:
|
||||
text = handle.read()
|
||||
|
||||
pkg_count = len(re.findall("ii {2}", text))
|
||||
upgrades_count = re.findall(r"(\d+) .*? (\d+) .*? (\d+) .*? (\d+) .*", text).pop()
|
||||
|
||||
return {
|
||||
"pkgCount": pkg_count,
|
||||
"upgraded": int(upgrades_count[0]),
|
||||
"newlyInstalled": int(upgrades_count[1]),
|
||||
"remove": int(upgrades_count[2]),
|
||||
"notUpgraded": int(upgrades_count[3])
|
||||
}
|
||||
```
|
||||
|
||||
## Stage 4: Ausgabe
|
||||
### Stage 4.1: JSON-Ausgabe
|
||||
```python
|
||||
with open("export.json", "w") as handle:
|
||||
handle.write(Run.schema().dumps(list_of_all, many=True))
|
||||
```
|
||||
|
||||
* Problem mit JSON
|
||||
|
||||
### Stage 4.2: SQLite-Ausgabe
|
||||
* Warum SQLite?
|
||||
|
||||
```python
|
||||
for run in list_of_all:
|
||||
write_run_to_db(run)
|
||||
```
|
||||
|
||||
```python
|
||||
def write_run_to_db(run):
|
||||
conn = sqlite3.connect(DB_NAME)
|
||||
run_data = (run.id, run.platform, run.system, run.version, run.path)
|
||||
conn.execute("INSERT OR IGNORE INTO runs VALUES (?, ?, ?, ?, ?)", run_data)
|
||||
|
||||
... otseca ...
|
||||
... testssl ...
|
||||
|
||||
for lynis_res in run.lynis_results:
|
||||
categories = list(lynis_res.result.values())[:31]
|
||||
general = list(lynis_res.result.values())[31]
|
||||
data = (
|
||||
run.id, lynis_res.run_nr, str(lynis_res.path),
|
||||
*list(map(json.dumps, categories)),
|
||||
*general.values()
|
||||
)
|
||||
print(data)
|
||||
conn.execute("INSERT OR IGNORE INTO lynis_results VALUES (" + "".join("?," * 37) + "?)", data)
|
||||
conn.commit()
|
||||
```
|
||||
# Ergebnisse
|
||||
* Insgesamt etwa 30 RegEx um zu parsen
|
||||
* laufzeit unter 10 Sekunden
|
||||
* Extremer Vorteil durch das abfragen mit SQL Syntax
|
||||
* Keine Flüchtigkeits- oder Übertragungsfehler durch menschliches verschulden
|
Loading…
Reference in New Issue
Block a user