diff --git a/daemon.py b/daemon.py index edf77aa..1aeaadb 100644 --- a/daemon.py +++ b/daemon.py @@ -5,6 +5,9 @@ import os import requests from src.exchanges.eix import EIXExchange from src.exchanges.ls import LSExchange +from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange +from src.exchanges.gettex import GettexExchange +from src.exchanges.stuttgart import StuttgartExchange from src.database.questdb_client import DatabaseClient logging.basicConfig( @@ -45,6 +48,13 @@ def run_task(historical=False): eix = EIXExchange() ls = LSExchange() + # Neue Deutsche Börse Exchanges + xetra = XetraExchange() + frankfurt = FrankfurtExchange() + quotrix = QuotrixExchange() + gettex = GettexExchange() + stuttgart = StuttgartExchange() + # Pass last_ts to fetcher to allow smart filtering # daemon.py runs daily, so we want to fetch everything since DB state # BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument @@ -53,7 +63,13 @@ def run_task(historical=False): # We will modify the loop below to handle args dynamically exchanges_to_process = [ (eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical - (ls, {'include_yesterday': historical}) + (ls, {'include_yesterday': historical}), + # Neue Exchanges + (xetra, {'include_yesterday': historical}), + (frankfurt, {'include_yesterday': historical}), + (quotrix, {'include_yesterday': historical}), + (gettex, {'include_yesterday': historical}), + (stuttgart, {'include_yesterday': historical}), ] db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) diff --git a/dashboard/public/index.html b/dashboard/public/index.html index 3ca7308..2fb66e7 100644 --- a/dashboard/public/index.html +++ b/dashboard/public/index.html @@ -121,7 +121,7 @@
- +
@@ -261,7 +261,8 @@ const dates = [...new Set(data.map(r => r[dateIdx]))].sort(); const datasets = []; - const colors = ['#38bdf8', '#f43f5e', '#10b981', '#fbbf24', '#8b5cf6']; + // Erweiterte Farben für mehr Exchanges (EIX, LS, XETRA, FRA, GETTEX, STU, QUOTRIX) + const colors = ['#38bdf8', '#f43f5e', '#10b981', '#fbbf24', '#8b5cf6', '#f97316', '#ec4899', '#14b8a6', '#84cc16', '#a855f7']; exchanges.forEach((exchange, idx) => { datasets.push({ @@ -579,7 +580,8 @@ const groups = [...new Set(data.map(r => r[groupIdx]))]; const dates = [...new Set(data.map(r => r[xIdx]))].sort(); - const colors = ['#38bdf8', '#f43f5e', '#10b981', '#fbbf24', '#8b5cf6', '#f97316', '#ec4899']; + // Erweiterte Farben für mehr Exchanges (EIX, LS, XETRA, FRA, GETTEX, STU, QUOTRIX) + const colors = ['#38bdf8', '#f43f5e', '#10b981', '#fbbf24', '#8b5cf6', '#f97316', '#ec4899', '#14b8a6', '#84cc16', '#a855f7']; const datasets = groups.map((group, idx) => ({ label: group || 'Unknown', data: dates.map(d => { diff --git a/requirements.txt b/requirements.txt index 5fbc4bd..c1bf673 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ requests beautifulsoup4 +lxml fastapi uvicorn pandas diff --git a/src/analytics/worker.py b/src/analytics/worker.py index 40790ca..1be6d40 100644 --- a/src/analytics/worker.py +++ b/src/analytics/worker.py @@ -650,30 +650,60 @@ class AnalyticsWorker: logger.error("Failed to connect to QuestDB. Exiting.") return - # Initiale Berechnung fehlender Tage + # Initiale Berechnung fehlender Tage (inkl. gestern und heute) logger.info("Checking for missing dates...") self.process_missing_dates() - # Hauptschleife: Warte auf Mitternacht - logger.info("Waiting for midnight to process yesterday's data...") + # Stelle sicher, dass gestern und heute verarbeitet werden + today = datetime.date.today() + yesterday = today - datetime.timedelta(days=1) + + logger.info(f"Ensuring yesterday ({yesterday}) and today ({today}) are processed...") + existing_dates = self.get_existing_dates('analytics_custom') + + if yesterday not in existing_dates: + logger.info(f"Processing yesterday's data: {yesterday}") + self.process_date(yesterday) + + # Heute wird nur verarbeitet, wenn es bereits Trades gibt (normalerweise am Ende des Tages) + # Aber wir prüfen trotzdem, ob es Daten gibt + if today not in existing_dates: + # Prüfe ob es heute schon Trades gibt + query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'" + data = self.query_questdb(query) + if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0: + logger.info(f"Found trades for today ({today}), processing...") + self.process_date(today) + + # Hauptschleife: Prüfe regelmäßig auf fehlende Tage + logger.info("Starting main loop - checking for missing dates every hour...") + last_check_hour = -1 while True: now = datetime.datetime.now() + current_hour = now.hour - # Prüfe ob es Mitternacht ist (00:00) + # Prüfe jede Stunde auf fehlende Tage + if current_hour != last_check_hour: + logger.info(f"Hourly check for missing dates (hour: {current_hour})...") + self.process_missing_dates() + last_check_hour = current_hour + + # Stelle sicher, dass gestern verarbeitet wurde + yesterday = (now - datetime.timedelta(days=1)).date() + existing_dates = self.get_existing_dates('analytics_custom') + if yesterday not in existing_dates: + logger.info(f"Processing yesterday's data: {yesterday}") + self.process_date(yesterday) + + # Prüfe ob es Mitternacht ist (00:00) - verarbeite dann gestern if now.hour == 0 and now.minute == 0: yesterday = (now - datetime.timedelta(days=1)).date() - logger.info(f"Processing yesterday's data: {yesterday}") + logger.info(f"Midnight reached - processing yesterday's data: {yesterday}") self.process_date(yesterday) # Warte 61s, um Mehrfachausführung zu verhindern time.sleep(61) - # Prüfe auch auf fehlende Tage (alle 6 Stunden) - if now.hour % 6 == 0 and now.minute == 0: - logger.info("Checking for missing dates...") - self.process_missing_dates() - time.sleep(61) - - time.sleep(30) + time.sleep(60) # Prüfe jede Minute def main(): worker = AnalyticsWorker() diff --git a/src/exchanges/deutsche_boerse.py b/src/exchanges/deutsche_boerse.py new file mode 100644 index 0000000..51e1983 --- /dev/null +++ b/src/exchanges/deutsche_boerse.py @@ -0,0 +1,269 @@ +import requests +import gzip +import json +import io +from datetime import datetime, timedelta, timezone +from typing import List, Optional +from .base import BaseExchange, Trade +from bs4 import BeautifulSoup + +# Browser User-Agent für Zugriff +HEADERS = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' +} + + +class DeutscheBoerseBase(BaseExchange): + """Basisklasse für Deutsche Börse Exchanges (Xetra, Frankfurt, Quotrix)""" + + @property + def base_url(self) -> str: + """Override in subclasses""" + raise NotImplementedError + + @property + def name(self) -> str: + raise NotImplementedError + + def _get_file_list(self) -> List[str]: + """Parst die Verzeichnisseite und extrahiert alle Dateinamen""" + try: + response = requests.get(self.base_url, headers=HEADERS, timeout=30) + response.raise_for_status() + + soup = BeautifulSoup(response.text, 'html.parser') + files = [] + + # Deutsche Börse listet Dateien als Links auf + for link in soup.find_all('a'): + href = link.get('href', '') + # Nur posttrade JSON.gz Dateien + if 'posttrade' in href and href.endswith('.json.gz'): + files.append(href) + + return files + except Exception as e: + print(f"Error fetching file list from {self.base_url}: {e}") + return [] + + def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]: + """ + Filtert Dateien für ein bestimmtes Datum. + Dateiformat: *posttrade-YYYY-MM-DDTHH:MM:SS*.json.gz + + Da Handel bis 22:00 MEZ geht (21:00/20:00 UTC), müssen wir auch + Dateien nach Mitternacht UTC berücksichtigen. + """ + filtered = [] + + # Für den Vortag: Dateien vom target_date UND vom Folgetag (bis ~02:00 UTC) + target_str = target_date.strftime('%Y-%m-%d') + next_day = target_date + timedelta(days=1) + next_day_str = next_day.strftime('%Y-%m-%d') + + for file in files: + # Extrahiere Datum aus Dateiname + # Format: posttrade-2026-01-26T21:30:00.json.gz + if target_str in file: + filtered.append(file) + elif next_day_str in file: + # Prüfe ob es eine frühe Datei vom nächsten Tag ist (< 03:00 UTC) + try: + # Finde Timestamp im Dateinamen + parts = file.split('posttrade-') + if len(parts) > 1: + ts_part = parts[1].split('.json.gz')[0] + file_dt = datetime.fromisoformat(ts_part) + if file_dt.hour < 3: # Frühe Morgenstunden gehören noch zum Vortag + filtered.append(file) + except Exception: + pass + + return filtered + + def _download_and_parse_file(self, file_url: str) -> List[Trade]: + """Lädt eine JSON.gz Datei herunter und parst die Trades""" + trades = [] + + try: + # Vollständige URL erstellen + if not file_url.startswith('http'): + full_url = f"{self.base_url.rstrip('/')}/{file_url.lstrip('/')}" + else: + full_url = file_url + + response = requests.get(full_url, headers=HEADERS, timeout=60) + response.raise_for_status() + + # Gzip entpacken + with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as f: + json_data = json.load(f) + + # Trades parsen + # Deutsche Börse JSON Format (RTS1/RTS2): + # Typische Felder: TrdDt, TrdTm, ISIN, Pric, Qty, TrdCcy, etc. + for record in json_data: + try: + trade = self._parse_trade_record(record) + if trade: + trades.append(trade) + except Exception as e: + print(f"Error parsing trade record: {e}") + continue + + except Exception as e: + print(f"Error downloading/parsing {file_url}: {e}") + + return trades + + def _parse_trade_record(self, record: dict) -> Optional[Trade]: + """ + Parst einen einzelnen Trade-Record aus dem JSON. + Deutsche Börse verwendet RTS1/RTS2 Format. + + Wichtige Felder: + - TrdDt: Trading Date (YYYY-MM-DD) + - TrdTm: Trading Time (HH:MM:SS.ffffff) + - ISIN: Instrument Identifier + - FinInstrmId.Id: Alternative ISIN Feld + - Pric.Pric.MntryVal.Amt: Preis + - Qty.Unit: Menge + """ + try: + # ISIN extrahieren + isin = record.get('ISIN') or record.get('FinInstrmId', {}).get('Id', '') + if not isin: + return None + + # Preis extrahieren (verschiedene mögliche Pfade) + price = None + if 'Pric' in record: + pric = record['Pric'] + if isinstance(pric, dict): + if 'Pric' in pric: + inner = pric['Pric'] + if 'MntryVal' in inner: + price = float(inner['MntryVal'].get('Amt', 0)) + elif 'Amt' in inner: + price = float(inner['Amt']) + elif 'MntryVal' in pric: + price = float(pric['MntryVal'].get('Amt', 0)) + elif isinstance(pric, (int, float)): + price = float(pric) + + if price is None or price <= 0: + return None + + # Menge extrahieren + quantity = None + if 'Qty' in record: + qty = record['Qty'] + if isinstance(qty, dict): + quantity = float(qty.get('Unit', qty.get('Qty', 0))) + elif isinstance(qty, (int, float)): + quantity = float(qty) + + if quantity is None or quantity <= 0: + return None + + # Timestamp extrahieren + trd_dt = record.get('TrdDt', '') + trd_tm = record.get('TrdTm', '00:00:00') + + if not trd_dt: + return None + + # Kombiniere Datum und Zeit + ts_str = f"{trd_dt}T{trd_tm}" + # Entferne Mikrosekunden wenn zu lang + if '.' in ts_str: + parts = ts_str.split('.') + if len(parts[1]) > 6: + ts_str = parts[0] + '.' + parts[1][:6] + + # Parse als UTC (Deutsche Börse liefert UTC) + timestamp = datetime.fromisoformat(ts_str) + if timestamp.tzinfo is None: + timestamp = timestamp.replace(tzinfo=timezone.utc) + + return Trade( + exchange=self.name, + symbol=isin, # Symbol = ISIN + isin=isin, + price=price, + quantity=quantity, + timestamp=timestamp + ) + + except Exception as e: + print(f"Error parsing record: {e}") + return None + + def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]: + """ + Holt alle Trades vom Vortag (oder seit since_date). + """ + all_trades = [] + + # Bestimme Zieldatum + if since_date: + target_date = since_date.date() if hasattr(since_date, 'date') else since_date + else: + # Standard: Vortag + target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date() + + print(f"[{self.name}] Fetching trades for date: {target_date}") + + # Dateiliste holen + files = self._get_file_list() + print(f"[{self.name}] Found {len(files)} total files") + + # Dateien für Zieldatum filtern + target_files = self._filter_files_for_date(files, target_date) + print(f"[{self.name}] {len(target_files)} files match target date") + + # Alle passenden Dateien herunterladen und parsen + for file in target_files: + trades = self._download_and_parse_file(file) + all_trades.extend(trades) + print(f"[{self.name}] Parsed {len(trades)} trades from {file}") + + print(f"[{self.name}] Total trades fetched: {len(all_trades)}") + return all_trades + + +class XetraExchange(DeutscheBoerseBase): + """Xetra (Deutsche Börse) - DETR""" + + @property + def base_url(self) -> str: + return "https://mfs.deutsche-boerse.com/DETR-posttrade" + + @property + def name(self) -> str: + return "XETRA" + + +class FrankfurtExchange(DeutscheBoerseBase): + """Börse Frankfurt - DFRA""" + + @property + def base_url(self) -> str: + return "https://mfs.deutsche-boerse.com/DFRA-posttrade" + + @property + def name(self) -> str: + return "FRA" + + +class QuotrixExchange(DeutscheBoerseBase): + """Quotrix (Düsseldorf/Tradegate) - DGAT""" + + @property + def base_url(self) -> str: + return "https://mfs.deutsche-boerse.com/DGAT-posttrade" + + @property + def name(self) -> str: + return "QUOTRIX" diff --git a/src/exchanges/gettex.py b/src/exchanges/gettex.py new file mode 100644 index 0000000..afc5cf3 --- /dev/null +++ b/src/exchanges/gettex.py @@ -0,0 +1,229 @@ +import requests +import gzip +import csv +import io +from datetime import datetime, timedelta, timezone +from typing import List, Optional +from .base import BaseExchange, Trade +from bs4 import BeautifulSoup + +# Browser User-Agent für Zugriff (gettex prüft User-Agent!) +HEADERS = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8', + 'Referer': 'https://www.gettex.de/' +} + +# gettex Download-Basis-URLs +GETTEX_PAGE_URL = "https://www.gettex.de/handel/delayed-data/posttrade-data/" +GETTEX_DOWNLOAD_BASE = "https://erdk.bayerische-boerse.de:8000/delayed-data/MUNC-MUND/posttrade/" + + +class GettexExchange(BaseExchange): + """ + gettex Exchange (Bayerische Börse) + Kombiniert MUNC und MUND Daten. + + Dateiformat: posttrade.YYYYMMDD.HH.mm.{munc|mund}.csv.gz + """ + + @property + def name(self) -> str: + return "GETTEX" + + def _get_file_list_from_page(self) -> List[str]: + """ + Parst die gettex Seite und extrahiert Download-Links. + """ + files = [] + + try: + response = requests.get(GETTEX_PAGE_URL, headers=HEADERS, timeout=30) + response.raise_for_status() + + soup = BeautifulSoup(response.text, 'html.parser') + + # Suche nach Links zu CSV.gz Dateien + for link in soup.find_all('a'): + href = link.get('href', '') + if href and 'posttrade' in href.lower() and href.endswith('.csv.gz'): + files.append(href) + + # Falls keine Links gefunden, versuche alternative Struktur + if not files: + # Manchmal sind Links in data-Attributen versteckt + for elem in soup.find_all(attrs={'data-href': True}): + href = elem.get('data-href', '') + if 'posttrade' in href.lower() and href.endswith('.csv.gz'): + files.append(href) + + except Exception as e: + print(f"[GETTEX] Error fetching page: {e}") + + return files + + def _generate_expected_files(self, target_date: datetime.date) -> List[str]: + """ + Generiert erwartete Dateinamen basierend auf dem Datum. + gettex veröffentlicht Dateien alle 15 Minuten während des Handels. + + Dateiformat: posttrade.YYYYMMDD.HH.mm.{munc|mund}.csv.gz + """ + files = [] + date_str = target_date.strftime('%Y%m%d') + + # Handelszeiten: ca. 08:00 - 22:00 MEZ + # In UTC: 07:00 - 21:00 (Winter) / 06:00 - 20:00 (Sommer) + # Generiere für alle 15-Minuten-Intervalle + + for hour in range(6, 23): # 06:00 - 22:45 UTC (abdeckend) + for minute in [0, 15, 30, 45]: + time_str = f"{hour:02d}.{minute:02d}" + files.append(f"posttrade.{date_str}.{time_str}.munc.csv.gz") + files.append(f"posttrade.{date_str}.{time_str}.mund.csv.gz") + + # Auch frühe Dateien vom Folgetag (nach Mitternacht UTC) + next_date = target_date + timedelta(days=1) + next_date_str = next_date.strftime('%Y%m%d') + for hour in range(0, 3): # 00:00 - 02:45 UTC + for minute in [0, 15, 30, 45]: + time_str = f"{hour:02d}.{minute:02d}" + files.append(f"posttrade.{next_date_str}.{time_str}.munc.csv.gz") + files.append(f"posttrade.{next_date_str}.{time_str}.mund.csv.gz") + + return files + + def _download_and_parse_file(self, filename: str) -> List[Trade]: + """Lädt eine CSV.gz Datei und parst die Trades""" + trades = [] + + try: + # Vollständige URL + url = f"{GETTEX_DOWNLOAD_BASE}{filename}" + + response = requests.get(url, headers=HEADERS, timeout=60) + + if response.status_code == 404: + # Datei existiert nicht - normal für Zeiten ohne Handel + return [] + + response.raise_for_status() + + # Gzip entpacken + with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as f: + csv_text = f.read().decode('utf-8') + + # CSV parsen + reader = csv.DictReader(io.StringIO(csv_text), delimiter=';') + + for row in reader: + try: + trade = self._parse_csv_row(row) + if trade: + trades.append(trade) + except Exception as e: + print(f"[GETTEX] Error parsing row: {e}") + continue + + except requests.exceptions.HTTPError as e: + if e.response.status_code != 404: + print(f"[GETTEX] HTTP error downloading {filename}: {e}") + except Exception as e: + print(f"[GETTEX] Error downloading {filename}: {e}") + + return trades + + def _parse_csv_row(self, row: dict) -> Optional[Trade]: + """ + Parst eine CSV-Zeile zu einem Trade. + + Erwartete Spalten (RTS Format): + - TrdDtTm: Trading Date/Time + - ISIN: Instrument Identifier + - Pric: Preis + - Qty: Menge + - Ccy: Währung + """ + try: + # ISIN + isin = row.get('ISIN', row.get('FinInstrmId', '')) + if not isin: + return None + + # Preis + price_str = row.get('Pric', row.get('Price', '0')) + price_str = price_str.replace(',', '.') + price = float(price_str) + + if price <= 0: + return None + + # Menge + qty_str = row.get('Qty', row.get('Quantity', '0')) + qty_str = qty_str.replace(',', '.') + quantity = float(qty_str) + + if quantity <= 0: + return None + + # Timestamp + ts_str = row.get('TrdDtTm', row.get('TradingDateTime', '')) + if not ts_str: + # Fallback: Separate Felder + trd_dt = row.get('TrdDt', '') + trd_tm = row.get('TrdTm', '00:00:00') + ts_str = f"{trd_dt}T{trd_tm}" + + # Parse Timestamp (UTC) + ts_str = ts_str.replace('Z', '+00:00') + if 'T' not in ts_str: + ts_str = ts_str.replace(' ', 'T') + + timestamp = datetime.fromisoformat(ts_str) + if timestamp.tzinfo is None: + timestamp = timestamp.replace(tzinfo=timezone.utc) + + return Trade( + exchange=self.name, + symbol=isin, + isin=isin, + price=price, + quantity=quantity, + timestamp=timestamp + ) + + except Exception as e: + print(f"[GETTEX] Error parsing CSV row: {e}") + return None + + def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]: + """ + Holt alle Trades vom Vortag. + """ + all_trades = [] + + # Zieldatum bestimmen + if since_date: + target_date = since_date.date() if hasattr(since_date, 'date') else since_date + else: + target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date() + + print(f"[{self.name}] Fetching trades for date: {target_date}") + + # Generiere erwartete Dateinamen + expected_files = self._generate_expected_files(target_date) + print(f"[{self.name}] Trying {len(expected_files)} potential files") + + # Versuche Dateien herunterzuladen + successful_files = 0 + for filename in expected_files: + trades = self._download_and_parse_file(filename) + if trades: + all_trades.extend(trades) + successful_files += 1 + + print(f"[{self.name}] Successfully downloaded {successful_files} files") + print(f"[{self.name}] Total trades fetched: {len(all_trades)}") + + return all_trades diff --git a/src/exchanges/stuttgart.py b/src/exchanges/stuttgart.py new file mode 100644 index 0000000..d10fc8b --- /dev/null +++ b/src/exchanges/stuttgart.py @@ -0,0 +1,366 @@ +import requests +import gzip +import json +import csv +import io +from datetime import datetime, timedelta, timezone +from typing import List, Optional +from .base import BaseExchange, Trade +from bs4 import BeautifulSoup + +# Browser User-Agent +HEADERS = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8', + 'Referer': 'https://www.boerse-stuttgart.de/' +} + +# Börse Stuttgart URLs +STUTTGART_PAGE_URL = "https://www.boerse-stuttgart.de/de-de/fuer-geschaeftspartner/reports/mifir-ii-delayed-data/xstf-post-trade/" + + +class StuttgartExchange(BaseExchange): + """ + Börse Stuttgart (XSTF) + MiFIR II Delayed Data Post-Trade + """ + + @property + def name(self) -> str: + return "STU" + + def _get_download_links(self) -> List[str]: + """ + Parst die Börse Stuttgart Seite und extrahiert Download-Links. + """ + files = [] + + try: + response = requests.get(STUTTGART_PAGE_URL, headers=HEADERS, timeout=30) + response.raise_for_status() + + soup = BeautifulSoup(response.text, 'html.parser') + + # Suche nach Download-Links + # Börse Stuttgart verwendet oft bestimmte CSS-Klassen oder data-Attribute + for link in soup.find_all('a'): + href = link.get('href', '') + + # Prüfe auf typische Dateiendungen + if href and ('posttrade' in href.lower() or 'post-trade' in href.lower()): + if href.endswith('.gz') or href.endswith('.json') or href.endswith('.csv'): + # Vollständige URL erstellen + if not href.startswith('http'): + if href.startswith('/'): + href = f"https://www.boerse-stuttgart.de{href}" + else: + href = f"https://www.boerse-stuttgart.de/{href}" + files.append(href) + + # Alternative: Suche nach JavaScript-generierten Links + if not files: + # Manchmal sind Links in Script-Tags versteckt + for script in soup.find_all('script'): + script_text = script.string or '' + if 'posttrade' in script_text.lower(): + # Versuche URLs zu extrahieren + import re + urls = re.findall(r'https?://[^\s\'"<>]+posttrade[^\s\'"<>]+\.(?:gz|json|csv)', script_text, re.IGNORECASE) + files.extend(urls) + + # Fallback: Versuche bekannte URL-Muster + if not files: + files = self._generate_expected_urls() + + except Exception as e: + print(f"[STU] Error fetching page: {e}") + files = self._generate_expected_urls() + + return files + + def _generate_expected_urls(self) -> List[str]: + """ + Generiert erwartete Download-URLs basierend auf bekannten Mustern. + Börse Stuttgart verwendet typischerweise ähnliche Formate wie andere Deutsche Börsen. + """ + files = [] + + # Versuche verschiedene URL-Muster + base_patterns = [ + "https://www.boerse-stuttgart.de/api/v1/delayed-data/xstf-post-trade/", + "https://www.boerse-stuttgart.de/downloads/delayed-data/", + "https://mfs.boerse-stuttgart.de/XSTF-posttrade/", + ] + + # Für die letzten 3 Tage + for days_ago in range(1, 4): + target_date = datetime.now(timezone.utc) - timedelta(days=days_ago) + date_str = target_date.strftime('%Y-%m-%d') + date_str_compact = target_date.strftime('%Y%m%d') + + for base in base_patterns: + files.append(f"{base}posttrade-{date_str}.json.gz") + files.append(f"{base}posttrade.{date_str_compact}.json.gz") + files.append(f"{base}xstf-posttrade-{date_str}.json.gz") + + return files + + def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]: + """Filtert Dateien für ein bestimmtes Datum""" + filtered = [] + target_str = target_date.strftime('%Y-%m-%d') + target_str_compact = target_date.strftime('%Y%m%d') + + # Auch Dateien vom Folgetag (frühe Morgenstunden) + next_day = target_date + timedelta(days=1) + next_day_str = next_day.strftime('%Y-%m-%d') + next_day_compact = next_day.strftime('%Y%m%d') + + for file in files: + file_lower = file.lower() + if target_str in file_lower or target_str_compact in file_lower: + filtered.append(file) + elif next_day_str in file_lower or next_day_compact in file_lower: + # Prüfe ob frühe Morgenstunde + if 'T00' in file or 'T01' in file or 'T02' in file: + filtered.append(file) + # Für kompakte Formate + elif '.00.' in file or '.01.' in file or '.02.' in file: + filtered.append(file) + + return filtered + + def _download_and_parse_file(self, url: str) -> List[Trade]: + """Lädt eine Datei herunter und parst die Trades""" + trades = [] + + try: + response = requests.get(url, headers=HEADERS, timeout=60) + + if response.status_code == 404: + return [] + + response.raise_for_status() + + content = response.content + + # Prüfe ob Gzip + if url.endswith('.gz'): + try: + with gzip.GzipFile(fileobj=io.BytesIO(content)) as f: + content = f.read() + except Exception: + pass # Vielleicht nicht wirklich gzip + + # Versuche als JSON zu parsen + if url.endswith('.json') or url.endswith('.json.gz'): + try: + data = json.loads(content) + if isinstance(data, list): + for record in data: + trade = self._parse_json_record(record) + if trade: + trades.append(trade) + return trades + except json.JSONDecodeError: + pass + + # Versuche als CSV zu parsen + try: + text = content.decode('utf-8') if isinstance(content, bytes) else content + reader = csv.DictReader(io.StringIO(text), delimiter=';') + for row in reader: + trade = self._parse_csv_row(row) + if trade: + trades.append(trade) + except Exception: + # Versuche mit Komma als Delimiter + try: + text = content.decode('utf-8') if isinstance(content, bytes) else content + reader = csv.DictReader(io.StringIO(text), delimiter=',') + for row in reader: + trade = self._parse_csv_row(row) + if trade: + trades.append(trade) + except Exception as e: + print(f"[STU] Could not parse {url}: {e}") + + except requests.exceptions.HTTPError as e: + if e.response.status_code != 404: + print(f"[STU] HTTP error downloading {url}: {e}") + except Exception as e: + print(f"[STU] Error downloading {url}: {e}") + + return trades + + def _parse_json_record(self, record: dict) -> Optional[Trade]: + """Parst einen JSON-Record zu einem Trade""" + try: + # ISIN + isin = record.get('ISIN') or record.get('FinInstrmId', {}).get('Id', '') + if not isin: + return None + + # Preis (verschiedene mögliche Strukturen) + price = None + if 'Pric' in record: + pric = record['Pric'] + if isinstance(pric, dict): + if 'Pric' in pric: + inner = pric['Pric'] + if isinstance(inner, dict): + price = float(inner.get('MntryVal', {}).get('Amt', 0) or inner.get('Amt', 0)) + else: + price = float(inner) + elif 'MntryVal' in pric: + price = float(pric['MntryVal'].get('Amt', 0)) + elif 'Amt' in pric: + price = float(pric['Amt']) + else: + price = float(pric) + elif 'Price' in record: + price = float(str(record['Price']).replace(',', '.')) + + if not price or price <= 0: + return None + + # Menge + quantity = None + if 'Qty' in record: + qty = record['Qty'] + if isinstance(qty, dict): + quantity = float(qty.get('Unit', qty.get('Qty', 0))) + else: + quantity = float(qty) + elif 'Quantity' in record: + quantity = float(str(record['Quantity']).replace(',', '.')) + + if not quantity or quantity <= 0: + return None + + # Timestamp + ts_str = record.get('TrdDtTm', '') + if not ts_str: + trd_dt = record.get('TrdDt', '') + trd_tm = record.get('TrdTm', '00:00:00') + if trd_dt: + ts_str = f"{trd_dt}T{trd_tm}" + + if not ts_str: + return None + + ts_str = ts_str.replace('Z', '+00:00') + timestamp = datetime.fromisoformat(ts_str) + if timestamp.tzinfo is None: + timestamp = timestamp.replace(tzinfo=timezone.utc) + + return Trade( + exchange=self.name, + symbol=isin, + isin=isin, + price=price, + quantity=quantity, + timestamp=timestamp + ) + + except Exception as e: + print(f"[STU] Error parsing JSON record: {e}") + return None + + def _parse_csv_row(self, row: dict) -> Optional[Trade]: + """Parst eine CSV-Zeile zu einem Trade""" + try: + # ISIN + isin = row.get('ISIN', row.get('FinInstrmId', '')) + if not isin: + return None + + # Preis + price_str = row.get('Pric', row.get('Price', '0')) + price_str = str(price_str).replace(',', '.') + price = float(price_str) + + if price <= 0: + return None + + # Menge + qty_str = row.get('Qty', row.get('Quantity', '0')) + qty_str = str(qty_str).replace(',', '.') + quantity = float(qty_str) + + if quantity <= 0: + return None + + # Timestamp + ts_str = row.get('TrdDtTm', row.get('TradingDateTime', '')) + if not ts_str: + trd_dt = row.get('TrdDt', '') + trd_tm = row.get('TrdTm', '00:00:00') + if trd_dt: + ts_str = f"{trd_dt}T{trd_tm}" + + if not ts_str: + return None + + ts_str = ts_str.replace('Z', '+00:00') + if 'T' not in ts_str: + ts_str = ts_str.replace(' ', 'T') + + timestamp = datetime.fromisoformat(ts_str) + if timestamp.tzinfo is None: + timestamp = timestamp.replace(tzinfo=timezone.utc) + + return Trade( + exchange=self.name, + symbol=isin, + isin=isin, + price=price, + quantity=quantity, + timestamp=timestamp + ) + + except Exception as e: + print(f"[STU] Error parsing CSV row: {e}") + return None + + def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]: + """ + Holt alle Trades vom Vortag. + """ + all_trades = [] + + # Zieldatum bestimmen + if since_date: + target_date = since_date.date() if hasattr(since_date, 'date') else since_date + else: + target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date() + + print(f"[{self.name}] Fetching trades for date: {target_date}") + + # Download-Links holen + all_links = self._get_download_links() + print(f"[{self.name}] Found {len(all_links)} potential download links") + + # Nach Datum filtern + target_links = self._filter_files_for_date(all_links, target_date) + + if not target_links: + # Fallback: Versuche alle Links + target_links = all_links + + print(f"[{self.name}] Trying {len(target_links)} files for target date") + + # Dateien herunterladen und parsen + successful = 0 + for url in target_links: + trades = self._download_and_parse_file(url) + if trades: + all_trades.extend(trades) + successful += 1 + print(f"[{self.name}] Parsed {len(trades)} trades from {url}") + + print(f"[{self.name}] Successfully processed {successful} files") + print(f"[{self.name}] Total trades fetched: {len(all_trades)}") + + return all_trades