6 Eigener Parser
Tobias Wieck edited this page 2021-02-07 14:41:32 +00:00

Warum ein Parser?

Zunächst stellt sich die Frage, warum wird ein programmatischer Parser benötigt? Zu Beginn war dies zwar abzusehen, aber nicht zwingend notwendig. Nachdem allerdings die ersten Testläufe durchgeführt wurden, wurde schnell klar, dass es viel Arbeit wird, alles auszuwerten. Wir haben uns gemeinsam die Logdateien der einzelnen Tools angesehen, und versucht, Metriken und Kategorien zu definieren. Alles in allem haben wir aus den drei Tools etwa 75, mehr oder weniger interessante, Kategorien definiert. Multipliziert man die Anzahl der Kategorien nun mit der Anzahl der Durchläufe, wird die Datenmenge besser ersichtlich. Aus 56 Durchläufen, aus denen je 75 Kategorien extrahiert werden, (teilweise hatten Kategorien auch einen zusammengesetzten Wert) erhält man ca. 4200 Tabellen Zellen, die von Hand befüllt werden müssten. Dies ist ein enormer Zeitaufwand und sehr fehlerträchtig. Daher blieb nur die Lösung durch einen programmatischen Ansatz.

Umsetzung

Technologieauswahl

Eine Art Parser schien die beste Möglichkeit zu sein, da alle Ausgabedateien von anderer Software generiert wurden. Aufgrund dieser Prämisse könnten sie bestimmt auch wieder einfach eingelesen und verarbeitet werden.

Als Sprache war hier Python die erste Wahl. Python bietet sich enorm gut dafür an, da es nicht nötig ist, feste Datenstrukturen zu erstellen. Alle Typen sind dynamisch. Weiter war es nicht wichtig, ob die Laufzeit optimal ist. Ob der Parser nun vier Sekunden oder zehn benötigt, war nicht relevant.

Stage 1: Präparieren der Eingabedateien

Durch die systematische Vorarbeit, die unser Script in Bezug auf die Verzeichnisstruktur erledigt hat, lagen alle Ergebnisse in der gleichen Struktur vor. Die einzige Schwierigkeit war es nun noch, jedem Test einen eindeutigen Namen zu geben, der programmatisch verarbeitet werden kann. Folgendes Schema erschien uns hier sinnvoll.

{GLOBAL_ID}_{VENDOR}_{SYSTEM}_{VERSION}

Also beispielsweise:

10_gcp_ubuntu_16.04
2_hetzner_ubuntu_16.04

Diese Benennung wurde von Hand durchgeführt. Hieraus ergibt sich dann die folgende Dateistruktur (Ausschnittsweise):

9_gcp_ubuntu_20.04
    ├── lynis-console-1.log
    ├── lynis-console-2.log
    ├── lynis-console-3.log
    ├── lynis-log-1.log
    ├── lynis-log-2.log
    ├── lynis-log-3.log
    ├── lynis-report-1.dat
    ├── lynis-report-2.dat
    ├── lynis-report-3.dat
    ├── otseca-1
    │   └── report.1610030682
    │       ├── kernel.all.log.html
    │       ├── network.all.log.html
    │       ├── permissions.all.log.html
    │       ├── services.all.log.html
    │       ├── system.all.log.html
    │       └── vendor
    ├── otseca-2
    │   └── report.1610031105
    │       ├── kernel.all.log.html
    │       ├── network.all.log.html
    │       ├── permissions.all.log.html
    │       ├── services.all.log.html
    │       ├── system.all.log.html
    │       └── vendor
    ├── otseca-3
    │   └── report.1610032314
    │       ├── kernel.all.log.html
    │       ├── network.all.log.html
    │       ├── permissions.all.log.html
    │       ├── services.all.log.html
    │       ├── system.all.log.html
    │       └── vendor
    ├── testssl-1.log
    ├── testssl-2.log
    └── testssl-3.log

Insgesamt ergeben sich dann über alle Scans und Tests ca. 30 Dateien pro Test auf 18 Tests etwa 540 Logfiles. Zu finden sind alle Dateien im Repository unter "raw_scans".

Stage 2: Einlesen der Logs

Die zweite Stufe ist dann die erste programmatische Arbeit. Der Parser liest über einen glob-Befehl das Verzeichnis aus, in dem die Scans liegen. Danach wird das oben erklärte Benennungsschema aus dem Namen extrahiert. Ist der Parsevorgang erfolgreich (was er immer ist), wird ein neues Element in die Liste aller Tests gelegt.

list_of_all = []
all_scans = glob.glob(os.path.join(BASE_SCAN_PATH, "*", ""))
for scan in all_scans:
    findings = re.findall(r"(\d+)_(.*)_(.*)_(.*)", os.path.dirname(scan))
    findings = findings[0]
    list_of_all.append(
        Run(findings[0], findings[1], findings[2], findings[3], scan, [], [], [])
    )

Die hier verwendete "Run" Datenstruktur ist eine von zwei Datenstrukturen, die wir angelegt haben, um den späteren Export zu erleichtern. Zum einen wäre hier der "Run" Datentyp:

@dataclass_json
@dataclass
class Run:
    id: int
    platform: str
    system: str
    version: str
    path: str
    otseca_results: List[Result]
    lynis_results: List[Result]
    testssl_results: List[Result]

Er speichert alle Informationen zu einem Test und zu jedem Tool eine Liste an "Results". "Results" ist hierbei der zweite eigene Datentyp:

@dataclass_json
@dataclass
class Result:
    path: str
    run_nr: int
    result: dict

Jedes Result bildet hierbei einen einzelnen Durchlauf eines Tools ab, z.B. "otseca-1" wäre ein "Result" Objekt oder aber auch "lynis-3".

Weiter haben beide Klassen den @dataclass und @dataclass_json Decorator. Dieser sorgt dafür, dass das Objekt später leicht serialisiert werden kann und erspart Boiler-Plate Code.

Stage 3: Parsen der Daten

for run in list_of_all:
    for otseca_path in glob.glob(os.path.join(run.path, "otseca*", "report*")):
        nr = re.findall(r"otseca-(\d+)", otseca_path)[0]
        run.otseca_results.append(Result(otseca_path, nr, otseca_parse(otseca_path)))
    for log_file in os.listdir(run.path):
        path = os.path.join(run.path, log_file)
        nr = re.findall(r"(\d+)", log_file)[0]
        if "lynis-console-" in log_file:
            run.lynis_results.append(Result(path, nr, lynis_parse(path)))
        if "testssl-" in log_file:
            run.testssl_results.append(Result(path, nr, testssl_parse(path)))

Das Herzstück des Parsers ist die obige Schleife, sie durchläuft alle Runs und ruft für die gefundenen Logs (Otseca, Lynis oder TestSSL) den für dieses Tool zugehörigen Parser auf. Die Ergebnisse werden dann an das Ursprungsobjekt gebunden.

Stage 3.1: Lynis

Da Lynis den umfangreichsten Report liefert, haben wir hier auch fast alle Informationen gespeichert, die verfügbar waren. Alle Informationen wurden aus dem Konsolenoutput geparsed, da die Logdatei sehr viel Unwichtiges entielt und die DAT-Datei zu wenig Informationen gespeichert hat.

LYNIS_REGEX = {
    "green": r"\[1;32m",
    "yellow": r"\[1;33m",
    "red": r"\[1;31m",
    "white": r"\[1;37m",
    "heading": r".*\[1;33m([\w\d ,:-]*).*"
}

Da der Konsolenoutput mit Farben arbeit, war es für uns sehr einfach, nach Farben zu filtern. In obigem Auschnitt sieht man die Definitionen der Regular Expressions der benutzen Matcher.

LYNIS_BLOCKS = {
    "Boot and services",
    ...
    "Kernel Hardening",
    "Hardening"
}

Weiter gibt es noch ein Array, welches alle Lynis Blocks definiert, welche wir speichern wollen.

with open(path_to_log) as handle:
    text = handle.read()
blocks = text.split("[+]")
interesting_blocks = {}
for block in blocks:
    heading = re.findall(LYNIS_REGEX["heading"], block.splitlines()[0])
    if heading and heading[0] in LYNIS_BLOCKS:
        block_text = "".join(block.splitlines()[2:])
        interesting_blocks[heading[0]] = {
            "text": block_text,
            "counts": {
                "green": len(re.findall(LYNIS_REGEX['green'], block_text)),
                "yellow": len(re.findall(LYNIS_REGEX['yellow'], block_text)),
                "red": len(re.findall(LYNIS_REGEX['red'], block_text)),
                "white": len(re.findall(LYNIS_REGEX['white'], block_text)),
            }
        }

Um nun zu parsen, wurde das Logfile an jeder Kapitelüberschrift geteilt und die darin enthaltenen Anzahlen an grünen, gelben, roten und weißen Angaben gespeichert.

Weiter wurden noch der Warning Count, Suggestion count, Hardening Index und die Anzahl an ausgeführen Tests gespeichert.

Nach dem generellen step wurden dann noch spezielle props aus den obigen Blocks extrahiert, dies ginge aber zu weit, genaueres kann direkt aus dem Quellcode in der Datei lynis.py herausgelesen werden.

Stage 3.2: Testssl

Der TestSSL Parser ist einer der am einfachsten zu implementierenden Parser gewesen. Die Logfile musste lediglich bei jedem neuen Scan geteilt werden und dann nach bestimmten Keywords Ausschau gehalten werden.

Leider haben wir erst im Nachhinein bemerkt, dass es die Möglichkeit gegeben hätte, als Output-Format JSON anzugeben, was die programmatische Verarbeitung noch einfacher gemacht hätte. So hatten wir nur eine Textdatei.

def testssl_parse(path):
    with open(path) as handle:
        text = handle.read()

    testssl_overall = {
        "443": {"open": False, "ssl": False},
        "21": {"open": False, "ssl": False},
        "465": {"open": False, "ssl": False},
        "587": {"open": False, "ssl": False},
        "110": {"open": False, "ssl": False},
        "995": {"open": False, "ssl": False},
        "993": {"open": False, "ssl": False},
        "5432": {"open": False, "ssl": False},
        "3306": {"open": False, "ssl": False}
    }

    tests = text.split("## Scan started as: ")
    for test in tests:
        port = re.findall("Start .*? -->> 127\.0\.0\.1:(\d*) \(localhost\) <<--", test)
        if not port:
            continue
        port = port[0]
        if "Overall Grade" in test:
            testssl_overall[port]["ssl"] = True
            testssl_overall[port]["open"] = True
        if "firewall" not in test:
            testssl_overall[port]["open"] = True
            continue
    return testssl_overall

Der Parser arbeitet mit einem festen Array an Werten, die durch unser Run-Skript vorgegeben waren. Während des Parsens wird dann nur noch der entsprechende Boolean auf True gesetzt, sobald das Keyword gefunden wurde. Der oben genannte Trenner für die Tests wurde als "## Scan startet as: " gewählt.

Stage 3.3: Otseca

Der Otseca parser war ebenfalls einfach umzusetzen, da alle Logfiles als HTML vorlagen. Da Otseca eher wenig Hinweise gibt, haben wir uns lediglich auf die Anzahl der grünen, gelben und roten Kästen im Output beschränkt. Der Parser zählt hierfür einfach das Vorkommen der CSS-Direktive "background-color" mit dem ensprechenden Wert.

def otseca_box_counts(path_to_report):
    counts = {}
    for (name, curr_file) in OTSECA_FILES.items():
        curr_path = os.path.join(path_to_report, curr_file)
        with open(curr_path, encoding="UTF-8") as handle:
            text = handle.read()
        counts[name] = {
            "green": (len(re.findall("background-color: #1F9D55", text))),
            "yellow": (len(re.findall("background-color: #F2D024", text))),
            "red": (len(re.findall("background-color: #CC1F1A", text))),
            "total": (len(re.findall("background-color:", text)))
        }
    return counts

Außerdem wurden weitere Betriebssystem-Infos geholt, welche in Lynis nicht verfügbar waren. Dazu gehören hauptsächlich die Gesamtanzahl Pakete, sowie die entsprechenden Counts für Upgraded, Newly Installed, Remove und Not Upgraded.

def otseca_distro_info(path_to_report):
    with open(os.path.join(path_to_report, OTSECA_FILES["distro"])) as handle:
        text = handle.read()

    pkg_count = len(re.findall("ii {2}", text))
    upgrades_count = re.findall(r"(\d+) .*? (\d+) .*? (\d+) .*? (\d+) .*", text).pop()

    return {
        "pkgCount": pkg_count,
        "upgraded": int(upgrades_count[0]),
        "newlyInstalled": int(upgrades_count[1]),
        "remove": int(upgrades_count[2]),
        "notUpgraded": int(upgrades_count[3])
    }

Stage 4: Ausgabe

Stage 4.1: JSON-Ausgabe

with open("export.json", "w") as handle:
    handle.write(Run.schema().dumps(list_of_all, many=True))

Zuerst wollten wir alles als JSON Objekt ausgeben, dies stellte sich aber sehr schell als unübersichtlich heraus. Die resultierende JSON-Datei hatte für alle 56 Durchläufe etwa 19.000 Zeilen.

Also entschieden wir uns noch für eine andere Art der Ausgabe, eine strukturiertere, SQLite.

Stage 4.2: SQLite-Ausgabe

SQlite bietet sich hierfür sehr gut an, da es sich um eine serverlose strukturierte Datenbank handelt. Weiter gibt es uns auch zur Auswertung ganz neue Möglichkeiten durch die Datenbankfunktionen. Außerdem bietet Python eine native Implementation von SQlite, ohne dass Zusatzpakete benötigt werden.

for run in list_of_all:
    write_run_to_db(run)

Der letzte Schritt des Parsers ist es, jeden Run an die Datenbankschnittstellt zu übergeben.

def write_run_to_db(run):
    conn = sqlite3.connect(DB_NAME)
    run_data = (run.id, run.platform, run.system, run.version, run.path)
    conn.execute("INSERT OR IGNORE INTO runs VALUES (?, ?, ?, ?, ?)", run_data)

    ... otseca ...
    ... testssl ...

    for lynis_res in run.lynis_results:
        categories = list(lynis_res.result.values())[:31]
        general = list(lynis_res.result.values())[31]
        data = (
            run.id, lynis_res.run_nr, str(lynis_res.path),
            *list(map(json.dumps, categories)),
            *general.values()
        )
        print(data)
        conn.execute("INSERT OR IGNORE INTO lynis_results VALUES (" + "".join("?," * 37) + "?)", data)
    conn.commit()

In dieser Funktion werden dann in vier Tabellen alle gesamelten Informationen gespeichert, die Tabellen umfassen zwischen fünf und 37 Spalten. Jeder Run wird hierbei in der Run Tabelle gespeichert. Außerdem bekommt jedes Tool ebenfalls seine eigene Tabelle (Lynis, TestSSL und Otseca). Die Tabellen von den Tools werden dann nach und nach gefüllt, und über Fremdschlüssel mit der Primärtabelle verbunden. Zuletzt wird die Transaktion dann geschrieben.

Ergebnisse

Insgesamt lässt sich sagen, dass etwa 30 Regular Expressions nötig waren, um alle gewollten Logfiles über alle Tools zu parsen. Die Laufzeit beträgt hierbei etwa zehn Sekunden, wobei das Einlesen der doch relativ langen Textdateien am längsten benötigt.

Der Export als SQlite hat sich im Nachhinein als sehr vorteilhaft entpuppt, da SQlite nativ mit JSON Daten umgehen kann, was sehr leichte Abfragemechanismen erlaubt.

Weiter wurden sogut wie alle menschlichen Fehler, die durch Unaufmerksamkeit auftreten würden, ausgeschlossen. Auch Ablese- oder Tippfehler sind kein Problem.