Fix: Streaming-Verarbeitung für EIX um RAM-Überlauf zu verhindern
Some checks failed
Deployment / deploy-docker (push) Has been cancelled

- EIX verarbeitet jetzt eine Datei nach der anderen (nicht alle auf einmal)
- Speicher wird nach jeder Datei freigegeben (gc.collect)
- Day-basiertes Caching für Duplikatprüfung mit Cache-Clearing
- Reduziert RAM-Verbrauch von 8GB+ auf unter 500MB
This commit is contained in:
Melchior Reimers
2026-01-29 16:17:11 +01:00
parent f325941e24
commit 9cd84e0855
2 changed files with 223 additions and 145 deletions

257
daemon.py
View File

@@ -3,6 +3,7 @@ import logging
import datetime
import hashlib
import os
import gc
import requests
from src.exchanges.eix import EIXExchange
from src.exchanges.ls import LSExchange
@@ -25,83 +26,100 @@ DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
# Cache für existierende Trades pro Tag (wird nach jedem Exchange geleert)
_existing_trades_cache = {}
def get_trade_hash(trade):
"""Erstellt einen eindeutigen Hash für einen Trade."""
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
return hashlib.md5(key.encode()).hexdigest()
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=1000):
"""Filtert neue Trades in Batches, um RAM zu sparen. Verwendet Batch-Queries statt einzelne Checks."""
def get_existing_trades_for_day(db_url, exchange_name, day):
"""Holt existierende Trades für einen Tag aus der DB (mit Caching)."""
cache_key = f"{exchange_name}_{day.strftime('%Y-%m-%d')}"
if cache_key in _existing_trades_cache:
return _existing_trades_cache[cache_key]
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
day_end = day + datetime.timedelta(days=1)
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
query = f"""
SELECT isin, timestamp, price, quantity
FROM trades
WHERE exchange = '{exchange_name}'
AND timestamp >= '{day_start_str}'
AND timestamp < '{day_end_str}'
"""
existing_trades = set()
try:
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60)
if response.status_code == 200:
data = response.json()
if data.get('dataset'):
for row in data['dataset']:
isin, ts, price, qty = row
if isinstance(ts, str):
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
key = (isin, ts_dt.isoformat(), float(price), float(qty))
existing_trades.add(key)
except Exception as e:
logger.warning(f"Error fetching existing trades for {day}: {e}")
_existing_trades_cache[cache_key] = existing_trades
return existing_trades
def clear_trades_cache():
"""Leert den Cache für existierende Trades."""
global _existing_trades_cache
_existing_trades_cache = {}
gc.collect()
def filter_new_trades_for_day(db_url, exchange_name, trades, day):
"""Filtert neue Trades für einen einzelnen Tag."""
if not trades:
return []
new_trades = []
total_batches = (len(trades) + batch_size - 1) // batch_size
existing = get_existing_trades_for_day(db_url, exchange_name, day)
for batch_idx in range(0, len(trades), batch_size):
batch = trades[batch_idx:batch_idx + batch_size]
batch_num = (batch_idx // batch_size) + 1
new_trades = []
for trade in trades:
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
if trade_key not in existing:
new_trades.append(trade)
return new_trades
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=5000):
"""Filtert neue Trades in Batches, gruppiert nach Tag."""
if not trades:
return []
# Gruppiere alle Trades nach Tag
trades_by_day = {}
for trade in trades:
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
if day not in trades_by_day:
trades_by_day[day] = []
trades_by_day[day].append(trade)
new_trades = []
total_days = len(trades_by_day)
for i, (day, day_trades) in enumerate(sorted(trades_by_day.items()), 1):
if i % 10 == 0 or i == 1:
logger.info(f"Checking day {i}/{total_days}: {day.strftime('%Y-%m-%d')} ({len(day_trades)} trades)...")
if batch_num % 10 == 0 or batch_num == 1:
logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} trades)...")
new_for_day = filter_new_trades_for_day(db_url, exchange_name, day_trades, day)
new_trades.extend(new_for_day)
# Gruppiere Trades nach Tag für effizientere Queries
trades_by_day = {}
for trade in batch:
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
if day not in trades_by_day:
trades_by_day[day] = []
trades_by_day[day].append(trade)
# Prüfe jeden Tag separat
for day, day_trades in trades_by_day.items():
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
day_end = day + datetime.timedelta(days=1)
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
# Hole alle existierenden Trades für diesen Tag
query = f"""
SELECT isin, timestamp, price, quantity
FROM trades
WHERE exchange = '{exchange_name}'
AND timestamp >= '{day_start_str}'
AND timestamp < '{day_end_str}'
"""
try:
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=30)
if response.status_code == 200:
data = response.json()
existing_trades = set()
if data.get('dataset'):
for row in data['dataset']:
isin, ts, price, qty = row
# Normalisiere Timestamp für Vergleich
if isinstance(ts, str):
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
# Erstelle Vergleichs-Key (ohne Hash, direkter Vergleich)
key = (isin, ts_dt.isoformat(), float(price), float(qty))
existing_trades.add(key)
# Prüfe welche Trades neu sind
for trade in day_trades:
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
if trade_key not in existing_trades:
new_trades.append(trade)
else:
# Bei Fehler: alle Trades als neu behandeln (sicherer)
logger.warning(f"Query failed for day {day}, treating all trades as new")
new_trades.extend(day_trades)
except Exception as e:
# Bei Fehler: alle Trades als neu behandeln (sicherer)
logger.warning(f"Error checking trades for day {day}: {e}, treating all trades as new")
new_trades.extend(day_trades)
# Kleine Pause zwischen Batches, um DB nicht zu überlasten
if batch_idx + batch_size < len(trades):
time.sleep(0.05)
# Kleine Pause um DB nicht zu überlasten
if i < total_days:
time.sleep(0.02)
return new_trades
@@ -126,11 +144,76 @@ def get_last_trade_timestamp(db_url, exchange_name):
logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}")
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
def process_eix_streaming(db, db_url, eix, historical=False):
"""Verarbeitet EIX in Streaming-Modus um RAM zu sparen."""
last_ts = get_last_trade_timestamp(db_url, eix.name)
logger.info(f"Fetching data from EIX (Last trade: {last_ts}) - STREAMING MODE...")
# Hole Liste der zu verarbeitenden Dateien
if historical:
files = eix.get_files_to_process(limit=None, since_date=None)
else:
files = eix.get_files_to_process(limit=None, since_date=last_ts)
if not files:
logger.info("No EIX files to process.")
return
logger.info(f"Found {len(files)} EIX files to process...")
total_new = 0
total_processed = 0
for i, file_item in enumerate(files, 1):
file_name = file_item.get('fileName', 'unknown').split('/')[-1]
logger.info(f"Processing EIX file {i}/{len(files)}: {file_name}")
# Lade eine Datei
trades = eix.fetch_trades_from_file(file_item)
if not trades:
logger.info(f" No trades in file {file_name}")
continue
total_processed += len(trades)
logger.info(f" Loaded {len(trades)} trades, filtering duplicates...")
# Filtere Duplikate
new_trades = filter_new_trades_batch(db_url, eix.name, trades, batch_size=5000)
if new_trades:
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
total_new += len(new_trades)
logger.info(f" Saved {len(new_trades)} new trades (total new: {total_new})")
else:
logger.info(f" No new trades in this file")
# Speicher freigeben
del trades
del new_trades
gc.collect()
# Kurze Pause zwischen Dateien
time.sleep(0.1)
logger.info(f"EIX complete: {total_new} new trades from {total_processed} total processed.")
clear_trades_cache()
def run_task(historical=False):
logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...")
# Initialize exchanges
eix = EIXExchange()
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
db_url = "http://questdb:9000"
# === EIX - Streaming Verarbeitung ===
try:
eix = EIXExchange()
process_eix_streaming(db, db_url, eix, historical=historical)
except Exception as e:
logger.error(f"Error processing EIX: {e}")
# === Andere Exchanges - normale Verarbeitung ===
ls = LSExchange()
# Neue Deutsche Börse Exchanges
@@ -150,14 +233,8 @@ def run_task(historical=False):
hana = HANAExchange()
hanb = HANBExchange()
# Pass last_ts to fetcher to allow smart filtering
# daemon.py runs daily, so we want to fetch everything since DB state
# BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument
# We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop.
# We will modify the loop below to handle args dynamically
# Alle anderen Exchanges (kleinere Datenmengen)
exchanges_to_process = [
(eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical
(ls, {'include_yesterday': historical}),
# Deutsche Börse Exchanges
(xetra, {'include_yesterday': historical}),
@@ -175,43 +252,41 @@ def run_task(historical=False):
(hana, {'include_yesterday': historical}),
(hanb, {'include_yesterday': historical}),
]
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
for exchange, args in exchanges_to_process:
try:
db_url = "http://questdb:9000"
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...")
# Special handling for EIX to support smart filtering
call_args = args.copy()
if exchange.name == "EIX" and not historical:
call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc)
# Remove limit if we are filtering by date to ensure we get everything
if 'limit' in call_args:
call_args.pop('limit')
trades = exchange.fetch_latest_trades(**call_args)
trades = exchange.fetch_latest_trades(**args)
if not trades:
logger.info(f"No trades fetched from {exchange.name}.")
continue
# Hash-basierte Deduplizierung - Batch-Verarbeitung um RAM zu sparen
logger.info(f"Filtering {len(trades)} trades for duplicates (batch processing)...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=500)
# Deduplizierung
logger.info(f"Filtering {len(trades)} trades for duplicates...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
if new_trades:
# Sort trades by timestamp before saving (QuestDB likes this)
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
logger.info(f"Stored {len(new_trades)} new trades in QuestDB.")
# Speicher freigeben nach jedem Exchange
del trades
if new_trades:
del new_trades
clear_trades_cache()
gc.collect()
except Exception as e:
logger.error(f"Error processing exchange {exchange.name}: {e}")
logger.info("All exchanges processed.")
def main():
logger.info("Trading Daemon started.")