import time import logging import datetime import hashlib import os import requests from typing import List, Type from src.exchanges.base import BaseExchange from src.exchanges.eix import EIXExchange from src.exchanges.ls import LSExchange from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange from src.exchanges.gettex import GettexExchange from src.exchanges.stuttgart import StuttgartExchange from src.exchanges.boersenag import ( DUSAExchange, DUSBExchange, DUSCExchange, DUSDExchange, HAMAExchange, HAMBExchange, HANAExchange, HANBExchange ) from src.database.questdb_client import DatabaseClient logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("TradingDaemon") DB_USER = os.getenv("DB_USER", "admin") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None # ============================================================================= # Exchange Registry - Neue Börsen hier hinzufügen # ============================================================================= # Exchanges die Streaming-Verarbeitung benötigen (große Datenmengen) STREAMING_EXCHANGES: List[Type[BaseExchange]] = [ EIXExchange, ] # Standard-Exchanges (normale Batch-Verarbeitung) STANDARD_EXCHANGES: List[Type[BaseExchange]] = [ # Lang & Schwarz LSExchange, # Deutsche Börse XetraExchange, FrankfurtExchange, QuotrixExchange, # Weitere Börsen GettexExchange, StuttgartExchange, # Börsenag (Düsseldorf, Hamburg, Hannover) DUSAExchange, DUSBExchange, DUSCExchange, DUSDExchange, HAMAExchange, HAMBExchange, HANAExchange, HANBExchange, ] # ============================================================================= # Trades Cache # ============================================================================= # Cache für existierende Trades pro Tag (wird nach jedem Exchange geleert) _existing_trades_cache = {} def get_trade_hash(trade): """Erstellt einen eindeutigen Hash für einen Trade.""" key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}" return hashlib.md5(key.encode()).hexdigest() def get_existing_trades_for_day(db_url, exchange_name, day): """Holt existierende Trades für einen Tag aus der DB (mit Caching).""" cache_key = f"{exchange_name}_{day.strftime('%Y-%m-%d')}" if cache_key in _existing_trades_cache: return _existing_trades_cache[cache_key] day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z') day_end = day + datetime.timedelta(days=1) day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z') query = f""" SELECT isin, timestamp, price, quantity FROM trades WHERE exchange = '{exchange_name}' AND timestamp >= '{day_start_str}' AND timestamp < '{day_end_str}' """ existing_trades = set() try: response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60) if response.status_code == 200: data = response.json() if data.get('dataset'): for row in data['dataset']: isin, ts, price, qty = row if isinstance(ts, str): ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00')) else: ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc) key = (isin, ts_dt.isoformat(), float(price), float(qty)) existing_trades.add(key) except Exception as e: logger.warning(f"Error fetching existing trades for {day}: {e}") _existing_trades_cache[cache_key] = existing_trades return existing_trades def clear_trades_cache(): """Leert den Cache für existierende Trades.""" global _existing_trades_cache _existing_trades_cache = {} def filter_new_trades_for_day(db_url, exchange_name, trades, day): """Filtert neue Trades für einen einzelnen Tag.""" if not trades: return [] existing = get_existing_trades_for_day(db_url, exchange_name, day) new_trades = [] for trade in trades: trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity)) if trade_key not in existing: new_trades.append(trade) return new_trades def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=5000): """Filtert neue Trades in Batches, gruppiert nach Tag.""" if not trades: return [] # Gruppiere alle Trades nach Tag trades_by_day = {} for trade in trades: day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0) if day not in trades_by_day: trades_by_day[day] = [] trades_by_day[day].append(trade) new_trades = [] total_days = len(trades_by_day) for i, (day, day_trades) in enumerate(sorted(trades_by_day.items()), 1): if i % 10 == 0 or i == 1: logger.info(f"Checking day {i}/{total_days}: {day.strftime('%Y-%m-%d')} ({len(day_trades)} trades)...") new_for_day = filter_new_trades_for_day(db_url, exchange_name, day_trades, day) new_trades.extend(new_for_day) # Kleine Pause um DB nicht zu überlasten if i < total_days: time.sleep(0.02) return new_trades def get_last_trade_timestamp(db_url: str, exchange_name: str) -> datetime.datetime: """Holt den Timestamp des letzten Trades für eine Exchange aus QuestDB.""" query = f"trades where exchange = '{exchange_name}' latest by timestamp" try: response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: data = response.json() if data.get('dataset'): # QuestDB gibt Timestamps in Mikrosekunden oder ISO-Format zurück ts_value = data['dataset'][0][0] if isinstance(ts_value, str): return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00')) else: return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc) except Exception as e: logger.debug(f"Keine existierenden Daten für {exchange_name} oder DB nicht erreichbar: {e}") return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) def process_eix_streaming(db, db_url: str, exchange: BaseExchange, historical: bool = False): """Verarbeitet eine Exchange im Streaming-Modus um RAM zu sparen.""" last_ts = get_last_trade_timestamp(db_url, exchange.name) logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts}) - STREAMING...") # Hole Liste der zu verarbeitenden Dateien if historical: files = exchange.get_files_to_process(limit=None, since_date=None) else: files = exchange.get_files_to_process(limit=None, since_date=last_ts) if not files: logger.info(f"Keine {exchange.name} Dateien zu verarbeiten.") return logger.info(f"{len(files)} {exchange.name} Dateien gefunden...") total_new = 0 total_processed = 0 for i, file_item in enumerate(files, 1): file_name = file_item.get('fileName', 'unknown').split('/')[-1] logger.info(f"Verarbeite {exchange.name} Datei {i}/{len(files)}: {file_name}") trades = exchange.fetch_trades_from_file(file_item) if not trades: logger.info(f" Keine Trades in {file_name}") continue total_processed += len(trades) logger.info(f" {len(trades)} Trades geladen, filtere Duplikate...") new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000) if new_trades: new_trades.sort(key=lambda x: x.timestamp) db.save_trades(new_trades) total_new += len(new_trades) logger.info(f" {len(new_trades)} neue Trades gespeichert (gesamt neu: {total_new})") else: logger.info(f" Keine neuen Trades in dieser Datei") # Referenzen freigeben del trades del new_trades time.sleep(0.1) logger.info(f"{exchange.name} fertig: {total_new} neue Trades von {total_processed} verarbeitet.") clear_trades_cache() def process_standard_exchange(db, db_url: str, exchange: BaseExchange, historical: bool): """Verarbeitet einen Standard-Exchange mit Batch-Verarbeitung.""" try: last_ts = get_last_trade_timestamp(db_url, exchange.name) logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts})...") trades = exchange.fetch_latest_trades(include_yesterday=historical) if not trades: logger.info(f"Keine Trades von {exchange.name} erhalten.") return # Deduplizierung logger.info(f"Filtere {len(trades)} Trades auf Duplikate...") new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000) logger.info(f"Gefunden: {len(trades)} Trades gesamt, {len(new_trades)} sind neu.") if new_trades: new_trades.sort(key=lambda x: x.timestamp) db.save_trades(new_trades) logger.info(f"{len(new_trades)} neue Trades in QuestDB gespeichert.") # Referenzen freigeben del trades if new_trades: del new_trades clear_trades_cache() except Exception as e: logger.error(f"Fehler bei Exchange {exchange.name}: {e}") def run_task(historical=False): """Haupttask: Holt Trades von allen registrierten Exchanges.""" logger.info(f"Starte Trading Data Fetcher (Historical: {historical})...") db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) db_url = "http://questdb:9000" # Streaming-Exchanges verarbeiten (große Datenmengen) for exchange_class in STREAMING_EXCHANGES: try: exchange = exchange_class() logger.info(f"Verarbeite {exchange.name} im Streaming-Modus...") process_eix_streaming(db, db_url, exchange, historical=historical) except Exception as e: logger.error(f"Fehler bei Streaming-Exchange {exchange_class.__name__}: {e}") # Standard-Exchanges verarbeiten for exchange_class in STANDARD_EXCHANGES: try: exchange = exchange_class() process_standard_exchange(db, db_url, exchange, historical) except Exception as e: logger.error(f"Fehler bei Exchange {exchange_class.__name__}: {e}") logger.info("Alle Exchanges verarbeitet.") def is_database_empty(db_url: str) -> bool: """Prüft ob die Datenbank leer ist oder die Tabelle nicht existiert.""" try: response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH) if response.status_code == 200: data = response.json() if data.get('dataset') and data['dataset'][0][0] > 0: return False except Exception: pass return True def calculate_seconds_until_target(target_hour: int, target_minute: int = 0) -> int: """Berechnet Sekunden bis zur nächsten Zielzeit.""" now = datetime.datetime.now() target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0) # Wenn Zielzeit heute schon vorbei ist, nimm morgen if target <= now: target += datetime.timedelta(days=1) return int((target - now).total_seconds()) def main(): logger.info("Trading Daemon gestartet.") db_url = "http://questdb:9000" # Startup: Initialer Sync if is_database_empty(db_url): logger.info("Datenbank ist leer. Starte initialen historischen Fetch...") run_task(historical=True) else: logger.info("Existierende Daten gefunden. Starte Catch-up Sync...") run_task(historical=False) logger.info("Catch-up Sync abgeschlossen.") # Scheduling Konfiguration SCHEDULE_HOUR = 23 SCHEDULE_MINUTE = 0 last_run_date = None logger.info(f"Warte auf täglichen Run um {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d}...") while True: now = datetime.datetime.now() today = now.date() # Prüfe ob wir heute schon gelaufen sind already_ran_today = (last_run_date == today) # Prüfe ob wir im Zeitfenster sind (23:00 - 23:59) in_schedule_window = (now.hour == SCHEDULE_HOUR and now.minute >= SCHEDULE_MINUTE) if in_schedule_window and not already_ran_today: logger.info(f"Geplanter Task startet ({now.strftime('%Y-%m-%d %H:%M:%S')})...") run_task(historical=False) last_run_date = today logger.info("Geplanter Task abgeschlossen. Warte auf nächsten Tag...") # Dynamische Sleep-Zeit: Kurz vor Zielzeit öfter prüfen seconds_until_target = calculate_seconds_until_target(SCHEDULE_HOUR, SCHEDULE_MINUTE) if seconds_until_target > 3600: # Mehr als 1 Stunde: Schlafe 30 Minuten sleep_time = 1800 elif seconds_until_target > 300: # 5 Minuten bis 1 Stunde: Schlafe 5 Minuten sleep_time = 300 else: # Unter 5 Minuten: Schlafe 30 Sekunden sleep_time = 30 time.sleep(sleep_time) if __name__ == "__main__": main()