Compare commits
1 Commits
master
...
postgres-d
Author | SHA1 | Date | |
---|---|---|---|
36d5affbee |
@ -2,6 +2,8 @@ import csv
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import psycopg2
|
||||||
|
import psycopg2.extras
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
@ -59,32 +61,33 @@ def get_online_files_list(subdir_filter=None, file_extension_filter=None):
|
|||||||
|
|
||||||
def init_database():
|
def init_database():
|
||||||
LOG.info("Try to create tables")
|
LOG.info("Try to create tables")
|
||||||
conn = sqlite3.connect(DB_NAME, timeout=300)
|
conn = get_conn()
|
||||||
conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats(
|
cursor = conn.cursor()
|
||||||
rental_id INTEGER PRIMARY KEY,
|
cursor.execute("""CREATE TABLE IF NOT EXISTS usage_stats(
|
||||||
duration INTEGER,
|
rental_id BIGINT PRIMARY KEY,
|
||||||
bike_id INTEGER,
|
duration BIGINT,
|
||||||
end_date INTEGER,
|
bike_id BIGINT,
|
||||||
end_station_id INTEGER,
|
end_date TIMESTAMP,
|
||||||
|
end_station_id BIGINT,
|
||||||
end_station_name TEXT,
|
end_station_name TEXT,
|
||||||
start_date INTEGER,
|
start_date TIMESTAMP,
|
||||||
start_station_id INTEGER,
|
start_station_id BIGINT,
|
||||||
start_station_name TEXT
|
start_station_name TEXT
|
||||||
)""")
|
)""")
|
||||||
conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)")
|
cursor.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)")
|
||||||
conn.execute("""CREATE TABLE IF NOT EXISTS bike_points(
|
cursor.execute("""CREATE TABLE IF NOT EXISTS bike_points(
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
common_name TEXT,
|
common_name TEXT,
|
||||||
lat REAL,
|
lat REAL,
|
||||||
lon REAL,
|
lon REAL,
|
||||||
id_num INTEGER
|
id_num INTEGER
|
||||||
)""")
|
)""")
|
||||||
conn.execute("""CREATE TABLE IF NOT EXISTS accidents(
|
cursor.execute("""CREATE TABLE IF NOT EXISTS accidents(
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY,
|
||||||
lat REAL,
|
lat REAL,
|
||||||
lon REAL,
|
lon REAL,
|
||||||
location TEXT,
|
location TEXT,
|
||||||
date TEXT,
|
date TIMESTAMP,
|
||||||
severity TEXT
|
severity TEXT
|
||||||
)""")
|
)""")
|
||||||
conn.commit()
|
conn.commit()
|
||||||
@ -131,12 +134,14 @@ def create_dashboard_table():
|
|||||||
|
|
||||||
def import_bikepoints():
|
def import_bikepoints():
|
||||||
LOG.info("Importing bikepoints")
|
LOG.info("Importing bikepoints")
|
||||||
conn = sqlite3.connect(DB_NAME, timeout=300)
|
conn = get_conn()
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text)
|
points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text)
|
||||||
points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon'], int(p['id'][11:])), points))
|
points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon'], int(p['id'][11:])), points))
|
||||||
|
|
||||||
LOG.info(f"Writing {len(points)} bikepoints to DB")
|
LOG.info(f"Writing {len(points)} bikepoints to DB")
|
||||||
conn.executemany("INSERT OR IGNORE INTO bike_points VALUES (?, ?, ?, ?, ?)", points)
|
cursor.executemany("INSERT INTO bike_points VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING", points)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
LOG.info("Bikepoints imported")
|
LOG.info("Bikepoints imported")
|
||||||
@ -144,7 +149,8 @@ def import_bikepoints():
|
|||||||
|
|
||||||
def import_accidents(year):
|
def import_accidents(year):
|
||||||
LOG.info("Importing accidents")
|
LOG.info("Importing accidents")
|
||||||
conn = sqlite3.connect(DB_NAME, timeout=300)
|
conn = get_conn()
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
def filter_pedal_cycles(accident):
|
def filter_pedal_cycles(accident):
|
||||||
for vehicle in accident['vehicles']:
|
for vehicle in accident['vehicles']:
|
||||||
@ -158,17 +164,19 @@ def import_accidents(year):
|
|||||||
accidents = list(map(lambda a: (a['id'], a['lat'], a['lon'], a['location'], a['date'], a['severity']), accidents))
|
accidents = list(map(lambda a: (a['id'], a['lat'], a['lon'], a['location'], a['date'], a['severity']), accidents))
|
||||||
|
|
||||||
LOG.info(f"Writing {len(accidents)} bike accidents to DB")
|
LOG.info(f"Writing {len(accidents)} bike accidents to DB")
|
||||||
conn.executemany("INSERT OR IGNORE INTO accidents VALUES (?, ?, ?, ?, ?, ?)", accidents)
|
cursor.executemany("INSERT INTO accidents VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING", accidents)
|
||||||
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
LOG.info("Accidents importet")
|
LOG.info("Accidents importet")
|
||||||
|
|
||||||
|
|
||||||
def import_usage_stats_file(export_file: ApiExportFile):
|
def import_usage_stats_file(export_file: ApiExportFile):
|
||||||
conn = sqlite3.connect(DB_NAME, timeout=300)
|
conn = get_conn()
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
rows = conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (export_file.etag,)).fetchall()
|
cursor.execute("SELECT * FROM read_files WHERE etag = %s", (export_file.etag,))
|
||||||
if len(rows) != 0:
|
if len(cursor.fetchall()) != 0:
|
||||||
LOG.warning(f"Skipping import of {export_file.path}")
|
LOG.warning(f"Skipping import of {export_file.path}")
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -188,13 +196,13 @@ def import_usage_stats_file(export_file: ApiExportFile):
|
|||||||
# Bike Id
|
# Bike Id
|
||||||
int(entry[2] or "-1"),
|
int(entry[2] or "-1"),
|
||||||
# End Date
|
# End Date
|
||||||
int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1,
|
datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M") if entry[3] else None,
|
||||||
# EndStation Id
|
# EndStation Id
|
||||||
int(entry[4] or "-1"),
|
int(entry[4] or "-1"),
|
||||||
# EndStation Name
|
# EndStation Name
|
||||||
entry[5].strip(),
|
entry[5].strip(),
|
||||||
# Start Date
|
# Start Date
|
||||||
int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1,
|
datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M") if entry[6] else None,
|
||||||
# StartStation Id
|
# StartStation Id
|
||||||
int(entry[7]),
|
int(entry[7]),
|
||||||
# StartStation Name
|
# StartStation Name
|
||||||
@ -207,35 +215,45 @@ def import_usage_stats_file(export_file: ApiExportFile):
|
|||||||
LOG.error(f"Key Error {e} on line {entry}")
|
LOG.error(f"Key Error {e} on line {entry}")
|
||||||
return
|
return
|
||||||
LOG.info(f"Writing {len(mapped)} entries to DB")
|
LOG.info(f"Writing {len(mapped)} entries to DB")
|
||||||
conn.executemany("INSERT OR IGNORE INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped)
|
psycopg2.extras.execute_values(cursor, "INSERT INTO usage_stats VALUES %s ON CONFLICT DO NOTHING ", mapped, page_size=1_000_000)
|
||||||
conn.execute("INSERT OR IGNORE INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag))
|
cursor.execute("INSERT INTO read_files VALUES (%s, %s) ON CONFLICT DO NOTHING", (export_file.path, export_file.etag))
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
|
||||||
LOG.info(f"Finished import of {export_file.path}")
|
LOG.info(f"Finished import of {export_file.path}")
|
||||||
|
|
||||||
|
|
||||||
|
def get_conn():
|
||||||
|
return psycopg2.connect(
|
||||||
|
host="localhost",
|
||||||
|
database="postgres",
|
||||||
|
user="postgres",
|
||||||
|
password="supersecure"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
# General DB init
|
# General DB init
|
||||||
init_database()
|
init_database()
|
||||||
|
import_accidents(2019)
|
||||||
|
import_bikepoints()
|
||||||
|
|
||||||
count_pre = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0]
|
# count_pre = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0]
|
||||||
|
#
|
||||||
# Download and import opendata from S3 bucket
|
# Download and import opendata from S3 bucket
|
||||||
all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv")
|
all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv")
|
||||||
for file in all_files:
|
for file in all_files:
|
||||||
import_usage_stats_file(file)
|
import_usage_stats_file(file)
|
||||||
|
#
|
||||||
count_after = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0]
|
# count_after = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0]
|
||||||
|
#
|
||||||
# Create search-index for faster querying
|
# # Create search-index for faster querying
|
||||||
create_indexes()
|
# create_indexes()
|
||||||
# Import Bikepoints
|
# # Import Bikepoints
|
||||||
import_bikepoints()
|
# import_bikepoints()
|
||||||
# Import bike accidents
|
# # Import bike accidents
|
||||||
import_accidents(2019)
|
# import_accidents(2019)
|
||||||
|
#
|
||||||
if count_after - count_pre > 0:
|
# if count_after - count_pre > 0:
|
||||||
create_dashboard_table()
|
# create_dashboard_table()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
75
projects/project-3/db/postgres.sql
Normal file
75
projects/project-3/db/postgres.sql
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
-- Tables
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS usage_stats(
|
||||||
|
rental_id BIGINT PRIMARY KEY,
|
||||||
|
duration BIGINT,
|
||||||
|
bike_id BIGINT,
|
||||||
|
end_date TIMESTAMP,
|
||||||
|
end_station_id BIGINT,
|
||||||
|
end_station_name TEXT,
|
||||||
|
start_date TIMESTAMP,
|
||||||
|
start_station_id BIGINT,
|
||||||
|
start_station_name TEXT
|
||||||
|
);
|
||||||
|
|
||||||
|
INSERT INTO usage_stats
|
||||||
|
VALUES (40346508, 360, 12019, TO_TIMESTAMP(1420326360), 424, 'Ebury Bridge, Pimlico', TO_TIMESTAMP(1420326000), 368, 'Harriet Street, Knightsbridge')
|
||||||
|
ON CONFLICT DO NOTHING;
|
||||||
|
|
||||||
|
SELECT TO_TIMESTAMP(1420326360);
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS read_files(
|
||||||
|
file_path TEXT,
|
||||||
|
etag TEXT PRIMARY KEY
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS bike_points(
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
common_name TEXT,
|
||||||
|
lat REAL,
|
||||||
|
lon REAL,
|
||||||
|
id_num BIGINT
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS accidents(
|
||||||
|
id BIGINT PRIMARY KEY,
|
||||||
|
lat REAL,
|
||||||
|
lon REAL,
|
||||||
|
location TEXT,
|
||||||
|
date TEXT,
|
||||||
|
severity TEXT
|
||||||
|
);
|
||||||
|
|
||||||
|
-- indicies
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_station_start_and_end_date ON usage_stats (start_station_id, start_date, end_date);
|
||||||
|
|
||||||
|
|
||||||
|
SELECT COUNT(*) FROM usage_stats;
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
min(u.start_station_name) AS startStationName,
|
||||||
|
u.end_station_name AS endStationName,
|
||||||
|
count(*) AS number,
|
||||||
|
round(avg(u.duration)) AS avgDuration
|
||||||
|
FROM usage_stats u
|
||||||
|
WHERE u.start_station_id = 512 AND u.start_date::DATE >= '2010-01-01'::DATE AND u.start_date::DATE <= '2022-01-15'::DATE
|
||||||
|
GROUP BY u.end_station_name
|
||||||
|
ORDER BY number DESC
|
||||||
|
LIMIT 3;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
MIN(b.id_num) as id,
|
||||||
|
MIN(b.common_name) AS commonName,
|
||||||
|
MIN(b.lat),
|
||||||
|
MIN(b.lon),
|
||||||
|
max(u.start_date) AS maxEndDate,
|
||||||
|
min(u.start_date) AS maxStartDate
|
||||||
|
FROM usage_stats u
|
||||||
|
JOIN bike_points b ON u.start_station_id = b.id_num
|
||||||
|
WHERE u.start_station_id = 306
|
Loading…
Reference in New Issue
Block a user