From a07319d9578ebc1a1e73f90479f205ff292d47ad Mon Sep 17 00:00:00 2001 From: Melchior Reimers Date: Thu, 29 Jan 2026 16:00:09 +0100 Subject: [PATCH] Fix: Analytics Worker berechnet jetzt alle Tabellen pro Tag --- daemon.py | 25 +++++++++- dashboard/public/index.html | 54 ++++++++++++++++++-- src/exchanges/deutsche_boerse.py | 84 ++++++++++++++++++++------------ 3 files changed, 129 insertions(+), 34 deletions(-) diff --git a/daemon.py b/daemon.py index 9883553..965dae0 100644 --- a/daemon.py +++ b/daemon.py @@ -9,6 +9,10 @@ from src.exchanges.ls import LSExchange from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange from src.exchanges.gettex import GettexExchange from src.exchanges.stuttgart import StuttgartExchange +from src.exchanges.boersenag import ( + DUSAExchange, DUSBExchange, DUSCExchange, DUSDExchange, + HAMAExchange, HAMBExchange, HANAExchange, HANBExchange +) from src.database.questdb_client import DatabaseClient logging.basicConfig( @@ -136,6 +140,16 @@ def run_task(historical=False): gettex = GettexExchange() stuttgart = StuttgartExchange() + # Börsenag Exchanges (Düsseldorf, Hamburg, Hannover) + dusa = DUSAExchange() + dusb = DUSBExchange() + dusc = DUSCExchange() + dusd = DUSDExchange() + hama = HAMAExchange() + hamb = HAMBExchange() + hana = HANAExchange() + hanb = HANBExchange() + # Pass last_ts to fetcher to allow smart filtering # daemon.py runs daily, so we want to fetch everything since DB state # BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument @@ -145,12 +159,21 @@ def run_task(historical=False): exchanges_to_process = [ (eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical (ls, {'include_yesterday': historical}), - # Neue Exchanges + # Deutsche Börse Exchanges (xetra, {'include_yesterday': historical}), (frankfurt, {'include_yesterday': historical}), (quotrix, {'include_yesterday': historical}), (gettex, {'include_yesterday': historical}), (stuttgart, {'include_yesterday': historical}), + # Börsenag Exchanges (Düsseldorf, Hamburg, Hannover) + (dusa, {'include_yesterday': historical}), + (dusb, {'include_yesterday': historical}), + (dusc, {'include_yesterday': historical}), + (dusd, {'include_yesterday': historical}), + (hama, {'include_yesterday': historical}), + (hamb, {'include_yesterday': historical}), + (hana, {'include_yesterday': historical}), + (hanb, {'include_yesterday': historical}), ] db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) diff --git a/dashboard/public/index.html b/dashboard/public/index.html index e9bb03f..50157de 100644 --- a/dashboard/public/index.html +++ b/dashboard/public/index.html @@ -102,6 +102,38 @@

Quotrix

+
+

Düsseldorf Reg. (DUSA)

+
+
+
+

Düsseldorf Frei. (DUSB)

+
+
+
+

Quotrix Reg. (DUSC)

+
+
+
+

Quotrix Frei. (DUSD)

+
+
+
+

Hamburg Reg. (HAMA)

+
+
+
+

Hamburg Frei. (HAMB)

+
+
+
+

Hannover Reg. (HANA)

+
+
+
+

Hannover Frei. (HANB)

+
+
@@ -151,7 +183,7 @@
- +
@@ -283,7 +315,15 @@ 'XETRA': { canvasId: 'maChartXETRA' }, 'FRA': { canvasId: 'maChartFRA' }, 'STU': { canvasId: 'maChartSTU' }, - 'QUOTRIX': { canvasId: 'maChartQUOTRIX' } + 'QUOTRIX': { canvasId: 'maChartQUOTRIX' }, + 'DUSA': { canvasId: 'maChartDUSA' }, + 'DUSB': { canvasId: 'maChartDUSB' }, + 'DUSC': { canvasId: 'maChartDUSC' }, + 'DUSD': { canvasId: 'maChartDUSD' }, + 'HAMA': { canvasId: 'maChartHAMA' }, + 'HAMB': { canvasId: 'maChartHAMB' }, + 'HANA': { canvasId: 'maChartHANA' }, + 'HANB': { canvasId: 'maChartHANB' } }; Object.values(exchangeGroups).forEach(config => { const canvas = document.getElementById(config.canvasId); @@ -325,7 +365,15 @@ 'XETRA': { exchanges: ['XETRA'], canvasId: 'maChartXETRA' }, 'FRA': { exchanges: ['FRA'], canvasId: 'maChartFRA' }, 'STU': { exchanges: ['STU'], canvasId: 'maChartSTU' }, - 'QUOTRIX': { exchanges: ['QUOTRIX'], canvasId: 'maChartQUOTRIX' } + 'QUOTRIX': { exchanges: ['QUOTRIX'], canvasId: 'maChartQUOTRIX' }, + 'DUSA': { exchanges: ['DUSA'], canvasId: 'maChartDUSA' }, + 'DUSB': { exchanges: ['DUSB'], canvasId: 'maChartDUSB' }, + 'DUSC': { exchanges: ['DUSC'], canvasId: 'maChartDUSC' }, + 'DUSD': { exchanges: ['DUSD'], canvasId: 'maChartDUSD' }, + 'HAMA': { exchanges: ['HAMA'], canvasId: 'maChartHAMA' }, + 'HAMB': { exchanges: ['HAMB'], canvasId: 'maChartHAMB' }, + 'HANA': { exchanges: ['HANA'], canvasId: 'maChartHANA' }, + 'HANB': { exchanges: ['HANB'], canvasId: 'maChartHANB' } }; // Alle Daten nach Datum sortieren diff --git a/src/exchanges/deutsche_boerse.py b/src/exchanges/deutsche_boerse.py index 7df35f8..49d455a 100644 --- a/src/exchanges/deutsche_boerse.py +++ b/src/exchanges/deutsche_boerse.py @@ -173,25 +173,30 @@ class DeutscheBoerseBase(BaseExchange): def _parse_trade_record(self, record: dict) -> Optional[Trade]: """ Parst einen einzelnen Trade-Record aus dem JSON. - Deutsche Börse verwendet RTS1/RTS2 Format. - Wichtige Felder: - - TrdDt: Trading Date (YYYY-MM-DD) - - TrdTm: Trading Time (HH:MM:SS.ffffff) - - ISIN: Instrument Identifier - - FinInstrmId.Id: Alternative ISIN Feld - - Pric.Pric.MntryVal.Amt: Preis - - Qty.Unit: Menge + Aktuelles JSON-Format (NDJSON): + { + "messageId": "posttrade", + "sourceName": "GAT", + "isin": "US00123Q1040", + "lastTradeTime": "2026-01-29T14:07:00.419000000Z", + "lastTrade": 10.145, + "lastQty": 500.0, + "currency": "EUR", + ... + } """ try: - # ISIN extrahieren - isin = record.get('ISIN') or record.get('FinInstrmId', {}).get('Id', '') + # ISIN extrahieren - neues Format verwendet 'isin' lowercase + isin = record.get('isin') or record.get('ISIN') or record.get('instrumentId') or record.get('FinInstrmId', {}).get('Id', '') if not isin: return None - # Preis extrahieren (verschiedene mögliche Pfade) + # Preis extrahieren - neues Format: 'lastTrade' price = None - if 'Pric' in record: + if 'lastTrade' in record: + price = float(record['lastTrade']) + elif 'Pric' in record: pric = record['Pric'] if isinstance(pric, dict): if 'Pric' in pric: @@ -208,9 +213,11 @@ class DeutscheBoerseBase(BaseExchange): if price is None or price <= 0: return None - # Menge extrahieren + # Menge extrahieren - neues Format: 'lastQty' quantity = None - if 'Qty' in record: + if 'lastQty' in record: + quantity = float(record['lastQty']) + elif 'Qty' in record: qty = record['Qty'] if isinstance(qty, dict): quantity = float(qty.get('Unit', qty.get('Qty', 0))) @@ -220,29 +227,46 @@ class DeutscheBoerseBase(BaseExchange): if quantity is None or quantity <= 0: return None - # Timestamp extrahieren - trd_dt = record.get('TrdDt', '') - trd_tm = record.get('TrdTm', '00:00:00') + # Timestamp extrahieren - neues Format: 'lastTradeTime' + timestamp = None + if 'lastTradeTime' in record: + ts_str = record['lastTradeTime'] + # Format: "2026-01-29T14:07:00.419000000Z" + # Python kann max 6 Dezimalstellen, also kürzen + if '.' in ts_str: + parts = ts_str.replace('Z', '').split('.') + if len(parts) == 2 and len(parts[1]) > 6: + ts_str = parts[0] + '.' + parts[1][:6] + '+00:00' + else: + ts_str = ts_str.replace('Z', '+00:00') + else: + ts_str = ts_str.replace('Z', '+00:00') + timestamp = datetime.fromisoformat(ts_str) + else: + # Fallback für altes Format + trd_dt = record.get('TrdDt', '') + trd_tm = record.get('TrdTm', '00:00:00') + + if not trd_dt: + return None + + ts_str = f"{trd_dt}T{trd_tm}" + if '.' in ts_str: + parts = ts_str.split('.') + if len(parts[1]) > 6: + ts_str = parts[0] + '.' + parts[1][:6] + + timestamp = datetime.fromisoformat(ts_str) - if not trd_dt: + if timestamp is None: return None - # Kombiniere Datum und Zeit - ts_str = f"{trd_dt}T{trd_tm}" - # Entferne Mikrosekunden wenn zu lang - if '.' in ts_str: - parts = ts_str.split('.') - if len(parts[1]) > 6: - ts_str = parts[0] + '.' + parts[1][:6] - - # Parse als UTC (Deutsche Börse liefert UTC) - timestamp = datetime.fromisoformat(ts_str) if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=timezone.utc) return Trade( exchange=self.name, - symbol=isin, # Symbol = ISIN + symbol=isin, isin=isin, price=price, quantity=quantity, @@ -250,7 +274,7 @@ class DeutscheBoerseBase(BaseExchange): ) except Exception as e: - print(f"Error parsing record: {e}") + # Debug: Zeige ersten fehlgeschlagenen Record return None def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date: