Files
trading-daemon/daemon.py

149 lines
6.2 KiB
Python
Raw Normal View History

2026-01-23 16:30:35 +01:00
import time
import logging
import datetime
import os
import requests
2026-01-23 16:30:35 +01:00
from src.exchanges.eix import EIXExchange
from src.exchanges.ls import LSExchange
2026-01-27 09:59:43 +01:00
from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange
from src.exchanges.gettex import GettexExchange
from src.exchanges.stuttgart import StuttgartExchange
2026-01-23 16:30:35 +01:00
from src.database.questdb_client import DatabaseClient
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("TradingDaemon")
DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
def get_last_trade_timestamp(db_url, exchange_name):
# QuestDB query: get the latest timestamp for a specific exchange
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
try:
# Using the /exec endpoint to get data
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data['dataset']:
# QuestDB returns timestamp in micros since epoch by default in some views, or ISO
2026-01-25 17:36:29 +01:00
# Let's assume the timestamp is in the dataset
# ILP timestamps are stored as designated timestamps.
ts_value = data['dataset'][0][0] # Adjust index based on column order
if isinstance(ts_value, str):
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
else:
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
except Exception as e:
logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}")
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
def run_task(historical=False):
logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...")
# Initialize exchanges
eix = EIXExchange()
ls = LSExchange()
2026-01-27 09:59:43 +01:00
# Neue Deutsche Börse Exchanges
xetra = XetraExchange()
frankfurt = FrankfurtExchange()
quotrix = QuotrixExchange()
gettex = GettexExchange()
stuttgart = StuttgartExchange()
2026-01-25 17:36:29 +01:00
# Pass last_ts to fetcher to allow smart filtering
# daemon.py runs daily, so we want to fetch everything since DB state
# BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument
# We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop.
# We will modify the loop below to handle args dynamically
exchanges_to_process = [
2026-01-25 17:36:29 +01:00
(eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical
2026-01-27 09:59:43 +01:00
(ls, {'include_yesterday': historical}),
# Neue Exchanges
(xetra, {'include_yesterday': historical}),
(frankfurt, {'include_yesterday': historical}),
(quotrix, {'include_yesterday': historical}),
(gettex, {'include_yesterday': historical}),
(stuttgart, {'include_yesterday': historical}),
2026-01-23 16:30:35 +01:00
]
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
2026-01-23 16:30:35 +01:00
for exchange, args in exchanges_to_process:
2026-01-23 16:30:35 +01:00
try:
db_url = "http://questdb:9000"
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Fetching data from {exchange.name} (Filtering trades older than {last_ts})...")
2026-01-25 16:44:43 +01:00
# Special handling for EIX to support smart filtering
call_args = args.copy()
if exchange.name == "EIX" and not historical:
call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc)
# Remove limit if we are filtering by date to ensure we get everything
if 'limit' in call_args:
call_args.pop('limit')
trades = exchange.fetch_latest_trades(**call_args)
2026-01-23 16:30:35 +01:00
# Deduplizierung: Nur Trades nehmen, die neuer sind als der letzte in der DB
new_trades = [
t for t in trades
if t.timestamp.replace(tzinfo=datetime.timezone.utc) > last_ts.replace(tzinfo=datetime.timezone.utc)
]
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
if new_trades:
# Sort trades by timestamp before saving (QuestDB likes this)
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
logger.info(f"Stored {len(new_trades)} new trades in QuestDB.")
2026-01-23 16:30:35 +01:00
except Exception as e:
logger.error(f"Error processing exchange {exchange.name}: {e}")
def main():
logger.info("Trading Daemon started.")
# 1. Startup Check: Ist die DB leer?
db_url = "http://questdb:9000"
is_empty = True
try:
# Prüfe ob bereits Trades in der Tabelle sind
response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data['dataset'] and data['dataset'][0][0] > 0:
is_empty = False
except Exception:
# Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist
is_empty = True
if is_empty:
logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...")
run_task(historical=True)
else:
2026-01-25 16:44:43 +01:00
logger.info("Found existing data in database. Triggering catch-up sync...")
# Run a normal task to fetch any missing data since the last run
run_task(historical=False)
logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.")
while True:
now = datetime.datetime.now()
# Täglich um 23:00 Uhr
if now.hour == 23 and now.minute == 0:
run_task(historical=False)
# Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern
time.sleep(61)
# Check alle 30 Sekunden
time.sleep(30)
2026-01-23 16:30:35 +01:00
if __name__ == "__main__":
main()