Remove parallelization cause sequential is faster

Remove class structure
Add proper logging
This commit is contained in:
Marcel Schwarz 2020-12-19 04:12:03 +01:00
parent 8b0a107c89
commit 2b205d3927
2 changed files with 107 additions and 129 deletions

View File

@ -1,2 +1,2 @@
tmp/
bike-data.db bike-data.db
*.log

View File

@ -1,23 +1,23 @@
import csv import csv
import json import json
import logging import logging
import os
import os.path
import shutil
import sqlite3 import sqlite3
import time
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
import requests import requests
WORKING_DIR = os.getcwd() logFormatter = logging.Formatter("%(asctime)-15s [%(levelname)8s] [%(threadName)s] - %(message)s")
TMP_DIR = os.path.join(WORKING_DIR, "tmp") LOG = logging.getLogger()
LOG.setLevel(logging.DEBUG)
logging.basicConfig(format="%(asctime)-15s [%(levelname)8s] [%(threadName)s] - %(message)s", level=logging.INFO) fileHandler = logging.FileHandler("importer.log")
LOG = logging.getLogger("Importer") fileHandler.setFormatter(logFormatter)
LOG.addHandler(fileHandler)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
LOG.addHandler(consoleHandler)
@dataclass @dataclass
@ -30,7 +30,6 @@ class ApiExportFile:
def get_online_files_list(subdir_filter=None, file_extension_filter=None): def get_online_files_list(subdir_filter=None, file_extension_filter=None):
import urllib.parse import urllib.parse
import xml.etree.ElementTree import xml.etree.ElementTree
import requests
base_uri = "https://s3-eu-west-1.amazonaws.com/cycling.data.tfl.gov.uk/" base_uri = "https://s3-eu-west-1.amazonaws.com/cycling.data.tfl.gov.uk/"
xml_data = xml.etree.ElementTree.fromstringlist(requests.get(base_uri).text) xml_data = xml.etree.ElementTree.fromstringlist(requests.get(base_uri).text)
@ -56,138 +55,117 @@ def get_online_files_list(subdir_filter=None, file_extension_filter=None):
return entries return entries
class BikeDatabase: def init_database():
def __init__(self): LOG.info("Try to create tables")
LOG.info("Created new database connection") conn = sqlite3.connect("bike-data.db", timeout=300)
self.conn = sqlite3.connect(os.path.join(WORKING_DIR, "bike-data.db"), check_same_thread=False, timeout=300) conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats(
rental_id INTEGER PRIMARY KEY,
duration INTEGER,
bike_id INTEGER,
end_date INTEGER,
end_station_id INTEGER,
end_station_name TEXT,
start_date INTEGER,
start_station_id INTEGER,
start_station_name TEXT
)""")
conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)")
conn.execute("""CREATE TABLE IF NOT EXISTS bike_points(
id TEXT PRIMARY KEY,
common_name TEXT,
lat REAL,
lon REAL,
id_num INTEGER AS (CAST(SUBSTR(id, 12) as INTEGER)) STORED
)""")
conn.commit()
conn.close()
LOG.info("Tables created")
def init_database(self):
LOG.info("Try to create tables")
self.conn.execute("""CREATE TABLE IF NOT EXISTS usage_stats(
rental_id INTEGER PRIMARY KEY,
duration INTEGER,
bike_id INTEGER,
end_date INTEGER,
end_station_id INTEGER,
end_station_name TEXT,
start_date INTEGER,
start_station_id INTEGER,
start_station_name TEXT
)""")
self.conn.execute("CREATE TABLE IF NOT EXISTS read_files(file_path TEXT, etag TEXT PRIMARY KEY)")
self.conn.execute("""CREATE TABLE IF NOT EXISTS bike_points(
id TEXT PRIMARY KEY,
common_name TEXT,
lat REAL,
lon REAL,
id_num INTEGER AS (CAST(SUBSTR(id, 12) as INTEGER)) STORED
)""")
self.conn.commit()
LOG.info("Tables created")
def create_indexes(self): def create_indexes():
LOG.info("Try to create indexes") LOG.info("Try to create indexes")
self.conn.execute("""CREATE INDEX IF NOT EXISTS idx_date_of_start_date conn = sqlite3.connect("bike-data.db", timeout=300)
ON usage_stats (date(start_date, "unixepoch"))""") conn.execute("""CREATE INDEX IF NOT EXISTS idx_date_of_start_date
self.conn.commit() ON usage_stats (date(start_date, "unixepoch"))""")
LOG.info("Indexes created") conn.commit()
conn.close()
LOG.info("Indexes created")
def import_bikepoints(self):
LOG.info("Importing bikepoints")
points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text)
points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon']), points))
self.conn.executemany("INSERT OR IGNORE INTO bike_points VALUES (?, ?, ?, ?)", points)
self.conn.commit()
LOG.info("Bikepoints imported")
def is_file_already_imported(self, etag): def import_bikepoints():
rows = self.conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (etag,)).fetchall() LOG.info("Importing bikepoints")
return len(rows) != 0 conn = sqlite3.connect("bike-data.db", timeout=300)
points = json.loads(requests.get("https://api.tfl.gov.uk/BikePoint").text)
points = list(map(lambda p: (p['id'], p['commonName'], p['lat'], p['lon']), points))
conn.executemany("INSERT OR IGNORE INTO bike_points VALUES (?, ?, ?, ?)", points)
conn.commit()
conn.close()
LOG.info("Bikepoints imported")
def import_usage_stats_file(self, export_file: ApiExportFile):
if self.is_file_already_imported(export_file.etag): def import_usage_stats_file(export_file: ApiExportFile):
LOG.warning(f"Skipping import of {export_file.path}") conn = sqlite3.connect("bike-data.db", timeout=300)
rows = conn.execute("SELECT * FROM read_files WHERE etag LIKE ?", (export_file.etag,)).fetchall()
if len(rows) != 0:
LOG.warning(f"Skipping import of {export_file.path}")
return
LOG.info(f"DOWNLOADING... {export_file.download_url}")
content = requests.get(export_file.download_url).content.decode("UTF-8")
LOG.info(f"Parsing {export_file.path}")
entries = list(csv.reader(content.splitlines()))[1:]
mapped = []
for entry in entries:
try:
mapped.append((
# Rental Id
int(entry[0]),
# Duration oder Duration_Seconds
int(entry[1] or "-1"),
# Bike Id
int(entry[2] or "-1"),
# End Date
int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1,
# EndStation Id
int(entry[4] or "-1"),
# EndStation Name
entry[5].strip(),
# Start Date
int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1,
# StartStation Id
int(entry[7]),
# StartStation Name
entry[8].strip()
))
except ValueError as e:
LOG.error(f"Value Error {e} on line {entry}")
return return
except KeyError as e:
cursor = self.conn.cursor() LOG.error(f"Key Error {e} on line {entry}")
return
os.makedirs(os.path.dirname(export_file.path), exist_ok=True) LOG.info(f"Writing {len(mapped)} entries to DB")
LOG.info(f"DOWNLOADING... {export_file.download_url} to {export_file.path}") conn.executemany("INSERT OR IGNORE INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped)
urllib.request.urlretrieve(export_file.download_url, export_file.path) conn.execute("INSERT OR IGNORE INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag))
conn.commit()
LOG.info(f"Importing {export_file.path}") conn.close()
with open(export_file.path, "r", newline='') as file: LOG.info(f"Finished import of {export_file.path}")
LOG.info(f"Reading file {export_file.path}")
entries = list(csv.reader(file))[1:]
mapped = []
for entry in entries:
try:
mapped.append((
# Rental Id
int(entry[0]),
# Duration oder Duration_Seconds
int(entry[1] or "-1"),
# Bike Id
int(entry[2] or "-1"),
# End Date
int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1,
# EndStation Id
int(entry[4] or "-1"),
# EndStation Name
entry[5].strip(),
# Start Date
int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1,
# StartStation Id
int(entry[7]),
# StartStation Name
entry[8].strip()
))
except ValueError as e:
LOG.error(f"Value Error {e} on line {entry}")
return
except KeyError as e:
LOG.error(f"Key Error {e} on line {entry}")
return
LOG.info(f"Writing {len(mapped)} entries to DB")
cursor.executemany("INSERT INTO usage_stats VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", mapped)
cursor.execute("INSERT INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag))
self.conn.commit()
LOG.info(f"Finished import of {export_file.path}")
os.remove(export_file.path)
LOG.info(f"Delete file {export_file.path}")
def main(): def main():
all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv")
# General DB init # General DB init
BikeDatabase().init_database() init_database()
# Download and import opendata from S3 bucket # Download and import opendata from S3 bucket
os.makedirs(TMP_DIR, exist_ok=True) all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv")
os.chdir(TMP_DIR) for file in all_files:
LOG.info("Switching into tmp dir") import_usage_stats_file(file)
import_tasks = []
with ThreadPoolExecutor(1) as executor:
for file in all_files:
db = BikeDatabase()
import_tasks.append(
executor.submit(db.import_usage_stats_file, file)
)
executor.shutdown(wait=True)
os.chdir(WORKING_DIR)
LOG.info("Switching back to workdir")
shutil.rmtree(TMP_DIR)
LOG.info("Deleted temp dir")
# Import Bikepoints # Import Bikepoints
BikeDatabase().import_bikepoints() import_bikepoints()
# Create search-index for faster querying # Create search-index for faster querying
BikeDatabase().create_indexes() create_indexes()
if __name__ == "__main__": if __name__ == "__main__":