import requests import gzip import csv import io from datetime import datetime, timedelta, timezone from typing import List, Optional from .base import BaseExchange, Trade from bs4 import BeautifulSoup # Browser User-Agent für Zugriff (gettex prüft User-Agent!) HEADERS = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8', 'Referer': 'https://www.gettex.de/' } # gettex Download-Basis-URLs GETTEX_PAGE_URL = "https://www.gettex.de/handel/delayed-data/posttrade-data/" # Die Download-URL ist auf der gettex-Webseite als Direkt-Link verfügbar # Basis-URL für fileadmin Downloads (gefunden durch Seitenanalyse) GETTEX_DOWNLOAD_BASE = "https://www.gettex.de/fileadmin/posttrade-data/" class GettexExchange(BaseExchange): """ gettex Exchange (Bayerische Börse) Kombiniert MUNC und MUND Daten. Dateiformat: posttrade.YYYYMMDD.HH.mm.{munc|mund}.csv.gz """ @property def name(self) -> str: return "GETTEX" def _get_file_list_from_page(self) -> List[dict]: """ Parst die gettex Seite und extrahiert Download-Links. Gibt Liste von dicts mit 'filename' und 'url' zurück. """ files = [] try: response = requests.get(GETTEX_PAGE_URL, headers=HEADERS, timeout=30) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Suche nach Links zu CSV.gz Dateien for link in soup.find_all('a'): href = link.get('href', '') text = link.get_text(strip=True) # Prüfe den Link-Text oder href auf posttrade CSV.gz Dateien if href and 'posttrade' in href.lower() and '.csv.gz' in href.lower(): # Vollständige URL erstellen if not href.startswith('http'): url = f"https://www.gettex.de{href}" if href.startswith('/') else f"https://www.gettex.de/{href}" else: url = href filename = href.split('/')[-1] files.append({'filename': filename, 'url': url}) elif text and 'posttrade' in text.lower() and '.csv.gz' in text.lower(): # Link-Text ist der Dateiname, href könnte die URL sein filename = text if href: if not href.startswith('http'): url = f"https://www.gettex.de{href}" if href.startswith('/') else f"https://www.gettex.de/{href}" else: url = href else: # Fallback: Versuche verschiedene URL-Patterns url = f"https://www.gettex.de/fileadmin/posttrade-data/{filename}" files.append({'filename': filename, 'url': url}) print(f"[GETTEX] Found {len(files)} files on page") except Exception as e: print(f"[GETTEX] Error fetching page: {e}") return files def _generate_expected_files(self, target_date: datetime.date) -> List[str]: """ Generiert erwartete Dateinamen basierend auf dem Datum. gettex veröffentlicht Dateien alle 15 Minuten während des Handels. Dateiformat: posttrade.YYYYMMDD.HH.mm.{munc|mund}.csv.gz """ files = [] date_str = target_date.strftime('%Y%m%d') # Handelszeiten: ca. 08:00 - 22:00 MEZ # In UTC: 07:00 - 21:00 (Winter) / 06:00 - 20:00 (Sommer) # Generiere für alle 15-Minuten-Intervalle for hour in range(6, 23): # 06:00 - 22:45 UTC (abdeckend) for minute in [0, 15, 30, 45]: time_str = f"{hour:02d}.{minute:02d}" files.append(f"posttrade.{date_str}.{time_str}.munc.csv.gz") files.append(f"posttrade.{date_str}.{time_str}.mund.csv.gz") # Auch frühe Dateien vom Folgetag (nach Mitternacht UTC) next_date = target_date + timedelta(days=1) next_date_str = next_date.strftime('%Y%m%d') for hour in range(0, 3): # 00:00 - 02:45 UTC for minute in [0, 15, 30, 45]: time_str = f"{hour:02d}.{minute:02d}" files.append(f"posttrade.{next_date_str}.{time_str}.munc.csv.gz") files.append(f"posttrade.{next_date_str}.{time_str}.mund.csv.gz") return files def _download_and_parse_file(self, filename: str) -> List[Trade]: """Lädt eine CSV.gz Datei und parst die Trades""" trades = [] try: # Vollständige URL url = f"{GETTEX_DOWNLOAD_BASE}{filename}" response = requests.get(url, headers=HEADERS, timeout=60) if response.status_code == 404: # Datei existiert nicht - normal für Zeiten ohne Handel return [] response.raise_for_status() # Gzip entpacken with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as f: csv_text = f.read().decode('utf-8') lines = csv_text.strip().split('\n') if not lines: return [] # Extrahiere Datum aus Dateinamen (Format: posttrade.YYYYMMDD.HH.MM.xxx.csv.gz) date_str = None parts = filename.split('.') if len(parts) >= 4: date_str = parts[1] # YYYYMMDD # Gettex CSV hat KEINEN Header! # Format: ISIN,Zeit,Währung,Preis,Menge # z.B.: DE000BAY0017,09:15:03.638460,EUR,45.775,22 for line in lines: if not line.strip(): continue try: trade = self._parse_headerless_csv_line(line, date_str) if trade: trades.append(trade) except Exception: continue if trades: print(f"[GETTEX] Parsed {len(trades)} trades from {filename}") except requests.exceptions.HTTPError as e: if e.response.status_code != 404: print(f"[GETTEX] HTTP error downloading {filename}: {e}") except Exception as e: print(f"[GETTEX] Error downloading {filename}: {e}") return trades def _parse_headerless_csv_line(self, line: str, date_str: str = None) -> Optional[Trade]: """ Parst eine headerlose CSV-Zeile im gettex Format. Format: ISIN,Zeit,Währung,Preis,Menge z.B.: DE000BAY0017,09:15:03.638460,EUR,45.775,22 """ try: parts = line.strip().split(',') if len(parts) < 5: return None isin = parts[0].strip() time_str = parts[1].strip() # currency = parts[2].strip() # nicht benötigt price_str = parts[3].strip() qty_str = parts[4].strip() # Validierung if not isin or len(isin) != 12: # ISIN ist immer 12 Zeichen return None price = float(price_str) quantity = float(qty_str) if price <= 0 or quantity <= 0: return None # Timestamp bauen # date_str ist YYYYMMDD, time_str ist HH:MM:SS.ffffff if date_str and len(date_str) == 8: year = date_str[:4] month = date_str[4:6] day = date_str[6:8] date_part = f"{year}-{month}-{day}" else: # Fallback: heute date_part = datetime.now(timezone.utc).strftime('%Y-%m-%d') # Zeit parsen (z.B. 09:15:03.638460) ts_str = f"{date_part}T{time_str}" # Mikrosekunden kürzen wenn zu lang if '.' in ts_str: base, frac = ts_str.rsplit('.', 1) if len(frac) > 6: frac = frac[:6] ts_str = f"{base}.{frac}" timestamp = datetime.fromisoformat(ts_str) timestamp = timestamp.replace(tzinfo=timezone.utc) return Trade( exchange=self.name, symbol=isin, isin=isin, price=price, quantity=quantity, timestamp=timestamp ) except Exception: return None def _parse_csv_row(self, row: dict) -> Optional[Trade]: """ Parst eine CSV-Zeile zu einem Trade. Unterstützte Spalten (RTS1/RTS2 Format, verschiedene Varianten): - ISIN / FinInstrmId / Isin: Instrument Identifier - Pric / Price / pric: Preis - Qty / Quantity / qty: Menge - TrdDtTm / TradingDateTime / TrdgDtTm: Trading Date/Time - TrdDt / TradingDate: Trading Date - TrdTm / TradingTime: Trading Time """ try: # ISIN - versuche verschiedene Spaltennamen isin = None for key in ['ISIN', 'Isin', 'isin', 'FinInstrmId', 'FinInstrmId.Id', 'Id']: if key in row and row[key]: isin = str(row[key]).strip() break if not isin: return None # Preis - versuche verschiedene Spaltennamen price = None for key in ['Pric', 'Price', 'pric', 'price', 'Pric.Pric.MntryVal.Amt', 'TradPric']: if key in row and row[key]: price_str = str(row[key]).replace(',', '.').strip() try: price = float(price_str) if price > 0: break except ValueError: continue if not price or price <= 0: return None # Menge - versuche verschiedene Spaltennamen quantity = None for key in ['Qty', 'Quantity', 'qty', 'quantity', 'TradQty', 'Qty.Unit']: if key in row and row[key]: qty_str = str(row[key]).replace(',', '.').strip() try: quantity = float(qty_str) if quantity > 0: break except ValueError: continue if not quantity or quantity <= 0: return None # Timestamp - versuche verschiedene Formate ts_str = None # Erst kombiniertes Feld versuchen for key in ['TrdDtTm', 'TradingDateTime', 'TrdgDtTm', 'Timestamp', 'timestamp']: if key in row and row[key]: ts_str = str(row[key]).strip() break # Falls nicht gefunden, separate Felder kombinieren if not ts_str: trd_dt = None trd_tm = '00:00:00' for key in ['TrdDt', 'TradingDate', 'Date', 'date']: if key in row and row[key]: trd_dt = str(row[key]).strip() break for key in ['TrdTm', 'TradingTime', 'Time', 'time']: if key in row and row[key]: trd_tm = str(row[key]).strip() break if trd_dt: ts_str = f"{trd_dt}T{trd_tm}" if not ts_str: return None # Parse Timestamp (UTC) ts_str = ts_str.replace('Z', '+00:00') if 'T' not in ts_str: ts_str = ts_str.replace(' ', 'T') # Entferne Mikrosekunden wenn zu lang if '.' in ts_str: parts = ts_str.split('.') if len(parts) > 1: ms_part = parts[1].split('+')[0].split('-')[0] if len(ms_part) > 6: ts_str = parts[0] + '.' + ms_part[:6] if '+' in parts[1]: ts_str += '+' + parts[1].split('+')[1] elif '-' in parts[1][1:]: ts_str += '-' + parts[1].split('-')[-1] timestamp = datetime.fromisoformat(ts_str) if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=timezone.utc) return Trade( exchange=self.name, symbol=isin, isin=isin, price=price, quantity=quantity, timestamp=timestamp ) except Exception as e: # Nur bei den ersten paar Fehlern loggen return None def _get_last_trading_day(self, from_date) -> datetime.date: """ Findet den letzten Handelstag (überspringt Wochenenden). Montag=0, Sonntag=6 """ date = from_date # Wenn Samstag (5), gehe zurück zu Freitag if date.weekday() == 5: date = date - timedelta(days=1) # Wenn Sonntag (6), gehe zurück zu Freitag elif date.weekday() == 6: date = date - timedelta(days=2) return date def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]: """ Holt alle Trades vom letzten Handelstag (überspringt Wochenenden). """ all_trades = [] # Zieldatum bestimmen if since_date: target_date = since_date.date() if hasattr(since_date, 'date') else since_date else: target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date() # Überspringe Wochenenden original_date = target_date target_date = self._get_last_trading_day(target_date) if target_date != original_date: print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}") print(f"[{self.name}] Fetching trades for date: {target_date}") # Versuche zuerst, Dateien von der Webseite zu laden page_files = self._get_file_list_from_page() if page_files: # Filtere Dateien für das Zieldatum target_str = target_date.strftime('%Y%m%d') next_day = target_date + timedelta(days=1) next_day_str = next_day.strftime('%Y%m%d') target_files = [] for f in page_files: filename = f['filename'] # Dateien vom Zieldatum oder frühe Morgenstunden des nächsten Tages if target_str in filename: target_files.append(f) elif next_day_str in filename: # Frühe Morgenstunden (00:00 - 02:45) gehören zum Vortag try: # Format: posttrade.YYYYMMDD.HH.MM.{munc|mund}.csv.gz parts = filename.split('.') if len(parts) >= 4: hour = int(parts[2]) if hour < 3: target_files.append(f) except: pass print(f"[{self.name}] Found {len(target_files)} files for target date from page") # Lade Dateien von der Webseite for f in target_files: trades = self._download_file_by_url(f['url'], f['filename']) if trades: all_trades.extend(trades) # Fallback: Versuche erwartete Dateinamen if not all_trades: print(f"[{self.name}] No files from page, trying generated filenames...") expected_files = self._generate_expected_files(target_date) print(f"[{self.name}] Trying {len(expected_files)} potential files") successful_files = 0 for filename in expected_files: trades = self._download_and_parse_file(filename) if trades: all_trades.extend(trades) successful_files += 1 print(f"[{self.name}] Successfully downloaded {successful_files} files") print(f"[{self.name}] Total trades fetched: {len(all_trades)}") return all_trades def _download_file_by_url(self, url: str, filename: str) -> List[Trade]: """Lädt eine Datei direkt von einer URL""" trades = [] try: response = requests.get(url, headers=HEADERS, timeout=60) if response.status_code == 404: return [] response.raise_for_status() # Gzip entpacken with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as f: csv_text = f.read().decode('utf-8') lines = csv_text.strip().split('\n') if not lines: return [] # Extrahiere Datum aus Dateinamen (Format: posttrade.YYYYMMDD.HH.MM.xxx.csv.gz) date_str = None parts = filename.split('.') if len(parts) >= 4: date_str = parts[1] # YYYYMMDD # Gettex CSV hat KEINEN Header! # Format: ISIN,Zeit,Währung,Preis,Menge for line in lines: if not line.strip(): continue try: trade = self._parse_headerless_csv_line(line, date_str) if trade: trades.append(trade) except Exception: continue if trades: print(f"[{self.name}] Parsed {len(trades)} trades from {filename}") except requests.exceptions.HTTPError as e: if e.response.status_code != 404: print(f"[{self.name}] HTTP error downloading {url}: {e}") except Exception as e: print(f"[{self.name}] Error downloading {url}: {e}") return trades