From 36d5affbeec164160a0d4327cacdffb58b797c54 Mon Sep 17 00:00:00 2001 From: Marcel Schwarz Date: Mon, 21 Dec 2020 03:23:08 +0100 Subject: [PATCH] Test --- projects/project-3/backend/db_init.py | 96 ++++++++++++++++----------- projects/project-3/db/postgres.sql | 75 +++++++++++++++++++++ 2 files changed, 132 insertions(+), 39 deletions(-) create mode 100644 projects/project-3/db/postgres.sql diff --git a/projects/project-3/backend/db_init.py b/projects/project-3/backend/db_init.py index 51eae02..35f2875 100644 --- a/projects/project-3/backend/db_init.py +++ b/projects/project-3/backend/db_init.py @@ -2,6 +2,8 @@ import csv import json import logging import sqlite3 +import psycopg2 +import psycopg2.extras from dataclasses import dataclass from datetime import datetime @@ -59,32 +61,33 @@ def get_online_files_list(subdir_filter=None, file_extension_filter=None): def init_database(): LOG.info("Try to create tables") - conn = sqlite3.connect(DB_NAME, timeout=300) - conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats( - rental_id INTEGER PRIMARY KEY, - duration INTEGER, - bike_id INTEGER, - end_date INTEGER, - end_station_id INTEGER, + conn = get_conn() + cursor = conn.cursor() + cursor.execute("""CREATE TABLE IF NOT EXISTS usage_stats( + rental_id BIGINT PRIMARY KEY, + duration BIGINT, + bike_id BIGINT, + end_date TIMESTAMP, + end_station_id BIGINT, end_station_name TEXT, - start_date INTEGER, - start_station_id INTEGER, + start_date TIMESTAMP, + start_station_id BIGINT, start_station_name TEXT )""") - conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)") - conn.execute("""CREATE TABLE IF NOT EXISTS bike_points( + cursor.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)") + cursor.execute("""CREATE TABLE IF NOT EXISTS bike_points( id TEXT PRIMARY KEY, common_name TEXT, lat REAL, lon REAL, id_num INTEGER )""") - conn.execute("""CREATE TABLE IF NOT EXISTS accidents( + cursor.execute("""CREATE TABLE IF NOT EXISTS accidents( id INTEGER PRIMARY KEY, lat REAL, lon REAL, location TEXT, - date TEXT, + date TIMESTAMP, severity TEXT )""") conn.commit() @@ -131,12 +134,14 @@ def create_dashboard_table(): def import_bikepoints(): LOG.info("Importing bikepoints") - conn = sqlite3.connect(DB_NAME, timeout=300) + conn = get_conn() + cursor = conn.cursor() + points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text) points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon'], int(p['id'][11:])), points)) LOG.info(f"Writing {len(points)} bikepoints to DB") - conn.executemany("INSERT OR IGNORE INTO bike_points VALUES (?, ?, ?, ?, ?)", points) + cursor.executemany("INSERT INTO bike_points VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING", points) conn.commit() conn.close() LOG.info("Bikepoints imported") @@ -144,7 +149,8 @@ def import_bikepoints(): def import_accidents(year): LOG.info("Importing accidents") - conn = sqlite3.connect(DB_NAME, timeout=300) + conn = get_conn() + cursor = conn.cursor() def filter_pedal_cycles(accident): for vehicle in accident['vehicles']: @@ -158,17 +164,19 @@ def import_accidents(year): accidents = list(map(lambda a: (a['id'], a['lat'], a['lon'], a['location'], a['date'], a['severity']), accidents)) LOG.info(f"Writing {len(accidents)} bike accidents to DB") - conn.executemany("INSERT OR IGNORE INTO accidents VALUES (?, ?, ?, ?, ?, ?)", accidents) + cursor.executemany("INSERT INTO accidents VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING", accidents) + conn.commit() conn.close() LOG.info("Accidents importet") def import_usage_stats_file(export_file: ApiExportFile): - conn = sqlite3.connect(DB_NAME, timeout=300) + conn = get_conn() + cursor = conn.cursor() - rows = conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (export_file.etag,)).fetchall() - if len(rows) != 0: + cursor.execute("SELECT * FROM read_files WHERE etag = %s", (export_file.etag,)) + if len(cursor.fetchall()) != 0: LOG.warning(f"Skipping import of {export_file.path}") return @@ -188,13 +196,13 @@ def import_usage_stats_file(export_file: ApiExportFile): # Bike Id int(entry[2] or "-1"), # End Date - int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1, + datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M") if entry[3] else None, # EndStation Id int(entry[4] or "-1"), # EndStation Name entry[5].strip(), # Start Date - int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1, + datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M") if entry[6] else None, # StartStation Id int(entry[7]), # StartStation Name @@ -207,35 +215,45 @@ def import_usage_stats_file(export_file: ApiExportFile): LOG.error(f"Key Error {e} on line {entry}") return LOG.info(f"Writing {len(mapped)} entries to DB") - conn.executemany("INSERT OR IGNORE INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped) - conn.execute("INSERT OR IGNORE INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag)) + psycopg2.extras.execute_values(cursor, "INSERT INTO usage_stats VALUES %s ON CONFLICT DO NOTHING ", mapped, page_size=1_000_000) + cursor.execute("INSERT INTO read_files VALUES (%s, %s) ON CONFLICT DO NOTHING", (export_file.path, export_file.etag)) conn.commit() - conn.close() LOG.info(f"Finished import of {export_file.path}") +def get_conn(): + return psycopg2.connect( + host="localhost", + database="postgres", + user="postgres", + password="supersecure" + ) + + def main(): # General DB init init_database() + import_accidents(2019) + import_bikepoints() - count_pre = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0] - + # count_pre = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0] + # # Download and import opendata from S3 bucket all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv") for file in all_files: import_usage_stats_file(file) - - count_after = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0] - - # Create search-index for faster querying - create_indexes() - # Import Bikepoints - import_bikepoints() - # Import bike accidents - import_accidents(2019) - - if count_after - count_pre > 0: - create_dashboard_table() + # + # count_after = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0] + # + # # Create search-index for faster querying + # create_indexes() + # # Import Bikepoints + # import_bikepoints() + # # Import bike accidents + # import_accidents(2019) + # + # if count_after - count_pre > 0: + # create_dashboard_table() if __name__ == "__main__": diff --git a/projects/project-3/db/postgres.sql b/projects/project-3/db/postgres.sql new file mode 100644 index 0000000..54ee95a --- /dev/null +++ b/projects/project-3/db/postgres.sql @@ -0,0 +1,75 @@ +-- Tables + +CREATE TABLE IF NOT EXISTS usage_stats( + rental_id BIGINT PRIMARY KEY, + duration BIGINT, + bike_id BIGINT, + end_date TIMESTAMP, + end_station_id BIGINT, + end_station_name TEXT, + start_date TIMESTAMP, + start_station_id BIGINT, + start_station_name TEXT +); + +INSERT INTO usage_stats +VALUES (40346508, 360, 12019, TO_TIMESTAMP(1420326360), 424, 'Ebury Bridge, Pimlico', TO_TIMESTAMP(1420326000), 368, 'Harriet Street, Knightsbridge') +ON CONFLICT DO NOTHING; + +SELECT TO_TIMESTAMP(1420326360); + + +CREATE TABLE IF NOT EXISTS read_files( + file_path TEXT, + etag TEXT PRIMARY KEY +); + + +CREATE TABLE IF NOT EXISTS bike_points( + id TEXT PRIMARY KEY, + common_name TEXT, + lat REAL, + lon REAL, + id_num BIGINT +); + + +CREATE TABLE IF NOT EXISTS accidents( + id BIGINT PRIMARY KEY, + lat REAL, + lon REAL, + location TEXT, + date TEXT, + severity TEXT +); + +-- indicies + +CREATE INDEX IF NOT EXISTS idx_station_start_and_end_date ON usage_stats (start_station_id, start_date, end_date); + + +SELECT COUNT(*) FROM usage_stats; + +SELECT + min(u.start_station_name) AS startStationName, + u.end_station_name AS endStationName, + count(*) AS number, + round(avg(u.duration)) AS avgDuration +FROM usage_stats u +WHERE u.start_station_id = 512 AND u.start_date::DATE >= '2010-01-01'::DATE AND u.start_date::DATE <= '2022-01-15'::DATE +GROUP BY u.end_station_name +ORDER BY number DESC +LIMIT 3; + + + +SELECT + MIN(b.id_num) as id, + MIN(b.common_name) AS commonName, + MIN(b.lat), + MIN(b.lon), + max(u.start_date) AS maxEndDate, + min(u.start_date) AS maxStartDate + FROM usage_stats u + JOIN bike_points b ON u.start_station_id = b.id_num + WHERE u.start_station_id = 306 \ No newline at end of file