Refactor: Code-Qualität verbessert und Projektstruktur aufgeräumt
Some checks failed
Deployment / deploy-docker (push) Has been cancelled

- daemon.py: gc.collect() entfernt, robustes Scheduling (last_run_date statt Minuten-Check),
  Exchange Registry Pattern eingeführt (STREAMING_EXCHANGES/STANDARD_EXCHANGES)
- deutsche_boerse.py: Thread-safe User-Agent Rotation bei Rate-Limits,
  Logging statt print(), Feiertags-Prüfung, aufgeteilte Parse-Methoden
- eix.py: Logging statt print(), spezifische Exception-Typen statt blankem except
- read.py gelöscht und durch scripts/inspect_gzip.py ersetzt (Streaming-basiert)
- Utility-Scripts in scripts/ verschoben (cleanup_duplicates, restore_and_fix, verify_fix)
This commit is contained in:
Melchior Reimers
2026-02-01 08:18:55 +01:00
parent cf55a0bd06
commit 1dc79b8b64
9 changed files with 545 additions and 308 deletions

311
daemon.py
View File

@@ -3,8 +3,10 @@ import logging
import datetime
import hashlib
import os
import gc
import requests
from typing import List, Type
from src.exchanges.base import BaseExchange
from src.exchanges.eix import EIXExchange
from src.exchanges.ls import LSExchange
from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange
@@ -26,6 +28,43 @@ DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
# =============================================================================
# Exchange Registry - Neue Börsen hier hinzufügen
# =============================================================================
# Exchanges die Streaming-Verarbeitung benötigen (große Datenmengen)
STREAMING_EXCHANGES: List[Type[BaseExchange]] = [
EIXExchange,
]
# Standard-Exchanges (normale Batch-Verarbeitung)
STANDARD_EXCHANGES: List[Type[BaseExchange]] = [
# Lang & Schwarz
LSExchange,
# Deutsche Börse
XetraExchange,
FrankfurtExchange,
QuotrixExchange,
# Weitere Börsen
GettexExchange,
StuttgartExchange,
# Börsenag (Düsseldorf, Hamburg, Hannover)
DUSAExchange,
DUSBExchange,
DUSCExchange,
DUSDExchange,
HAMAExchange,
HAMBExchange,
HANAExchange,
HANBExchange,
]
# =============================================================================
# Trades Cache
# =============================================================================
# Cache für existierende Trades pro Tag (wird nach jedem Exchange geleert)
_existing_trades_cache = {}
@@ -77,7 +116,6 @@ def clear_trades_cache():
"""Leert den Cache für existierende Trades."""
global _existing_trades_cache
_existing_trades_cache = {}
gc.collect()
def filter_new_trades_for_day(db_url, exchange_name, trades, day):
"""Filtert neue Trades für einen einzelnen Tag."""
@@ -123,207 +161,212 @@ def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=5000):
return new_trades
def get_last_trade_timestamp(db_url, exchange_name):
# QuestDB query: get the latest timestamp for a specific exchange
def get_last_trade_timestamp(db_url: str, exchange_name: str) -> datetime.datetime:
"""Holt den Timestamp des letzten Trades für eine Exchange aus QuestDB."""
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
try:
# Using the /exec endpoint to get data
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data['dataset']:
# QuestDB returns timestamp in micros since epoch by default in some views, or ISO
# Let's assume the timestamp is in the dataset
# ILP timestamps are stored as designated timestamps.
ts_value = data['dataset'][0][0] # Adjust index based on column order
if data.get('dataset'):
# QuestDB gibt Timestamps in Mikrosekunden oder ISO-Format zurück
ts_value = data['dataset'][0][0]
if isinstance(ts_value, str):
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
else:
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
except Exception as e:
logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}")
logger.debug(f"Keine existierenden Daten für {exchange_name} oder DB nicht erreichbar: {e}")
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
def process_eix_streaming(db, db_url, eix, historical=False):
"""Verarbeitet EIX in Streaming-Modus um RAM zu sparen."""
last_ts = get_last_trade_timestamp(db_url, eix.name)
logger.info(f"Fetching data from EIX (Last trade: {last_ts}) - STREAMING MODE...")
def process_eix_streaming(db, db_url: str, exchange: BaseExchange, historical: bool = False):
"""Verarbeitet eine Exchange im Streaming-Modus um RAM zu sparen."""
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts}) - STREAMING...")
# Hole Liste der zu verarbeitenden Dateien
if historical:
files = eix.get_files_to_process(limit=None, since_date=None)
files = exchange.get_files_to_process(limit=None, since_date=None)
else:
files = eix.get_files_to_process(limit=None, since_date=last_ts)
files = exchange.get_files_to_process(limit=None, since_date=last_ts)
if not files:
logger.info("No EIX files to process.")
logger.info(f"Keine {exchange.name} Dateien zu verarbeiten.")
return
logger.info(f"Found {len(files)} EIX files to process...")
logger.info(f"{len(files)} {exchange.name} Dateien gefunden...")
total_new = 0
total_processed = 0
for i, file_item in enumerate(files, 1):
file_name = file_item.get('fileName', 'unknown').split('/')[-1]
logger.info(f"Processing EIX file {i}/{len(files)}: {file_name}")
logger.info(f"Verarbeite {exchange.name} Datei {i}/{len(files)}: {file_name}")
# Lade eine Datei
trades = eix.fetch_trades_from_file(file_item)
trades = exchange.fetch_trades_from_file(file_item)
if not trades:
logger.info(f" No trades in file {file_name}")
logger.info(f" Keine Trades in {file_name}")
continue
total_processed += len(trades)
logger.info(f" Loaded {len(trades)} trades, filtering duplicates...")
logger.info(f" {len(trades)} Trades geladen, filtere Duplikate...")
# Filtere Duplikate
new_trades = filter_new_trades_batch(db_url, eix.name, trades, batch_size=5000)
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
if new_trades:
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
total_new += len(new_trades)
logger.info(f" Saved {len(new_trades)} new trades (total new: {total_new})")
logger.info(f" {len(new_trades)} neue Trades gespeichert (gesamt neu: {total_new})")
else:
logger.info(f" No new trades in this file")
logger.info(f" Keine neuen Trades in dieser Datei")
# Speicher freigeben
# Referenzen freigeben
del trades
del new_trades
gc.collect()
# Kurze Pause zwischen Dateien
time.sleep(0.1)
logger.info(f"EIX complete: {total_new} new trades from {total_processed} total processed.")
logger.info(f"{exchange.name} fertig: {total_new} neue Trades von {total_processed} verarbeitet.")
clear_trades_cache()
def process_standard_exchange(db, db_url: str, exchange: BaseExchange, historical: bool):
"""Verarbeitet einen Standard-Exchange mit Batch-Verarbeitung."""
try:
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts})...")
trades = exchange.fetch_latest_trades(include_yesterday=historical)
if not trades:
logger.info(f"Keine Trades von {exchange.name} erhalten.")
return
# Deduplizierung
logger.info(f"Filtere {len(trades)} Trades auf Duplikate...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
logger.info(f"Gefunden: {len(trades)} Trades gesamt, {len(new_trades)} sind neu.")
if new_trades:
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
logger.info(f"{len(new_trades)} neue Trades in QuestDB gespeichert.")
# Referenzen freigeben
del trades
if new_trades:
del new_trades
clear_trades_cache()
except Exception as e:
logger.error(f"Fehler bei Exchange {exchange.name}: {e}")
def run_task(historical=False):
logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...")
"""Haupttask: Holt Trades von allen registrierten Exchanges."""
logger.info(f"Starte Trading Data Fetcher (Historical: {historical})...")
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
db_url = "http://questdb:9000"
# === EIX - Streaming Verarbeitung ===
try:
eix = EIXExchange()
process_eix_streaming(db, db_url, eix, historical=historical)
except Exception as e:
logger.error(f"Error processing EIX: {e}")
# === Andere Exchanges - normale Verarbeitung ===
ls = LSExchange()
# Neue Deutsche Börse Exchanges
xetra = XetraExchange()
frankfurt = FrankfurtExchange()
quotrix = QuotrixExchange()
gettex = GettexExchange()
stuttgart = StuttgartExchange()
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
dusa = DUSAExchange()
dusb = DUSBExchange()
dusc = DUSCExchange()
dusd = DUSDExchange()
hama = HAMAExchange()
hamb = HAMBExchange()
hana = HANAExchange()
hanb = HANBExchange()
# Alle anderen Exchanges (kleinere Datenmengen)
exchanges_to_process = [
(ls, {'include_yesterday': historical}),
# Deutsche Börse Exchanges
(xetra, {'include_yesterday': historical}),
(frankfurt, {'include_yesterday': historical}),
(quotrix, {'include_yesterday': historical}),
(gettex, {'include_yesterday': historical}),
(stuttgart, {'include_yesterday': historical}),
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
(dusa, {'include_yesterday': historical}),
(dusb, {'include_yesterday': historical}),
(dusc, {'include_yesterday': historical}),
(dusd, {'include_yesterday': historical}),
(hama, {'include_yesterday': historical}),
(hamb, {'include_yesterday': historical}),
(hana, {'include_yesterday': historical}),
(hanb, {'include_yesterday': historical}),
]
for exchange, args in exchanges_to_process:
# Streaming-Exchanges verarbeiten (große Datenmengen)
for exchange_class in STREAMING_EXCHANGES:
try:
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...")
trades = exchange.fetch_latest_trades(**args)
if not trades:
logger.info(f"No trades fetched from {exchange.name}.")
continue
# Deduplizierung
logger.info(f"Filtering {len(trades)} trades for duplicates...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
if new_trades:
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
logger.info(f"Stored {len(new_trades)} new trades in QuestDB.")
# Speicher freigeben nach jedem Exchange
del trades
if new_trades:
del new_trades
clear_trades_cache()
gc.collect()
exchange = exchange_class()
logger.info(f"Verarbeite {exchange.name} im Streaming-Modus...")
process_eix_streaming(db, db_url, exchange, historical=historical)
except Exception as e:
logger.error(f"Error processing exchange {exchange.name}: {e}")
logger.error(f"Fehler bei Streaming-Exchange {exchange_class.__name__}: {e}")
logger.info("All exchanges processed.")
# Standard-Exchanges verarbeiten
for exchange_class in STANDARD_EXCHANGES:
try:
exchange = exchange_class()
process_standard_exchange(db, db_url, exchange, historical)
except Exception as e:
logger.error(f"Fehler bei Exchange {exchange_class.__name__}: {e}")
logger.info("Alle Exchanges verarbeitet.")
def main():
logger.info("Trading Daemon started.")
# 1. Startup Check: Ist die DB leer?
db_url = "http://questdb:9000"
is_empty = True
def is_database_empty(db_url: str) -> bool:
"""Prüft ob die Datenbank leer ist oder die Tabelle nicht existiert."""
try:
# Prüfe ob bereits Trades in der Tabelle sind
response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data['dataset'] and data['dataset'][0][0] > 0:
is_empty = False
if data.get('dataset') and data['dataset'][0][0] > 0:
return False
except Exception:
# Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist
is_empty = True
pass
return True
if is_empty:
logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...")
def calculate_seconds_until_target(target_hour: int, target_minute: int = 0) -> int:
"""Berechnet Sekunden bis zur nächsten Zielzeit."""
now = datetime.datetime.now()
target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
# Wenn Zielzeit heute schon vorbei ist, nimm morgen
if target <= now:
target += datetime.timedelta(days=1)
return int((target - now).total_seconds())
def main():
logger.info("Trading Daemon gestartet.")
db_url = "http://questdb:9000"
# Startup: Initialer Sync
if is_database_empty(db_url):
logger.info("Datenbank ist leer. Starte initialen historischen Fetch...")
run_task(historical=True)
else:
logger.info("Found existing data in database. Triggering catch-up sync...")
# Run a normal task to fetch any missing data since the last run
logger.info("Existierende Daten gefunden. Starte Catch-up Sync...")
run_task(historical=False)
logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.")
logger.info("Catch-up Sync abgeschlossen.")
# Scheduling Konfiguration
SCHEDULE_HOUR = 23
SCHEDULE_MINUTE = 0
last_run_date = None
logger.info(f"Warte auf täglichen Run um {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d}...")
while True:
now = datetime.datetime.now()
# Täglich um 23:00 Uhr
if now.hour == 23 and now.minute == 0:
run_task(historical=False)
# Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern
time.sleep(61)
today = now.date()
# Check alle 30 Sekunden
time.sleep(30)
# Prüfe ob wir heute schon gelaufen sind
already_ran_today = (last_run_date == today)
# Prüfe ob wir im Zeitfenster sind (23:00 - 23:59)
in_schedule_window = (now.hour == SCHEDULE_HOUR and now.minute >= SCHEDULE_MINUTE)
if in_schedule_window and not already_ran_today:
logger.info(f"Geplanter Task startet ({now.strftime('%Y-%m-%d %H:%M:%S')})...")
run_task(historical=False)
last_run_date = today
logger.info("Geplanter Task abgeschlossen. Warte auf nächsten Tag...")
# Dynamische Sleep-Zeit: Kurz vor Zielzeit öfter prüfen
seconds_until_target = calculate_seconds_until_target(SCHEDULE_HOUR, SCHEDULE_MINUTE)
if seconds_until_target > 3600:
# Mehr als 1 Stunde: Schlafe 30 Minuten
sleep_time = 1800
elif seconds_until_target > 300:
# 5 Minuten bis 1 Stunde: Schlafe 5 Minuten
sleep_time = 300
else:
# Unter 5 Minuten: Schlafe 30 Sekunden
sleep_time = 30
time.sleep(sleep_time)
if __name__ == "__main__":
main()