From 7ad0df77cde592e357e0a3e9e3f9bec68416efa3 Mon Sep 17 00:00:00 2001 From: Marcel Schwarz Date: Mon, 14 Dec 2020 01:44:06 +0100 Subject: [PATCH] Implement first working database init script --- projects/project-3/openapi/.gitignore | 2 + projects/project-3/openapi/db_init.py | 146 +++++++++++++++++++++++ projects/project-3/openapi/downloader.py | 34 ------ projects/project-3/openapi/read_csv.py | 57 --------- 4 files changed, 148 insertions(+), 91 deletions(-) create mode 100644 projects/project-3/openapi/.gitignore create mode 100644 projects/project-3/openapi/db_init.py delete mode 100644 projects/project-3/openapi/downloader.py delete mode 100644 projects/project-3/openapi/read_csv.py diff --git a/projects/project-3/openapi/.gitignore b/projects/project-3/openapi/.gitignore new file mode 100644 index 0000000..a702315 --- /dev/null +++ b/projects/project-3/openapi/.gitignore @@ -0,0 +1,2 @@ +tmp/ +bike-data.db \ No newline at end of file diff --git a/projects/project-3/openapi/db_init.py b/projects/project-3/openapi/db_init.py new file mode 100644 index 0000000..16ac06d --- /dev/null +++ b/projects/project-3/openapi/db_init.py @@ -0,0 +1,146 @@ +import os.path +import shutil +from dataclasses import dataclass +import logging + + +WORKING_DIR = os.getcwd() +TMP_DIR = os.path.join(WORKING_DIR, "tmp") + +logging.basicConfig(format="%(asctime)-15s [%(levelname)8s] - %(message)s", level=logging.ERROR) +LOG = logging.getLogger("Importer") + + +@dataclass +class ApiExportFile: + path: str + download_url: str + etag: str + + +def get_online_files_list(subdir_filter=None, file_extension_filter=None): + import urllib.parse + import xml.etree.ElementTree + import requests + + base_uri = "https://s3-eu-west-1.amazonaws.com/cycling.data.tfl.gov.uk/" + xml_data = xml.etree.ElementTree.fromstringlist(requests.get(base_uri).text) + entries = [] + + for child in xml_data.findall('{http://s3.amazonaws.com/doc/2006-03-01/}Contents'): + key = child.find('{http://s3.amazonaws.com/doc/2006-03-01/}Key').text + etag = child.find('{http://s3.amazonaws.com/doc/2006-03-01/}ETag').text + if key.endswith('/'): + continue + + download_url = base_uri + urllib.parse.quote_plus(key, safe="/") + entries.append( + ApiExportFile(key, download_url, etag) + ) + + if subdir_filter: + entries = list(filter(lambda el: el.path.startswith(subdir_filter), entries)) + + if file_extension_filter: + entries = list(filter(lambda el: el.path.endswith(file_extension_filter), entries)) + + return entries + + +def download_file(url, save_path): + import os.path + import urllib.request + + save_path = os.path.join(TMP_DIR, save_path) + + if os.path.exists(save_path): + LOG.warning(f"Skipping exists: {save_path}") + return + + os.makedirs(os.path.dirname(save_path), exist_ok=True) + + LOG.info(f"DOWNLOADING... {url} to {save_path}") + urllib.request.urlretrieve(url, save_path) + + +class BikeDatabase: + + def __init__(self): + import sqlite3 + self.conn = sqlite3.connect("bike-data.db") + self.cursor = self.conn.cursor() + + def init_database(self): + self.conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats( + rental_id INTEGER PRIMARY KEY, + duration INTEGER, + bike_id INTEGER, + end_date INTEGER, + end_station_id INTEGER, + end_station_name TEXT, + start_date INTEGER, + start_station_id INTEGER, + start_station_name TEXT + )""") + self.conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)") + + def is_file_already_imported(self, etag): + rows = self.conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (etag,)).fetchall() + return len(rows) != 0 + + def import_usage_stats_file(self, export_file: ApiExportFile): + from datetime import datetime + import csv + import os + + os.chdir(TMP_DIR) + + LOG.info(f"Importing {export_file.path}") + with open(export_file.path, "r", newline='') as file: + LOG.info(f"Reading file {export_file.path}") + entries = list(csv.DictReader(file)) + mapped = [] + for entry in entries: + try: + mapped.append(( + int(entry['Rental Id']), + int(entry['Duration'] or "-1"), + int(entry['Bike Id'] or "-1"), + int(datetime.strptime(entry['End Date'][:16], "%d/%m/%Y %H:%M").timestamp()) if entry['End Date'] else -1, + int(entry['EndStation Id'] or "-1"), + entry['EndStation Name'], + int(datetime.strptime(entry['Start Date'][:16], "%d/%m/%Y %H:%M").timestamp()) if entry['Start Date'] else -1, + int(entry['StartStation Id']), + entry['StartStation Name'] + )) + except ValueError as e: + LOG.error(f"Value Error {e} on line {entry}") + return + except KeyError as e: + LOG.error(f"Key Error {e} on line {entry}") + return + LOG.info(f"Writing {len(mapped)} entries to DB") + self.cursor.executemany("INSERT INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped) + self.cursor.execute("INSERT INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag)) + self.conn.commit() + LOG.info(f"Finished import of {export_file.path}") + os.chdir(WORKING_DIR) + shutil.rmtree(TMP_DIR) + LOG.info("Deleted temp dir") + + +def main(): + all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv") + db = BikeDatabase() + db.init_database() + + for file in all_files: + if not db.is_file_already_imported(file.etag): + download_file(file.download_url, file.path) + db.import_usage_stats_file(file) + else: + LOG.warning(f"Skipping import of {file.path}") + + +if __name__ == "__main__": + main() diff --git a/projects/project-3/openapi/downloader.py b/projects/project-3/openapi/downloader.py deleted file mode 100644 index a54bf81..0000000 --- a/projects/project-3/openapi/downloader.py +++ /dev/null @@ -1,34 +0,0 @@ -import urllib.parse -import urllib.request -import xml.etree.ElementTree -import os.path - -import requests - -BASE_URI = "https://s3-eu-west-1.amazonaws.com/cycling.data.tfl.gov.uk/" -BASE_DIR = "data/" - -xml_data = xml.etree.ElementTree.fromstringlist(requests.get(BASE_URI).text) - -for child in xml_data.findall('{http://s3.amazonaws.com/doc/2006-03-01/}Contents'): - key = child.find('{http://s3.amazonaws.com/doc/2006-03-01/}Key').text - if key.endswith('/'): - continue - - parts = key.rsplit('/') - # create download url - parts.append(urllib.parse.quote_plus(parts.pop())) - download_url = BASE_URI + "/".join(parts) - - # create folders and files - parts.append(urllib.parse.unquote_plus(parts.pop())) - save_path = BASE_DIR + "/".join(parts) - os.makedirs(os.path.dirname(save_path), exist_ok=True) - - # skip already downloaded files - if os.path.exists(save_path): - continue - - # do the download - urllib.request.urlretrieve(download_url, save_path) - print(f"DOWNLOADING... {download_url} to {save_path}") diff --git a/projects/project-3/openapi/read_csv.py b/projects/project-3/openapi/read_csv.py deleted file mode 100644 index 31d9ac3..0000000 --- a/projects/project-3/openapi/read_csv.py +++ /dev/null @@ -1,57 +0,0 @@ -import csv -import os - - -def read_files(base_dir, max_num=2): - data = [] - for file in os.listdir(base_dir): - if max_num == 0: - break - print(f'reading file \'{file}\'') - with open(os.path.join(base_dir, file), 'r', newline='') as input_file: - data.extend(list(csv.DictReader(input_file))) - max_num -= 1 - return data - - -def get_key_counts(data): - all_key_sets = [e.keys() for e in data] - key_counts = {} - for key_set in all_key_sets: - for key in key_set: - try: - key_counts[key] += 1 - except KeyError: - key_counts[key] = 1 - return key_counts - - -def main(): - data = read_files('data/usage-stats', max_num=10) - print("Sorting") - data = sorted(data, key=lambda entry: int(entry['Rental Id'])) - print(f'final length of data {len(data)}') - - # counts = get_key_counts(data) - # for count, val in counts.items(): - # if val != len(data): - # print(count, val) - # print(counts) - with open('test.csv', 'w', newline='') as out_file: - writer = csv.DictWriter(out_file, data[0].keys(), extrasaction='ignore') - writer.writeheader() - writer.writerows(data) - - -if __name__ == '__main__': - main() - - -# -# with open('test.csv') as again_in: -# reader2 = csv.DictReader(again_in) -# data2 = list(reader2) -# -# print(data2[0]) -# -# print(f"Is same? {data[0] == data2[0]}")