geovisualisierung/projects/project-3/backend/db_init.py

243 lines
8.4 KiB
Python

import csv
import json
import logging
import sqlite3
from dataclasses import dataclass
from datetime import datetime
import requests
DB_NAME = "bike-data.db"
logFormatter = logging.Formatter("%(asctime)-15s [%(levelname)8s] [%(threadName)s] - %(message)s")
LOG = logging.getLogger()
LOG.setLevel(logging.DEBUG)
fileHandler = logging.FileHandler("db_init.log")
fileHandler.setFormatter(logFormatter)
LOG.addHandler(fileHandler)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
LOG.addHandler(consoleHandler)
@dataclass
class ApiExportFile:
path: str
download_url: str
etag: str
def get_online_files_list(subdir_filter=None, file_extension_filter=None):
import urllib.parse
import xml.etree.ElementTree
base_uri = "https://s3-eu-west-1.amazonaws.com/cycling.data.tfl.gov.uk/"
xml_data = xml.etree.ElementTree.fromstringlist(requests.get(base_uri).text)
entries = []
for child in xml_data.findall('{http://s3.amazonaws.com/doc/2006-03-01/}Contents'):
key = child.find('{http://s3.amazonaws.com/doc/2006-03-01/}Key').text
etag = child.find('{http://s3.amazonaws.com/doc/2006-03-01/}ETag').text
if key.endswith('/'):
continue
download_url = base_uri + urllib.parse.quote_plus(key, safe="/")
entries.append(
ApiExportFile(key, download_url, etag)
)
if subdir_filter:
entries = list(filter(lambda el: el.path.startswith(subdir_filter), entries))
if file_extension_filter:
entries = list(filter(lambda el: el.path.endswith(file_extension_filter), entries))
return entries
def init_database():
LOG.info("Try to create tables")
conn = sqlite3.connect(DB_NAME, timeout=300)
conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats(
rental_id INTEGER PRIMARY KEY,
duration INTEGER,
bike_id INTEGER,
end_date INTEGER,
end_station_id INTEGER,
end_station_name TEXT,
start_date INTEGER,
start_station_id INTEGER,
start_station_name TEXT
)""")
conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)")
conn.execute("""CREATE TABLE IF NOT EXISTS bike_points(
id TEXT PRIMARY KEY,
common_name TEXT,
lat REAL,
lon REAL,
id_num INTEGER
)""")
conn.execute("""CREATE TABLE IF NOT EXISTS accidents(
id INTEGER PRIMARY KEY,
lat REAL,
lon REAL,
location TEXT,
date TEXT,
severity TEXT
)""")
conn.commit()
conn.close()
LOG.info("Tables created")
def create_indexes():
LOG.info("Try to create indexes")
conn = sqlite3.connect(DB_NAME, timeout=300)
LOG.info("Starting to build index: idx_date_of_start_date")
conn.execute("""CREATE INDEX IF NOT EXISTS idx_date_of_start_date
ON usage_stats (date(start_date, 'unixepoch'))""")
conn.commit()
LOG.info("Created index: idx_date_of_start_date")
LOG.info("Starting to build index: idx_end_station_id_date_of_start_date")
conn.execute("""CREATE INDEX IF NOT EXISTS "idx_end_station_id_date_of_start_date"
ON "usage_stats" ("end_station_id" ASC, date(start_date, 'unixepoch'))""")
conn.commit()
LOG.info("Created index: idx_end_station_id_date_of_start_date")
LOG.info("Starting to build index: idx_start_station_id_date_of_start_date")
conn.execute("""CREATE INDEX IF NOT EXISTS "idx_start_station_id_date_of_start_date"
ON "usage_stats" ("start_station_id" ASC, date("start_date", 'unixepoch'))""")
conn.commit()
LOG.info("Created index: idx_start_station_id_date_of_start_date")
conn.close()
LOG.info("Indexes created")
def create_dashboard_table():
LOG.info("Creating dashboard table")
conn = sqlite3.connect(DB_NAME, timeout=300)
conn.execute("DROP TABLE IF EXISTS dashboard")
conn.execute("""CREATE TABLE dashboard AS SELECT
b.id_num as id,
max(date(u.start_date, 'unixepoch')) AS max_end_date,
min(date(u.start_date, 'unixepoch')) AS max_start_date
FROM usage_stats u
JOIN bike_points b ON u.start_station_id = b.id_num
GROUP BY b.id_num""")
conn.commit()
LOG.info("Created dashboard table")
def import_bikepoints():
LOG.info("Importing bikepoints")
conn = sqlite3.connect(DB_NAME, timeout=300)
points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text)
points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon'], int(p['id'][11:])), points))
LOG.info(f"Writing {len(points)} bikepoints to DB")
conn.executemany("INSERT OR IGNORE INTO bike_points VALUES (?, ?, ?, ?, ?)", points)
conn.commit()
conn.close()
LOG.info("Bikepoints imported")
def import_accidents(year):
LOG.info("Importing accidents")
conn = sqlite3.connect(DB_NAME, timeout=300)
def filter_pedal_cycles(accident):
for vehicle in accident['vehicles']:
if vehicle['type'] == "PedalCycle":
return True
return False
accidents = requests.get(f"https://api.tfl.gov.uk/AccidentStats/{year}").text
accidents = json.loads(accidents)
accidents = list(filter(filter_pedal_cycles, accidents))
accidents = list(map(lambda a: (a['id'], a['lat'], a['lon'], a['location'], a['date'], a['severity']), accidents))
LOG.info(f"Writing {len(accidents)} bike accidents to DB")
conn.executemany("INSERT OR IGNORE INTO accidents VALUES (?, ?, ?, ?, ?, ?)", accidents)
conn.commit()
conn.close()
LOG.info("Accidents importet")
def import_usage_stats_file(export_file: ApiExportFile):
conn = sqlite3.connect(DB_NAME, timeout=300)
rows = conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (export_file.etag,)).fetchall()
if len(rows) != 0:
LOG.warning(f"Skipping import of {export_file.path}")
return
LOG.info(f"DOWNLOADING... {export_file.download_url}")
content = requests.get(export_file.download_url).content.decode("UTF-8")
LOG.info(f"Parsing {export_file.path}")
entries = list(csv.reader(content.splitlines()))[1:]
mapped = []
for entry in entries:
try:
mapped.append((
# Rental Id
int(entry[0]),
# Duration oder Duration_Seconds
int(entry[1] or "-1"),
# Bike Id
int(entry[2] or "-1"),
# End Date
int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1,
# EndStation Id
int(entry[4] or "-1"),
# EndStation Name
entry[5].strip(),
# Start Date
int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1,
# StartStation Id
int(entry[7]),
# StartStation Name
entry[8].strip()
))
except ValueError as e:
LOG.error(f"Value Error {e} on line {entry}")
return
except KeyError as e:
LOG.error(f"Key Error {e} on line {entry}")
return
LOG.info(f"Writing {len(mapped)} entries to DB")
conn.executemany("INSERT OR IGNORE INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped)
conn.execute("INSERT OR IGNORE INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag))
conn.commit()
conn.close()
LOG.info(f"Finished import of {export_file.path}")
def main():
# General DB init
init_database()
count_pre = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0]
# Download and import opendata from S3 bucket
all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv")
for file in all_files:
import_usage_stats_file(file)
count_after = sqlite3.connect(DB_NAME, timeout=300).execute("SELECT count(*) FROM usage_stats").fetchone()[0]
# Create search-index for faster querying
create_indexes()
# Import Bikepoints
import_bikepoints()
# Import bike accidents
import_accidents(2019)
if count_after - count_pre > 0:
create_dashboard_table()
if __name__ == "__main__":
main()