import os.path import shutil from dataclasses import dataclass import logging WORKING_DIR = os.getcwd() TMP_DIR = os.path.join(WORKING_DIR, "tmp") logging.basicConfig(format="%(asctime)-15s [%(levelname)8s] - %(message)s", level=logging.ERROR) LOG = logging.getLogger("Importer") @dataclass class ApiExportFile: path: str download_url: str etag: str def get_online_files_list(subdir_filter=None, file_extension_filter=None): import urllib.parse import xml.etree.ElementTree import requests base_uri = "https://s3-eu-west-1.amazonaws.com/cycling.data.tfl.gov.uk/" xml_data = xml.etree.ElementTree.fromstringlist(requests.get(base_uri).text) entries = [] for child in xml_data.findall('{http://s3.amazonaws.com/doc/2006-03-01/}Contents'): key = child.find('{http://s3.amazonaws.com/doc/2006-03-01/}Key').text etag = child.find('{http://s3.amazonaws.com/doc/2006-03-01/}ETag').text if key.endswith('/'): continue download_url = base_uri + urllib.parse.quote_plus(key, safe="/") entries.append( ApiExportFile(key, download_url, etag) ) if subdir_filter: entries = list(filter(lambda el: el.path.startswith(subdir_filter), entries)) if file_extension_filter: entries = list(filter(lambda el: el.path.endswith(file_extension_filter), entries)) return entries def download_file(url, save_path): import os.path import urllib.request save_path = os.path.join(TMP_DIR, save_path) if os.path.exists(save_path): LOG.warning(f"Skipping exists: {save_path}") return os.makedirs(os.path.dirname(save_path), exist_ok=True) LOG.info(f"DOWNLOADING... {url} to {save_path}") urllib.request.urlretrieve(url, save_path) class BikeDatabase: def __init__(self): import sqlite3 self.conn = sqlite3.connect("bike-data.db") self.cursor = self.conn.cursor() def init_database(self): self.conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats( rental_id INTEGER PRIMARY KEY, duration INTEGER, bike_id INTEGER, end_date INTEGER, end_station_id INTEGER, end_station_name TEXT, start_date INTEGER, start_station_id INTEGER, start_station_name TEXT )""") self.conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)") def is_file_already_imported(self, etag): rows = self.conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (etag,)).fetchall() return len(rows) != 0 def import_usage_stats_file(self, export_file: ApiExportFile): from datetime import datetime import csv import os os.chdir(TMP_DIR) LOG.info(f"Importing {export_file.path}") with open(export_file.path, "r", newline='') as file: LOG.info(f"Reading file {export_file.path}") entries = list(csv.DictReader(file)) mapped = [] for entry in entries: try: mapped.append(( int(entry['Rental Id']), int(entry['Duration'] or "-1"), int(entry['Bike Id'] or "-1"), int(datetime.strptime(entry['End Date'][:16], "%d/%m/%Y %H:%M").timestamp()) if entry['End Date'] else -1, int(entry['EndStation Id'] or "-1"), entry['EndStation Name'], int(datetime.strptime(entry['Start Date'][:16], "%d/%m/%Y %H:%M").timestamp()) if entry['Start Date'] else -1, int(entry['StartStation Id']), entry['StartStation Name'] )) except ValueError as e: LOG.error(f"Value Error {e} on line {entry}") return except KeyError as e: LOG.error(f"Key Error {e} on line {entry}") return LOG.info(f"Writing {len(mapped)} entries to DB") self.cursor.executemany("INSERT INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped) self.cursor.execute("INSERT INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag)) self.conn.commit() LOG.info(f"Finished import of {export_file.path}") os.chdir(WORKING_DIR) shutil.rmtree(TMP_DIR) LOG.info("Deleted temp dir") def main(): all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv") db = BikeDatabase() db.init_database() for file in all_files: if not db.is_file_already_imported(file.etag): download_file(file.download_url, file.path) db.import_usage_stats_file(file) else: LOG.warning(f"Skipping import of {file.path}") if __name__ == "__main__": main()