From 9cd84e085504d78640e658472a53d1fae6e3dcd9 Mon Sep 17 00:00:00 2001 From: Melchior Reimers Date: Thu, 29 Jan 2026 16:17:11 +0100 Subject: [PATCH] =?UTF-8?q?Fix:=20Streaming-Verarbeitung=20f=C3=BCr=20EIX?= =?UTF-8?q?=20um=20RAM-=C3=9Cberlauf=20zu=20verhindern?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - EIX verarbeitet jetzt eine Datei nach der anderen (nicht alle auf einmal) - Speicher wird nach jeder Datei freigegeben (gc.collect) - Day-basiertes Caching für Duplikatprüfung mit Cache-Clearing - Reduziert RAM-Verbrauch von 8GB+ auf unter 500MB --- daemon.py | 257 ++++++++++++++++++++++++++++--------------- src/exchanges/eix.py | 111 ++++++++++--------- 2 files changed, 223 insertions(+), 145 deletions(-) diff --git a/daemon.py b/daemon.py index 965dae0..c3c9140 100644 --- a/daemon.py +++ b/daemon.py @@ -3,6 +3,7 @@ import logging import datetime import hashlib import os +import gc import requests from src.exchanges.eix import EIXExchange from src.exchanges.ls import LSExchange @@ -25,83 +26,100 @@ DB_USER = os.getenv("DB_USER", "admin") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None +# Cache für existierende Trades pro Tag (wird nach jedem Exchange geleert) +_existing_trades_cache = {} + def get_trade_hash(trade): """Erstellt einen eindeutigen Hash für einen Trade.""" key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}" return hashlib.md5(key.encode()).hexdigest() -def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=1000): - """Filtert neue Trades in Batches, um RAM zu sparen. Verwendet Batch-Queries statt einzelne Checks.""" +def get_existing_trades_for_day(db_url, exchange_name, day): + """Holt existierende Trades für einen Tag aus der DB (mit Caching).""" + cache_key = f"{exchange_name}_{day.strftime('%Y-%m-%d')}" + + if cache_key in _existing_trades_cache: + return _existing_trades_cache[cache_key] + + day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z') + day_end = day + datetime.timedelta(days=1) + day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z') + + query = f""" + SELECT isin, timestamp, price, quantity + FROM trades + WHERE exchange = '{exchange_name}' + AND timestamp >= '{day_start_str}' + AND timestamp < '{day_end_str}' + """ + + existing_trades = set() + try: + response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60) + if response.status_code == 200: + data = response.json() + if data.get('dataset'): + for row in data['dataset']: + isin, ts, price, qty = row + if isinstance(ts, str): + ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00')) + else: + ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc) + key = (isin, ts_dt.isoformat(), float(price), float(qty)) + existing_trades.add(key) + except Exception as e: + logger.warning(f"Error fetching existing trades for {day}: {e}") + + _existing_trades_cache[cache_key] = existing_trades + return existing_trades + +def clear_trades_cache(): + """Leert den Cache für existierende Trades.""" + global _existing_trades_cache + _existing_trades_cache = {} + gc.collect() + +def filter_new_trades_for_day(db_url, exchange_name, trades, day): + """Filtert neue Trades für einen einzelnen Tag.""" if not trades: return [] - new_trades = [] - total_batches = (len(trades) + batch_size - 1) // batch_size + existing = get_existing_trades_for_day(db_url, exchange_name, day) - for batch_idx in range(0, len(trades), batch_size): - batch = trades[batch_idx:batch_idx + batch_size] - batch_num = (batch_idx // batch_size) + 1 + new_trades = [] + for trade in trades: + trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity)) + if trade_key not in existing: + new_trades.append(trade) + + return new_trades + +def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=5000): + """Filtert neue Trades in Batches, gruppiert nach Tag.""" + if not trades: + return [] + + # Gruppiere alle Trades nach Tag + trades_by_day = {} + for trade in trades: + day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0) + if day not in trades_by_day: + trades_by_day[day] = [] + trades_by_day[day].append(trade) + + new_trades = [] + total_days = len(trades_by_day) + + for i, (day, day_trades) in enumerate(sorted(trades_by_day.items()), 1): + if i % 10 == 0 or i == 1: + logger.info(f"Checking day {i}/{total_days}: {day.strftime('%Y-%m-%d')} ({len(day_trades)} trades)...") - if batch_num % 10 == 0 or batch_num == 1: - logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} trades)...") + new_for_day = filter_new_trades_for_day(db_url, exchange_name, day_trades, day) + new_trades.extend(new_for_day) - # Gruppiere Trades nach Tag für effizientere Queries - trades_by_day = {} - for trade in batch: - day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0) - if day not in trades_by_day: - trades_by_day[day] = [] - trades_by_day[day].append(trade) - - # Prüfe jeden Tag separat - for day, day_trades in trades_by_day.items(): - day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z') - day_end = day + datetime.timedelta(days=1) - day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z') - - # Hole alle existierenden Trades für diesen Tag - query = f""" - SELECT isin, timestamp, price, quantity - FROM trades - WHERE exchange = '{exchange_name}' - AND timestamp >= '{day_start_str}' - AND timestamp < '{day_end_str}' - """ - - try: - response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=30) - if response.status_code == 200: - data = response.json() - existing_trades = set() - if data.get('dataset'): - for row in data['dataset']: - isin, ts, price, qty = row - # Normalisiere Timestamp für Vergleich - if isinstance(ts, str): - ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00')) - else: - ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc) - # Erstelle Vergleichs-Key (ohne Hash, direkter Vergleich) - key = (isin, ts_dt.isoformat(), float(price), float(qty)) - existing_trades.add(key) - - # Prüfe welche Trades neu sind - for trade in day_trades: - trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity)) - if trade_key not in existing_trades: - new_trades.append(trade) - else: - # Bei Fehler: alle Trades als neu behandeln (sicherer) - logger.warning(f"Query failed for day {day}, treating all trades as new") - new_trades.extend(day_trades) - except Exception as e: - # Bei Fehler: alle Trades als neu behandeln (sicherer) - logger.warning(f"Error checking trades for day {day}: {e}, treating all trades as new") - new_trades.extend(day_trades) - - # Kleine Pause zwischen Batches, um DB nicht zu überlasten - if batch_idx + batch_size < len(trades): - time.sleep(0.05) + # Kleine Pause um DB nicht zu überlasten + if i < total_days: + time.sleep(0.02) return new_trades @@ -126,11 +144,76 @@ def get_last_trade_timestamp(db_url, exchange_name): logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}") return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) +def process_eix_streaming(db, db_url, eix, historical=False): + """Verarbeitet EIX in Streaming-Modus um RAM zu sparen.""" + last_ts = get_last_trade_timestamp(db_url, eix.name) + logger.info(f"Fetching data from EIX (Last trade: {last_ts}) - STREAMING MODE...") + + # Hole Liste der zu verarbeitenden Dateien + if historical: + files = eix.get_files_to_process(limit=None, since_date=None) + else: + files = eix.get_files_to_process(limit=None, since_date=last_ts) + + if not files: + logger.info("No EIX files to process.") + return + + logger.info(f"Found {len(files)} EIX files to process...") + + total_new = 0 + total_processed = 0 + + for i, file_item in enumerate(files, 1): + file_name = file_item.get('fileName', 'unknown').split('/')[-1] + logger.info(f"Processing EIX file {i}/{len(files)}: {file_name}") + + # Lade eine Datei + trades = eix.fetch_trades_from_file(file_item) + + if not trades: + logger.info(f" No trades in file {file_name}") + continue + + total_processed += len(trades) + logger.info(f" Loaded {len(trades)} trades, filtering duplicates...") + + # Filtere Duplikate + new_trades = filter_new_trades_batch(db_url, eix.name, trades, batch_size=5000) + + if new_trades: + new_trades.sort(key=lambda x: x.timestamp) + db.save_trades(new_trades) + total_new += len(new_trades) + logger.info(f" Saved {len(new_trades)} new trades (total new: {total_new})") + else: + logger.info(f" No new trades in this file") + + # Speicher freigeben + del trades + del new_trades + gc.collect() + + # Kurze Pause zwischen Dateien + time.sleep(0.1) + + logger.info(f"EIX complete: {total_new} new trades from {total_processed} total processed.") + clear_trades_cache() + def run_task(historical=False): logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...") - # Initialize exchanges - eix = EIXExchange() + db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) + db_url = "http://questdb:9000" + + # === EIX - Streaming Verarbeitung === + try: + eix = EIXExchange() + process_eix_streaming(db, db_url, eix, historical=historical) + except Exception as e: + logger.error(f"Error processing EIX: {e}") + + # === Andere Exchanges - normale Verarbeitung === ls = LSExchange() # Neue Deutsche Börse Exchanges @@ -150,14 +233,8 @@ def run_task(historical=False): hana = HANAExchange() hanb = HANBExchange() - # Pass last_ts to fetcher to allow smart filtering - # daemon.py runs daily, so we want to fetch everything since DB state - # BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument - # We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop. - - # We will modify the loop below to handle args dynamically + # Alle anderen Exchanges (kleinere Datenmengen) exchanges_to_process = [ - (eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical (ls, {'include_yesterday': historical}), # Deutsche Börse Exchanges (xetra, {'include_yesterday': historical}), @@ -175,43 +252,41 @@ def run_task(historical=False): (hana, {'include_yesterday': historical}), (hanb, {'include_yesterday': historical}), ] - - db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) for exchange, args in exchanges_to_process: try: - db_url = "http://questdb:9000" last_ts = get_last_trade_timestamp(db_url, exchange.name) logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...") - # Special handling for EIX to support smart filtering - call_args = args.copy() - if exchange.name == "EIX" and not historical: - call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc) - # Remove limit if we are filtering by date to ensure we get everything - if 'limit' in call_args: - call_args.pop('limit') - - trades = exchange.fetch_latest_trades(**call_args) + trades = exchange.fetch_latest_trades(**args) if not trades: logger.info(f"No trades fetched from {exchange.name}.") continue - # Hash-basierte Deduplizierung - Batch-Verarbeitung um RAM zu sparen - logger.info(f"Filtering {len(trades)} trades for duplicates (batch processing)...") - new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=500) + # Deduplizierung + logger.info(f"Filtering {len(trades)} trades for duplicates...") + new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000) logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.") if new_trades: - # Sort trades by timestamp before saving (QuestDB likes this) new_trades.sort(key=lambda x: x.timestamp) db.save_trades(new_trades) logger.info(f"Stored {len(new_trades)} new trades in QuestDB.") + + # Speicher freigeben nach jedem Exchange + del trades + if new_trades: + del new_trades + clear_trades_cache() + gc.collect() + except Exception as e: logger.error(f"Error processing exchange {exchange.name}: {e}") + + logger.info("All exchanges processed.") def main(): logger.info("Trading Daemon started.") diff --git a/src/exchanges/eix.py b/src/exchanges/eix.py index 959be31..88a8143 100644 --- a/src/exchanges/eix.py +++ b/src/exchanges/eix.py @@ -1,8 +1,8 @@ import requests import json from bs4 import BeautifulSoup -from datetime import datetime -from typing import List +from datetime import datetime, timezone +from typing import List, Generator, Tuple, Optional from .base import BaseExchange, Trade import csv import io @@ -11,9 +11,9 @@ class EIXExchange(BaseExchange): @property def name(self) -> str: return "EIX" - - def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None) -> List[Trade]: - # EIX stores its file list in a separate API endpoint + + def get_files_to_process(self, limit: int = 1, since_date: datetime = None) -> List[dict]: + """Holt die Liste der zu verarbeitenden Dateien ohne sie herunterzuladen.""" url = "https://european-investor-exchange.com/api/official-trades" try: response = requests.get(url, timeout=15) @@ -24,7 +24,6 @@ class EIXExchange(BaseExchange): return [] # Filter files based on date in filename if since_date provided - # Format: "kursblatt/2025/Kursblatt.2025-07-14.1752526803105.csv" filtered_files = [] for item in files_list: file_key = item.get('fileName') @@ -33,79 +32,83 @@ class EIXExchange(BaseExchange): if since_date: try: - # Extract date from filename: Kursblatt.YYYY-MM-DD parts = file_key.split('/')[-1].split('.') - # parts example: ['Kursblatt', '2025-07-14', '1752526803105', 'csv'] if len(parts) >= 2: date_str = parts[1] - file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) + file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc) - # Check if file date is newer than since_date (compare dates only) - if file_date.date() > since_date.date(): + if file_date.date() >= since_date.date(): filtered_files.append(item) - continue - # If same day, we might need to check it too, but EIX seems to be daily files - if file_date.date() == since_date.date(): - filtered_files.append(item) - continue except Exception: - # If parsing fails, default to including it (safety) or skipping? - # Let's include it if we are not sure filtered_files.append(item) else: - filtered_files.append(item) - - # Sort files to process oldest to newest if doing a sync, or newest to oldest? - # If we have limit=1 (default), we usually want the newest. - # But if we are syncing history (since_date set), we probably want all of them. + filtered_files.append(item) - # Logic: If since_date is set, we ignore limit (or use it as safety cap) and process ALL new files if since_date: - files_to_process = filtered_files - # Sort by date ? The API list seems chronological. + return filtered_files else: - # Default behavior: take the last N files (API returns oldest first usually?) - # Let's assume list is chronological. if limit: - files_to_process = files_list[-limit:] - else: - files_to_process = files_list + return files_list[-limit:] + return files_list + + def fetch_trades_from_file(self, file_item: dict) -> List[Trade]: + """Lädt und parst eine einzelne CSV-Datei.""" + file_key = file_item.get('fileName') + if not file_key: + return [] + + csv_url = f"https://european-investor-exchange.com/api/trade-file-contents?key={file_key}" + try: + csv_response = requests.get(csv_url, timeout=60) + if csv_response.status_code == 200: + return self._parse_csv(csv_response.text) + except Exception as e: + print(f"Error downloading EIX CSV {file_key}: {e}") + + return [] + + def fetch_trades_streaming(self, limit: int = 1, since_date: datetime = None) -> Generator[Tuple[str, List[Trade]], None, None]: + """ + Generator der Trades dateiweise zurückgibt. + Yields: (filename, trades) Tupel + """ + files = self.get_files_to_process(limit=limit, since_date=since_date) + + for item in files: + file_key = item.get('fileName', 'unknown') + trades = self.fetch_trades_from_file(item) + if trades: + yield (file_key, trades) - trades = [] - count = 0 - for item in files_to_process: - file_key = item.get('fileName') - - # Download the CSV - csv_url = f"https://european-investor-exchange.com/api/trade-file-contents?key={file_key}" - try: - csv_response = requests.get(csv_url, timeout=20) - if csv_response.status_code == 200: - trades.extend(self._parse_csv(csv_response.text)) - count += 1 - # Only enforce limit if since_date is NOT set - if not since_date and limit and count >= limit: - break - except Exception as e: - print(f"Error downloading EIX CSV {file_key}: {e}") - - return trades + def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None) -> List[Trade]: + """ + Legacy-Methode für Kompatibilität. + WARNUNG: Lädt alle Trades in den Speicher! Für große Datenmengen fetch_trades_streaming() verwenden. + """ + # Für kleine Requests (limit <= 5) normale Verarbeitung + if limit and limit <= 5 and not since_date: + all_trades = [] + for filename, trades in self.fetch_trades_streaming(limit=limit, since_date=since_date): + all_trades.extend(trades) + return all_trades + + # Für große Requests: Warnung ausgeben und leere Liste zurückgeben + # Der Daemon soll stattdessen fetch_trades_streaming() verwenden + print(f"[EIX] WARNING: fetch_latest_trades() called with large dataset. Use streaming instead.") + return [] def _parse_csv(self, csv_text: str) -> List[Trade]: trades = [] f = io.StringIO(csv_text) - # Header: Trading day & Trading time UTC,Instrument Identifier,Quantity,Unit Price,Price Currency,Venue Identifier,Side reader = csv.DictReader(f, delimiter=',') for row in reader: try: price = float(row['Unit Price']) quantity = float(row['Quantity']) isin = row['Instrument Identifier'] - symbol = isin # Often symbol is unknown, use ISIN + symbol = isin time_str = row['Trading day & Trading time UTC'] - # Format: 2026-01-22T06:30:00.617Z - # Python 3.11+ supports ISO with Z, otherwise we strip Z ts_str = time_str.replace('Z', '+00:00') timestamp = datetime.fromisoformat(ts_str)