Files
trading-daemon/src/exchanges/deutsche_boerse.py
Melchior Reimers 228212cbab
All checks were successful
Deployment / deploy-docker (push) Successful in 17s
updated dashboard
2026-01-27 11:09:52 +01:00

352 lines
12 KiB
Python

import requests
import gzip
import json
import io
import time
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from .base import BaseExchange, Trade
from bs4 import BeautifulSoup
# Rate-Limiting Konfiguration
RATE_LIMIT_DELAY = 0.5 # Sekunden zwischen Requests
RATE_LIMIT_RETRY_DELAY = 5 # Sekunden Wartezeit bei 429
MAX_RETRIES = 3 # Maximale Wiederholungen bei 429
# API URLs für Deutsche Börse
API_URLS = {
'XETRA': 'https://mfs.deutsche-boerse.com/api/DETR-posttrade',
'FRA': 'https://mfs.deutsche-boerse.com/api/DFRA-posttrade',
'QUOTRIX': 'https://mfs.deutsche-boerse.com/api/DGAT-posttrade',
}
DOWNLOAD_BASE_URL = "https://mfs.deutsche-boerse.com/api/download"
# Browser User-Agent für Zugriff
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, application/gzip, */*',
'Referer': 'https://mfs.deutsche-boerse.com/',
}
class DeutscheBoerseBase(BaseExchange):
"""Basisklasse für Deutsche Börse Exchanges (Xetra, Frankfurt, Quotrix)"""
@property
def base_url(self) -> str:
"""Override in subclasses"""
raise NotImplementedError
@property
def name(self) -> str:
raise NotImplementedError
@property
def api_url(self) -> str:
"""API URL für die Dateiliste"""
return API_URLS.get(self.name, self.base_url)
def _get_file_list(self) -> List[str]:
"""Holt die Dateiliste von der JSON API"""
try:
response = requests.get(self.api_url, headers=HEADERS, timeout=30)
response.raise_for_status()
data = response.json()
files = data.get('CurrentFiles', [])
print(f"[{self.name}] API returned {len(files)} files")
return files
except Exception as e:
print(f"[{self.name}] Error fetching file list from API: {e}")
return []
def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]:
"""
Filtert Dateien für ein bestimmtes Datum.
Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz (mit Unterstrich!)
Da Handel bis 22:00 MEZ geht (21:00/20:00 UTC), müssen wir auch
Dateien nach Mitternacht UTC berücksichtigen.
"""
import re
filtered = []
# Für den Vortag: Dateien vom target_date UND vom Folgetag (bis ~02:00 UTC)
target_str = target_date.strftime('%Y-%m-%d')
next_day = target_date + timedelta(days=1)
next_day_str = next_day.strftime('%Y-%m-%d')
for file in files:
# Extrahiere Datum aus Dateiname
# Format: DETR-posttrade-2026-01-26T21_30.json.gz
if target_str in file:
filtered.append(file)
elif next_day_str in file:
# Prüfe ob es eine frühe Datei vom nächsten Tag ist (< 03:00 UTC)
try:
# Finde Timestamp im Dateinamen mit Unterstrich für Minuten
match = re.search(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})', file)
if match:
hour = int(match.group(2))
if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag
filtered.append(file)
except Exception:
pass
return filtered
def _download_and_parse_file(self, filename: str) -> List[Trade]:
"""Lädt eine JSON.gz Datei von der API herunter und parst die Trades"""
trades = []
full_url = f"{DOWNLOAD_BASE_URL}/{filename}"
for retry in range(MAX_RETRIES):
try:
response = requests.get(full_url, headers=HEADERS, timeout=60)
if response.status_code == 404:
# Datei nicht gefunden - normal für alte Dateien
return []
if response.status_code == 429:
# Rate-Limit erreicht - warten und erneut versuchen
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
continue
response.raise_for_status()
# Gzip entpacken
with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as f:
content = f.read().decode('utf-8')
if not content.strip():
# Leere Datei
return []
# NDJSON Format: Eine JSON-Zeile pro Trade
for line in content.strip().split('\n'):
if not line.strip():
continue
try:
record = json.loads(line)
trade = self._parse_trade_record(record)
if trade:
trades.append(trade)
except json.JSONDecodeError:
continue
except Exception:
continue
# Erfolg - keine weitere Retry nötig
break
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
continue
elif e.response.status_code != 404:
print(f"[{self.name}] HTTP error downloading {filename}: {e}")
break
except Exception as e:
print(f"[{self.name}] Error downloading/parsing {filename}: {e}")
break
return trades
def _parse_trade_record(self, record: dict) -> Optional[Trade]:
"""
Parst einen einzelnen Trade-Record aus dem JSON.
Deutsche Börse verwendet RTS1/RTS2 Format.
Wichtige Felder:
- TrdDt: Trading Date (YYYY-MM-DD)
- TrdTm: Trading Time (HH:MM:SS.ffffff)
- ISIN: Instrument Identifier
- FinInstrmId.Id: Alternative ISIN Feld
- Pric.Pric.MntryVal.Amt: Preis
- Qty.Unit: Menge
"""
try:
# ISIN extrahieren
isin = record.get('ISIN') or record.get('FinInstrmId', {}).get('Id', '')
if not isin:
return None
# Preis extrahieren (verschiedene mögliche Pfade)
price = None
if 'Pric' in record:
pric = record['Pric']
if isinstance(pric, dict):
if 'Pric' in pric:
inner = pric['Pric']
if 'MntryVal' in inner:
price = float(inner['MntryVal'].get('Amt', 0))
elif 'Amt' in inner:
price = float(inner['Amt'])
elif 'MntryVal' in pric:
price = float(pric['MntryVal'].get('Amt', 0))
elif isinstance(pric, (int, float)):
price = float(pric)
if price is None or price <= 0:
return None
# Menge extrahieren
quantity = None
if 'Qty' in record:
qty = record['Qty']
if isinstance(qty, dict):
quantity = float(qty.get('Unit', qty.get('Qty', 0)))
elif isinstance(qty, (int, float)):
quantity = float(qty)
if quantity is None or quantity <= 0:
return None
# Timestamp extrahieren
trd_dt = record.get('TrdDt', '')
trd_tm = record.get('TrdTm', '00:00:00')
if not trd_dt:
return None
# Kombiniere Datum und Zeit
ts_str = f"{trd_dt}T{trd_tm}"
# Entferne Mikrosekunden wenn zu lang
if '.' in ts_str:
parts = ts_str.split('.')
if len(parts[1]) > 6:
ts_str = parts[0] + '.' + parts[1][:6]
# Parse als UTC (Deutsche Börse liefert UTC)
timestamp = datetime.fromisoformat(ts_str)
if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=timezone.utc)
return Trade(
exchange=self.name,
symbol=isin, # Symbol = ISIN
isin=isin,
price=price,
quantity=quantity,
timestamp=timestamp
)
except Exception as e:
print(f"Error parsing record: {e}")
return None
def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date:
"""
Findet den letzten Handelstag (überspringt Wochenenden).
Montag=0, Sonntag=6
"""
date = from_date
# Wenn Samstag (5), gehe zurück zu Freitag
if date.weekday() == 5:
date = date - timedelta(days=1)
# Wenn Sonntag (6), gehe zurück zu Freitag
elif date.weekday() == 6:
date = date - timedelta(days=2)
return date
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
"""
Holt alle Trades vom letzten Handelstag (überspringt Wochenenden).
"""
all_trades = []
# Bestimme Zieldatum
if since_date:
target_date = since_date.date() if hasattr(since_date, 'date') else since_date
else:
# Standard: Vortag
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
# Überspringe Wochenenden
original_date = target_date
target_date = self._get_last_trading_day(target_date)
if target_date != original_date:
print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}")
print(f"[{self.name}] Fetching trades for date: {target_date}")
# Hole Dateiliste von der API
files = self._get_file_list()
if not files:
print(f"[{self.name}] No files available from API")
return []
# Dateien für Zieldatum filtern
target_files = self._filter_files_for_date(files, target_date)
print(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)")
if not target_files:
print(f"[{self.name}] No files for target date found")
return []
# Alle passenden Dateien herunterladen und parsen (mit Rate-Limiting)
successful = 0
total_files = len(target_files)
for i, file in enumerate(target_files):
trades = self._download_and_parse_file(file)
if trades:
all_trades.extend(trades)
successful += 1
# Rate-Limiting: Pause zwischen Downloads
if i < total_files - 1:
time.sleep(RATE_LIMIT_DELAY)
# Fortschritt alle 100 Dateien
if (i + 1) % 100 == 0:
print(f"[{self.name}] Progress: {i + 1}/{total_files} files, {len(all_trades)} trades so far")
print(f"[{self.name}] Downloaded {successful} files, total {len(all_trades)} trades")
return all_trades
class XetraExchange(DeutscheBoerseBase):
"""Xetra (Deutsche Börse) - DETR"""
@property
def base_url(self) -> str:
return "https://mfs.deutsche-boerse.com/DETR-posttrade"
@property
def name(self) -> str:
return "XETRA"
class FrankfurtExchange(DeutscheBoerseBase):
"""Börse Frankfurt - DFRA"""
@property
def base_url(self) -> str:
return "https://mfs.deutsche-boerse.com/DFRA-posttrade"
@property
def name(self) -> str:
return "FRA"
class QuotrixExchange(DeutscheBoerseBase):
"""Quotrix (Düsseldorf/Tradegate) - DGAT"""
@property
def base_url(self) -> str:
return "https://mfs.deutsche-boerse.com/DGAT-posttrade"
@property
def name(self) -> str:
return "QUOTRIX"