Make import multi threaded, improve logging
This commit is contained in:
parent
7ad0df77cd
commit
9f8076286e
@ -1,13 +1,18 @@
|
|||||||
|
import csv
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
import os.path
|
import os.path
|
||||||
import shutil
|
import shutil
|
||||||
|
import sqlite3
|
||||||
|
import urllib.request
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
import logging
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
WORKING_DIR = os.getcwd()
|
WORKING_DIR = os.getcwd()
|
||||||
TMP_DIR = os.path.join(WORKING_DIR, "tmp")
|
TMP_DIR = os.path.join(WORKING_DIR, "tmp")
|
||||||
|
|
||||||
logging.basicConfig(format="%(asctime)-15s [%(levelname)8s] - %(message)s", level=logging.ERROR)
|
logging.basicConfig(format="%(asctime)-15s [%(levelname)8s] [%(threadName)s] - %(message)s", level=logging.INFO)
|
||||||
LOG = logging.getLogger("Importer")
|
LOG = logging.getLogger("Importer")
|
||||||
|
|
||||||
|
|
||||||
@ -47,27 +52,9 @@ def get_online_files_list(subdir_filter=None, file_extension_filter=None):
|
|||||||
return entries
|
return entries
|
||||||
|
|
||||||
|
|
||||||
def download_file(url, save_path):
|
|
||||||
import os.path
|
|
||||||
import urllib.request
|
|
||||||
|
|
||||||
save_path = os.path.join(TMP_DIR, save_path)
|
|
||||||
|
|
||||||
if os.path.exists(save_path):
|
|
||||||
LOG.warning(f"Skipping exists: {save_path}")
|
|
||||||
return
|
|
||||||
|
|
||||||
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
|
||||||
|
|
||||||
LOG.info(f"DOWNLOADING... {url} to {save_path}")
|
|
||||||
urllib.request.urlretrieve(url, save_path)
|
|
||||||
|
|
||||||
|
|
||||||
class BikeDatabase:
|
class BikeDatabase:
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
import sqlite3
|
self.conn = sqlite3.connect(os.path.join(WORKING_DIR, "bike-data.db"), check_same_thread=False, timeout=60)
|
||||||
self.conn = sqlite3.connect("bike-data.db")
|
|
||||||
self.cursor = self.conn.cursor()
|
self.cursor = self.conn.cursor()
|
||||||
|
|
||||||
def init_database(self):
|
def init_database(self):
|
||||||
@ -89,29 +76,41 @@ class BikeDatabase:
|
|||||||
return len(rows) != 0
|
return len(rows) != 0
|
||||||
|
|
||||||
def import_usage_stats_file(self, export_file: ApiExportFile):
|
def import_usage_stats_file(self, export_file: ApiExportFile):
|
||||||
from datetime import datetime
|
|
||||||
import csv
|
|
||||||
import os
|
|
||||||
|
|
||||||
os.chdir(TMP_DIR)
|
if self.is_file_already_imported(export_file.etag):
|
||||||
|
LOG.warning(f"Skipping import of {export_file.path}")
|
||||||
|
return
|
||||||
|
|
||||||
|
os.makedirs(os.path.dirname(export_file.path), exist_ok=True)
|
||||||
|
LOG.info(f"DOWNLOADING... {export_file.download_url} to {export_file.path}")
|
||||||
|
urllib.request.urlretrieve(export_file.download_url, export_file.path)
|
||||||
|
|
||||||
LOG.info(f"Importing {export_file.path}")
|
LOG.info(f"Importing {export_file.path}")
|
||||||
with open(export_file.path, "r", newline='') as file:
|
with open(export_file.path, "r", newline='') as file:
|
||||||
LOG.info(f"Reading file {export_file.path}")
|
LOG.info(f"Reading file {export_file.path}")
|
||||||
entries = list(csv.DictReader(file))
|
entries = list(csv.reader(file))[1:]
|
||||||
mapped = []
|
mapped = []
|
||||||
for entry in entries:
|
for entry in entries:
|
||||||
try:
|
try:
|
||||||
mapped.append((
|
mapped.append((
|
||||||
int(entry['Rental Id']),
|
# Rental Id
|
||||||
int(entry['Duration'] or "-1"),
|
int(entry[0]),
|
||||||
int(entry['Bike Id'] or "-1"),
|
# Duration oder Duration_Seconds
|
||||||
int(datetime.strptime(entry['End Date'][:16], "%d/%m/%Y %H:%M").timestamp()) if entry['End Date'] else -1,
|
int(entry[1] or "-1"),
|
||||||
int(entry['EndStation Id'] or "-1"),
|
# Bike Id
|
||||||
entry['EndStation Name'],
|
int(entry[2] or "-1"),
|
||||||
int(datetime.strptime(entry['Start Date'][:16], "%d/%m/%Y %H:%M").timestamp()) if entry['Start Date'] else -1,
|
# End Date
|
||||||
int(entry['StartStation Id']),
|
int(datetime.strptime(entry[3][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[3] else -1,
|
||||||
entry['StartStation Name']
|
# EndStation Id
|
||||||
|
int(entry[4] or "-1"),
|
||||||
|
# EndStation Name
|
||||||
|
entry[5].strip(),
|
||||||
|
# Start Date
|
||||||
|
int(datetime.strptime(entry[6][:16], "%d/%m/%Y %H:%M").timestamp()) if entry[6] else -1,
|
||||||
|
# StartStation Id
|
||||||
|
int(entry[7]),
|
||||||
|
# StartStation Name
|
||||||
|
entry[8].strip()
|
||||||
))
|
))
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
LOG.error(f"Value Error {e} on line {entry}")
|
LOG.error(f"Value Error {e} on line {entry}")
|
||||||
@ -124,22 +123,34 @@ class BikeDatabase:
|
|||||||
self.cursor.execute("INSERT INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag))
|
self.cursor.execute("INSERT INTO read_files VALUES (?, ?)", (export_file.path, export_file.etag))
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
LOG.info(f"Finished import of {export_file.path}")
|
LOG.info(f"Finished import of {export_file.path}")
|
||||||
os.chdir(WORKING_DIR)
|
os.remove(export_file.path)
|
||||||
shutil.rmtree(TMP_DIR)
|
LOG.info(f"Delete file {export_file.path}")
|
||||||
LOG.info("Deleted temp dir")
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv")
|
all_files = get_online_files_list(subdir_filter="usage-stats", file_extension_filter=".csv")
|
||||||
|
|
||||||
db = BikeDatabase()
|
db = BikeDatabase()
|
||||||
db.init_database()
|
db.init_database()
|
||||||
|
|
||||||
for file in all_files:
|
os.makedirs(TMP_DIR, exist_ok=True)
|
||||||
if not db.is_file_already_imported(file.etag):
|
os.chdir(TMP_DIR)
|
||||||
download_file(file.download_url, file.path)
|
LOG.info("Switching into tmp dir")
|
||||||
db.import_usage_stats_file(file)
|
|
||||||
else:
|
import_tasks = []
|
||||||
LOG.warning(f"Skipping import of {file.path}")
|
|
||||||
|
with ThreadPoolExecutor(3) as executor:
|
||||||
|
for file in all_files:
|
||||||
|
db = BikeDatabase()
|
||||||
|
import_tasks.append(
|
||||||
|
executor.submit(db.import_usage_stats_file, file)
|
||||||
|
)
|
||||||
|
executor.shutdown(wait=True)
|
||||||
|
|
||||||
|
os.chdir(WORKING_DIR)
|
||||||
|
LOG.info("Switching back to workdir")
|
||||||
|
shutil.rmtree(TMP_DIR)
|
||||||
|
LOG.info("Deleted temp dir")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
Loading…
Reference in New Issue
Block a user