import time import logging import datetime import hashlib import os import requests from src.exchanges.eix import EIXExchange from src.exchanges.ls import LSExchange from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange from src.exchanges.gettex import GettexExchange from src.exchanges.stuttgart import StuttgartExchange from src.exchanges.boersenag import ( DUSAExchange, DUSBExchange, DUSCExchange, DUSDExchange, HAMAExchange, HAMBExchange, HANAExchange, HANBExchange ) from src.database.questdb_client import DatabaseClient logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("TradingDaemon") DB_USER = os.getenv("DB_USER", "admin") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None def get_trade_hash(trade): """Erstellt einen eindeutigen Hash für einen Trade.""" key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}" return hashlib.md5(key.encode()).hexdigest() def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=1000): """Filtert neue Trades in Batches, um RAM zu sparen. Verwendet Batch-Queries statt einzelne Checks.""" if not trades: return [] new_trades = [] total_batches = (len(trades) + batch_size - 1) // batch_size for batch_idx in range(0, len(trades), batch_size): batch = trades[batch_idx:batch_idx + batch_size] batch_num = (batch_idx // batch_size) + 1 if batch_num % 10 == 0 or batch_num == 1: logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} trades)...") # Gruppiere Trades nach Tag für effizientere Queries trades_by_day = {} for trade in batch: day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0) if day not in trades_by_day: trades_by_day[day] = [] trades_by_day[day].append(trade) # Prüfe jeden Tag separat for day, day_trades in trades_by_day.items(): day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z') day_end = day + datetime.timedelta(days=1) day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z') # Hole alle existierenden Trades für diesen Tag query = f""" SELECT isin, timestamp, price, quantity FROM trades WHERE exchange = '{exchange_name}' AND timestamp >= '{day_start_str}' AND timestamp < '{day_end_str}' """ try: response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=30) if response.status_code == 200: data = response.json() existing_trades = set() if data.get('dataset'): for row in data['dataset']: isin, ts, price, qty = row # Normalisiere Timestamp für Vergleich if isinstance(ts, str): ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00')) else: ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc) # Erstelle Vergleichs-Key (ohne Hash, direkter Vergleich) key = (isin, ts_dt.isoformat(), float(price), float(qty)) existing_trades.add(key) # Prüfe welche Trades neu sind for trade in day_trades: trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity)) if trade_key not in existing_trades: new_trades.append(trade) else: # Bei Fehler: alle Trades als neu behandeln (sicherer) logger.warning(f"Query failed for day {day}, treating all trades as new") new_trades.extend(day_trades) except Exception as e: # Bei Fehler: alle Trades als neu behandeln (sicherer) logger.warning(f"Error checking trades for day {day}: {e}, treating all trades as new") new_trades.extend(day_trades) # Kleine Pause zwischen Batches, um DB nicht zu überlasten if batch_idx + batch_size < len(trades): time.sleep(0.05) return new_trades def get_last_trade_timestamp(db_url, exchange_name): # QuestDB query: get the latest timestamp for a specific exchange query = f"trades where exchange = '{exchange_name}' latest by timestamp" try: # Using the /exec endpoint to get data response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: data = response.json() if data['dataset']: # QuestDB returns timestamp in micros since epoch by default in some views, or ISO # Let's assume the timestamp is in the dataset # ILP timestamps are stored as designated timestamps. ts_value = data['dataset'][0][0] # Adjust index based on column order if isinstance(ts_value, str): return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00')) else: return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc) except Exception as e: logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}") return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) def run_task(historical=False): logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...") # Initialize exchanges eix = EIXExchange() ls = LSExchange() # Neue Deutsche Börse Exchanges xetra = XetraExchange() frankfurt = FrankfurtExchange() quotrix = QuotrixExchange() gettex = GettexExchange() stuttgart = StuttgartExchange() # Börsenag Exchanges (Düsseldorf, Hamburg, Hannover) dusa = DUSAExchange() dusb = DUSBExchange() dusc = DUSCExchange() dusd = DUSDExchange() hama = HAMAExchange() hamb = HAMBExchange() hana = HANAExchange() hanb = HANBExchange() # Pass last_ts to fetcher to allow smart filtering # daemon.py runs daily, so we want to fetch everything since DB state # BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument # We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop. # We will modify the loop below to handle args dynamically exchanges_to_process = [ (eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical (ls, {'include_yesterday': historical}), # Deutsche Börse Exchanges (xetra, {'include_yesterday': historical}), (frankfurt, {'include_yesterday': historical}), (quotrix, {'include_yesterday': historical}), (gettex, {'include_yesterday': historical}), (stuttgart, {'include_yesterday': historical}), # Börsenag Exchanges (Düsseldorf, Hamburg, Hannover) (dusa, {'include_yesterday': historical}), (dusb, {'include_yesterday': historical}), (dusc, {'include_yesterday': historical}), (dusd, {'include_yesterday': historical}), (hama, {'include_yesterday': historical}), (hamb, {'include_yesterday': historical}), (hana, {'include_yesterday': historical}), (hanb, {'include_yesterday': historical}), ] db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) for exchange, args in exchanges_to_process: try: db_url = "http://questdb:9000" last_ts = get_last_trade_timestamp(db_url, exchange.name) logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...") # Special handling for EIX to support smart filtering call_args = args.copy() if exchange.name == "EIX" and not historical: call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc) # Remove limit if we are filtering by date to ensure we get everything if 'limit' in call_args: call_args.pop('limit') trades = exchange.fetch_latest_trades(**call_args) if not trades: logger.info(f"No trades fetched from {exchange.name}.") continue # Hash-basierte Deduplizierung - Batch-Verarbeitung um RAM zu sparen logger.info(f"Filtering {len(trades)} trades for duplicates (batch processing)...") new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=500) logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.") if new_trades: # Sort trades by timestamp before saving (QuestDB likes this) new_trades.sort(key=lambda x: x.timestamp) db.save_trades(new_trades) logger.info(f"Stored {len(new_trades)} new trades in QuestDB.") except Exception as e: logger.error(f"Error processing exchange {exchange.name}: {e}") def main(): logger.info("Trading Daemon started.") # 1. Startup Check: Ist die DB leer? db_url = "http://questdb:9000" is_empty = True try: # Prüfe ob bereits Trades in der Tabelle sind response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH) if response.status_code == 200: data = response.json() if data['dataset'] and data['dataset'][0][0] > 0: is_empty = False except Exception: # Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist is_empty = True if is_empty: logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...") run_task(historical=True) else: logger.info("Found existing data in database. Triggering catch-up sync...") # Run a normal task to fetch any missing data since the last run run_task(historical=False) logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.") while True: now = datetime.datetime.now() # Täglich um 23:00 Uhr if now.hour == 23 and now.minute == 0: run_task(historical=False) # Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern time.sleep(61) # Check alle 30 Sekunden time.sleep(30) if __name__ == "__main__": main()