diff --git a/daemon.py b/daemon.py index 2b5993a..9883553 100644 --- a/daemon.py +++ b/daemon.py @@ -26,33 +26,80 @@ def get_trade_hash(trade): key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}" return hashlib.md5(key.encode()).hexdigest() -def get_existing_trade_hashes(db_url, exchange_name, since_date): - """Holt alle Trade-Hashes für eine Exchange seit einem bestimmten Datum.""" - hashes = set() +def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=1000): + """Filtert neue Trades in Batches, um RAM zu sparen. Verwendet Batch-Queries statt einzelne Checks.""" + if not trades: + return [] - # Hole alle Trades seit dem Datum - date_str = since_date.strftime('%Y-%m-%dT%H:%M:%S.000000Z') - query = f"SELECT exchange, isin, timestamp, price, quantity FROM trades WHERE exchange = '{exchange_name}' AND timestamp >= '{date_str}'" + new_trades = [] + total_batches = (len(trades) + batch_size - 1) // batch_size - try: - response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60) - if response.status_code == 200: - data = response.json() - if data.get('dataset'): - for row in data['dataset']: - exchange, isin, ts, price, qty = row - # Konvertiere Timestamp - if isinstance(ts, str): - ts_iso = ts.replace('Z', '+00:00') - else: - ts_iso = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc).isoformat() + for batch_idx in range(0, len(trades), batch_size): + batch = trades[batch_idx:batch_idx + batch_size] + batch_num = (batch_idx // batch_size) + 1 + + if batch_num % 10 == 0 or batch_num == 1: + logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} trades)...") + + # Gruppiere Trades nach Tag für effizientere Queries + trades_by_day = {} + for trade in batch: + day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0) + if day not in trades_by_day: + trades_by_day[day] = [] + trades_by_day[day].append(trade) + + # Prüfe jeden Tag separat + for day, day_trades in trades_by_day.items(): + day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z') + day_end = day + datetime.timedelta(days=1) + day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z') + + # Hole alle existierenden Trades für diesen Tag + query = f""" + SELECT isin, timestamp, price, quantity + FROM trades + WHERE exchange = '{exchange_name}' + AND timestamp >= '{day_start_str}' + AND timestamp < '{day_end_str}' + """ + + try: + response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=30) + if response.status_code == 200: + data = response.json() + existing_trades = set() + if data.get('dataset'): + for row in data['dataset']: + isin, ts, price, qty = row + # Normalisiere Timestamp für Vergleich + if isinstance(ts, str): + ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00')) + else: + ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc) + # Erstelle Vergleichs-Key (ohne Hash, direkter Vergleich) + key = (isin, ts_dt.isoformat(), float(price), float(qty)) + existing_trades.add(key) - key = f"{exchange}|{isin}|{ts_iso}|{price}|{qty}" - hashes.add(hashlib.md5(key.encode()).hexdigest()) - except Exception as e: - logger.warning(f"Could not fetch existing trade hashes: {e}") + # Prüfe welche Trades neu sind + for trade in day_trades: + trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity)) + if trade_key not in existing_trades: + new_trades.append(trade) + else: + # Bei Fehler: alle Trades als neu behandeln (sicherer) + logger.warning(f"Query failed for day {day}, treating all trades as new") + new_trades.extend(day_trades) + except Exception as e: + # Bei Fehler: alle Trades als neu behandeln (sicherer) + logger.warning(f"Error checking trades for day {day}: {e}, treating all trades as new") + new_trades.extend(day_trades) + + # Kleine Pause zwischen Batches, um DB nicht zu überlasten + if batch_idx + batch_size < len(trades): + time.sleep(0.05) - return hashes + return new_trades def get_last_trade_timestamp(db_url, exchange_name): # QuestDB query: get the latest timestamp for a specific exchange @@ -129,27 +176,9 @@ def run_task(historical=False): logger.info(f"No trades fetched from {exchange.name}.") continue - # Hash-basierte Deduplizierung - IMMER prüfen! - oldest_trade_ts = min(t.timestamp for t in trades) - newest_trade_ts = max(t.timestamp for t in trades) - - # Hole Hashes für den Zeitraum der neuen Trades (plus 1 Tag Puffer) - check_since = oldest_trade_ts - datetime.timedelta(days=1) - existing_hashes = get_existing_trade_hashes(db_url, exchange.name, check_since) - - if existing_hashes: - logger.info(f"Found {len(existing_hashes)} existing trade hashes in DB for period") - - # Filtere nur wirklich neue Trades - new_trades = [] - for t in trades: - trade_hash = get_trade_hash(t) - if trade_hash not in existing_hashes: - new_trades.append(t) - else: - # Keine existierenden Hashes gefunden - alle Trades sind neu - logger.info(f"No existing hashes found - all trades are new") - new_trades = trades + # Hash-basierte Deduplizierung - Batch-Verarbeitung um RAM zu sparen + logger.info(f"Filtering {len(trades)} trades for duplicates (batch processing)...") + new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=500) logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")