From 8b0a107c89b21a9ddae78a64a41f9e212f5b9d51 Mon Sep 17 00:00:00 2001 From: Marcel Schwarz Date: Sat, 19 Dec 2020 00:29:32 +0100 Subject: [PATCH] Refine init DB script --- projects/project-3/openapi/db_init.py | 51 +++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/projects/project-3/openapi/db_init.py b/projects/project-3/openapi/db_init.py index 11b2530..9cfc86a 100644 --- a/projects/project-3/openapi/db_init.py +++ b/projects/project-3/openapi/db_init.py @@ -1,14 +1,18 @@ import csv +import json import logging import os import os.path import shutil import sqlite3 +import time import urllib.request from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import datetime +import requests + WORKING_DIR = os.getcwd() TMP_DIR = os.path.join(WORKING_DIR, "tmp") @@ -54,10 +58,11 @@ def get_online_files_list(subdir_filter=None, file_extension_filter=None): class BikeDatabase: def __init__(self): - self.conn = sqlite3.connect(os.path.join(WORKING_DIR, "bike-data.db"), check_same_thread=False, timeout=60) - self.cursor = self.conn.cursor() + LOG.info("Created new database connection") + self.conn = sqlite3.connect(os.path.join(WORKING_DIR, "bike-data.db"), check_same_thread=False, timeout=300) def init_database(self): + LOG.info("Try to create tables") self.conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats( rental_id INTEGER PRIMARY KEY, duration INTEGER, @@ -70,6 +75,30 @@ class BikeDatabase: start_station_name TEXT )""") self.conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)") + self.conn.execute("""CREATE TABLE IF NOT EXISTS bike_points( + id TEXT PRIMARY KEY, + common_name TEXT, + lat REAL, + lon REAL, + id_num INTEGER AS (CAST(SUBSTR(id, 12) as INTEGER)) STORED + )""") + self.conn.commit() + LOG.info("Tables created") + + def create_indexes(self): + LOG.info("Try to create indexes") + self.conn.execute("""CREATE INDEX IF NOT EXISTS idx_date_of_start_date + ON usage_stats (date(start_date, "unixepoch"))""") + self.conn.commit() + LOG.info("Indexes created") + + def import_bikepoints(self): + LOG.info("Importing bikepoints") + points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text) + points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon']), points)) + self.conn.executemany("INSERT OR IGNORE INTO bike_points VALUES (?, ?, ?, ?)", points) + self.conn.commit() + LOG.info("Bikepoints imported") def is_file_already_imported(self, etag): rows = self.conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (etag,)).fetchall() @@ -81,6 +110,8 @@ class BikeDatabase: LOG.warning(f"Skipping import of {export_file.path}") return + cursor = self.conn.cursor() + os.makedirs(os.path.dirname(export_file.path), exist_ok=True) LOG.info(f"DOWNLOADING... {export_file.download_url} to {export_file.path}") urllib.request.urlretrieve(export_file.download_url, export_file.path) @@ -119,8 +150,8 @@ class BikeDatabase: LOG.error(f"Key Error {e} on line {entry}") return LOG.info(f"Writing {len(mapped)} entries to DB") - self.cursor.executemany("INSERT INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped) - self.cursor.execute("INSERT INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag)) + cursor.executemany("INSERT INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped) + cursor.execute("INSERT INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag)) self.conn.commit() LOG.info(f"Finished import of {export_file.path}") os.remove(export_file.path) @@ -130,16 +161,17 @@ class BikeDatabase: def main(): all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv") - db = BikeDatabase() - db.init_database() + # General DB init + BikeDatabase().init_database() + # Download and import opendata from S3 bucket os.makedirs(TMP_DIR, exist_ok=True) os.chdir(TMP_DIR) LOG.info("Switching into tmp dir") import_tasks = [] - with ThreadPoolExecutor(3) as executor: + with ThreadPoolExecutor(1) as executor: for file in all_files: db = BikeDatabase() import_tasks.append( @@ -152,6 +184,11 @@ def main(): shutil.rmtree(TMP_DIR) LOG.info("Deleted temp dir") + # Import Bikepoints + BikeDatabase().import_bikepoints() + # Create search-index for faster querying + BikeDatabase().create_indexes() + if __name__ == "__main__": main()