This commit is contained in:
Marcel Schwarz 2020-12-21 03:23:08 +01:00
parent 11fa967d22
commit 36d5affbee
2 changed files with 132 additions and 39 deletions

View File

@ -2,6 +2,8 @@ import csv
import json import json
import logging import logging
import sqlite3 import sqlite3
import psycopg2
import psycopg2.extras
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
@ -59,32 +61,33 @@ def get_online_files_list(subdir_filter=None, file_extension_filter=None):
def init_database(): def init_database():
LOG.info("Try to create tables") LOG.info("Try to create tables")
conn = sqlite3.connect(DB_NAME, timeout=300) conn = get_conn()
conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats( cursor = conn.cursor()
rental_id INTEGER PRIMARY KEY, cursor.execute("""CREATE TABLE IF NOT EXISTS usage_stats(
duration INTEGER, rental_id BIGINT PRIMARY KEY,
bike_id INTEGER, duration BIGINT,
end_date INTEGER, bike_id BIGINT,
end_station_id INTEGER, end_date TIMESTAMP,
end_station_id BIGINT,
end_station_name TEXT, end_station_name TEXT,
start_date INTEGER, start_date TIMESTAMP,
start_station_id INTEGER, start_station_id BIGINT,
start_station_name TEXT start_station_name TEXT
)""") )""")
conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)") cursor.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)")
conn.execute("""CREATE TABLE IF NOT EXISTS bike_points( cursor.execute("""CREATE TABLE IF NOT EXISTS bike_points(
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
common_name TEXT, common_name TEXT,
lat REAL, lat REAL,
lon REAL, lon REAL,
id_num INTEGER id_num INTEGER
)""") )""")
conn.execute("""CREATE TABLE IF NOT EXISTS accidents( cursor.execute("""CREATE TABLE IF NOT EXISTS accidents(
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
lat REAL, lat REAL,
lon REAL, lon REAL,
location TEXT, location TEXT,
date TEXT, date TIMESTAMP,
severity TEXT severity TEXT
)""") )""")
conn.commit() conn.commit()
@ -131,12 +134,14 @@ def create_dashboard_table():
def import_bikepoints(): def import_bikepoints():
LOG.info("Importing bikepoints") LOG.info("Importing bikepoints")
conn = sqlite3.connect(DB_NAME, timeout=300) conn = get_conn()
cursor = conn.cursor()
points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text) points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text)
points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon'], int(p['id'][11:])), points)) points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon'], int(p['id'][11:])), points))
LOG.info(f"Writing {len(points)} bikepoints to DB") LOG.info(f"Writing {len(points)} bikepoints to DB")
conn.executemany("INSERT OR IGNORE INTO bike_points VALUES (?, ?, ?, ?, ?)", points) cursor.executemany("INSERT INTO bike_points VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING", points)
conn.commit() conn.commit()
conn.close() conn.close()
LOG.info("Bikepoints imported") LOG.info("Bikepoints imported")
@ -144,7 +149,8 @@ def import_bikepoints():
def import_accidents(year): def import_accidents(year):
LOG.info("Importing accidents") LOG.info("Importing accidents")
conn = sqlite3.connect(DB_NAME, timeout=300) conn = get_conn()
cursor = conn.cursor()
def filter_pedal_cycles(accident): def filter_pedal_cycles(accident):
for vehicle in accident['vehicles']: for vehicle in accident['vehicles']:
@ -158,17 +164,19 @@ def import_accidents(year):
accidents = list(map(lambda a: (a['id'], a['lat'], a['lon'], a['location'], a['date'], a['severity']), accidents)) accidents = list(map(lambda a: (a['id'], a['lat'], a['lon'], a['location'], a['date'], a['severity']), accidents))
LOG.info(f"Writing {len(accidents)} bike accidents to DB") LOG.info(f"Writing {len(accidents)} bike accidents to DB")
conn.executemany("INSERT OR IGNORE INTO accidents VALUES (?, ?, ?, ?, ?, ?)", accidents) cursor.executemany("INSERT INTO accidents VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING", accidents)
conn.commit() conn.commit()
conn.close() conn.close()
LOG.info("Accidents importet") LOG.info("Accidents importet")
def import_usage_stats_file(export_file: ApiExportFile): def import_usage_stats_file(export_file: ApiExportFile):
conn = sqlite3.connect(DB_NAME, timeout=300) conn = get_conn()
cursor = conn.cursor()
rows = conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (export_file.etag,)).fetchall() cursor.execute("SELECT * FROM read_files WHERE etag = %s", (export_file.etag,))
if len(rows) != 0: if len(cursor.fetchall()) != 0:
LOG.warning(f"Skipping import of {export_file.path}") LOG.warning(f"Skipping import of {export_file.path}")
return return
@ -188,13 +196,13 @@ def import_usage_stats_file(export_file: ApiExportFile):
# Bike Id # Bike Id
int(entry[2] or "-1"), int(entry[2] or "-1"),
# End Date # End Date
int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1, datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M") if entry[3] else None,
# EndStation Id # EndStation Id
int(entry[4] or "-1"), int(entry[4] or "-1"),
# EndStation Name # EndStation Name
entry[5].strip(), entry[5].strip(),
# Start Date # Start Date
int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1, datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M") if entry[6] else None,
# StartStation Id # StartStation Id
int(entry[7]), int(entry[7]),
# StartStation Name # StartStation Name
@ -207,35 +215,45 @@ def import_usage_stats_file(export_file: ApiExportFile):
LOG.error(f"Key Error {e} on line {entry}") LOG.error(f"Key Error {e} on line {entry}")
return return
LOG.info(f"Writing {len(mapped)} entries to DB") LOG.info(f"Writing {len(mapped)} entries to DB")
conn.executemany("INSERT OR IGNORE INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped) psycopg2.extras.execute_values(cursor, "INSERT INTO usage_stats VALUES %s ON CONFLICT DO NOTHING ", mapped, page_size=1_000_000)
conn.execute("INSERT OR IGNORE INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag)) cursor.execute("INSERT INTO read_files VALUES (%s, %s) ON CONFLICT DO NOTHING", (export_file.path, export_file.etag))
conn.commit() conn.commit()
conn.close()
LOG.info(f"Finished import of {export_file.path}") LOG.info(f"Finished import of {export_file.path}")
def get_conn():
return psycopg2.connect(
host="localhost",
database="postgres",
user="postgres",
password="supersecure"
)
def main(): def main():
# General DB init # General DB init
init_database() init_database()
import_accidents(2019)
import_bikepoints()
count_pre = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0] # count_pre = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0]
#
# Download and import opendata from S3 bucket # Download and import opendata from S3 bucket
all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv") all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv")
for file in all_files: for file in all_files:
import_usage_stats_file(file) import_usage_stats_file(file)
#
count_after = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0] # count_after = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0]
#
# Create search-index for faster querying # # Create search-index for faster querying
create_indexes() # create_indexes()
# Import Bikepoints # # Import Bikepoints
import_bikepoints() # import_bikepoints()
# Import bike accidents # # Import bike accidents
import_accidents(2019) # import_accidents(2019)
#
if count_after - count_pre > 0: # if count_after - count_pre > 0:
create_dashboard_table() # create_dashboard_table()
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -0,0 +1,75 @@
-- Tables
CREATE TABLE IF NOT EXISTS usage_stats(
rental_id BIGINT PRIMARY KEY,
duration BIGINT,
bike_id BIGINT,
end_date TIMESTAMP,
end_station_id BIGINT,
end_station_name TEXT,
start_date TIMESTAMP,
start_station_id BIGINT,
start_station_name TEXT
);
INSERT INTO usage_stats
VALUES (40346508, 360, 12019, TO_TIMESTAMP(1420326360), 424, 'Ebury Bridge, Pimlico', TO_TIMESTAMP(1420326000), 368, 'Harriet Street, Knightsbridge')
ON CONFLICT DO NOTHING;
SELECT TO_TIMESTAMP(1420326360);
CREATE TABLE IF NOT EXISTS read_files(
file_path TEXT,
etag TEXT PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS bike_points(
id TEXT PRIMARY KEY,
common_name TEXT,
lat REAL,
lon REAL,
id_num BIGINT
);
CREATE TABLE IF NOT EXISTS accidents(
id BIGINT PRIMARY KEY,
lat REAL,
lon REAL,
location TEXT,
date TEXT,
severity TEXT
);
-- indicies
CREATE INDEX IF NOT EXISTS idx_station_start_and_end_date ON usage_stats (start_station_id, start_date, end_date);
SELECT COUNT(*) FROM usage_stats;
SELECT
min(u.start_station_name) AS startStationName,
u.end_station_name AS endStationName,
count(*) AS number,
round(avg(u.duration)) AS avgDuration
FROM usage_stats u
WHERE u.start_station_id = 512 AND u.start_date::DATE >= '2010-01-01'::DATE AND u.start_date::DATE <= '2022-01-15'::DATE
GROUP BY u.end_station_name
ORDER BY number DESC
LIMIT 3;
SELECT
MIN(b.id_num) as id,
MIN(b.common_name) AS commonName,
MIN(b.lat),
MIN(b.lon),
max(u.start_date) AS maxEndDate,
min(u.start_date) AS maxStartDate
FROM usage_stats u
JOIN bike_points b ON u.start_station_id = b.id_num
WHERE u.start_station_id = 306