diff --git a/projects/project-3/openapi/.gitignore b/projects/project-3/openapi/.gitignore index a702315..9adde20 100644 --- a/projects/project-3/openapi/.gitignore +++ b/projects/project-3/openapi/.gitignore @@ -1,2 +1,2 @@ -tmp/ -bike-data.db \ No newline at end of file +bike-data.db +*.log \ No newline at end of file diff --git a/projects/project-3/openapi/db_init.py b/projects/project-3/openapi/db_init.py index 9cfc86a..f42df31 100644 --- a/projects/project-3/openapi/db_init.py +++ b/projects/project-3/openapi/db_init.py @@ -1,23 +1,23 @@ import csv import json import logging -import os -import os.path -import shutil import sqlite3 -import time -import urllib.request -from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import datetime import requests -WORKING_DIR = os.getcwd() -TMP_DIR = os.path.join(WORKING_DIR, "tmp") +logFormatter = logging.Formatter("%(asctime)-15s [%(levelname)8s] [%(threadName)s] - %(message)s") +LOG = logging.getLogger() +LOG.setLevel(logging.DEBUG) -logging.basicConfig(format="%(asctime)-15s [%(levelname)8s] [%(threadName)s] - %(message)s", level=logging.INFO) -LOG = logging.getLogger("Importer") +fileHandler = logging.FileHandler("importer.log") +fileHandler.setFormatter(logFormatter) +LOG.addHandler(fileHandler) + +consoleHandler = logging.StreamHandler() +consoleHandler.setFormatter(logFormatter) +LOG.addHandler(consoleHandler) @dataclass @@ -30,7 +30,6 @@ class ApiExportFile: def get_online_files_list(subdir_filter=None, file_extension_filter=None): import urllib.parse import xml.etree.ElementTree - import requests base_uri = "https://s3-eu-west-1.amazonaws.com/cycling.data.tfl.gov.uk/" xml_data = xml.etree.ElementTree.fromstringlist(requests.get(base_uri).text) @@ -56,138 +55,117 @@ def get_online_files_list(subdir_filter=None, file_extension_filter=None): return entries -class BikeDatabase: - def __init__(self): - LOG.info("Created new database connection") - self.conn = sqlite3.connect(os.path.join(WORKING_DIR, "bike-data.db"), check_same_thread=False, timeout=300) +def init_database(): + LOG.info("Try to create tables") + conn = sqlite3.connect("bike-data.db", timeout=300) + conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats( + rental_id INTEGER PRIMARY KEY, + duration INTEGER, + bike_id INTEGER, + end_date INTEGER, + end_station_id INTEGER, + end_station_name TEXT, + start_date INTEGER, + start_station_id INTEGER, + start_station_name TEXT + )""") + conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)") + conn.execute("""CREATE TABLE IF NOT EXISTS bike_points( + id TEXT PRIMARY KEY, + common_name TEXT, + lat REAL, + lon REAL, + id_num INTEGER AS (CAST(SUBSTR(id, 12) as INTEGER)) STORED + )""") + conn.commit() + conn.close() + LOG.info("Tables created") - def init_database(self): - LOG.info("Try to create tables") - self.conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats( - rental_id INTEGER PRIMARY KEY, - duration INTEGER, - bike_id INTEGER, - end_date INTEGER, - end_station_id INTEGER, - end_station_name TEXT, - start_date INTEGER, - start_station_id INTEGER, - start_station_name TEXT - )""") - self.conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)") - self.conn.execute("""CREATE TABLE IF NOT EXISTS bike_points( - id TEXT PRIMARY KEY, - common_name TEXT, - lat REAL, - lon REAL, - id_num INTEGER AS (CAST(SUBSTR(id, 12) as INTEGER)) STORED - )""") - self.conn.commit() - LOG.info("Tables created") - def create_indexes(self): - LOG.info("Try to create indexes") - self.conn.execute("""CREATE INDEX IF NOT EXISTS idx_date_of_start_date - ON usage_stats (date(start_date, "unixepoch"))""") - self.conn.commit() - LOG.info("Indexes created") +def create_indexes(): + LOG.info("Try to create indexes") + conn = sqlite3.connect("bike-data.db", timeout=300) + conn.execute("""CREATE INDEX IF NOT EXISTS idx_date_of_start_date + ON usage_stats (date(start_date, "unixepoch"))""") + conn.commit() + conn.close() + LOG.info("Indexes created") - def import_bikepoints(self): - LOG.info("Importing bikepoints") - points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text) - points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon']), points)) - self.conn.executemany("INSERT OR IGNORE INTO bike_points VALUES (?, ?, ?, ?)", points) - self.conn.commit() - LOG.info("Bikepoints imported") - def is_file_already_imported(self, etag): - rows = self.conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (etag,)).fetchall() - return len(rows) != 0 +def import_bikepoints(): + LOG.info("Importing bikepoints") + conn = sqlite3.connect("bike-data.db", timeout=300) + points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text) + points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon']), points)) + conn.executemany("INSERT OR IGNORE INTO bike_points VALUES (?, ?, ?, ?)", points) + conn.commit() + conn.close() + LOG.info("Bikepoints imported") - def import_usage_stats_file(self, export_file: ApiExportFile): - if self.is_file_already_imported(export_file.etag): - LOG.warning(f"Skipping import of {export_file.path}") +def import_usage_stats_file(export_file: ApiExportFile): + conn = sqlite3.connect("bike-data.db", timeout=300) + + rows = conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (export_file.etag,)).fetchall() + if len(rows) != 0: + LOG.warning(f"Skipping import of {export_file.path}") + return + + LOG.info(f"DOWNLOADING... {export_file.download_url}") + content = requests.get(export_file.download_url).content.decode("UTF-8") + + LOG.info(f"Parsing {export_file.path}") + entries = list(csv.reader(content.splitlines()))[1:] + mapped = [] + for entry in entries: + try: + mapped.append(( + # Rental Id + int(entry[0]), + # Duration oder Duration_Seconds + int(entry[1] or "-1"), + # Bike Id + int(entry[2] or "-1"), + # End Date + int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1, + # EndStation Id + int(entry[4] or "-1"), + # EndStation Name + entry[5].strip(), + # Start Date + int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1, + # StartStation Id + int(entry[7]), + # StartStation Name + entry[8].strip() + )) + except ValueError as e: + LOG.error(f"Value Error {e} on line {entry}") return - - cursor = self.conn.cursor() - - os.makedirs(os.path.dirname(export_file.path), exist_ok=True) - LOG.info(f"DOWNLOADING... {export_file.download_url} to {export_file.path}") - urllib.request.urlretrieve(export_file.download_url, export_file.path) - - LOG.info(f"Importing {export_file.path}") - with open(export_file.path, "r", newline='') as file: - LOG.info(f"Reading file {export_file.path}") - entries = list(csv.reader(file))[1:] - mapped = [] - for entry in entries: - try: - mapped.append(( - # Rental Id - int(entry[0]), - # Duration oder Duration_Seconds - int(entry[1] or "-1"), - # Bike Id - int(entry[2] or "-1"), - # End Date - int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1, - # EndStation Id - int(entry[4] or "-1"), - # EndStation Name - entry[5].strip(), - # Start Date - int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1, - # StartStation Id - int(entry[7]), - # StartStation Name - entry[8].strip() - )) - except ValueError as e: - LOG.error(f"Value Error {e} on line {entry}") - return - except KeyError as e: - LOG.error(f"Key Error {e} on line {entry}") - return - LOG.info(f"Writing {len(mapped)} entries to DB") - cursor.executemany("INSERT INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped) - cursor.execute("INSERT INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag)) - self.conn.commit() - LOG.info(f"Finished import of {export_file.path}") - os.remove(export_file.path) - LOG.info(f"Delete file {export_file.path}") + except KeyError as e: + LOG.error(f"Key Error {e} on line {entry}") + return + LOG.info(f"Writing {len(mapped)} entries to DB") + conn.executemany("INSERT OR IGNORE INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped) + conn.execute("INSERT OR IGNORE INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag)) + conn.commit() + conn.close() + LOG.info(f"Finished import of {export_file.path}") def main(): - all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv") - # General DB init - BikeDatabase().init_database() + init_database() # Download and import opendata from S3 bucket - os.makedirs(TMP_DIR, exist_ok=True) - os.chdir(TMP_DIR) - LOG.info("Switching into tmp dir") - - import_tasks = [] - - with ThreadPoolExecutor(1) as executor: - for file in all_files: - db = BikeDatabase() - import_tasks.append( - executor.submit(db.import_usage_stats_file, file) - ) - executor.shutdown(wait=True) - - os.chdir(WORKING_DIR) - LOG.info("Switching back to workdir") - shutil.rmtree(TMP_DIR) - LOG.info("Deleted temp dir") + all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv") + for file in all_files: + import_usage_stats_file(file) # Import Bikepoints - BikeDatabase().import_bikepoints() + import_bikepoints() # Create search-index for faster querying - BikeDatabase().create_indexes() + create_indexes() if __name__ == "__main__":