2026-01-23 16:30:35 +01:00
|
|
|
import time
|
|
|
|
|
import logging
|
2026-01-23 17:24:05 +01:00
|
|
|
import datetime
|
2026-01-27 10:48:11 +01:00
|
|
|
import hashlib
|
2026-01-23 17:24:05 +01:00
|
|
|
import os
|
|
|
|
|
import requests
|
2026-01-23 16:30:35 +01:00
|
|
|
from src.exchanges.eix import EIXExchange
|
|
|
|
|
from src.exchanges.ls import LSExchange
|
2026-01-27 09:59:43 +01:00
|
|
|
from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange
|
|
|
|
|
from src.exchanges.gettex import GettexExchange
|
|
|
|
|
from src.exchanges.stuttgart import StuttgartExchange
|
2026-01-23 16:30:35 +01:00
|
|
|
from src.database.questdb_client import DatabaseClient
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(
|
|
|
|
|
level=logging.INFO,
|
|
|
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
|
|
|
)
|
|
|
|
|
logger = logging.getLogger("TradingDaemon")
|
|
|
|
|
|
2026-01-23 17:24:05 +01:00
|
|
|
DB_USER = os.getenv("DB_USER", "admin")
|
|
|
|
|
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
|
|
|
|
|
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
|
|
|
|
|
|
2026-01-27 10:48:11 +01:00
|
|
|
def get_trade_hash(trade):
|
|
|
|
|
"""Erstellt einen eindeutigen Hash für einen Trade."""
|
|
|
|
|
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
|
|
|
|
|
return hashlib.md5(key.encode()).hexdigest()
|
|
|
|
|
|
|
|
|
|
def get_existing_trade_hashes(db_url, exchange_name, since_date):
|
|
|
|
|
"""Holt alle Trade-Hashes für eine Exchange seit einem bestimmten Datum."""
|
|
|
|
|
hashes = set()
|
|
|
|
|
|
|
|
|
|
# Hole alle Trades seit dem Datum
|
|
|
|
|
date_str = since_date.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
|
|
|
|
query = f"SELECT exchange, isin, timestamp, price, quantity FROM trades WHERE exchange = '{exchange_name}' AND timestamp >= '{date_str}'"
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60)
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
data = response.json()
|
|
|
|
|
if data.get('dataset'):
|
|
|
|
|
for row in data['dataset']:
|
|
|
|
|
exchange, isin, ts, price, qty = row
|
|
|
|
|
# Konvertiere Timestamp
|
|
|
|
|
if isinstance(ts, str):
|
|
|
|
|
ts_iso = ts.replace('Z', '+00:00')
|
|
|
|
|
else:
|
|
|
|
|
ts_iso = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc).isoformat()
|
|
|
|
|
|
|
|
|
|
key = f"{exchange}|{isin}|{ts_iso}|{price}|{qty}"
|
|
|
|
|
hashes.add(hashlib.md5(key.encode()).hexdigest())
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning(f"Could not fetch existing trade hashes: {e}")
|
|
|
|
|
|
|
|
|
|
return hashes
|
|
|
|
|
|
2026-01-23 17:24:05 +01:00
|
|
|
def get_last_trade_timestamp(db_url, exchange_name):
|
|
|
|
|
# QuestDB query: get the latest timestamp for a specific exchange
|
|
|
|
|
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
|
|
|
|
|
try:
|
|
|
|
|
# Using the /exec endpoint to get data
|
|
|
|
|
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
data = response.json()
|
|
|
|
|
if data['dataset']:
|
|
|
|
|
# QuestDB returns timestamp in micros since epoch by default in some views, or ISO
|
2026-01-25 17:36:29 +01:00
|
|
|
# Let's assume the timestamp is in the dataset
|
|
|
|
|
# ILP timestamps are stored as designated timestamps.
|
2026-01-23 17:24:05 +01:00
|
|
|
ts_value = data['dataset'][0][0] # Adjust index based on column order
|
|
|
|
|
if isinstance(ts_value, str):
|
|
|
|
|
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
|
|
|
|
|
else:
|
|
|
|
|
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}")
|
|
|
|
|
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
|
|
|
|
|
|
2026-01-23 17:44:06 +01:00
|
|
|
def run_task(historical=False):
|
|
|
|
|
logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...")
|
|
|
|
|
|
|
|
|
|
# Initialize exchanges
|
|
|
|
|
eix = EIXExchange()
|
|
|
|
|
ls = LSExchange()
|
|
|
|
|
|
2026-01-27 09:59:43 +01:00
|
|
|
# Neue Deutsche Börse Exchanges
|
|
|
|
|
xetra = XetraExchange()
|
|
|
|
|
frankfurt = FrankfurtExchange()
|
|
|
|
|
quotrix = QuotrixExchange()
|
|
|
|
|
gettex = GettexExchange()
|
|
|
|
|
stuttgart = StuttgartExchange()
|
|
|
|
|
|
2026-01-25 17:36:29 +01:00
|
|
|
# Pass last_ts to fetcher to allow smart filtering
|
|
|
|
|
# daemon.py runs daily, so we want to fetch everything since DB state
|
|
|
|
|
# BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument
|
|
|
|
|
# We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop.
|
|
|
|
|
|
|
|
|
|
# We will modify the loop below to handle args dynamically
|
2026-01-23 17:44:06 +01:00
|
|
|
exchanges_to_process = [
|
2026-01-25 17:36:29 +01:00
|
|
|
(eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical
|
2026-01-27 09:59:43 +01:00
|
|
|
(ls, {'include_yesterday': historical}),
|
|
|
|
|
# Neue Exchanges
|
|
|
|
|
(xetra, {'include_yesterday': historical}),
|
|
|
|
|
(frankfurt, {'include_yesterday': historical}),
|
|
|
|
|
(quotrix, {'include_yesterday': historical}),
|
|
|
|
|
(gettex, {'include_yesterday': historical}),
|
|
|
|
|
(stuttgart, {'include_yesterday': historical}),
|
2026-01-23 16:30:35 +01:00
|
|
|
]
|
2026-01-23 17:44:06 +01:00
|
|
|
|
|
|
|
|
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
|
2026-01-23 16:30:35 +01:00
|
|
|
|
2026-01-23 18:00:34 +01:00
|
|
|
for exchange, args in exchanges_to_process:
|
2026-01-23 16:30:35 +01:00
|
|
|
try:
|
2026-01-23 17:24:05 +01:00
|
|
|
db_url = "http://questdb:9000"
|
|
|
|
|
last_ts = get_last_trade_timestamp(db_url, exchange.name)
|
|
|
|
|
|
2026-01-27 10:48:11 +01:00
|
|
|
logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...")
|
2026-01-25 16:44:43 +01:00
|
|
|
|
|
|
|
|
# Special handling for EIX to support smart filtering
|
|
|
|
|
call_args = args.copy()
|
|
|
|
|
if exchange.name == "EIX" and not historical:
|
|
|
|
|
call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc)
|
|
|
|
|
# Remove limit if we are filtering by date to ensure we get everything
|
|
|
|
|
if 'limit' in call_args:
|
|
|
|
|
call_args.pop('limit')
|
|
|
|
|
|
|
|
|
|
trades = exchange.fetch_latest_trades(**call_args)
|
2026-01-23 16:30:35 +01:00
|
|
|
|
2026-01-27 10:48:11 +01:00
|
|
|
if not trades:
|
|
|
|
|
logger.info(f"No trades fetched from {exchange.name}.")
|
|
|
|
|
continue
|
|
|
|
|
|
2026-01-27 11:30:50 +01:00
|
|
|
# Hash-basierte Deduplizierung - IMMER prüfen!
|
2026-01-27 10:48:11 +01:00
|
|
|
oldest_trade_ts = min(t.timestamp for t in trades)
|
2026-01-27 11:30:50 +01:00
|
|
|
newest_trade_ts = max(t.timestamp for t in trades)
|
2026-01-27 10:48:11 +01:00
|
|
|
|
2026-01-27 11:30:50 +01:00
|
|
|
# Hole Hashes für den Zeitraum der neuen Trades (plus 1 Tag Puffer)
|
|
|
|
|
check_since = oldest_trade_ts - datetime.timedelta(days=1)
|
|
|
|
|
existing_hashes = get_existing_trade_hashes(db_url, exchange.name, check_since)
|
|
|
|
|
|
|
|
|
|
if existing_hashes:
|
|
|
|
|
logger.info(f"Found {len(existing_hashes)} existing trade hashes in DB for period")
|
2026-01-27 10:48:11 +01:00
|
|
|
|
|
|
|
|
# Filtere nur wirklich neue Trades
|
|
|
|
|
new_trades = []
|
|
|
|
|
for t in trades:
|
|
|
|
|
trade_hash = get_trade_hash(t)
|
|
|
|
|
if trade_hash not in existing_hashes:
|
|
|
|
|
new_trades.append(t)
|
|
|
|
|
else:
|
2026-01-27 11:30:50 +01:00
|
|
|
# Keine existierenden Hashes gefunden - alle Trades sind neu
|
|
|
|
|
logger.info(f"No existing hashes found - all trades are new")
|
2026-01-27 10:48:11 +01:00
|
|
|
new_trades = trades
|
2026-01-23 17:24:05 +01:00
|
|
|
|
|
|
|
|
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
|
|
|
|
|
|
|
|
|
|
if new_trades:
|
2026-01-23 17:44:06 +01:00
|
|
|
# Sort trades by timestamp before saving (QuestDB likes this)
|
|
|
|
|
new_trades.sort(key=lambda x: x.timestamp)
|
2026-01-23 17:24:05 +01:00
|
|
|
db.save_trades(new_trades)
|
|
|
|
|
logger.info(f"Stored {len(new_trades)} new trades in QuestDB.")
|
2026-01-23 16:30:35 +01:00
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error processing exchange {exchange.name}: {e}")
|
|
|
|
|
|
2026-01-23 17:24:05 +01:00
|
|
|
def main():
|
2026-01-23 17:44:06 +01:00
|
|
|
logger.info("Trading Daemon started.")
|
|
|
|
|
|
|
|
|
|
# 1. Startup Check: Ist die DB leer?
|
|
|
|
|
db_url = "http://questdb:9000"
|
|
|
|
|
is_empty = True
|
|
|
|
|
try:
|
|
|
|
|
# Prüfe ob bereits Trades in der Tabelle sind
|
|
|
|
|
response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH)
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
data = response.json()
|
|
|
|
|
if data['dataset'] and data['dataset'][0][0] > 0:
|
|
|
|
|
is_empty = False
|
|
|
|
|
except Exception:
|
|
|
|
|
# Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist
|
|
|
|
|
is_empty = True
|
|
|
|
|
|
|
|
|
|
if is_empty:
|
|
|
|
|
logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...")
|
|
|
|
|
run_task(historical=True)
|
|
|
|
|
else:
|
2026-01-25 16:44:43 +01:00
|
|
|
logger.info("Found existing data in database. Triggering catch-up sync...")
|
|
|
|
|
# Run a normal task to fetch any missing data since the last run
|
|
|
|
|
run_task(historical=False)
|
|
|
|
|
logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.")
|
2026-01-23 17:24:05 +01:00
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
now = datetime.datetime.now()
|
|
|
|
|
# Täglich um 23:00 Uhr
|
|
|
|
|
if now.hour == 23 and now.minute == 0:
|
2026-01-23 17:44:06 +01:00
|
|
|
run_task(historical=False)
|
2026-01-23 17:24:05 +01:00
|
|
|
# Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern
|
|
|
|
|
time.sleep(61)
|
|
|
|
|
|
|
|
|
|
# Check alle 30 Sekunden
|
|
|
|
|
time.sleep(30)
|
2026-01-23 16:30:35 +01:00
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
main()
|