Files
trading-daemon/daemon.py
Melchior Reimers 1dc79b8b64
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
Refactor: Code-Qualität verbessert und Projektstruktur aufgeräumt
- daemon.py: gc.collect() entfernt, robustes Scheduling (last_run_date statt Minuten-Check),
  Exchange Registry Pattern eingeführt (STREAMING_EXCHANGES/STANDARD_EXCHANGES)
- deutsche_boerse.py: Thread-safe User-Agent Rotation bei Rate-Limits,
  Logging statt print(), Feiertags-Prüfung, aufgeteilte Parse-Methoden
- eix.py: Logging statt print(), spezifische Exception-Typen statt blankem except
- read.py gelöscht und durch scripts/inspect_gzip.py ersetzt (Streaming-basiert)
- Utility-Scripts in scripts/ verschoben (cleanup_duplicates, restore_and_fix, verify_fix)
2026-02-01 08:18:55 +01:00

373 lines
14 KiB
Python

import time
import logging
import datetime
import hashlib
import os
import requests
from typing import List, Type
from src.exchanges.base import BaseExchange
from src.exchanges.eix import EIXExchange
from src.exchanges.ls import LSExchange
from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange
from src.exchanges.gettex import GettexExchange
from src.exchanges.stuttgart import StuttgartExchange
from src.exchanges.boersenag import (
DUSAExchange, DUSBExchange, DUSCExchange, DUSDExchange,
HAMAExchange, HAMBExchange, HANAExchange, HANBExchange
)
from src.database.questdb_client import DatabaseClient
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("TradingDaemon")
DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
# =============================================================================
# Exchange Registry - Neue Börsen hier hinzufügen
# =============================================================================
# Exchanges die Streaming-Verarbeitung benötigen (große Datenmengen)
STREAMING_EXCHANGES: List[Type[BaseExchange]] = [
EIXExchange,
]
# Standard-Exchanges (normale Batch-Verarbeitung)
STANDARD_EXCHANGES: List[Type[BaseExchange]] = [
# Lang & Schwarz
LSExchange,
# Deutsche Börse
XetraExchange,
FrankfurtExchange,
QuotrixExchange,
# Weitere Börsen
GettexExchange,
StuttgartExchange,
# Börsenag (Düsseldorf, Hamburg, Hannover)
DUSAExchange,
DUSBExchange,
DUSCExchange,
DUSDExchange,
HAMAExchange,
HAMBExchange,
HANAExchange,
HANBExchange,
]
# =============================================================================
# Trades Cache
# =============================================================================
# Cache für existierende Trades pro Tag (wird nach jedem Exchange geleert)
_existing_trades_cache = {}
def get_trade_hash(trade):
"""Erstellt einen eindeutigen Hash für einen Trade."""
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
return hashlib.md5(key.encode()).hexdigest()
def get_existing_trades_for_day(db_url, exchange_name, day):
"""Holt existierende Trades für einen Tag aus der DB (mit Caching)."""
cache_key = f"{exchange_name}_{day.strftime('%Y-%m-%d')}"
if cache_key in _existing_trades_cache:
return _existing_trades_cache[cache_key]
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
day_end = day + datetime.timedelta(days=1)
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
query = f"""
SELECT isin, timestamp, price, quantity
FROM trades
WHERE exchange = '{exchange_name}'
AND timestamp >= '{day_start_str}'
AND timestamp < '{day_end_str}'
"""
existing_trades = set()
try:
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60)
if response.status_code == 200:
data = response.json()
if data.get('dataset'):
for row in data['dataset']:
isin, ts, price, qty = row
if isinstance(ts, str):
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
key = (isin, ts_dt.isoformat(), float(price), float(qty))
existing_trades.add(key)
except Exception as e:
logger.warning(f"Error fetching existing trades for {day}: {e}")
_existing_trades_cache[cache_key] = existing_trades
return existing_trades
def clear_trades_cache():
"""Leert den Cache für existierende Trades."""
global _existing_trades_cache
_existing_trades_cache = {}
def filter_new_trades_for_day(db_url, exchange_name, trades, day):
"""Filtert neue Trades für einen einzelnen Tag."""
if not trades:
return []
existing = get_existing_trades_for_day(db_url, exchange_name, day)
new_trades = []
for trade in trades:
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
if trade_key not in existing:
new_trades.append(trade)
return new_trades
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=5000):
"""Filtert neue Trades in Batches, gruppiert nach Tag."""
if not trades:
return []
# Gruppiere alle Trades nach Tag
trades_by_day = {}
for trade in trades:
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
if day not in trades_by_day:
trades_by_day[day] = []
trades_by_day[day].append(trade)
new_trades = []
total_days = len(trades_by_day)
for i, (day, day_trades) in enumerate(sorted(trades_by_day.items()), 1):
if i % 10 == 0 or i == 1:
logger.info(f"Checking day {i}/{total_days}: {day.strftime('%Y-%m-%d')} ({len(day_trades)} trades)...")
new_for_day = filter_new_trades_for_day(db_url, exchange_name, day_trades, day)
new_trades.extend(new_for_day)
# Kleine Pause um DB nicht zu überlasten
if i < total_days:
time.sleep(0.02)
return new_trades
def get_last_trade_timestamp(db_url: str, exchange_name: str) -> datetime.datetime:
"""Holt den Timestamp des letzten Trades für eine Exchange aus QuestDB."""
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
try:
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data.get('dataset'):
# QuestDB gibt Timestamps in Mikrosekunden oder ISO-Format zurück
ts_value = data['dataset'][0][0]
if isinstance(ts_value, str):
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
else:
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
except Exception as e:
logger.debug(f"Keine existierenden Daten für {exchange_name} oder DB nicht erreichbar: {e}")
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
def process_eix_streaming(db, db_url: str, exchange: BaseExchange, historical: bool = False):
"""Verarbeitet eine Exchange im Streaming-Modus um RAM zu sparen."""
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts}) - STREAMING...")
# Hole Liste der zu verarbeitenden Dateien
if historical:
files = exchange.get_files_to_process(limit=None, since_date=None)
else:
files = exchange.get_files_to_process(limit=None, since_date=last_ts)
if not files:
logger.info(f"Keine {exchange.name} Dateien zu verarbeiten.")
return
logger.info(f"{len(files)} {exchange.name} Dateien gefunden...")
total_new = 0
total_processed = 0
for i, file_item in enumerate(files, 1):
file_name = file_item.get('fileName', 'unknown').split('/')[-1]
logger.info(f"Verarbeite {exchange.name} Datei {i}/{len(files)}: {file_name}")
trades = exchange.fetch_trades_from_file(file_item)
if not trades:
logger.info(f" Keine Trades in {file_name}")
continue
total_processed += len(trades)
logger.info(f" {len(trades)} Trades geladen, filtere Duplikate...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
if new_trades:
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
total_new += len(new_trades)
logger.info(f" {len(new_trades)} neue Trades gespeichert (gesamt neu: {total_new})")
else:
logger.info(f" Keine neuen Trades in dieser Datei")
# Referenzen freigeben
del trades
del new_trades
time.sleep(0.1)
logger.info(f"{exchange.name} fertig: {total_new} neue Trades von {total_processed} verarbeitet.")
clear_trades_cache()
def process_standard_exchange(db, db_url: str, exchange: BaseExchange, historical: bool):
"""Verarbeitet einen Standard-Exchange mit Batch-Verarbeitung."""
try:
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts})...")
trades = exchange.fetch_latest_trades(include_yesterday=historical)
if not trades:
logger.info(f"Keine Trades von {exchange.name} erhalten.")
return
# Deduplizierung
logger.info(f"Filtere {len(trades)} Trades auf Duplikate...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
logger.info(f"Gefunden: {len(trades)} Trades gesamt, {len(new_trades)} sind neu.")
if new_trades:
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
logger.info(f"{len(new_trades)} neue Trades in QuestDB gespeichert.")
# Referenzen freigeben
del trades
if new_trades:
del new_trades
clear_trades_cache()
except Exception as e:
logger.error(f"Fehler bei Exchange {exchange.name}: {e}")
def run_task(historical=False):
"""Haupttask: Holt Trades von allen registrierten Exchanges."""
logger.info(f"Starte Trading Data Fetcher (Historical: {historical})...")
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
db_url = "http://questdb:9000"
# Streaming-Exchanges verarbeiten (große Datenmengen)
for exchange_class in STREAMING_EXCHANGES:
try:
exchange = exchange_class()
logger.info(f"Verarbeite {exchange.name} im Streaming-Modus...")
process_eix_streaming(db, db_url, exchange, historical=historical)
except Exception as e:
logger.error(f"Fehler bei Streaming-Exchange {exchange_class.__name__}: {e}")
# Standard-Exchanges verarbeiten
for exchange_class in STANDARD_EXCHANGES:
try:
exchange = exchange_class()
process_standard_exchange(db, db_url, exchange, historical)
except Exception as e:
logger.error(f"Fehler bei Exchange {exchange_class.__name__}: {e}")
logger.info("Alle Exchanges verarbeitet.")
def is_database_empty(db_url: str) -> bool:
"""Prüft ob die Datenbank leer ist oder die Tabelle nicht existiert."""
try:
response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data.get('dataset') and data['dataset'][0][0] > 0:
return False
except Exception:
pass
return True
def calculate_seconds_until_target(target_hour: int, target_minute: int = 0) -> int:
"""Berechnet Sekunden bis zur nächsten Zielzeit."""
now = datetime.datetime.now()
target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
# Wenn Zielzeit heute schon vorbei ist, nimm morgen
if target <= now:
target += datetime.timedelta(days=1)
return int((target - now).total_seconds())
def main():
logger.info("Trading Daemon gestartet.")
db_url = "http://questdb:9000"
# Startup: Initialer Sync
if is_database_empty(db_url):
logger.info("Datenbank ist leer. Starte initialen historischen Fetch...")
run_task(historical=True)
else:
logger.info("Existierende Daten gefunden. Starte Catch-up Sync...")
run_task(historical=False)
logger.info("Catch-up Sync abgeschlossen.")
# Scheduling Konfiguration
SCHEDULE_HOUR = 23
SCHEDULE_MINUTE = 0
last_run_date = None
logger.info(f"Warte auf täglichen Run um {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d}...")
while True:
now = datetime.datetime.now()
today = now.date()
# Prüfe ob wir heute schon gelaufen sind
already_ran_today = (last_run_date == today)
# Prüfe ob wir im Zeitfenster sind (23:00 - 23:59)
in_schedule_window = (now.hour == SCHEDULE_HOUR and now.minute >= SCHEDULE_MINUTE)
if in_schedule_window and not already_ran_today:
logger.info(f"Geplanter Task startet ({now.strftime('%Y-%m-%d %H:%M:%S')})...")
run_task(historical=False)
last_run_date = today
logger.info("Geplanter Task abgeschlossen. Warte auf nächsten Tag...")
# Dynamische Sleep-Zeit: Kurz vor Zielzeit öfter prüfen
seconds_until_target = calculate_seconds_until_target(SCHEDULE_HOUR, SCHEDULE_MINUTE)
if seconds_until_target > 3600:
# Mehr als 1 Stunde: Schlafe 30 Minuten
sleep_time = 1800
elif seconds_until_target > 300:
# 5 Minuten bis 1 Stunde: Schlafe 5 Minuten
sleep_time = 300
else:
# Unter 5 Minuten: Schlafe 30 Sekunden
sleep_time = 30
time.sleep(sleep_time)
if __name__ == "__main__":
main()