geovisualisierung/projects/project-3/openapi/db_init.py

195 lines
6.8 KiB
Python
Raw Normal View History

import csv
2020-12-19 00:29:32 +01:00
import json
import logging
import os
import os.path
import shutil
import sqlite3
2020-12-19 00:29:32 +01:00
import time
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import datetime
2020-12-19 00:29:32 +01:00
import requests
WORKING_DIR = os.getcwd()
TMP_DIR = os.path.join(WORKING_DIR, "tmp")
logging.basicConfig(format="%(asctime)-15s [%(levelname)8s] [%(threadName)s] - %(message)s", level=logging.INFO)
LOG = logging.getLogger("Importer")
@dataclass
class ApiExportFile:
path: str
download_url: str
etag: str
def get_online_files_list(subdir_filter=None, file_extension_filter=None):
import urllib.parse
import xml.etree.ElementTree
import requests
base_uri = "https://s3-eu-west-1.amazonaws.com/cycling.data.tfl.gov.uk/"
xml_data = xml.etree.ElementTree.fromstringlist(requests.get(base_uri).text)
entries = []
for child in xml_data.findall('{http://s3.amazonaws.com/doc/2006-03-01/}Contents'):
key = child.find('{http://s3.amazonaws.com/doc/2006-03-01/}Key').text
etag = child.find('{http://s3.amazonaws.com/doc/2006-03-01/}ETag').text
if key.endswith('/'):
continue
download_url = base_uri + urllib.parse.quote_plus(key, safe="/")
entries.append(
ApiExportFile(key, download_url, etag)
)
if subdir_filter:
entries = list(filter(lambda el: el.path.startswith(subdir_filter), entries))
if file_extension_filter:
entries = list(filter(lambda el: el.path.endswith(file_extension_filter), entries))
return entries
class BikeDatabase:
def __init__(self):
2020-12-19 00:29:32 +01:00
LOG.info("Created new database connection")
self.conn = sqlite3.connect(os.path.join(WORKING_DIR, "bike-data.db"), check_same_thread=False, timeout=300)
def init_database(self):
2020-12-19 00:29:32 +01:00
LOG.info("Try to create tables")
self.conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats(
rental_id INTEGER PRIMARY KEY,
duration INTEGER,
bike_id INTEGER,
end_date INTEGER,
end_station_id INTEGER,
end_station_name TEXT,
start_date INTEGER,
start_station_id INTEGER,
start_station_name TEXT
)""")
self.conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)")
2020-12-19 00:29:32 +01:00
self.conn.execute("""CREATE TABLE IF NOT EXISTS bike_points(
id TEXT PRIMARY KEY,
common_name TEXT,
lat REAL,
lon REAL,
id_num INTEGER AS (CAST(SUBSTR(id, 12) as INTEGER)) STORED
)""")
self.conn.commit()
LOG.info("Tables created")
def create_indexes(self):
LOG.info("Try to create indexes")
self.conn.execute("""CREATE INDEX IF NOT EXISTS idx_date_of_start_date
ON usage_stats (date(start_date, "unixepoch"))""")
self.conn.commit()
LOG.info("Indexes created")
def import_bikepoints(self):
LOG.info("Importing bikepoints")
points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text)
points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon']), points))
self.conn.executemany("INSERT OR IGNORE INTO bike_points VALUES (?, ?, ?, ?)", points)
self.conn.commit()
LOG.info("Bikepoints imported")
def is_file_already_imported(self, etag):
rows = self.conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (etag,)).fetchall()
return len(rows) != 0
def import_usage_stats_file(self, export_file: ApiExportFile):
if self.is_file_already_imported(export_file.etag):
LOG.warning(f"Skipping import of {export_file.path}")
return
2020-12-19 00:29:32 +01:00
cursor = self.conn.cursor()
os.makedirs(os.path.dirname(export_file.path), exist_ok=True)
LOG.info(f"DOWNLOADING... {export_file.download_url} to {export_file.path}")
urllib.request.urlretrieve(export_file.download_url, export_file.path)
LOG.info(f"Importing {export_file.path}")
with open(export_file.path, "r", newline='') as file:
LOG.info(f"Reading file {export_file.path}")
entries = list(csv.reader(file))[1:]
mapped = []
for entry in entries:
try:
mapped.append((
# Rental Id
int(entry[0]),
# Duration oder Duration_Seconds
int(entry[1] or "-1"),
# Bike Id
int(entry[2] or "-1"),
# End Date
int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1,
# EndStation Id
int(entry[4] or "-1"),
# EndStation Name
entry[5].strip(),
# Start Date
int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1,
# StartStation Id
int(entry[7]),
# StartStation Name
entry[8].strip()
))
except ValueError as e:
LOG.error(f"Value Error {e} on line {entry}")
return
except KeyError as e:
LOG.error(f"Key Error {e} on line {entry}")
return
LOG.info(f"Writing {len(mapped)} entries to DB")
2020-12-19 00:29:32 +01:00
cursor.executemany("INSERT INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped)
cursor.execute("INSERT INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag))
self.conn.commit()
LOG.info(f"Finished import of {export_file.path}")
os.remove(export_file.path)
LOG.info(f"Delete file {export_file.path}")
def main():
all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv")
2020-12-19 00:29:32 +01:00
# General DB init
BikeDatabase().init_database()
2020-12-19 00:29:32 +01:00
# Download and import opendata from S3 bucket
os.makedirs(TMP_DIR, exist_ok=True)
os.chdir(TMP_DIR)
LOG.info("Switching into tmp dir")
import_tasks = []
2020-12-19 00:29:32 +01:00
with ThreadPoolExecutor(1) as executor:
for file in all_files:
db = BikeDatabase()
import_tasks.append(
executor.submit(db.import_usage_stats_file, file)
)
executor.shutdown(wait=True)
os.chdir(WORKING_DIR)
LOG.info("Switching back to workdir")
shutil.rmtree(TMP_DIR)
LOG.info("Deleted temp dir")
2020-12-19 00:29:32 +01:00
# Import Bikepoints
BikeDatabase().import_bikepoints()
# Create search-index for faster querying
BikeDatabase().create_indexes()
if __name__ == "__main__":
main()