import csv import json import logging import sqlite3 from dataclasses import dataclass from datetime import datetime import requests DB_NAME = "bike-data.db" logFormatter = logging.Formatter("%(asctime)-15s [%(levelname)8s] [%(threadName)s] - %(message)s") LOG = logging.getLogger() LOG.setLevel(logging.DEBUG) fileHandler = logging.FileHandler("db_init.log") fileHandler.setFormatter(logFormatter) LOG.addHandler(fileHandler) consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(logFormatter) LOG.addHandler(consoleHandler) @dataclass class ApiExportFile: path: str download_url: str etag: str def get_online_files_list(subdir_filter=None, file_extension_filter=None): import urllib.parse import xml.etree.ElementTree base_uri = "https://s3-eu-west-1.amazonaws.com/cycling.data.tfl.gov.uk/" xml_data = xml.etree.ElementTree.fromstringlist(requests.get(base_uri).text) entries = [] for child in xml_data.findall('{http://s3.amazonaws.com/doc/2006-03-01/}Contents'): key = child.find('{http://s3.amazonaws.com/doc/2006-03-01/}Key').text etag = child.find('{http://s3.amazonaws.com/doc/2006-03-01/}ETag').text if key.endswith('/'): continue download_url = base_uri + urllib.parse.quote_plus(key, safe="/") entries.append( ApiExportFile(key, download_url, etag) ) if subdir_filter: entries = list(filter(lambda el: el.path.startswith(subdir_filter), entries)) if file_extension_filter: entries = list(filter(lambda el: el.path.endswith(file_extension_filter), entries)) return entries def init_database(): LOG.info("Try to create tables") conn = sqlite3.connect(DB_NAME, timeout=300) conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats( rental_id INTEGER PRIMARY KEY, duration INTEGER, bike_id INTEGER, end_date INTEGER, end_station_id INTEGER, end_station_name TEXT, start_date INTEGER, start_station_id INTEGER, start_station_name TEXT )""") conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)") conn.execute("""CREATE TABLE IF NOT EXISTS bike_points( id TEXT PRIMARY KEY, common_name TEXT, lat REAL, lon REAL, id_num INTEGER AS (CAST(SUBSTR(id, 12) as INTEGER)) STORED )""") conn.execute("""CREATE TABLE IF NOT EXISTS accidents( id INTEGER PRIMARY KEY, lat REAL, lon REAL, location TEXT, date TEXT, severity TEXT )""") conn.commit() conn.close() LOG.info("Tables created") def create_indexes(): LOG.info("Try to create indexes") conn = sqlite3.connect(DB_NAME, timeout=300) LOG.info("Starting to build index: idx_date_of_start_date") conn.execute("""CREATE INDEX IF NOT EXISTS idx_date_of_start_date ON usage_stats (date(start_date, "unixepoch"))""") conn.commit() LOG.info("Created index: idx_date_of_start_date") LOG.info("Starting to build index: idx_end_station_id_date_of_start_date") conn.execute("""CREATE INDEX IF NOT EXISTS "idx_end_station_id_date_of_start_date" ON "usage_stats" ("end_station_id" ASC, date(start_date, "unixepoch"))""") conn.commit() LOG.info("Created index: idx_end_station_id_date_of_start_date") LOG.info("Starting to build index: idx_start_station_id_date_of_start_date") conn.execute("""CREATE INDEX IF NOT EXISTS "idx_start_station_id_date_of_start_date" ON "usage_stats" ("start_station_id" ASC, date("start_date", "unixepoch"))""") conn.commit() LOG.info("Created index: idx_start_station_id_date_of_start_date") conn.close() LOG.info("Indexes created") def import_bikepoints(): LOG.info("Importing bikepoints") conn = sqlite3.connect(DB_NAME, timeout=300) points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text) points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon']), points)) LOG.info(f"Writing {len(points)} bikepoints to DB") conn.executemany("INSERT OR IGNORE INTO bike_points VALUES (?, ?, ?, ?)", points) conn.commit() conn.close() LOG.info("Bikepoints imported") def import_accidents(year): LOG.info("Importing accidents") conn = sqlite3.connect(DB_NAME, timeout=300) def filter_pedal_cycles(accident): for vehicle in accident['vehicles']: if vehicle['type'] == "PedalCycle": return True return False accidents = requests.get(f"https://api.tfl.gov.uk/AccidentStats/{year}").text accidents = json.loads(accidents) accidents = list(filter(filter_pedal_cycles, accidents)) accidents = list(map(lambda a: (a['id'], a['lat'], a['lon'], a['location'], a['date'], a['severity']), accidents)) LOG.info(f"Writing {len(accidents)} bike accidents to DB") conn.executemany("INSERT OR IGNORE INTO accidents VALUES (?, ?, ?, ?, ?, ?)", accidents) conn.commit() conn.close() LOG.info("Accidents importet") def import_usage_stats_file(export_file: ApiExportFile): conn = sqlite3.connect(DB_NAME, timeout=300) rows = conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (export_file.etag,)).fetchall() if len(rows) != 0: LOG.warning(f"Skipping import of {export_file.path}") return LOG.info(f"DOWNLOADING... {export_file.download_url}") content = requests.get(export_file.download_url).content.decode("UTF-8") LOG.info(f"Parsing {export_file.path}") entries = list(csv.reader(content.splitlines()))[1:] mapped = [] for entry in entries: try: mapped.append(( # Rental Id int(entry[0]), # Duration oder Duration_Seconds int(entry[1] or "-1"), # Bike Id int(entry[2] or "-1"), # End Date int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1, # EndStation Id int(entry[4] or "-1"), # EndStation Name entry[5].strip(), # Start Date int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1, # StartStation Id int(entry[7]), # StartStation Name entry[8].strip() )) except ValueError as e: LOG.error(f"Value Error {e} on line {entry}") return except KeyError as e: LOG.error(f"Key Error {e} on line {entry}") return LOG.info(f"Writing {len(mapped)} entries to DB") conn.executemany("INSERT OR IGNORE INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped) conn.execute("INSERT OR IGNORE INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag)) conn.commit() conn.close() LOG.info(f"Finished import of {export_file.path}") def main(): # General DB init init_database() # Download and import opendata from S3 bucket all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv") for file in all_files: import_usage_stats_file(file) # Create search-index for faster querying create_indexes() # Import Bikepoints import_bikepoints() # Import bike accidents import_accidents(2019) if __name__ == "__main__": main()