From 9f8076286ecfe048ca1cf5e70e506973634375ec Mon Sep 17 00:00:00 2001 From: Marcel Schwarz Date: Tue, 15 Dec 2020 00:17:58 +0100 Subject: [PATCH] Make import multi threaded, improve logging --- projects/project-3/openapi/db_init.py | 101 ++++++++++++++------------ 1 file changed, 56 insertions(+), 45 deletions(-) diff --git a/projects/project-3/openapi/db_init.py b/projects/project-3/openapi/db_init.py index 16ac06d..11b2530 100644 --- a/projects/project-3/openapi/db_init.py +++ b/projects/project-3/openapi/db_init.py @@ -1,13 +1,18 @@ +import csv +import logging +import os import os.path import shutil +import sqlite3 +import urllib.request +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -import logging - +from datetime import datetime WORKING_DIR = os.getcwd() TMP_DIR = os.path.join(WORKING_DIR, "tmp") -logging.basicConfig(format="%(asctime)-15s [%(levelname)8s] - %(message)s", level=logging.ERROR) +logging.basicConfig(format="%(asctime)-15s [%(levelname)8s] [%(threadName)s] - %(message)s", level=logging.INFO) LOG = logging.getLogger("Importer") @@ -47,27 +52,9 @@ def get_online_files_list(subdir_filter=None, file_extension_filter=None): return entries -def download_file(url, save_path): - import os.path - import urllib.request - - save_path = os.path.join(TMP_DIR, save_path) - - if os.path.exists(save_path): - LOG.warning(f"Skipping exists: {save_path}") - return - - os.makedirs(os.path.dirname(save_path), exist_ok=True) - - LOG.info(f"DOWNLOADING... {url} to {save_path}") - urllib.request.urlretrieve(url, save_path) - - class BikeDatabase: - def __init__(self): - import sqlite3 - self.conn = sqlite3.connect("bike-data.db") + self.conn = sqlite3.connect(os.path.join(WORKING_DIR, "bike-data.db"), check_same_thread=False, timeout=60) self.cursor = self.conn.cursor() def init_database(self): @@ -89,29 +76,41 @@ class BikeDatabase: return len(rows) != 0 def import_usage_stats_file(self, export_file: ApiExportFile): - from datetime import datetime - import csv - import os - os.chdir(TMP_DIR) + if self.is_file_already_imported(export_file.etag): + LOG.warning(f"Skipping import of {export_file.path}") + return + + os.makedirs(os.path.dirname(export_file.path), exist_ok=True) + LOG.info(f"DOWNLOADING... {export_file.download_url} to {export_file.path}") + urllib.request.urlretrieve(export_file.download_url, export_file.path) LOG.info(f"Importing {export_file.path}") with open(export_file.path, "r", newline='') as file: LOG.info(f"Reading file {export_file.path}") - entries = list(csv.DictReader(file)) + entries = list(csv.reader(file))[1:] mapped = [] for entry in entries: try: mapped.append(( - int(entry['Rental Id']), - int(entry['Duration'] or "-1"), - int(entry['Bike Id'] or "-1"), - int(datetime.strptime(entry['End Date'][:16], "%d/%m/%Y %H:%M").timestamp()) if entry['End Date'] else -1, - int(entry['EndStation Id'] or "-1"), - entry['EndStation Name'], - int(datetime.strptime(entry['Start Date'][:16], "%d/%m/%Y %H:%M").timestamp()) if entry['Start Date'] else -1, - int(entry['StartStation Id']), - entry['StartStation Name'] + # Rental Id + int(entry[0]), + # Duration oder Duration_Seconds + int(entry[1] or "-1"), + # Bike Id + int(entry[2] or "-1"), + # End Date + int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1, + # EndStation Id + int(entry[4] or "-1"), + # EndStation Name + entry[5].strip(), + # Start Date + int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1, + # StartStation Id + int(entry[7]), + # StartStation Name + entry[8].strip() )) except ValueError as e: LOG.error(f"Value Error {e} on line {entry}") @@ -124,22 +123,34 @@ class BikeDatabase: self.cursor.execute("INSERT INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag)) self.conn.commit() LOG.info(f"Finished import of {export_file.path}") - os.chdir(WORKING_DIR) - shutil.rmtree(TMP_DIR) - LOG.info("Deleted temp dir") + os.remove(export_file.path) + LOG.info(f"Delete file {export_file.path}") def main(): all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv") + db = BikeDatabase() db.init_database() - for file in all_files: - if not db.is_file_already_imported(file.etag): - download_file(file.download_url, file.path) - db.import_usage_stats_file(file) - else: - LOG.warning(f"Skipping import of {file.path}") + os.makedirs(TMP_DIR, exist_ok=True) + os.chdir(TMP_DIR) + LOG.info("Switching into tmp dir") + + import_tasks = [] + + with ThreadPoolExecutor(3) as executor: + for file in all_files: + db = BikeDatabase() + import_tasks.append( + executor.submit(db.import_usage_stats_file, file) + ) + executor.shutdown(wait=True) + + os.chdir(WORKING_DIR) + LOG.info("Switching back to workdir") + shutil.rmtree(TMP_DIR) + LOG.info("Deleted temp dir") if __name__ == "__main__":