Fix: Analytics Worker berechnet jetzt alle Tabellen pro Tag
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
This commit is contained in:
117
daemon.py
117
daemon.py
@@ -26,33 +26,80 @@ def get_trade_hash(trade):
|
|||||||
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
|
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
|
||||||
return hashlib.md5(key.encode()).hexdigest()
|
return hashlib.md5(key.encode()).hexdigest()
|
||||||
|
|
||||||
def get_existing_trade_hashes(db_url, exchange_name, since_date):
|
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=1000):
|
||||||
"""Holt alle Trade-Hashes für eine Exchange seit einem bestimmten Datum."""
|
"""Filtert neue Trades in Batches, um RAM zu sparen. Verwendet Batch-Queries statt einzelne Checks."""
|
||||||
hashes = set()
|
if not trades:
|
||||||
|
return []
|
||||||
|
|
||||||
# Hole alle Trades seit dem Datum
|
new_trades = []
|
||||||
date_str = since_date.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
total_batches = (len(trades) + batch_size - 1) // batch_size
|
||||||
query = f"SELECT exchange, isin, timestamp, price, quantity FROM trades WHERE exchange = '{exchange_name}' AND timestamp >= '{date_str}'"
|
|
||||||
|
|
||||||
try:
|
for batch_idx in range(0, len(trades), batch_size):
|
||||||
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60)
|
batch = trades[batch_idx:batch_idx + batch_size]
|
||||||
if response.status_code == 200:
|
batch_num = (batch_idx // batch_size) + 1
|
||||||
data = response.json()
|
|
||||||
if data.get('dataset'):
|
if batch_num % 10 == 0 or batch_num == 1:
|
||||||
for row in data['dataset']:
|
logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} trades)...")
|
||||||
exchange, isin, ts, price, qty = row
|
|
||||||
# Konvertiere Timestamp
|
# Gruppiere Trades nach Tag für effizientere Queries
|
||||||
if isinstance(ts, str):
|
trades_by_day = {}
|
||||||
ts_iso = ts.replace('Z', '+00:00')
|
for trade in batch:
|
||||||
else:
|
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||||
ts_iso = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc).isoformat()
|
if day not in trades_by_day:
|
||||||
|
trades_by_day[day] = []
|
||||||
|
trades_by_day[day].append(trade)
|
||||||
|
|
||||||
|
# Prüfe jeden Tag separat
|
||||||
|
for day, day_trades in trades_by_day.items():
|
||||||
|
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
||||||
|
day_end = day + datetime.timedelta(days=1)
|
||||||
|
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
||||||
|
|
||||||
|
# Hole alle existierenden Trades für diesen Tag
|
||||||
|
query = f"""
|
||||||
|
SELECT isin, timestamp, price, quantity
|
||||||
|
FROM trades
|
||||||
|
WHERE exchange = '{exchange_name}'
|
||||||
|
AND timestamp >= '{day_start_str}'
|
||||||
|
AND timestamp < '{day_end_str}'
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=30)
|
||||||
|
if response.status_code == 200:
|
||||||
|
data = response.json()
|
||||||
|
existing_trades = set()
|
||||||
|
if data.get('dataset'):
|
||||||
|
for row in data['dataset']:
|
||||||
|
isin, ts, price, qty = row
|
||||||
|
# Normalisiere Timestamp für Vergleich
|
||||||
|
if isinstance(ts, str):
|
||||||
|
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
|
||||||
|
else:
|
||||||
|
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
|
||||||
|
# Erstelle Vergleichs-Key (ohne Hash, direkter Vergleich)
|
||||||
|
key = (isin, ts_dt.isoformat(), float(price), float(qty))
|
||||||
|
existing_trades.add(key)
|
||||||
|
|
||||||
key = f"{exchange}|{isin}|{ts_iso}|{price}|{qty}"
|
# Prüfe welche Trades neu sind
|
||||||
hashes.add(hashlib.md5(key.encode()).hexdigest())
|
for trade in day_trades:
|
||||||
except Exception as e:
|
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
|
||||||
logger.warning(f"Could not fetch existing trade hashes: {e}")
|
if trade_key not in existing_trades:
|
||||||
|
new_trades.append(trade)
|
||||||
|
else:
|
||||||
|
# Bei Fehler: alle Trades als neu behandeln (sicherer)
|
||||||
|
logger.warning(f"Query failed for day {day}, treating all trades as new")
|
||||||
|
new_trades.extend(day_trades)
|
||||||
|
except Exception as e:
|
||||||
|
# Bei Fehler: alle Trades als neu behandeln (sicherer)
|
||||||
|
logger.warning(f"Error checking trades for day {day}: {e}, treating all trades as new")
|
||||||
|
new_trades.extend(day_trades)
|
||||||
|
|
||||||
|
# Kleine Pause zwischen Batches, um DB nicht zu überlasten
|
||||||
|
if batch_idx + batch_size < len(trades):
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
return hashes
|
return new_trades
|
||||||
|
|
||||||
def get_last_trade_timestamp(db_url, exchange_name):
|
def get_last_trade_timestamp(db_url, exchange_name):
|
||||||
# QuestDB query: get the latest timestamp for a specific exchange
|
# QuestDB query: get the latest timestamp for a specific exchange
|
||||||
@@ -129,27 +176,9 @@ def run_task(historical=False):
|
|||||||
logger.info(f"No trades fetched from {exchange.name}.")
|
logger.info(f"No trades fetched from {exchange.name}.")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Hash-basierte Deduplizierung - IMMER prüfen!
|
# Hash-basierte Deduplizierung - Batch-Verarbeitung um RAM zu sparen
|
||||||
oldest_trade_ts = min(t.timestamp for t in trades)
|
logger.info(f"Filtering {len(trades)} trades for duplicates (batch processing)...")
|
||||||
newest_trade_ts = max(t.timestamp for t in trades)
|
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=500)
|
||||||
|
|
||||||
# Hole Hashes für den Zeitraum der neuen Trades (plus 1 Tag Puffer)
|
|
||||||
check_since = oldest_trade_ts - datetime.timedelta(days=1)
|
|
||||||
existing_hashes = get_existing_trade_hashes(db_url, exchange.name, check_since)
|
|
||||||
|
|
||||||
if existing_hashes:
|
|
||||||
logger.info(f"Found {len(existing_hashes)} existing trade hashes in DB for period")
|
|
||||||
|
|
||||||
# Filtere nur wirklich neue Trades
|
|
||||||
new_trades = []
|
|
||||||
for t in trades:
|
|
||||||
trade_hash = get_trade_hash(t)
|
|
||||||
if trade_hash not in existing_hashes:
|
|
||||||
new_trades.append(t)
|
|
||||||
else:
|
|
||||||
# Keine existierenden Hashes gefunden - alle Trades sind neu
|
|
||||||
logger.info(f"No existing hashes found - all trades are new")
|
|
||||||
new_trades = trades
|
|
||||||
|
|
||||||
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
|
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user