This commit is contained in:
65
daemon.py
65
daemon.py
@@ -1,6 +1,7 @@
|
||||
import time
|
||||
import logging
|
||||
import datetime
|
||||
import hashlib
|
||||
import os
|
||||
import requests
|
||||
from src.exchanges.eix import EIXExchange
|
||||
@@ -20,6 +21,39 @@ DB_USER = os.getenv("DB_USER", "admin")
|
||||
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
|
||||
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
|
||||
|
||||
def get_trade_hash(trade):
|
||||
"""Erstellt einen eindeutigen Hash für einen Trade."""
|
||||
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
|
||||
return hashlib.md5(key.encode()).hexdigest()
|
||||
|
||||
def get_existing_trade_hashes(db_url, exchange_name, since_date):
|
||||
"""Holt alle Trade-Hashes für eine Exchange seit einem bestimmten Datum."""
|
||||
hashes = set()
|
||||
|
||||
# Hole alle Trades seit dem Datum
|
||||
date_str = since_date.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
||||
query = f"SELECT exchange, isin, timestamp, price, quantity FROM trades WHERE exchange = '{exchange_name}' AND timestamp >= '{date_str}'"
|
||||
|
||||
try:
|
||||
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
if data.get('dataset'):
|
||||
for row in data['dataset']:
|
||||
exchange, isin, ts, price, qty = row
|
||||
# Konvertiere Timestamp
|
||||
if isinstance(ts, str):
|
||||
ts_iso = ts.replace('Z', '+00:00')
|
||||
else:
|
||||
ts_iso = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc).isoformat()
|
||||
|
||||
key = f"{exchange}|{isin}|{ts_iso}|{price}|{qty}"
|
||||
hashes.add(hashlib.md5(key.encode()).hexdigest())
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not fetch existing trade hashes: {e}")
|
||||
|
||||
return hashes
|
||||
|
||||
def get_last_trade_timestamp(db_url, exchange_name):
|
||||
# QuestDB query: get the latest timestamp for a specific exchange
|
||||
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
|
||||
@@ -79,7 +113,7 @@ def run_task(historical=False):
|
||||
db_url = "http://questdb:9000"
|
||||
last_ts = get_last_trade_timestamp(db_url, exchange.name)
|
||||
|
||||
logger.info(f"Fetching data from {exchange.name} (Filtering trades older than {last_ts})...")
|
||||
logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...")
|
||||
|
||||
# Special handling for EIX to support smart filtering
|
||||
call_args = args.copy()
|
||||
@@ -91,11 +125,30 @@ def run_task(historical=False):
|
||||
|
||||
trades = exchange.fetch_latest_trades(**call_args)
|
||||
|
||||
# Deduplizierung: Nur Trades nehmen, die neuer sind als der letzte in der DB
|
||||
new_trades = [
|
||||
t for t in trades
|
||||
if t.timestamp.replace(tzinfo=datetime.timezone.utc) > last_ts.replace(tzinfo=datetime.timezone.utc)
|
||||
]
|
||||
if not trades:
|
||||
logger.info(f"No trades fetched from {exchange.name}.")
|
||||
continue
|
||||
|
||||
# Hash-basierte Deduplizierung
|
||||
# Hole existierende Hashes für Trades ab dem ältesten neuen Trade
|
||||
oldest_trade_ts = min(t.timestamp for t in trades)
|
||||
|
||||
# Nur prüfen wenn wir nicht einen komplett historischen Sync machen
|
||||
if last_ts > datetime.datetime.min.replace(tzinfo=datetime.timezone.utc):
|
||||
# Hole Hashes der letzten 7 Tage für diese Exchange
|
||||
check_since = oldest_trade_ts - datetime.timedelta(days=1)
|
||||
existing_hashes = get_existing_trade_hashes(db_url, exchange.name, check_since)
|
||||
logger.info(f"Found {len(existing_hashes)} existing trade hashes in DB")
|
||||
|
||||
# Filtere nur wirklich neue Trades
|
||||
new_trades = []
|
||||
for t in trades:
|
||||
trade_hash = get_trade_hash(t)
|
||||
if trade_hash not in existing_hashes:
|
||||
new_trades.append(t)
|
||||
else:
|
||||
# Historischer Sync - keine Deduplizierung nötig
|
||||
new_trades = trades
|
||||
|
||||
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user