import requests import gzip import json import csv import io from datetime import datetime, timedelta, timezone from typing import List, Optional from .base import BaseExchange, Trade from bs4 import BeautifulSoup # Browser User-Agent HEADERS = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8', 'Referer': 'https://www.boerse-stuttgart.de/' } # Börse Stuttgart URLs STUTTGART_PAGE_URL = "https://www.boerse-stuttgart.de/de-de/fuer-geschaeftspartner/reports/mifir-ii-delayed-data/xstf-post-trade/" class StuttgartExchange(BaseExchange): """ Börse Stuttgart (XSTF) MiFIR II Delayed Data Post-Trade """ @property def name(self) -> str: return "STU" def _get_download_links(self) -> List[str]: """ Parst die Börse Stuttgart Seite und extrahiert Download-Links. """ files = [] try: response = requests.get(STUTTGART_PAGE_URL, headers=HEADERS, timeout=30) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Suche nach Download-Links # Börse Stuttgart verwendet oft bestimmte CSS-Klassen oder data-Attribute for link in soup.find_all('a'): href = link.get('href', '') # Prüfe auf typische Dateiendungen if href and ('posttrade' in href.lower() or 'post-trade' in href.lower()): if href.endswith('.gz') or href.endswith('.json') or href.endswith('.csv'): # Vollständige URL erstellen if not href.startswith('http'): if href.startswith('/'): href = f"https://www.boerse-stuttgart.de{href}" else: href = f"https://www.boerse-stuttgart.de/{href}" files.append(href) # Alternative: Suche nach JavaScript-generierten Links if not files: # Manchmal sind Links in Script-Tags versteckt for script in soup.find_all('script'): script_text = script.string or '' if 'posttrade' in script_text.lower(): # Versuche URLs zu extrahieren import re urls = re.findall(r'https?://[^\s\'"<>]+posttrade[^\s\'"<>]+\.(?:gz|json|csv)', script_text, re.IGNORECASE) files.extend(urls) # Fallback: Versuche bekannte URL-Muster if not files: files = self._generate_expected_urls() except Exception as e: print(f"[STU] Error fetching page: {e}") files = self._generate_expected_urls() return files def _generate_expected_urls(self) -> List[str]: """ Generiert erwartete Download-URLs basierend auf bekannten Mustern. Börse Stuttgart verwendet typischerweise ähnliche Formate wie andere Deutsche Börsen. """ files = [] # Versuche verschiedene URL-Muster base_patterns = [ "https://www.boerse-stuttgart.de/api/v1/delayed-data/xstf-post-trade/", "https://www.boerse-stuttgart.de/downloads/delayed-data/", "https://mfs.boerse-stuttgart.de/XSTF-posttrade/", ] # Für die letzten 3 Tage for days_ago in range(1, 4): target_date = datetime.now(timezone.utc) - timedelta(days=days_ago) date_str = target_date.strftime('%Y-%m-%d') date_str_compact = target_date.strftime('%Y%m%d') for base in base_patterns: files.append(f"{base}posttrade-{date_str}.json.gz") files.append(f"{base}posttrade.{date_str_compact}.json.gz") files.append(f"{base}xstf-posttrade-{date_str}.json.gz") return files def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]: """Filtert Dateien für ein bestimmtes Datum""" filtered = [] target_str = target_date.strftime('%Y-%m-%d') target_str_compact = target_date.strftime('%Y%m%d') # Auch Dateien vom Folgetag (frühe Morgenstunden) next_day = target_date + timedelta(days=1) next_day_str = next_day.strftime('%Y-%m-%d') next_day_compact = next_day.strftime('%Y%m%d') for file in files: file_lower = file.lower() if target_str in file_lower or target_str_compact in file_lower: filtered.append(file) elif next_day_str in file_lower or next_day_compact in file_lower: # Prüfe ob frühe Morgenstunde if 'T00' in file or 'T01' in file or 'T02' in file: filtered.append(file) # Für kompakte Formate elif '.00.' in file or '.01.' in file or '.02.' in file: filtered.append(file) return filtered def _download_and_parse_file(self, url: str) -> List[Trade]: """Lädt eine Datei herunter und parst die Trades""" trades = [] try: response = requests.get(url, headers=HEADERS, timeout=60) if response.status_code == 404: return [] response.raise_for_status() content = response.content # Prüfe ob Gzip if url.endswith('.gz'): try: with gzip.GzipFile(fileobj=io.BytesIO(content)) as f: content = f.read() except Exception: pass # Vielleicht nicht wirklich gzip # Versuche als JSON zu parsen if url.endswith('.json') or url.endswith('.json.gz'): try: data = json.loads(content) if isinstance(data, list): for record in data: trade = self._parse_json_record(record) if trade: trades.append(trade) return trades except json.JSONDecodeError: pass # Versuche als CSV zu parsen try: text = content.decode('utf-8') if isinstance(content, bytes) else content reader = csv.DictReader(io.StringIO(text), delimiter=';') for row in reader: trade = self._parse_csv_row(row) if trade: trades.append(trade) except Exception: # Versuche mit Komma als Delimiter try: text = content.decode('utf-8') if isinstance(content, bytes) else content reader = csv.DictReader(io.StringIO(text), delimiter=',') for row in reader: trade = self._parse_csv_row(row) if trade: trades.append(trade) except Exception as e: print(f"[STU] Could not parse {url}: {e}") except requests.exceptions.HTTPError as e: if e.response.status_code != 404: print(f"[STU] HTTP error downloading {url}: {e}") except Exception as e: print(f"[STU] Error downloading {url}: {e}") return trades def _parse_json_record(self, record: dict) -> Optional[Trade]: """Parst einen JSON-Record zu einem Trade""" try: # ISIN isin = record.get('ISIN') or record.get('FinInstrmId', {}).get('Id', '') if not isin: return None # Preis (verschiedene mögliche Strukturen) price = None if 'Pric' in record: pric = record['Pric'] if isinstance(pric, dict): if 'Pric' in pric: inner = pric['Pric'] if isinstance(inner, dict): price = float(inner.get('MntryVal', {}).get('Amt', 0) or inner.get('Amt', 0)) else: price = float(inner) elif 'MntryVal' in pric: price = float(pric['MntryVal'].get('Amt', 0)) elif 'Amt' in pric: price = float(pric['Amt']) else: price = float(pric) elif 'Price' in record: price = float(str(record['Price']).replace(',', '.')) if not price or price <= 0: return None # Menge quantity = None if 'Qty' in record: qty = record['Qty'] if isinstance(qty, dict): quantity = float(qty.get('Unit', qty.get('Qty', 0))) else: quantity = float(qty) elif 'Quantity' in record: quantity = float(str(record['Quantity']).replace(',', '.')) if not quantity or quantity <= 0: return None # Timestamp ts_str = record.get('TrdDtTm', '') if not ts_str: trd_dt = record.get('TrdDt', '') trd_tm = record.get('TrdTm', '00:00:00') if trd_dt: ts_str = f"{trd_dt}T{trd_tm}" if not ts_str: return None ts_str = ts_str.replace('Z', '+00:00') timestamp = datetime.fromisoformat(ts_str) if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=timezone.utc) return Trade( exchange=self.name, symbol=isin, isin=isin, price=price, quantity=quantity, timestamp=timestamp ) except Exception as e: print(f"[STU] Error parsing JSON record: {e}") return None def _parse_csv_row(self, row: dict) -> Optional[Trade]: """Parst eine CSV-Zeile zu einem Trade""" try: # ISIN isin = row.get('ISIN', row.get('FinInstrmId', '')) if not isin: return None # Preis price_str = row.get('Pric', row.get('Price', '0')) price_str = str(price_str).replace(',', '.') price = float(price_str) if price <= 0: return None # Menge qty_str = row.get('Qty', row.get('Quantity', '0')) qty_str = str(qty_str).replace(',', '.') quantity = float(qty_str) if quantity <= 0: return None # Timestamp ts_str = row.get('TrdDtTm', row.get('TradingDateTime', '')) if not ts_str: trd_dt = row.get('TrdDt', '') trd_tm = row.get('TrdTm', '00:00:00') if trd_dt: ts_str = f"{trd_dt}T{trd_tm}" if not ts_str: return None ts_str = ts_str.replace('Z', '+00:00') if 'T' not in ts_str: ts_str = ts_str.replace(' ', 'T') timestamp = datetime.fromisoformat(ts_str) if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=timezone.utc) return Trade( exchange=self.name, symbol=isin, isin=isin, price=price, quantity=quantity, timestamp=timestamp ) except Exception as e: print(f"[STU] Error parsing CSV row: {e}") return None def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]: """ Holt alle Trades vom Vortag. """ all_trades = [] # Zieldatum bestimmen if since_date: target_date = since_date.date() if hasattr(since_date, 'date') else since_date else: target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date() print(f"[{self.name}] Fetching trades for date: {target_date}") # Download-Links holen all_links = self._get_download_links() print(f"[{self.name}] Found {len(all_links)} potential download links") # Nach Datum filtern target_links = self._filter_files_for_date(all_links, target_date) if not target_links: # Fallback: Versuche alle Links target_links = all_links print(f"[{self.name}] Trying {len(target_links)} files for target date") # Dateien herunterladen und parsen successful = 0 for url in target_links: trades = self._download_and_parse_file(url) if trades: all_trades.extend(trades) successful += 1 print(f"[{self.name}] Parsed {len(trades)} trades from {url}") print(f"[{self.name}] Successfully processed {successful} files") print(f"[{self.name}] Total trades fetched: {len(all_trades)}") return all_trades