Compare commits

..

3 Commits

Author SHA1 Message Date
Melchior Reimers
1dc79b8b64 Refactor: Code-Qualität verbessert und Projektstruktur aufgeräumt
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
- daemon.py: gc.collect() entfernt, robustes Scheduling (last_run_date statt Minuten-Check),
  Exchange Registry Pattern eingeführt (STREAMING_EXCHANGES/STANDARD_EXCHANGES)
- deutsche_boerse.py: Thread-safe User-Agent Rotation bei Rate-Limits,
  Logging statt print(), Feiertags-Prüfung, aufgeteilte Parse-Methoden
- eix.py: Logging statt print(), spezifische Exception-Typen statt blankem except
- read.py gelöscht und durch scripts/inspect_gzip.py ersetzt (Streaming-basiert)
- Utility-Scripts in scripts/ verschoben (cleanup_duplicates, restore_and_fix, verify_fix)
2026-02-01 08:18:55 +01:00
Melchior Reimers
cf55a0bd06 Fix: Analytics Worker berechnet heute/gestern IMMER neu
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
- Neue force_recalculate_date() Methode löscht alte Daten vor Neuberechnung
- Heute und gestern werden bei jedem Stunden-Check neu berechnet
- Behebt Problem, dass neue Trades nicht in Analytics aufgenommen wurden
2026-01-29 22:36:22 +01:00
Melchior Reimers
9cd84e0855 Fix: Streaming-Verarbeitung für EIX um RAM-Überlauf zu verhindern
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
- EIX verarbeitet jetzt eine Datei nach der anderen (nicht alle auf einmal)
- Speicher wird nach jeder Datei freigegeben (gc.collect)
- Day-basiertes Caching für Duplikatprüfung mit Cache-Clearing
- Reduziert RAM-Verbrauch von 8GB+ auf unter 500MB
2026-01-29 16:17:11 +01:00
10 changed files with 770 additions and 439 deletions

484
daemon.py
View File

@@ -4,6 +4,9 @@ import datetime
import hashlib
import os
import requests
from typing import List, Type
from src.exchanges.base import BaseExchange
from src.exchanges.eix import EIXExchange
from src.exchanges.ls import LSExchange
from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange
@@ -25,230 +28,345 @@ DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
# =============================================================================
# Exchange Registry - Neue Börsen hier hinzufügen
# =============================================================================
# Exchanges die Streaming-Verarbeitung benötigen (große Datenmengen)
STREAMING_EXCHANGES: List[Type[BaseExchange]] = [
EIXExchange,
]
# Standard-Exchanges (normale Batch-Verarbeitung)
STANDARD_EXCHANGES: List[Type[BaseExchange]] = [
# Lang & Schwarz
LSExchange,
# Deutsche Börse
XetraExchange,
FrankfurtExchange,
QuotrixExchange,
# Weitere Börsen
GettexExchange,
StuttgartExchange,
# Börsenag (Düsseldorf, Hamburg, Hannover)
DUSAExchange,
DUSBExchange,
DUSCExchange,
DUSDExchange,
HAMAExchange,
HAMBExchange,
HANAExchange,
HANBExchange,
]
# =============================================================================
# Trades Cache
# =============================================================================
# Cache für existierende Trades pro Tag (wird nach jedem Exchange geleert)
_existing_trades_cache = {}
def get_trade_hash(trade):
"""Erstellt einen eindeutigen Hash für einen Trade."""
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
return hashlib.md5(key.encode()).hexdigest()
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=1000):
"""Filtert neue Trades in Batches, um RAM zu sparen. Verwendet Batch-Queries statt einzelne Checks."""
def get_existing_trades_for_day(db_url, exchange_name, day):
"""Holt existierende Trades für einen Tag aus der DB (mit Caching)."""
cache_key = f"{exchange_name}_{day.strftime('%Y-%m-%d')}"
if cache_key in _existing_trades_cache:
return _existing_trades_cache[cache_key]
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
day_end = day + datetime.timedelta(days=1)
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
query = f"""
SELECT isin, timestamp, price, quantity
FROM trades
WHERE exchange = '{exchange_name}'
AND timestamp >= '{day_start_str}'
AND timestamp < '{day_end_str}'
"""
existing_trades = set()
try:
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60)
if response.status_code == 200:
data = response.json()
if data.get('dataset'):
for row in data['dataset']:
isin, ts, price, qty = row
if isinstance(ts, str):
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
key = (isin, ts_dt.isoformat(), float(price), float(qty))
existing_trades.add(key)
except Exception as e:
logger.warning(f"Error fetching existing trades for {day}: {e}")
_existing_trades_cache[cache_key] = existing_trades
return existing_trades
def clear_trades_cache():
"""Leert den Cache für existierende Trades."""
global _existing_trades_cache
_existing_trades_cache = {}
def filter_new_trades_for_day(db_url, exchange_name, trades, day):
"""Filtert neue Trades für einen einzelnen Tag."""
if not trades:
return []
new_trades = []
total_batches = (len(trades) + batch_size - 1) // batch_size
existing = get_existing_trades_for_day(db_url, exchange_name, day)
for batch_idx in range(0, len(trades), batch_size):
batch = trades[batch_idx:batch_idx + batch_size]
batch_num = (batch_idx // batch_size) + 1
if batch_num % 10 == 0 or batch_num == 1:
logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} trades)...")
# Gruppiere Trades nach Tag für effizientere Queries
trades_by_day = {}
for trade in batch:
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
if day not in trades_by_day:
trades_by_day[day] = []
trades_by_day[day].append(trade)
# Prüfe jeden Tag separat
for day, day_trades in trades_by_day.items():
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
day_end = day + datetime.timedelta(days=1)
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
# Hole alle existierenden Trades für diesen Tag
query = f"""
SELECT isin, timestamp, price, quantity
FROM trades
WHERE exchange = '{exchange_name}'
AND timestamp >= '{day_start_str}'
AND timestamp < '{day_end_str}'
"""
try:
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=30)
if response.status_code == 200:
data = response.json()
existing_trades = set()
if data.get('dataset'):
for row in data['dataset']:
isin, ts, price, qty = row
# Normalisiere Timestamp für Vergleich
if isinstance(ts, str):
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
# Erstelle Vergleichs-Key (ohne Hash, direkter Vergleich)
key = (isin, ts_dt.isoformat(), float(price), float(qty))
existing_trades.add(key)
# Prüfe welche Trades neu sind
for trade in day_trades:
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
if trade_key not in existing_trades:
new_trades.append(trade)
else:
# Bei Fehler: alle Trades als neu behandeln (sicherer)
logger.warning(f"Query failed for day {day}, treating all trades as new")
new_trades.extend(day_trades)
except Exception as e:
# Bei Fehler: alle Trades als neu behandeln (sicherer)
logger.warning(f"Error checking trades for day {day}: {e}, treating all trades as new")
new_trades.extend(day_trades)
# Kleine Pause zwischen Batches, um DB nicht zu überlasten
if batch_idx + batch_size < len(trades):
time.sleep(0.05)
new_trades = []
for trade in trades:
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
if trade_key not in existing:
new_trades.append(trade)
return new_trades
def get_last_trade_timestamp(db_url, exchange_name):
# QuestDB query: get the latest timestamp for a specific exchange
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=5000):
"""Filtert neue Trades in Batches, gruppiert nach Tag."""
if not trades:
return []
# Gruppiere alle Trades nach Tag
trades_by_day = {}
for trade in trades:
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
if day not in trades_by_day:
trades_by_day[day] = []
trades_by_day[day].append(trade)
new_trades = []
total_days = len(trades_by_day)
for i, (day, day_trades) in enumerate(sorted(trades_by_day.items()), 1):
if i % 10 == 0 or i == 1:
logger.info(f"Checking day {i}/{total_days}: {day.strftime('%Y-%m-%d')} ({len(day_trades)} trades)...")
new_for_day = filter_new_trades_for_day(db_url, exchange_name, day_trades, day)
new_trades.extend(new_for_day)
# Kleine Pause um DB nicht zu überlasten
if i < total_days:
time.sleep(0.02)
return new_trades
def get_last_trade_timestamp(db_url: str, exchange_name: str) -> datetime.datetime:
"""Holt den Timestamp des letzten Trades für eine Exchange aus QuestDB."""
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
try:
# Using the /exec endpoint to get data
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data['dataset']:
# QuestDB returns timestamp in micros since epoch by default in some views, or ISO
# Let's assume the timestamp is in the dataset
# ILP timestamps are stored as designated timestamps.
ts_value = data['dataset'][0][0] # Adjust index based on column order
if data.get('dataset'):
# QuestDB gibt Timestamps in Mikrosekunden oder ISO-Format zurück
ts_value = data['dataset'][0][0]
if isinstance(ts_value, str):
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
else:
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
except Exception as e:
logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}")
logger.debug(f"Keine existierenden Daten für {exchange_name} oder DB nicht erreichbar: {e}")
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
def process_eix_streaming(db, db_url: str, exchange: BaseExchange, historical: bool = False):
"""Verarbeitet eine Exchange im Streaming-Modus um RAM zu sparen."""
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts}) - STREAMING...")
# Hole Liste der zu verarbeitenden Dateien
if historical:
files = exchange.get_files_to_process(limit=None, since_date=None)
else:
files = exchange.get_files_to_process(limit=None, since_date=last_ts)
if not files:
logger.info(f"Keine {exchange.name} Dateien zu verarbeiten.")
return
logger.info(f"{len(files)} {exchange.name} Dateien gefunden...")
total_new = 0
total_processed = 0
for i, file_item in enumerate(files, 1):
file_name = file_item.get('fileName', 'unknown').split('/')[-1]
logger.info(f"Verarbeite {exchange.name} Datei {i}/{len(files)}: {file_name}")
trades = exchange.fetch_trades_from_file(file_item)
if not trades:
logger.info(f" Keine Trades in {file_name}")
continue
total_processed += len(trades)
logger.info(f" {len(trades)} Trades geladen, filtere Duplikate...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
if new_trades:
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
total_new += len(new_trades)
logger.info(f" {len(new_trades)} neue Trades gespeichert (gesamt neu: {total_new})")
else:
logger.info(f" Keine neuen Trades in dieser Datei")
# Referenzen freigeben
del trades
del new_trades
time.sleep(0.1)
logger.info(f"{exchange.name} fertig: {total_new} neue Trades von {total_processed} verarbeitet.")
clear_trades_cache()
def process_standard_exchange(db, db_url: str, exchange: BaseExchange, historical: bool):
"""Verarbeitet einen Standard-Exchange mit Batch-Verarbeitung."""
try:
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts})...")
trades = exchange.fetch_latest_trades(include_yesterday=historical)
if not trades:
logger.info(f"Keine Trades von {exchange.name} erhalten.")
return
# Deduplizierung
logger.info(f"Filtere {len(trades)} Trades auf Duplikate...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
logger.info(f"Gefunden: {len(trades)} Trades gesamt, {len(new_trades)} sind neu.")
if new_trades:
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
logger.info(f"{len(new_trades)} neue Trades in QuestDB gespeichert.")
# Referenzen freigeben
del trades
if new_trades:
del new_trades
clear_trades_cache()
except Exception as e:
logger.error(f"Fehler bei Exchange {exchange.name}: {e}")
def run_task(historical=False):
logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...")
# Initialize exchanges
eix = EIXExchange()
ls = LSExchange()
# Neue Deutsche Börse Exchanges
xetra = XetraExchange()
frankfurt = FrankfurtExchange()
quotrix = QuotrixExchange()
gettex = GettexExchange()
stuttgart = StuttgartExchange()
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
dusa = DUSAExchange()
dusb = DUSBExchange()
dusc = DUSCExchange()
dusd = DUSDExchange()
hama = HAMAExchange()
hamb = HAMBExchange()
hana = HANAExchange()
hanb = HANBExchange()
# Pass last_ts to fetcher to allow smart filtering
# daemon.py runs daily, so we want to fetch everything since DB state
# BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument
# We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop.
# We will modify the loop below to handle args dynamically
exchanges_to_process = [
(eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical
(ls, {'include_yesterday': historical}),
# Deutsche Börse Exchanges
(xetra, {'include_yesterday': historical}),
(frankfurt, {'include_yesterday': historical}),
(quotrix, {'include_yesterday': historical}),
(gettex, {'include_yesterday': historical}),
(stuttgart, {'include_yesterday': historical}),
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
(dusa, {'include_yesterday': historical}),
(dusb, {'include_yesterday': historical}),
(dusc, {'include_yesterday': historical}),
(dusd, {'include_yesterday': historical}),
(hama, {'include_yesterday': historical}),
(hamb, {'include_yesterday': historical}),
(hana, {'include_yesterday': historical}),
(hanb, {'include_yesterday': historical}),
]
"""Haupttask: Holt Trades von allen registrierten Exchanges."""
logger.info(f"Starte Trading Data Fetcher (Historical: {historical})...")
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
for exchange, args in exchanges_to_process:
try:
db_url = "http://questdb:9000"
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...")
# Special handling for EIX to support smart filtering
call_args = args.copy()
if exchange.name == "EIX" and not historical:
call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc)
# Remove limit if we are filtering by date to ensure we get everything
if 'limit' in call_args:
call_args.pop('limit')
trades = exchange.fetch_latest_trades(**call_args)
if not trades:
logger.info(f"No trades fetched from {exchange.name}.")
continue
# Hash-basierte Deduplizierung - Batch-Verarbeitung um RAM zu sparen
logger.info(f"Filtering {len(trades)} trades for duplicates (batch processing)...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=500)
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
if new_trades:
# Sort trades by timestamp before saving (QuestDB likes this)
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
logger.info(f"Stored {len(new_trades)} new trades in QuestDB.")
except Exception as e:
logger.error(f"Error processing exchange {exchange.name}: {e}")
def main():
logger.info("Trading Daemon started.")
# 1. Startup Check: Ist die DB leer?
db_url = "http://questdb:9000"
is_empty = True
# Streaming-Exchanges verarbeiten (große Datenmengen)
for exchange_class in STREAMING_EXCHANGES:
try:
exchange = exchange_class()
logger.info(f"Verarbeite {exchange.name} im Streaming-Modus...")
process_eix_streaming(db, db_url, exchange, historical=historical)
except Exception as e:
logger.error(f"Fehler bei Streaming-Exchange {exchange_class.__name__}: {e}")
# Standard-Exchanges verarbeiten
for exchange_class in STANDARD_EXCHANGES:
try:
exchange = exchange_class()
process_standard_exchange(db, db_url, exchange, historical)
except Exception as e:
logger.error(f"Fehler bei Exchange {exchange_class.__name__}: {e}")
logger.info("Alle Exchanges verarbeitet.")
def is_database_empty(db_url: str) -> bool:
"""Prüft ob die Datenbank leer ist oder die Tabelle nicht existiert."""
try:
# Prüfe ob bereits Trades in der Tabelle sind
response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data['dataset'] and data['dataset'][0][0] > 0:
is_empty = False
if data.get('dataset') and data['dataset'][0][0] > 0:
return False
except Exception:
# Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist
is_empty = True
pass
return True
if is_empty:
logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...")
def calculate_seconds_until_target(target_hour: int, target_minute: int = 0) -> int:
"""Berechnet Sekunden bis zur nächsten Zielzeit."""
now = datetime.datetime.now()
target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
# Wenn Zielzeit heute schon vorbei ist, nimm morgen
if target <= now:
target += datetime.timedelta(days=1)
return int((target - now).total_seconds())
def main():
logger.info("Trading Daemon gestartet.")
db_url = "http://questdb:9000"
# Startup: Initialer Sync
if is_database_empty(db_url):
logger.info("Datenbank ist leer. Starte initialen historischen Fetch...")
run_task(historical=True)
else:
logger.info("Found existing data in database. Triggering catch-up sync...")
# Run a normal task to fetch any missing data since the last run
logger.info("Existierende Daten gefunden. Starte Catch-up Sync...")
run_task(historical=False)
logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.")
logger.info("Catch-up Sync abgeschlossen.")
# Scheduling Konfiguration
SCHEDULE_HOUR = 23
SCHEDULE_MINUTE = 0
last_run_date = None
logger.info(f"Warte auf täglichen Run um {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d}...")
while True:
now = datetime.datetime.now()
# Täglich um 23:00 Uhr
if now.hour == 23 and now.minute == 0:
run_task(historical=False)
# Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern
time.sleep(61)
today = now.date()
# Check alle 30 Sekunden
time.sleep(30)
# Prüfe ob wir heute schon gelaufen sind
already_ran_today = (last_run_date == today)
# Prüfe ob wir im Zeitfenster sind (23:00 - 23:59)
in_schedule_window = (now.hour == SCHEDULE_HOUR and now.minute >= SCHEDULE_MINUTE)
if in_schedule_window and not already_ran_today:
logger.info(f"Geplanter Task startet ({now.strftime('%Y-%m-%d %H:%M:%S')})...")
run_task(historical=False)
last_run_date = today
logger.info("Geplanter Task abgeschlossen. Warte auf nächsten Tag...")
# Dynamische Sleep-Zeit: Kurz vor Zielzeit öfter prüfen
seconds_until_target = calculate_seconds_until_target(SCHEDULE_HOUR, SCHEDULE_MINUTE)
if seconds_until_target > 3600:
# Mehr als 1 Stunde: Schlafe 30 Minuten
sleep_time = 1800
elif seconds_until_target > 300:
# 5 Minuten bis 1 Stunde: Schlafe 5 Minuten
sleep_time = 300
else:
# Unter 5 Minuten: Schlafe 30 Sekunden
sleep_time = 30
time.sleep(sleep_time)
if __name__ == "__main__":
main()

View File

@@ -1,5 +0,0 @@
import gzip
import json
with gzip.open("DGAT-posttrade-2026-01-29T14_07.json.gz", mode="rt") as f:
data = [json.loads(line) for line in f]
print (str(data))

79
scripts/inspect_gzip.py Normal file
View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""
Utility-Script zum Inspizieren von gzip-komprimierten JSON-Dateien.
Verarbeitet Dateien streaming, ohne alles in den RAM zu laden.
Verwendung:
python scripts/inspect_gzip.py <datei.json.gz> [--limit N] [--output datei.json]
"""
import gzip
import json
import argparse
import sys
from pathlib import Path
def inspect_gzip_file(filepath: str, limit: int = None, output_file: str = None):
"""
Liest eine gzip-komprimierte NDJSON-Datei und gibt die Inhalte aus.
Args:
filepath: Pfad zur .json.gz Datei
limit: Maximale Anzahl der auszugebenden Records (None = alle)
output_file: Optional: Ausgabe in Datei statt stdout
"""
path = Path(filepath)
if not path.exists():
print(f"Fehler: Datei '{filepath}' nicht gefunden.", file=sys.stderr)
return 1
count = 0
output = open(output_file, 'w', encoding='utf-8') if output_file else sys.stdout
try:
with gzip.open(filepath, mode='rt', encoding='utf-8') as f:
for line in f:
if not line.strip():
continue
try:
record = json.loads(line)
# Pretty-print einzelner Record
json.dump(record, output, indent=2, ensure_ascii=False)
output.write('\n')
count += 1
if limit and count >= limit:
break
except json.JSONDecodeError as e:
print(f"JSON-Fehler in Zeile {count + 1}: {e}", file=sys.stderr)
continue
print(f"\n--- {count} Records verarbeitet ---", file=sys.stderr)
finally:
if output_file and output != sys.stdout:
output.close()
return 0
def main():
parser = argparse.ArgumentParser(
description='Inspiziert gzip-komprimierte JSON-Dateien (NDJSON-Format)'
)
parser.add_argument('file', help='Pfad zur .json.gz Datei')
parser.add_argument('--limit', '-n', type=int, default=10,
help='Maximale Anzahl der Records (default: 10, 0 = alle)')
parser.add_argument('--output', '-o', type=str,
help='Ausgabe in Datei statt stdout')
args = parser.parse_args()
limit = args.limit if args.limit > 0 else None
return inspect_gzip_file(args.file, limit=limit, output_file=args.output)
if __name__ == '__main__':
sys.exit(main())

View File

@@ -865,6 +865,39 @@ class AnalyticsWorker:
if i % 10 == 0:
time.sleep(1)
def delete_analytics_for_date(self, date: datetime.date):
"""Löscht alle Analytics-Daten für ein bestimmtes Datum, damit sie neu berechnet werden können."""
date_str = date.strftime('%Y-%m-%d')
next_day = date + datetime.timedelta(days=1)
next_day_str = next_day.strftime('%Y-%m-%d')
tables = ['analytics_custom', 'analytics_exchange_daily', 'analytics_daily_summary']
for table in tables:
try:
# QuestDB DELETE syntax
delete_query = f"DELETE FROM {table} WHERE timestamp >= '{date_str}' AND timestamp < '{next_day_str}'"
response = requests.get(
f"{self.questdb_url}/exec",
params={'query': delete_query},
auth=self.auth,
timeout=30
)
if response.status_code == 200:
logger.debug(f"Deleted old analytics from {table} for {date}")
except Exception as e:
logger.debug(f"Could not delete from {table} for {date}: {e}")
def force_recalculate_date(self, date: datetime.date):
"""Erzwingt Neuberechnung der Analytics für ein Datum (löscht alte Daten zuerst)."""
logger.info(f"Force recalculating analytics for {date}...")
# Lösche alte Analytics-Daten für dieses Datum
self.delete_analytics_for_date(date)
# Berechne neu
self.process_date(date)
def run(self):
"""Hauptschleife des Workers"""
logger.info("Analytics Worker started.")
@@ -874,35 +907,26 @@ class AnalyticsWorker:
logger.error("Failed to connect to QuestDB. Exiting.")
return
# Initiale Berechnung fehlender Tage (inkl. gestern und heute)
# Initiale Berechnung fehlender Tage
logger.info("Checking for missing dates...")
self.process_missing_dates()
# Stelle sicher, dass gestern und heute verarbeitet werden
# IMMER heute und gestern neu berechnen (da neue Trades hinzukommen können)
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
logger.info(f"Ensuring yesterday ({yesterday}) and today ({today}) are processed...")
# Prüfe alle drei Tabellen
existing_custom = self.get_existing_dates('analytics_custom')
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
existing_summary = self.get_existing_dates('analytics_daily_summary')
existing_dates = existing_custom | existing_exchange | existing_summary
logger.info(f"Force recalculating yesterday ({yesterday}) and today ({today}) - new trades may have been added...")
if yesterday not in existing_dates:
logger.info(f"Processing yesterday's data: {yesterday}")
self.process_date(yesterday)
# Gestern immer neu berechnen
self.force_recalculate_date(yesterday)
# Heute wird verarbeitet, wenn es bereits Trades gibt
if today not in existing_dates:
# Prüfe ob es heute schon Trades gibt
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
data = self.query_questdb(query)
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
logger.info(f"Found trades for today ({today}), processing...")
self.process_date(today)
else:
logger.info(f"No trades found for today ({today}) yet, will process later")
# Heute nur wenn es Trades gibt
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
data = self.query_questdb(query)
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
self.force_recalculate_date(today)
else:
logger.info(f"No trades found for today ({today}) yet, will process later")
# Hauptschleife: Prüfe regelmäßig auf fehlende Tage
logger.info("Starting main loop - checking for missing dates every hour...")
@@ -917,32 +941,24 @@ class AnalyticsWorker:
self.process_missing_dates()
last_check_hour = current_hour
# Stelle sicher, dass gestern und heute verarbeitet wurden
# IMMER heute und gestern neu berechnen
today = now.date()
yesterday = today - datetime.timedelta(days=1)
# Prüfe alle drei Tabellen
existing_custom = self.get_existing_dates('analytics_custom')
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
existing_summary = self.get_existing_dates('analytics_daily_summary')
existing_dates = existing_custom | existing_exchange | existing_summary
if yesterday not in existing_dates:
logger.info(f"Processing yesterday's data: {yesterday}")
self.process_date(yesterday)
logger.info(f"Hourly recalculation of yesterday ({yesterday}) and today ({today})...")
self.force_recalculate_date(yesterday)
# Prüfe heute, ob es Trades gibt
if today not in existing_dates:
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
data = self.query_questdb(query)
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
logger.info(f"Found trades for today ({today}), processing...")
self.process_date(today)
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
data = self.query_questdb(query)
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
self.force_recalculate_date(today)
# Prüfe ob es Mitternacht ist (00:00) - verarbeite dann gestern
if now.hour == 0 and now.minute == 0:
yesterday = (now - datetime.timedelta(days=1)).date()
logger.info(f"Midnight reached - processing yesterday's data: {yesterday}")
self.process_date(yesterday)
logger.info(f"Midnight reached - force recalculating yesterday's data: {yesterday}")
self.force_recalculate_date(yesterday)
# Warte 61s, um Mehrfachausführung zu verhindern
time.sleep(61)

View File

@@ -2,16 +2,20 @@ import requests
import gzip
import json
import io
import re
import time
import logging
import threading
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from .base import BaseExchange, Trade
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
# Rate-Limiting Konfiguration
RATE_LIMIT_DELAY = 0.5 # Sekunden zwischen Requests
RATE_LIMIT_RETRY_DELAY = 5 # Sekunden Wartezeit bei 429
MAX_RETRIES = 3 # Maximale Wiederholungen bei 429
MAX_RETRIES = 5 # Maximale Wiederholungen bei 429
# API URLs für Deutsche Börse
API_URLS = {
@@ -21,17 +25,47 @@ API_URLS = {
}
DOWNLOAD_BASE_URL = "https://mfs.deutsche-boerse.com/api/download"
# Browser User-Agent für Zugriff
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, application/gzip, */*',
'Referer': 'https://mfs.deutsche-boerse.com/',
}
# Liste von User-Agents für Rotation bei Rate-Limiting
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0',
]
class UserAgentRotator:
"""Thread-safe User-Agent Rotation"""
def __init__(self):
self._index = 0
self._lock = threading.Lock()
def get_headers(self, rotate: bool = False) -> dict:
"""Gibt Headers mit aktuellem User-Agent zurück. Bei rotate=True wird zum nächsten gewechselt."""
with self._lock:
if rotate:
self._index = (self._index + 1) % len(USER_AGENTS)
return {
'User-Agent': USER_AGENTS[self._index],
'Accept': 'application/json, application/gzip, */*',
'Referer': 'https://mfs.deutsche-boerse.com/',
}
# Globale Instanz für User-Agent Rotation
_ua_rotator = UserAgentRotator()
class DeutscheBoerseBase(BaseExchange):
"""Basisklasse für Deutsche Börse Exchanges (Xetra, Frankfurt, Quotrix)"""
# Regex für Dateinamen-Parsing (kompiliert für Performance)
_FILENAME_PATTERN = re.compile(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})')
@property
def base_url(self) -> str:
"""Override in subclasses"""
@@ -46,60 +80,73 @@ class DeutscheBoerseBase(BaseExchange):
"""API URL für die Dateiliste"""
return API_URLS.get(self.name, self.base_url)
def _handle_rate_limit(self, retry: int, context: str) -> None:
"""Zentrale Rate-Limit Behandlung: rotiert User-Agent und wartet."""
_ua_rotator.get_headers(rotate=True)
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
logger.warning(f"[{self.name}] Rate limited ({context}), rotating User-Agent and waiting {wait_time}s... (retry {retry + 1}/{MAX_RETRIES})")
time.sleep(wait_time)
def _get_file_list(self) -> List[str]:
"""Holt die Dateiliste von der JSON API"""
try:
api_url = self.api_url
print(f"[{self.name}] Fetching file list from: {api_url}")
response = requests.get(api_url, headers=HEADERS, timeout=30)
response.raise_for_status()
data = response.json()
files = data.get('CurrentFiles', [])
print(f"[{self.name}] API returned {len(files)} files")
if files:
print(f"[{self.name}] Sample files: {files[:3]}")
return files
except Exception as e:
print(f"[{self.name}] Error fetching file list from API: {e}")
import traceback
print(f"[{self.name}] Traceback: {traceback.format_exc()}")
return []
api_url = self.api_url
for retry in range(MAX_RETRIES):
try:
headers = _ua_rotator.get_headers(rotate=(retry > 0))
logger.info(f"[{self.name}] Fetching file list from: {api_url}")
response = requests.get(api_url, headers=headers, timeout=30)
if response.status_code == 429:
self._handle_rate_limit(retry, "file list")
continue
response.raise_for_status()
data = response.json()
files = data.get('CurrentFiles', [])
logger.info(f"[{self.name}] API returned {len(files)} files")
if files:
logger.debug(f"[{self.name}] Sample files: {files[:3]}")
return files
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
self._handle_rate_limit(retry, "file list HTTPError")
continue
logger.error(f"[{self.name}] HTTP error fetching file list: {e}")
break
except Exception as e:
logger.exception(f"[{self.name}] Error fetching file list from API: {e}")
break
return []
def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]:
"""
Filtert Dateien für ein bestimmtes Datum.
Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz (mit Unterstrich!)
Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz
Da Handel bis 22:00 MEZ geht (21:00/20:00 UTC), müssen wir auch
Dateien nach Mitternacht UTC berücksichtigen.
"""
import re
filtered = []
# Für den Vortag: Dateien vom target_date UND vom Folgetag (bis ~02:00 UTC)
target_str = target_date.strftime('%Y-%m-%d')
next_day = target_date + timedelta(days=1)
next_day_str = next_day.strftime('%Y-%m-%d')
for file in files:
# Extrahiere Datum aus Dateiname
# Format: DETR-posttrade-2026-01-26T21_30.json.gz
if target_str in file:
filtered.append(file)
elif next_day_str in file:
# Prüfe ob es eine frühe Datei vom nächsten Tag ist (< 03:00 UTC)
try:
# Finde Timestamp im Dateinamen mit Unterstrich für Minuten
match = re.search(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})', file)
if match:
hour = int(match.group(2))
if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag
filtered.append(file)
except Exception:
pass
match = self._FILENAME_PATTERN.search(file)
if match:
hour = int(match.group(2))
if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag
filtered.append(file)
return filtered
@@ -110,17 +157,14 @@ class DeutscheBoerseBase(BaseExchange):
for retry in range(MAX_RETRIES):
try:
response = requests.get(full_url, headers=HEADERS, timeout=60)
headers = _ua_rotator.get_headers(rotate=(retry > 0))
response = requests.get(full_url, headers=headers, timeout=60)
if response.status_code == 404:
# Datei nicht gefunden - normal für alte Dateien
return []
if response.status_code == 429:
# Rate-Limit erreicht - warten und erneut versuchen
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
self._handle_rate_limit(retry, "download")
continue
response.raise_for_status()
@@ -130,13 +174,11 @@ class DeutscheBoerseBase(BaseExchange):
content = f.read().decode('utf-8')
if not content.strip():
# Leere Datei
return []
# NDJSON Format: Eine JSON-Zeile pro Trade
lines = content.strip().split('\n')
if not lines or (len(lines) == 1 and not lines[0].strip()):
# Leere Datei
return []
for line in lines:
@@ -147,116 +189,146 @@ class DeutscheBoerseBase(BaseExchange):
trade = self._parse_trade_record(record)
if trade:
trades.append(trade)
except json.JSONDecodeError:
continue
except Exception:
continue
except json.JSONDecodeError as e:
logger.debug(f"[{self.name}] JSON decode error in {filename}: {e}")
except Exception as e:
logger.debug(f"[{self.name}] Error parsing record in {filename}: {e}")
# Erfolg - keine weitere Retry nötig
# Erfolg
break
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
self._handle_rate_limit(retry, "download HTTPError")
continue
elif e.response.status_code != 404:
print(f"[{self.name}] HTTP error downloading {filename}: {e}")
logger.error(f"[{self.name}] HTTP error downloading {filename}: {e}")
break
except Exception as e:
print(f"[{self.name}] Error downloading/parsing {filename}: {e}")
logger.error(f"[{self.name}] Error downloading/parsing {filename}: {e}")
break
return trades
def _parse_timestamp(self, ts_str: str) -> Optional[datetime]:
"""
Parst einen Timestamp-String in ein datetime-Objekt.
Unterstützt Nanosekunden durch Kürzung auf Mikrosekunden.
"""
if not ts_str:
return None
# Ersetze 'Z' durch '+00:00' für ISO-Kompatibilität
ts_str = ts_str.replace('Z', '+00:00')
# Kürze Nanosekunden auf Mikrosekunden (Python max 6 Dezimalstellen)
if '.' in ts_str:
# Split bei '+' oder '-' für Timezone
if '+' in ts_str:
time_part, tz_part = ts_str.rsplit('+', 1)
tz_part = '+' + tz_part
elif ts_str.count('-') > 2: # Negative Timezone
time_part, tz_part = ts_str.rsplit('-', 1)
tz_part = '-' + tz_part
else:
time_part, tz_part = ts_str, ''
if '.' in time_part:
base, frac = time_part.split('.')
frac = frac[:6] # Kürze auf 6 Stellen
ts_str = f"{base}.{frac}{tz_part}"
return datetime.fromisoformat(ts_str)
def _extract_price(self, record: dict) -> Optional[float]:
"""Extrahiert den Preis aus verschiedenen JSON-Formaten."""
# Neues Format
if 'lastTrade' in record:
return float(record['lastTrade'])
# Altes Format mit verschachteltem Pric-Objekt
pric = record.get('Pric')
if pric is None:
return None
if isinstance(pric, (int, float)):
return float(pric)
if isinstance(pric, dict):
# Versuche verschiedene Pfade
if 'Pric' in pric:
inner = pric['Pric']
if isinstance(inner, dict):
amt = inner.get('MntryVal', {}).get('Amt') or inner.get('Amt')
if amt is not None:
return float(amt)
if 'MntryVal' in pric:
amt = pric['MntryVal'].get('Amt')
if amt is not None:
return float(amt)
return None
def _extract_quantity(self, record: dict) -> Optional[float]:
"""Extrahiert die Menge aus verschiedenen JSON-Formaten."""
# Neues Format
if 'lastQty' in record:
return float(record['lastQty'])
# Altes Format
qty = record.get('Qty')
if qty is None:
return None
if isinstance(qty, (int, float)):
return float(qty)
if isinstance(qty, dict):
val = qty.get('Unit') or qty.get('Qty')
if val is not None:
return float(val)
return None
def _parse_trade_record(self, record: dict) -> Optional[Trade]:
"""
Parst einen einzelnen Trade-Record aus dem JSON.
Aktuelles JSON-Format (NDJSON):
{
"messageId": "posttrade",
"sourceName": "GAT",
"isin": "US00123Q1040",
"lastTradeTime": "2026-01-29T14:07:00.419000000Z",
"lastTrade": 10.145,
"lastQty": 500.0,
"currency": "EUR",
...
}
Unterstützte Formate:
- Neues Format: isin, lastTrade, lastQty, lastTradeTime
- Altes Format: FinInstrmId.Id, Pric, Qty, TrdDt/TrdTm
"""
try:
# ISIN extrahieren - neues Format verwendet 'isin' lowercase
isin = record.get('isin') or record.get('ISIN') or record.get('instrumentId') or record.get('FinInstrmId', {}).get('Id', '')
# ISIN extrahieren
isin = (
record.get('isin') or
record.get('ISIN') or
record.get('instrumentId') or
record.get('FinInstrmId', {}).get('Id', '')
)
if not isin:
return None
# Preis extrahieren - neues Format: 'lastTrade'
price = None
if 'lastTrade' in record:
price = float(record['lastTrade'])
elif 'Pric' in record:
pric = record['Pric']
if isinstance(pric, dict):
if 'Pric' in pric:
inner = pric['Pric']
if 'MntryVal' in inner:
price = float(inner['MntryVal'].get('Amt', 0))
elif 'Amt' in inner:
price = float(inner['Amt'])
elif 'MntryVal' in pric:
price = float(pric['MntryVal'].get('Amt', 0))
elif isinstance(pric, (int, float)):
price = float(pric)
# Preis extrahieren
price = self._extract_price(record)
if price is None or price <= 0:
return None
# Menge extrahieren - neues Format: 'lastQty'
quantity = None
if 'lastQty' in record:
quantity = float(record['lastQty'])
elif 'Qty' in record:
qty = record['Qty']
if isinstance(qty, dict):
quantity = float(qty.get('Unit', qty.get('Qty', 0)))
elif isinstance(qty, (int, float)):
quantity = float(qty)
# Menge extrahieren
quantity = self._extract_quantity(record)
if quantity is None or quantity <= 0:
return None
# Timestamp extrahieren - neues Format: 'lastTradeTime'
# Timestamp extrahieren
timestamp = None
if 'lastTradeTime' in record:
ts_str = record['lastTradeTime']
# Format: "2026-01-29T14:07:00.419000000Z"
# Python kann max 6 Dezimalstellen, also kürzen
if '.' in ts_str:
parts = ts_str.replace('Z', '').split('.')
if len(parts) == 2 and len(parts[1]) > 6:
ts_str = parts[0] + '.' + parts[1][:6] + '+00:00'
else:
ts_str = ts_str.replace('Z', '+00:00')
else:
ts_str = ts_str.replace('Z', '+00:00')
timestamp = datetime.fromisoformat(ts_str)
timestamp = self._parse_timestamp(record['lastTradeTime'])
else:
# Fallback für altes Format
trd_dt = record.get('TrdDt', '')
trd_tm = record.get('TrdTm', '00:00:00')
if not trd_dt:
return None
ts_str = f"{trd_dt}T{trd_tm}"
if '.' in ts_str:
parts = ts_str.split('.')
if len(parts[1]) > 6:
ts_str = parts[0] + '.' + parts[1][:6]
timestamp = datetime.fromisoformat(ts_str)
if trd_dt:
timestamp = self._parse_timestamp(f"{trd_dt}T{trd_tm}")
if timestamp is None:
return None
@@ -273,22 +345,41 @@ class DeutscheBoerseBase(BaseExchange):
timestamp=timestamp
)
except Exception as e:
# Debug: Zeige ersten fehlgeschlagenen Record
except (ValueError, TypeError, KeyError) as e:
logger.debug(f"[{self.name}] Failed to parse trade record: {e}")
return None
def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date:
"""
Findet den letzten Handelstag (überspringt Wochenenden).
Findet den letzten Handelstag (überspringt Wochenenden und bekannte Feiertage).
Montag=0, Sonntag=6
"""
# Deutsche Börsen-Feiertage (fixe Daten, jedes Jahr gleich)
# Bewegliche Feiertage (Ostern etc.) müssten jährlich berechnet werden
fixed_holidays = {
(1, 1), # Neujahr
(5, 1), # Tag der Arbeit
(12, 24), # Heiligabend
(12, 25), # 1. Weihnachtstag
(12, 26), # 2. Weihnachtstag
(12, 31), # Silvester
}
date = from_date
# Wenn Samstag (5), gehe zurück zu Freitag
if date.weekday() == 5:
date = date - timedelta(days=1)
# Wenn Sonntag (6), gehe zurück zu Freitag
elif date.weekday() == 6:
date = date - timedelta(days=2)
max_iterations = 10 # Sicherheit gegen Endlosschleife
for _ in range(max_iterations):
# Wochenende überspringen
if date.weekday() == 5: # Samstag
date = date - timedelta(days=1)
elif date.weekday() == 6: # Sonntag
date = date - timedelta(days=2)
# Feiertag überspringen
elif (date.month, date.day) in fixed_holidays:
date = date - timedelta(days=1)
else:
break
return date
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
@@ -304,40 +395,36 @@ class DeutscheBoerseBase(BaseExchange):
# Standard: Vortag
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
# Überspringe Wochenenden
# Überspringe Wochenenden und Feiertage
original_date = target_date
target_date = self._get_last_trading_day(target_date)
if target_date != original_date:
print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}")
logger.info(f"[{self.name}] Adjusted date: {original_date} -> {target_date} (weekend/holiday)")
print(f"[{self.name}] Fetching trades for date: {target_date}")
logger.info(f"[{self.name}] Fetching trades for date: {target_date}")
# Hole Dateiliste von der API
files = self._get_file_list()
if not files:
print(f"[{self.name}] No files available from API")
logger.warning(f"[{self.name}] No files available from API")
return []
# Dateien für Zieldatum filtern
target_files = self._filter_files_for_date(files, target_date)
print(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)")
logger.info(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)")
if not target_files:
print(f"[{self.name}] No files for target date found")
logger.warning(f"[{self.name}] No files for target date found")
return []
# Alle passenden Dateien herunterladen und parsen (mit Rate-Limiting)
# Alle passenden Dateien herunterladen und parsen
successful = 0
failed = 0
total_files = len(target_files)
if total_files == 0:
print(f"[{self.name}] No files to download for date {target_date}")
return []
print(f"[{self.name}] Starting download of {total_files} files...")
logger.info(f"[{self.name}] Starting download of {total_files} files...")
for i, file in enumerate(target_files):
trades = self._download_and_parse_file(file)
@@ -353,9 +440,9 @@ class DeutscheBoerseBase(BaseExchange):
# Fortschritt alle 100 Dateien
if (i + 1) % 100 == 0:
print(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far")
logger.info(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far")
print(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades")
logger.info(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades")
return all_trades

View File

@@ -1,30 +1,38 @@
import requests
import json
from bs4 import BeautifulSoup
from datetime import datetime
from typing import List
import logging
from datetime import datetime, timezone
from typing import List, Generator, Tuple
from .base import BaseExchange, Trade
import csv
import io
logger = logging.getLogger(__name__)
class EIXExchange(BaseExchange):
"""European Investor Exchange - CSV-basierte Trade-Daten."""
API_BASE_URL = "https://european-investor-exchange.com/api"
@property
def name(self) -> str:
return "EIX"
def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None) -> List[Trade]:
# EIX stores its file list in a separate API endpoint
url = "https://european-investor-exchange.com/api/official-trades"
def get_files_to_process(self, limit: int = 1, since_date: datetime = None) -> List[dict]:
"""Holt die Liste der zu verarbeitenden Dateien ohne sie herunterzuladen."""
url = f"{self.API_BASE_URL}/official-trades"
try:
response = requests.get(url, timeout=15)
response.raise_for_status()
files_list = response.json()
except Exception as e:
print(f"Error fetching EIX file list: {e}")
except requests.exceptions.RequestException as e:
logger.error(f"[{self.name}] Fehler beim Abrufen der Dateiliste: {e}")
return []
except ValueError as e:
logger.error(f"[{self.name}] Ungültiges JSON in Dateiliste: {e}")
return []
# Filter files based on date in filename if since_date provided
# Format: "kursblatt/2025/Kursblatt.2025-07-14.1752526803105.csv"
# Filtere Dateien nach Datum wenn since_date angegeben
filtered_files = []
for item in files_list:
file_key = item.get('fileName')
@@ -33,90 +41,118 @@ class EIXExchange(BaseExchange):
if since_date:
try:
# Extract date from filename: Kursblatt.YYYY-MM-DD
parts = file_key.split('/')[-1].split('.')
# parts example: ['Kursblatt', '2025-07-14', '1752526803105', 'csv']
if len(parts) >= 2:
date_str = parts[1]
file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
# Check if file date is newer than since_date (compare dates only)
if file_date.date() > since_date.date():
if file_date.date() >= since_date.date():
filtered_files.append(item)
continue
# If same day, we might need to check it too, but EIX seems to be daily files
if file_date.date() == since_date.date():
filtered_files.append(item)
continue
except Exception:
# If parsing fails, default to including it (safety) or skipping?
# Let's include it if we are not sure
except (ValueError, IndexError) as e:
# Dateiname hat unerwartetes Format - zur Sicherheit einschließen
logger.debug(f"[{self.name}] Konnte Datum nicht aus {file_key} extrahieren: {e}")
filtered_files.append(item)
else:
filtered_files.append(item)
# Sort files to process oldest to newest if doing a sync, or newest to oldest?
# If we have limit=1 (default), we usually want the newest.
# But if we are syncing history (since_date set), we probably want all of them.
filtered_files.append(item)
# Logic: If since_date is set, we ignore limit (or use it as safety cap) and process ALL new files
if since_date:
files_to_process = filtered_files
# Sort by date ? The API list seems chronological.
return filtered_files
else:
# Default behavior: take the last N files (API returns oldest first usually?)
# Let's assume list is chronological.
if limit:
files_to_process = files_list[-limit:]
else:
files_to_process = files_list
return files_list[-limit:]
return files_list
def fetch_trades_from_file(self, file_item: dict) -> List[Trade]:
"""Lädt und parst eine einzelne CSV-Datei."""
file_key = file_item.get('fileName')
if not file_key:
return []
csv_url = f"{self.API_BASE_URL}/trade-file-contents?key={file_key}"
try:
response = requests.get(csv_url, timeout=60)
response.raise_for_status()
return self._parse_csv(response.text)
except requests.exceptions.RequestException as e:
logger.error(f"[{self.name}] Fehler beim Download von {file_key}: {e}")
except Exception as e:
logger.error(f"[{self.name}] Unerwarteter Fehler bei {file_key}: {e}")
return []
def fetch_trades_streaming(self, limit: int = 1, since_date: datetime = None) -> Generator[Tuple[str, List[Trade]], None, None]:
"""
Generator der Trades dateiweise zurückgibt.
Yields: (filename, trades) Tupel
"""
files = self.get_files_to_process(limit=limit, since_date=since_date)
for item in files:
file_key = item.get('fileName', 'unknown')
trades = self.fetch_trades_from_file(item)
if trades:
yield (file_key, trades)
trades = []
count = 0
for item in files_to_process:
file_key = item.get('fileName')
# Download the CSV
csv_url = f"https://european-investor-exchange.com/api/trade-file-contents?key={file_key}"
try:
csv_response = requests.get(csv_url, timeout=20)
if csv_response.status_code == 200:
trades.extend(self._parse_csv(csv_response.text))
count += 1
# Only enforce limit if since_date is NOT set
if not since_date and limit and count >= limit:
break
except Exception as e:
print(f"Error downloading EIX CSV {file_key}: {e}")
return trades
def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None, **kwargs) -> List[Trade]:
"""
Legacy-Methode für Kompatibilität.
WARNUNG: Lädt alle Trades in den Speicher! Für große Datenmengen fetch_trades_streaming() verwenden.
"""
# Für kleine Requests (limit <= 5) normale Verarbeitung
if limit and limit <= 5 and not since_date:
all_trades = []
for filename, trades in self.fetch_trades_streaming(limit=limit, since_date=since_date):
all_trades.extend(trades)
return all_trades
# Für große Requests: Warnung ausgeben und leere Liste zurückgeben
logger.warning(f"[{self.name}] fetch_latest_trades() mit großem Dataset aufgerufen. Verwende Streaming.")
return []
def _parse_csv(self, csv_text: str) -> List[Trade]:
"""Parst CSV-Text zu Trade-Objekten."""
trades = []
parse_errors = 0
f = io.StringIO(csv_text)
# Header: Trading day & Trading time UTC,Instrument Identifier,Quantity,Unit Price,Price Currency,Venue Identifier,Side
reader = csv.DictReader(f, delimiter=',')
for row in reader:
for row_num, row in enumerate(reader, start=2): # Start bei 2 wegen Header
try:
price = float(row['Unit Price'])
quantity = float(row['Quantity'])
isin = row['Instrument Identifier']
symbol = isin # Often symbol is unknown, use ISIN
time_str = row['Trading day & Trading time UTC']
# Format: 2026-01-22T06:30:00.617Z
# Python 3.11+ supports ISO with Z, otherwise we strip Z
# Preis und Menge validieren
if price <= 0 or quantity <= 0:
logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Preis/Menge: {price}/{quantity}")
parse_errors += 1
continue
ts_str = time_str.replace('Z', '+00:00')
timestamp = datetime.fromisoformat(ts_str)
trades.append(Trade(
exchange=self.name,
symbol=symbol,
symbol=isin,
isin=isin,
price=price,
quantity=quantity,
timestamp=timestamp
))
except Exception:
continue
except KeyError as e:
logger.debug(f"[{self.name}] Zeile {row_num}: Fehlendes Feld {e}")
parse_errors += 1
except ValueError as e:
logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Wert: {e}")
parse_errors += 1
except Exception as e:
logger.warning(f"[{self.name}] Zeile {row_num}: Unerwarteter Fehler: {e}")
parse_errors += 1
if parse_errors > 0:
logger.debug(f"[{self.name}] {parse_errors} Zeilen konnten nicht geparst werden")
return trades