diff --git a/daemon.py b/daemon.py index c3c9140..842924e 100644 --- a/daemon.py +++ b/daemon.py @@ -3,8 +3,10 @@ import logging import datetime import hashlib import os -import gc import requests +from typing import List, Type + +from src.exchanges.base import BaseExchange from src.exchanges.eix import EIXExchange from src.exchanges.ls import LSExchange from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange @@ -26,6 +28,43 @@ DB_USER = os.getenv("DB_USER", "admin") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None + +# ============================================================================= +# Exchange Registry - Neue Börsen hier hinzufügen +# ============================================================================= + +# Exchanges die Streaming-Verarbeitung benötigen (große Datenmengen) +STREAMING_EXCHANGES: List[Type[BaseExchange]] = [ + EIXExchange, +] + +# Standard-Exchanges (normale Batch-Verarbeitung) +STANDARD_EXCHANGES: List[Type[BaseExchange]] = [ + # Lang & Schwarz + LSExchange, + # Deutsche Börse + XetraExchange, + FrankfurtExchange, + QuotrixExchange, + # Weitere Börsen + GettexExchange, + StuttgartExchange, + # Börsenag (Düsseldorf, Hamburg, Hannover) + DUSAExchange, + DUSBExchange, + DUSCExchange, + DUSDExchange, + HAMAExchange, + HAMBExchange, + HANAExchange, + HANBExchange, +] + + +# ============================================================================= +# Trades Cache +# ============================================================================= + # Cache für existierende Trades pro Tag (wird nach jedem Exchange geleert) _existing_trades_cache = {} @@ -77,7 +116,6 @@ def clear_trades_cache(): """Leert den Cache für existierende Trades.""" global _existing_trades_cache _existing_trades_cache = {} - gc.collect() def filter_new_trades_for_day(db_url, exchange_name, trades, day): """Filtert neue Trades für einen einzelnen Tag.""" @@ -123,207 +161,212 @@ def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=5000): return new_trades -def get_last_trade_timestamp(db_url, exchange_name): - # QuestDB query: get the latest timestamp for a specific exchange +def get_last_trade_timestamp(db_url: str, exchange_name: str) -> datetime.datetime: + """Holt den Timestamp des letzten Trades für eine Exchange aus QuestDB.""" query = f"trades where exchange = '{exchange_name}' latest by timestamp" try: - # Using the /exec endpoint to get data response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: data = response.json() - if data['dataset']: - # QuestDB returns timestamp in micros since epoch by default in some views, or ISO - # Let's assume the timestamp is in the dataset - # ILP timestamps are stored as designated timestamps. - ts_value = data['dataset'][0][0] # Adjust index based on column order + if data.get('dataset'): + # QuestDB gibt Timestamps in Mikrosekunden oder ISO-Format zurück + ts_value = data['dataset'][0][0] if isinstance(ts_value, str): return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00')) else: return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc) except Exception as e: - logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}") + logger.debug(f"Keine existierenden Daten für {exchange_name} oder DB nicht erreichbar: {e}") return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) -def process_eix_streaming(db, db_url, eix, historical=False): - """Verarbeitet EIX in Streaming-Modus um RAM zu sparen.""" - last_ts = get_last_trade_timestamp(db_url, eix.name) - logger.info(f"Fetching data from EIX (Last trade: {last_ts}) - STREAMING MODE...") +def process_eix_streaming(db, db_url: str, exchange: BaseExchange, historical: bool = False): + """Verarbeitet eine Exchange im Streaming-Modus um RAM zu sparen.""" + last_ts = get_last_trade_timestamp(db_url, exchange.name) + logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts}) - STREAMING...") # Hole Liste der zu verarbeitenden Dateien if historical: - files = eix.get_files_to_process(limit=None, since_date=None) + files = exchange.get_files_to_process(limit=None, since_date=None) else: - files = eix.get_files_to_process(limit=None, since_date=last_ts) + files = exchange.get_files_to_process(limit=None, since_date=last_ts) if not files: - logger.info("No EIX files to process.") + logger.info(f"Keine {exchange.name} Dateien zu verarbeiten.") return - logger.info(f"Found {len(files)} EIX files to process...") + logger.info(f"{len(files)} {exchange.name} Dateien gefunden...") total_new = 0 total_processed = 0 for i, file_item in enumerate(files, 1): file_name = file_item.get('fileName', 'unknown').split('/')[-1] - logger.info(f"Processing EIX file {i}/{len(files)}: {file_name}") + logger.info(f"Verarbeite {exchange.name} Datei {i}/{len(files)}: {file_name}") - # Lade eine Datei - trades = eix.fetch_trades_from_file(file_item) + trades = exchange.fetch_trades_from_file(file_item) if not trades: - logger.info(f" No trades in file {file_name}") + logger.info(f" Keine Trades in {file_name}") continue total_processed += len(trades) - logger.info(f" Loaded {len(trades)} trades, filtering duplicates...") + logger.info(f" {len(trades)} Trades geladen, filtere Duplikate...") - # Filtere Duplikate - new_trades = filter_new_trades_batch(db_url, eix.name, trades, batch_size=5000) + new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000) if new_trades: new_trades.sort(key=lambda x: x.timestamp) db.save_trades(new_trades) total_new += len(new_trades) - logger.info(f" Saved {len(new_trades)} new trades (total new: {total_new})") + logger.info(f" {len(new_trades)} neue Trades gespeichert (gesamt neu: {total_new})") else: - logger.info(f" No new trades in this file") + logger.info(f" Keine neuen Trades in dieser Datei") - # Speicher freigeben + # Referenzen freigeben del trades del new_trades - gc.collect() - # Kurze Pause zwischen Dateien time.sleep(0.1) - logger.info(f"EIX complete: {total_new} new trades from {total_processed} total processed.") + logger.info(f"{exchange.name} fertig: {total_new} neue Trades von {total_processed} verarbeitet.") clear_trades_cache() +def process_standard_exchange(db, db_url: str, exchange: BaseExchange, historical: bool): + """Verarbeitet einen Standard-Exchange mit Batch-Verarbeitung.""" + try: + last_ts = get_last_trade_timestamp(db_url, exchange.name) + logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts})...") + + trades = exchange.fetch_latest_trades(include_yesterday=historical) + + if not trades: + logger.info(f"Keine Trades von {exchange.name} erhalten.") + return + + # Deduplizierung + logger.info(f"Filtere {len(trades)} Trades auf Duplikate...") + new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000) + + logger.info(f"Gefunden: {len(trades)} Trades gesamt, {len(new_trades)} sind neu.") + + if new_trades: + new_trades.sort(key=lambda x: x.timestamp) + db.save_trades(new_trades) + logger.info(f"{len(new_trades)} neue Trades in QuestDB gespeichert.") + + # Referenzen freigeben + del trades + if new_trades: + del new_trades + clear_trades_cache() + + except Exception as e: + logger.error(f"Fehler bei Exchange {exchange.name}: {e}") + + def run_task(historical=False): - logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...") + """Haupttask: Holt Trades von allen registrierten Exchanges.""" + logger.info(f"Starte Trading Data Fetcher (Historical: {historical})...") db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) db_url = "http://questdb:9000" - # === EIX - Streaming Verarbeitung === - try: - eix = EIXExchange() - process_eix_streaming(db, db_url, eix, historical=historical) - except Exception as e: - logger.error(f"Error processing EIX: {e}") - - # === Andere Exchanges - normale Verarbeitung === - ls = LSExchange() - - # Neue Deutsche Börse Exchanges - xetra = XetraExchange() - frankfurt = FrankfurtExchange() - quotrix = QuotrixExchange() - gettex = GettexExchange() - stuttgart = StuttgartExchange() - - # Börsenag Exchanges (Düsseldorf, Hamburg, Hannover) - dusa = DUSAExchange() - dusb = DUSBExchange() - dusc = DUSCExchange() - dusd = DUSDExchange() - hama = HAMAExchange() - hamb = HAMBExchange() - hana = HANAExchange() - hanb = HANBExchange() - - # Alle anderen Exchanges (kleinere Datenmengen) - exchanges_to_process = [ - (ls, {'include_yesterday': historical}), - # Deutsche Börse Exchanges - (xetra, {'include_yesterday': historical}), - (frankfurt, {'include_yesterday': historical}), - (quotrix, {'include_yesterday': historical}), - (gettex, {'include_yesterday': historical}), - (stuttgart, {'include_yesterday': historical}), - # Börsenag Exchanges (Düsseldorf, Hamburg, Hannover) - (dusa, {'include_yesterday': historical}), - (dusb, {'include_yesterday': historical}), - (dusc, {'include_yesterday': historical}), - (dusd, {'include_yesterday': historical}), - (hama, {'include_yesterday': historical}), - (hamb, {'include_yesterday': historical}), - (hana, {'include_yesterday': historical}), - (hanb, {'include_yesterday': historical}), - ] - - for exchange, args in exchanges_to_process: + # Streaming-Exchanges verarbeiten (große Datenmengen) + for exchange_class in STREAMING_EXCHANGES: try: - last_ts = get_last_trade_timestamp(db_url, exchange.name) - - logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...") - - trades = exchange.fetch_latest_trades(**args) - - if not trades: - logger.info(f"No trades fetched from {exchange.name}.") - continue - - # Deduplizierung - logger.info(f"Filtering {len(trades)} trades for duplicates...") - new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000) - - logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.") - - if new_trades: - new_trades.sort(key=lambda x: x.timestamp) - db.save_trades(new_trades) - logger.info(f"Stored {len(new_trades)} new trades in QuestDB.") - - # Speicher freigeben nach jedem Exchange - del trades - if new_trades: - del new_trades - clear_trades_cache() - gc.collect() - + exchange = exchange_class() + logger.info(f"Verarbeite {exchange.name} im Streaming-Modus...") + process_eix_streaming(db, db_url, exchange, historical=historical) except Exception as e: - logger.error(f"Error processing exchange {exchange.name}: {e}") + logger.error(f"Fehler bei Streaming-Exchange {exchange_class.__name__}: {e}") - logger.info("All exchanges processed.") + # Standard-Exchanges verarbeiten + for exchange_class in STANDARD_EXCHANGES: + try: + exchange = exchange_class() + process_standard_exchange(db, db_url, exchange, historical) + except Exception as e: + logger.error(f"Fehler bei Exchange {exchange_class.__name__}: {e}") + + logger.info("Alle Exchanges verarbeitet.") -def main(): - logger.info("Trading Daemon started.") - - # 1. Startup Check: Ist die DB leer? - db_url = "http://questdb:9000" - is_empty = True +def is_database_empty(db_url: str) -> bool: + """Prüft ob die Datenbank leer ist oder die Tabelle nicht existiert.""" try: - # Prüfe ob bereits Trades in der Tabelle sind response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH) if response.status_code == 200: data = response.json() - if data['dataset'] and data['dataset'][0][0] > 0: - is_empty = False + if data.get('dataset') and data['dataset'][0][0] > 0: + return False except Exception: - # Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist - is_empty = True + pass + return True - if is_empty: - logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...") + +def calculate_seconds_until_target(target_hour: int, target_minute: int = 0) -> int: + """Berechnet Sekunden bis zur nächsten Zielzeit.""" + now = datetime.datetime.now() + target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0) + + # Wenn Zielzeit heute schon vorbei ist, nimm morgen + if target <= now: + target += datetime.timedelta(days=1) + + return int((target - now).total_seconds()) + + +def main(): + logger.info("Trading Daemon gestartet.") + + db_url = "http://questdb:9000" + + # Startup: Initialer Sync + if is_database_empty(db_url): + logger.info("Datenbank ist leer. Starte initialen historischen Fetch...") run_task(historical=True) else: - logger.info("Found existing data in database. Triggering catch-up sync...") - # Run a normal task to fetch any missing data since the last run + logger.info("Existierende Daten gefunden. Starte Catch-up Sync...") run_task(historical=False) - logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.") + logger.info("Catch-up Sync abgeschlossen.") + + # Scheduling Konfiguration + SCHEDULE_HOUR = 23 + SCHEDULE_MINUTE = 0 + last_run_date = None + + logger.info(f"Warte auf täglichen Run um {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d}...") while True: now = datetime.datetime.now() - # Täglich um 23:00 Uhr - if now.hour == 23 and now.minute == 0: - run_task(historical=False) - # Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern - time.sleep(61) + today = now.date() - # Check alle 30 Sekunden - time.sleep(30) + # Prüfe ob wir heute schon gelaufen sind + already_ran_today = (last_run_date == today) + + # Prüfe ob wir im Zeitfenster sind (23:00 - 23:59) + in_schedule_window = (now.hour == SCHEDULE_HOUR and now.minute >= SCHEDULE_MINUTE) + + if in_schedule_window and not already_ran_today: + logger.info(f"Geplanter Task startet ({now.strftime('%Y-%m-%d %H:%M:%S')})...") + run_task(historical=False) + last_run_date = today + logger.info("Geplanter Task abgeschlossen. Warte auf nächsten Tag...") + + # Dynamische Sleep-Zeit: Kurz vor Zielzeit öfter prüfen + seconds_until_target = calculate_seconds_until_target(SCHEDULE_HOUR, SCHEDULE_MINUTE) + + if seconds_until_target > 3600: + # Mehr als 1 Stunde: Schlafe 30 Minuten + sleep_time = 1800 + elif seconds_until_target > 300: + # 5 Minuten bis 1 Stunde: Schlafe 5 Minuten + sleep_time = 300 + else: + # Unter 5 Minuten: Schlafe 30 Sekunden + sleep_time = 30 + + time.sleep(sleep_time) + if __name__ == "__main__": main() diff --git a/read.py b/read.py deleted file mode 100644 index dcc84ff..0000000 --- a/read.py +++ /dev/null @@ -1,5 +0,0 @@ -import gzip -import json -with gzip.open("DGAT-posttrade-2026-01-29T14_07.json.gz", mode="rt") as f: - data = [json.loads(line) for line in f] - print (str(data)) diff --git a/cleanup_duplicates.py b/scripts/cleanup_duplicates.py similarity index 100% rename from cleanup_duplicates.py rename to scripts/cleanup_duplicates.py diff --git a/scripts/inspect_gzip.py b/scripts/inspect_gzip.py new file mode 100644 index 0000000..08b3bed --- /dev/null +++ b/scripts/inspect_gzip.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +""" +Utility-Script zum Inspizieren von gzip-komprimierten JSON-Dateien. +Verarbeitet Dateien streaming, ohne alles in den RAM zu laden. + +Verwendung: + python scripts/inspect_gzip.py [--limit N] [--output datei.json] +""" +import gzip +import json +import argparse +import sys +from pathlib import Path + + +def inspect_gzip_file(filepath: str, limit: int = None, output_file: str = None): + """ + Liest eine gzip-komprimierte NDJSON-Datei und gibt die Inhalte aus. + + Args: + filepath: Pfad zur .json.gz Datei + limit: Maximale Anzahl der auszugebenden Records (None = alle) + output_file: Optional: Ausgabe in Datei statt stdout + """ + path = Path(filepath) + if not path.exists(): + print(f"Fehler: Datei '{filepath}' nicht gefunden.", file=sys.stderr) + return 1 + + count = 0 + output = open(output_file, 'w', encoding='utf-8') if output_file else sys.stdout + + try: + with gzip.open(filepath, mode='rt', encoding='utf-8') as f: + for line in f: + if not line.strip(): + continue + + try: + record = json.loads(line) + # Pretty-print einzelner Record + json.dump(record, output, indent=2, ensure_ascii=False) + output.write('\n') + count += 1 + + if limit and count >= limit: + break + + except json.JSONDecodeError as e: + print(f"JSON-Fehler in Zeile {count + 1}: {e}", file=sys.stderr) + continue + + print(f"\n--- {count} Records verarbeitet ---", file=sys.stderr) + + finally: + if output_file and output != sys.stdout: + output.close() + + return 0 + + +def main(): + parser = argparse.ArgumentParser( + description='Inspiziert gzip-komprimierte JSON-Dateien (NDJSON-Format)' + ) + parser.add_argument('file', help='Pfad zur .json.gz Datei') + parser.add_argument('--limit', '-n', type=int, default=10, + help='Maximale Anzahl der Records (default: 10, 0 = alle)') + parser.add_argument('--output', '-o', type=str, + help='Ausgabe in Datei statt stdout') + + args = parser.parse_args() + + limit = args.limit if args.limit > 0 else None + return inspect_gzip_file(args.file, limit=limit, output_file=args.output) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/restore_and_fix.py b/scripts/restore_and_fix.py similarity index 100% rename from restore_and_fix.py rename to scripts/restore_and_fix.py diff --git a/test_sector_fetch.py b/scripts/test_sector_fetch.py similarity index 100% rename from test_sector_fetch.py rename to scripts/test_sector_fetch.py diff --git a/verify_fix.py b/scripts/verify_fix.py similarity index 100% rename from verify_fix.py rename to scripts/verify_fix.py diff --git a/src/exchanges/deutsche_boerse.py b/src/exchanges/deutsche_boerse.py index 49d455a..90f4703 100644 --- a/src/exchanges/deutsche_boerse.py +++ b/src/exchanges/deutsche_boerse.py @@ -2,16 +2,20 @@ import requests import gzip import json import io +import re import time +import logging +import threading from datetime import datetime, timedelta, timezone from typing import List, Optional from .base import BaseExchange, Trade -from bs4 import BeautifulSoup + +logger = logging.getLogger(__name__) # Rate-Limiting Konfiguration RATE_LIMIT_DELAY = 0.5 # Sekunden zwischen Requests RATE_LIMIT_RETRY_DELAY = 5 # Sekunden Wartezeit bei 429 -MAX_RETRIES = 3 # Maximale Wiederholungen bei 429 +MAX_RETRIES = 5 # Maximale Wiederholungen bei 429 # API URLs für Deutsche Börse API_URLS = { @@ -21,17 +25,47 @@ API_URLS = { } DOWNLOAD_BASE_URL = "https://mfs.deutsche-boerse.com/api/download" -# Browser User-Agent für Zugriff -HEADERS = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', - 'Accept': 'application/json, application/gzip, */*', - 'Referer': 'https://mfs.deutsche-boerse.com/', -} +# Liste von User-Agents für Rotation bei Rate-Limiting +USER_AGENTS = [ + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15', + 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0', + 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0', +] + + +class UserAgentRotator: + """Thread-safe User-Agent Rotation""" + + def __init__(self): + self._index = 0 + self._lock = threading.Lock() + + def get_headers(self, rotate: bool = False) -> dict: + """Gibt Headers mit aktuellem User-Agent zurück. Bei rotate=True wird zum nächsten gewechselt.""" + with self._lock: + if rotate: + self._index = (self._index + 1) % len(USER_AGENTS) + return { + 'User-Agent': USER_AGENTS[self._index], + 'Accept': 'application/json, application/gzip, */*', + 'Referer': 'https://mfs.deutsche-boerse.com/', + } + +# Globale Instanz für User-Agent Rotation +_ua_rotator = UserAgentRotator() class DeutscheBoerseBase(BaseExchange): """Basisklasse für Deutsche Börse Exchanges (Xetra, Frankfurt, Quotrix)""" + # Regex für Dateinamen-Parsing (kompiliert für Performance) + _FILENAME_PATTERN = re.compile(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})') + @property def base_url(self) -> str: """Override in subclasses""" @@ -46,60 +80,73 @@ class DeutscheBoerseBase(BaseExchange): """API URL für die Dateiliste""" return API_URLS.get(self.name, self.base_url) + def _handle_rate_limit(self, retry: int, context: str) -> None: + """Zentrale Rate-Limit Behandlung: rotiert User-Agent und wartet.""" + _ua_rotator.get_headers(rotate=True) + wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1) + logger.warning(f"[{self.name}] Rate limited ({context}), rotating User-Agent and waiting {wait_time}s... (retry {retry + 1}/{MAX_RETRIES})") + time.sleep(wait_time) + def _get_file_list(self) -> List[str]: """Holt die Dateiliste von der JSON API""" - try: - api_url = self.api_url - print(f"[{self.name}] Fetching file list from: {api_url}") - response = requests.get(api_url, headers=HEADERS, timeout=30) - response.raise_for_status() - - data = response.json() - files = data.get('CurrentFiles', []) - - print(f"[{self.name}] API returned {len(files)} files") - if files: - print(f"[{self.name}] Sample files: {files[:3]}") - return files - - except Exception as e: - print(f"[{self.name}] Error fetching file list from API: {e}") - import traceback - print(f"[{self.name}] Traceback: {traceback.format_exc()}") - return [] + api_url = self.api_url + + for retry in range(MAX_RETRIES): + try: + headers = _ua_rotator.get_headers(rotate=(retry > 0)) + logger.info(f"[{self.name}] Fetching file list from: {api_url}") + response = requests.get(api_url, headers=headers, timeout=30) + + if response.status_code == 429: + self._handle_rate_limit(retry, "file list") + continue + + response.raise_for_status() + + data = response.json() + files = data.get('CurrentFiles', []) + + logger.info(f"[{self.name}] API returned {len(files)} files") + if files: + logger.debug(f"[{self.name}] Sample files: {files[:3]}") + return files + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 429: + self._handle_rate_limit(retry, "file list HTTPError") + continue + logger.error(f"[{self.name}] HTTP error fetching file list: {e}") + break + except Exception as e: + logger.exception(f"[{self.name}] Error fetching file list from API: {e}") + break + + return [] def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]: """ Filtert Dateien für ein bestimmtes Datum. - Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz (mit Unterstrich!) + Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz Da Handel bis 22:00 MEZ geht (21:00/20:00 UTC), müssen wir auch Dateien nach Mitternacht UTC berücksichtigen. """ - import re filtered = [] - # Für den Vortag: Dateien vom target_date UND vom Folgetag (bis ~02:00 UTC) target_str = target_date.strftime('%Y-%m-%d') next_day = target_date + timedelta(days=1) next_day_str = next_day.strftime('%Y-%m-%d') for file in files: - # Extrahiere Datum aus Dateiname - # Format: DETR-posttrade-2026-01-26T21_30.json.gz if target_str in file: filtered.append(file) elif next_day_str in file: # Prüfe ob es eine frühe Datei vom nächsten Tag ist (< 03:00 UTC) - try: - # Finde Timestamp im Dateinamen mit Unterstrich für Minuten - match = re.search(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})', file) - if match: - hour = int(match.group(2)) - if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag - filtered.append(file) - except Exception: - pass + match = self._FILENAME_PATTERN.search(file) + if match: + hour = int(match.group(2)) + if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag + filtered.append(file) return filtered @@ -110,17 +157,14 @@ class DeutscheBoerseBase(BaseExchange): for retry in range(MAX_RETRIES): try: - response = requests.get(full_url, headers=HEADERS, timeout=60) + headers = _ua_rotator.get_headers(rotate=(retry > 0)) + response = requests.get(full_url, headers=headers, timeout=60) if response.status_code == 404: - # Datei nicht gefunden - normal für alte Dateien return [] if response.status_code == 429: - # Rate-Limit erreicht - warten und erneut versuchen - wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1) - print(f"[{self.name}] Rate limited, waiting {wait_time}s...") - time.sleep(wait_time) + self._handle_rate_limit(retry, "download") continue response.raise_for_status() @@ -130,13 +174,11 @@ class DeutscheBoerseBase(BaseExchange): content = f.read().decode('utf-8') if not content.strip(): - # Leere Datei return [] # NDJSON Format: Eine JSON-Zeile pro Trade lines = content.strip().split('\n') if not lines or (len(lines) == 1 and not lines[0].strip()): - # Leere Datei return [] for line in lines: @@ -147,116 +189,146 @@ class DeutscheBoerseBase(BaseExchange): trade = self._parse_trade_record(record) if trade: trades.append(trade) - except json.JSONDecodeError: - continue - except Exception: - continue + except json.JSONDecodeError as e: + logger.debug(f"[{self.name}] JSON decode error in {filename}: {e}") + except Exception as e: + logger.debug(f"[{self.name}] Error parsing record in {filename}: {e}") - # Erfolg - keine weitere Retry nötig + # Erfolg break except requests.exceptions.HTTPError as e: if e.response.status_code == 429: - wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1) - print(f"[{self.name}] Rate limited, waiting {wait_time}s...") - time.sleep(wait_time) + self._handle_rate_limit(retry, "download HTTPError") continue elif e.response.status_code != 404: - print(f"[{self.name}] HTTP error downloading {filename}: {e}") + logger.error(f"[{self.name}] HTTP error downloading {filename}: {e}") break except Exception as e: - print(f"[{self.name}] Error downloading/parsing {filename}: {e}") + logger.error(f"[{self.name}] Error downloading/parsing {filename}: {e}") break return trades + def _parse_timestamp(self, ts_str: str) -> Optional[datetime]: + """ + Parst einen Timestamp-String in ein datetime-Objekt. + Unterstützt Nanosekunden durch Kürzung auf Mikrosekunden. + """ + if not ts_str: + return None + + # Ersetze 'Z' durch '+00:00' für ISO-Kompatibilität + ts_str = ts_str.replace('Z', '+00:00') + + # Kürze Nanosekunden auf Mikrosekunden (Python max 6 Dezimalstellen) + if '.' in ts_str: + # Split bei '+' oder '-' für Timezone + if '+' in ts_str: + time_part, tz_part = ts_str.rsplit('+', 1) + tz_part = '+' + tz_part + elif ts_str.count('-') > 2: # Negative Timezone + time_part, tz_part = ts_str.rsplit('-', 1) + tz_part = '-' + tz_part + else: + time_part, tz_part = ts_str, '' + + if '.' in time_part: + base, frac = time_part.split('.') + frac = frac[:6] # Kürze auf 6 Stellen + ts_str = f"{base}.{frac}{tz_part}" + + return datetime.fromisoformat(ts_str) + + def _extract_price(self, record: dict) -> Optional[float]: + """Extrahiert den Preis aus verschiedenen JSON-Formaten.""" + # Neues Format + if 'lastTrade' in record: + return float(record['lastTrade']) + + # Altes Format mit verschachteltem Pric-Objekt + pric = record.get('Pric') + if pric is None: + return None + + if isinstance(pric, (int, float)): + return float(pric) + + if isinstance(pric, dict): + # Versuche verschiedene Pfade + if 'Pric' in pric: + inner = pric['Pric'] + if isinstance(inner, dict): + amt = inner.get('MntryVal', {}).get('Amt') or inner.get('Amt') + if amt is not None: + return float(amt) + if 'MntryVal' in pric: + amt = pric['MntryVal'].get('Amt') + if amt is not None: + return float(amt) + + return None + + def _extract_quantity(self, record: dict) -> Optional[float]: + """Extrahiert die Menge aus verschiedenen JSON-Formaten.""" + # Neues Format + if 'lastQty' in record: + return float(record['lastQty']) + + # Altes Format + qty = record.get('Qty') + if qty is None: + return None + + if isinstance(qty, (int, float)): + return float(qty) + + if isinstance(qty, dict): + val = qty.get('Unit') or qty.get('Qty') + if val is not None: + return float(val) + + return None + def _parse_trade_record(self, record: dict) -> Optional[Trade]: """ Parst einen einzelnen Trade-Record aus dem JSON. - Aktuelles JSON-Format (NDJSON): - { - "messageId": "posttrade", - "sourceName": "GAT", - "isin": "US00123Q1040", - "lastTradeTime": "2026-01-29T14:07:00.419000000Z", - "lastTrade": 10.145, - "lastQty": 500.0, - "currency": "EUR", - ... - } + Unterstützte Formate: + - Neues Format: isin, lastTrade, lastQty, lastTradeTime + - Altes Format: FinInstrmId.Id, Pric, Qty, TrdDt/TrdTm """ try: - # ISIN extrahieren - neues Format verwendet 'isin' lowercase - isin = record.get('isin') or record.get('ISIN') or record.get('instrumentId') or record.get('FinInstrmId', {}).get('Id', '') + # ISIN extrahieren + isin = ( + record.get('isin') or + record.get('ISIN') or + record.get('instrumentId') or + record.get('FinInstrmId', {}).get('Id', '') + ) if not isin: return None - # Preis extrahieren - neues Format: 'lastTrade' - price = None - if 'lastTrade' in record: - price = float(record['lastTrade']) - elif 'Pric' in record: - pric = record['Pric'] - if isinstance(pric, dict): - if 'Pric' in pric: - inner = pric['Pric'] - if 'MntryVal' in inner: - price = float(inner['MntryVal'].get('Amt', 0)) - elif 'Amt' in inner: - price = float(inner['Amt']) - elif 'MntryVal' in pric: - price = float(pric['MntryVal'].get('Amt', 0)) - elif isinstance(pric, (int, float)): - price = float(pric) - + # Preis extrahieren + price = self._extract_price(record) if price is None or price <= 0: return None - # Menge extrahieren - neues Format: 'lastQty' - quantity = None - if 'lastQty' in record: - quantity = float(record['lastQty']) - elif 'Qty' in record: - qty = record['Qty'] - if isinstance(qty, dict): - quantity = float(qty.get('Unit', qty.get('Qty', 0))) - elif isinstance(qty, (int, float)): - quantity = float(qty) - + # Menge extrahieren + quantity = self._extract_quantity(record) if quantity is None or quantity <= 0: return None - # Timestamp extrahieren - neues Format: 'lastTradeTime' + # Timestamp extrahieren timestamp = None if 'lastTradeTime' in record: - ts_str = record['lastTradeTime'] - # Format: "2026-01-29T14:07:00.419000000Z" - # Python kann max 6 Dezimalstellen, also kürzen - if '.' in ts_str: - parts = ts_str.replace('Z', '').split('.') - if len(parts) == 2 and len(parts[1]) > 6: - ts_str = parts[0] + '.' + parts[1][:6] + '+00:00' - else: - ts_str = ts_str.replace('Z', '+00:00') - else: - ts_str = ts_str.replace('Z', '+00:00') - timestamp = datetime.fromisoformat(ts_str) + timestamp = self._parse_timestamp(record['lastTradeTime']) else: # Fallback für altes Format trd_dt = record.get('TrdDt', '') trd_tm = record.get('TrdTm', '00:00:00') - - if not trd_dt: - return None - - ts_str = f"{trd_dt}T{trd_tm}" - if '.' in ts_str: - parts = ts_str.split('.') - if len(parts[1]) > 6: - ts_str = parts[0] + '.' + parts[1][:6] - - timestamp = datetime.fromisoformat(ts_str) + if trd_dt: + timestamp = self._parse_timestamp(f"{trd_dt}T{trd_tm}") if timestamp is None: return None @@ -273,22 +345,41 @@ class DeutscheBoerseBase(BaseExchange): timestamp=timestamp ) - except Exception as e: - # Debug: Zeige ersten fehlgeschlagenen Record + except (ValueError, TypeError, KeyError) as e: + logger.debug(f"[{self.name}] Failed to parse trade record: {e}") return None def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date: """ - Findet den letzten Handelstag (überspringt Wochenenden). + Findet den letzten Handelstag (überspringt Wochenenden und bekannte Feiertage). Montag=0, Sonntag=6 """ + # Deutsche Börsen-Feiertage (fixe Daten, jedes Jahr gleich) + # Bewegliche Feiertage (Ostern etc.) müssten jährlich berechnet werden + fixed_holidays = { + (1, 1), # Neujahr + (5, 1), # Tag der Arbeit + (12, 24), # Heiligabend + (12, 25), # 1. Weihnachtstag + (12, 26), # 2. Weihnachtstag + (12, 31), # Silvester + } + date = from_date - # Wenn Samstag (5), gehe zurück zu Freitag - if date.weekday() == 5: - date = date - timedelta(days=1) - # Wenn Sonntag (6), gehe zurück zu Freitag - elif date.weekday() == 6: - date = date - timedelta(days=2) + max_iterations = 10 # Sicherheit gegen Endlosschleife + + for _ in range(max_iterations): + # Wochenende überspringen + if date.weekday() == 5: # Samstag + date = date - timedelta(days=1) + elif date.weekday() == 6: # Sonntag + date = date - timedelta(days=2) + # Feiertag überspringen + elif (date.month, date.day) in fixed_holidays: + date = date - timedelta(days=1) + else: + break + return date def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]: @@ -304,40 +395,36 @@ class DeutscheBoerseBase(BaseExchange): # Standard: Vortag target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date() - # Überspringe Wochenenden + # Überspringe Wochenenden und Feiertage original_date = target_date target_date = self._get_last_trading_day(target_date) if target_date != original_date: - print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}") + logger.info(f"[{self.name}] Adjusted date: {original_date} -> {target_date} (weekend/holiday)") - print(f"[{self.name}] Fetching trades for date: {target_date}") + logger.info(f"[{self.name}] Fetching trades for date: {target_date}") # Hole Dateiliste von der API files = self._get_file_list() if not files: - print(f"[{self.name}] No files available from API") + logger.warning(f"[{self.name}] No files available from API") return [] # Dateien für Zieldatum filtern target_files = self._filter_files_for_date(files, target_date) - print(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)") + logger.info(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)") if not target_files: - print(f"[{self.name}] No files for target date found") + logger.warning(f"[{self.name}] No files for target date found") return [] - # Alle passenden Dateien herunterladen und parsen (mit Rate-Limiting) + # Alle passenden Dateien herunterladen und parsen successful = 0 failed = 0 total_files = len(target_files) - if total_files == 0: - print(f"[{self.name}] No files to download for date {target_date}") - return [] - - print(f"[{self.name}] Starting download of {total_files} files...") + logger.info(f"[{self.name}] Starting download of {total_files} files...") for i, file in enumerate(target_files): trades = self._download_and_parse_file(file) @@ -353,9 +440,9 @@ class DeutscheBoerseBase(BaseExchange): # Fortschritt alle 100 Dateien if (i + 1) % 100 == 0: - print(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far") + logger.info(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far") - print(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades") + logger.info(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades") return all_trades diff --git a/src/exchanges/eix.py b/src/exchanges/eix.py index 88a8143..89d03f7 100644 --- a/src/exchanges/eix.py +++ b/src/exchanges/eix.py @@ -1,29 +1,38 @@ import requests -import json -from bs4 import BeautifulSoup +import logging from datetime import datetime, timezone -from typing import List, Generator, Tuple, Optional +from typing import List, Generator, Tuple from .base import BaseExchange, Trade import csv import io +logger = logging.getLogger(__name__) + + class EIXExchange(BaseExchange): + """European Investor Exchange - CSV-basierte Trade-Daten.""" + + API_BASE_URL = "https://european-investor-exchange.com/api" + @property def name(self) -> str: return "EIX" def get_files_to_process(self, limit: int = 1, since_date: datetime = None) -> List[dict]: """Holt die Liste der zu verarbeitenden Dateien ohne sie herunterzuladen.""" - url = "https://european-investor-exchange.com/api/official-trades" + url = f"{self.API_BASE_URL}/official-trades" try: response = requests.get(url, timeout=15) response.raise_for_status() files_list = response.json() - except Exception as e: - print(f"Error fetching EIX file list: {e}") + except requests.exceptions.RequestException as e: + logger.error(f"[{self.name}] Fehler beim Abrufen der Dateiliste: {e}") + return [] + except ValueError as e: + logger.error(f"[{self.name}] Ungültiges JSON in Dateiliste: {e}") return [] - # Filter files based on date in filename if since_date provided + # Filtere Dateien nach Datum wenn since_date angegeben filtered_files = [] for item in files_list: file_key = item.get('fileName') @@ -39,7 +48,9 @@ class EIXExchange(BaseExchange): if file_date.date() >= since_date.date(): filtered_files.append(item) - except Exception: + except (ValueError, IndexError) as e: + # Dateiname hat unerwartetes Format - zur Sicherheit einschließen + logger.debug(f"[{self.name}] Konnte Datum nicht aus {file_key} extrahieren: {e}") filtered_files.append(item) else: filtered_files.append(item) @@ -57,13 +68,15 @@ class EIXExchange(BaseExchange): if not file_key: return [] - csv_url = f"https://european-investor-exchange.com/api/trade-file-contents?key={file_key}" + csv_url = f"{self.API_BASE_URL}/trade-file-contents?key={file_key}" try: - csv_response = requests.get(csv_url, timeout=60) - if csv_response.status_code == 200: - return self._parse_csv(csv_response.text) + response = requests.get(csv_url, timeout=60) + response.raise_for_status() + return self._parse_csv(response.text) + except requests.exceptions.RequestException as e: + logger.error(f"[{self.name}] Fehler beim Download von {file_key}: {e}") except Exception as e: - print(f"Error downloading EIX CSV {file_key}: {e}") + logger.error(f"[{self.name}] Unerwarteter Fehler bei {file_key}: {e}") return [] @@ -80,7 +93,7 @@ class EIXExchange(BaseExchange): if trades: yield (file_key, trades) - def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None) -> List[Trade]: + def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None, **kwargs) -> List[Trade]: """ Legacy-Methode für Kompatibilität. WARNUNG: Lädt alle Trades in den Speicher! Für große Datenmengen fetch_trades_streaming() verwenden. @@ -93,33 +106,53 @@ class EIXExchange(BaseExchange): return all_trades # Für große Requests: Warnung ausgeben und leere Liste zurückgeben - # Der Daemon soll stattdessen fetch_trades_streaming() verwenden - print(f"[EIX] WARNING: fetch_latest_trades() called with large dataset. Use streaming instead.") + logger.warning(f"[{self.name}] fetch_latest_trades() mit großem Dataset aufgerufen. Verwende Streaming.") return [] def _parse_csv(self, csv_text: str) -> List[Trade]: + """Parst CSV-Text zu Trade-Objekten.""" trades = [] + parse_errors = 0 + f = io.StringIO(csv_text) reader = csv.DictReader(f, delimiter=',') - for row in reader: + + for row_num, row in enumerate(reader, start=2): # Start bei 2 wegen Header try: price = float(row['Unit Price']) quantity = float(row['Quantity']) isin = row['Instrument Identifier'] - symbol = isin time_str = row['Trading day & Trading time UTC'] + # Preis und Menge validieren + if price <= 0 or quantity <= 0: + logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Preis/Menge: {price}/{quantity}") + parse_errors += 1 + continue + ts_str = time_str.replace('Z', '+00:00') timestamp = datetime.fromisoformat(ts_str) trades.append(Trade( exchange=self.name, - symbol=symbol, + symbol=isin, isin=isin, price=price, quantity=quantity, timestamp=timestamp )) - except Exception: - continue + + except KeyError as e: + logger.debug(f"[{self.name}] Zeile {row_num}: Fehlendes Feld {e}") + parse_errors += 1 + except ValueError as e: + logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Wert: {e}") + parse_errors += 1 + except Exception as e: + logger.warning(f"[{self.name}] Zeile {row_num}: Unerwarteter Fehler: {e}") + parse_errors += 1 + + if parse_errors > 0: + logger.debug(f"[{self.name}] {parse_errors} Zeilen konnten nicht geparst werden") + return trades