import time import logging import datetime import hashlib import os import requests from src.exchanges.eix import EIXExchange from src.exchanges.ls import LSExchange from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange from src.exchanges.gettex import GettexExchange from src.exchanges.stuttgart import StuttgartExchange from src.database.questdb_client import DatabaseClient logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("TradingDaemon") DB_USER = os.getenv("DB_USER", "admin") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None def get_trade_hash(trade): """Erstellt einen eindeutigen Hash für einen Trade.""" key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}" return hashlib.md5(key.encode()).hexdigest() def get_existing_trade_hashes(db_url, exchange_name, since_date): """Holt alle Trade-Hashes für eine Exchange seit einem bestimmten Datum.""" hashes = set() # Hole alle Trades seit dem Datum date_str = since_date.strftime('%Y-%m-%dT%H:%M:%S.000000Z') query = f"SELECT exchange, isin, timestamp, price, quantity FROM trades WHERE exchange = '{exchange_name}' AND timestamp >= '{date_str}'" try: response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60) if response.status_code == 200: data = response.json() if data.get('dataset'): for row in data['dataset']: exchange, isin, ts, price, qty = row # Konvertiere Timestamp if isinstance(ts, str): ts_iso = ts.replace('Z', '+00:00') else: ts_iso = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc).isoformat() key = f"{exchange}|{isin}|{ts_iso}|{price}|{qty}" hashes.add(hashlib.md5(key.encode()).hexdigest()) except Exception as e: logger.warning(f"Could not fetch existing trade hashes: {e}") return hashes def get_last_trade_timestamp(db_url, exchange_name): # QuestDB query: get the latest timestamp for a specific exchange query = f"trades where exchange = '{exchange_name}' latest by timestamp" try: # Using the /exec endpoint to get data response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: data = response.json() if data['dataset']: # QuestDB returns timestamp in micros since epoch by default in some views, or ISO # Let's assume the timestamp is in the dataset # ILP timestamps are stored as designated timestamps. ts_value = data['dataset'][0][0] # Adjust index based on column order if isinstance(ts_value, str): return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00')) else: return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc) except Exception as e: logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}") return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) def run_task(historical=False): logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...") # Initialize exchanges eix = EIXExchange() ls = LSExchange() # Neue Deutsche Börse Exchanges xetra = XetraExchange() frankfurt = FrankfurtExchange() quotrix = QuotrixExchange() gettex = GettexExchange() stuttgart = StuttgartExchange() # Pass last_ts to fetcher to allow smart filtering # daemon.py runs daily, so we want to fetch everything since DB state # BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument # We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop. # We will modify the loop below to handle args dynamically exchanges_to_process = [ (eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical (ls, {'include_yesterday': historical}), # Neue Exchanges (xetra, {'include_yesterday': historical}), (frankfurt, {'include_yesterday': historical}), (quotrix, {'include_yesterday': historical}), (gettex, {'include_yesterday': historical}), (stuttgart, {'include_yesterday': historical}), ] db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) for exchange, args in exchanges_to_process: try: db_url = "http://questdb:9000" last_ts = get_last_trade_timestamp(db_url, exchange.name) logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...") # Special handling for EIX to support smart filtering call_args = args.copy() if exchange.name == "EIX" and not historical: call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc) # Remove limit if we are filtering by date to ensure we get everything if 'limit' in call_args: call_args.pop('limit') trades = exchange.fetch_latest_trades(**call_args) if not trades: logger.info(f"No trades fetched from {exchange.name}.") continue # Hash-basierte Deduplizierung - IMMER prüfen! oldest_trade_ts = min(t.timestamp for t in trades) newest_trade_ts = max(t.timestamp for t in trades) # Hole Hashes für den Zeitraum der neuen Trades (plus 1 Tag Puffer) check_since = oldest_trade_ts - datetime.timedelta(days=1) existing_hashes = get_existing_trade_hashes(db_url, exchange.name, check_since) if existing_hashes: logger.info(f"Found {len(existing_hashes)} existing trade hashes in DB for period") # Filtere nur wirklich neue Trades new_trades = [] for t in trades: trade_hash = get_trade_hash(t) if trade_hash not in existing_hashes: new_trades.append(t) else: # Keine existierenden Hashes gefunden - alle Trades sind neu logger.info(f"No existing hashes found - all trades are new") new_trades = trades logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.") if new_trades: # Sort trades by timestamp before saving (QuestDB likes this) new_trades.sort(key=lambda x: x.timestamp) db.save_trades(new_trades) logger.info(f"Stored {len(new_trades)} new trades in QuestDB.") except Exception as e: logger.error(f"Error processing exchange {exchange.name}: {e}") def main(): logger.info("Trading Daemon started.") # 1. Startup Check: Ist die DB leer? db_url = "http://questdb:9000" is_empty = True try: # Prüfe ob bereits Trades in der Tabelle sind response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH) if response.status_code == 200: data = response.json() if data['dataset'] and data['dataset'][0][0] > 0: is_empty = False except Exception: # Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist is_empty = True if is_empty: logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...") run_task(historical=True) else: logger.info("Found existing data in database. Triggering catch-up sync...") # Run a normal task to fetch any missing data since the last run run_task(historical=False) logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.") while True: now = datetime.datetime.now() # Täglich um 23:00 Uhr if now.hour == 23 and now.minute == 0: run_task(historical=False) # Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern time.sleep(61) # Check alle 30 Sekunden time.sleep(30) if __name__ == "__main__": main()