Files
trading-daemon/src/exchanges/gettex.py

325 lines
12 KiB
Python
Raw Normal View History

2026-01-27 09:59:43 +01:00
import requests
import gzip
import csv
import io
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from .base import BaseExchange, Trade
from bs4 import BeautifulSoup
# Browser User-Agent für Zugriff (gettex prüft User-Agent!)
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8',
'Referer': 'https://www.gettex.de/'
}
# gettex Download-Basis-URLs
GETTEX_PAGE_URL = "https://www.gettex.de/handel/delayed-data/posttrade-data/"
2026-01-27 10:14:27 +01:00
# Die Download-URL ist auf der gettex-Webseite als Direkt-Link verfügbar
# Basis-URL für fileadmin Downloads (gefunden durch Seitenanalyse)
GETTEX_DOWNLOAD_BASE = "https://www.gettex.de/fileadmin/posttrade-data/"
2026-01-27 09:59:43 +01:00
class GettexExchange(BaseExchange):
"""
gettex Exchange (Bayerische Börse)
Kombiniert MUNC und MUND Daten.
Dateiformat: posttrade.YYYYMMDD.HH.mm.{munc|mund}.csv.gz
"""
@property
def name(self) -> str:
return "GETTEX"
2026-01-27 10:14:27 +01:00
def _get_file_list_from_page(self) -> List[dict]:
2026-01-27 09:59:43 +01:00
"""
Parst die gettex Seite und extrahiert Download-Links.
2026-01-27 10:14:27 +01:00
Gibt Liste von dicts mit 'filename' und 'url' zurück.
2026-01-27 09:59:43 +01:00
"""
files = []
try:
response = requests.get(GETTEX_PAGE_URL, headers=HEADERS, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Suche nach Links zu CSV.gz Dateien
for link in soup.find_all('a'):
href = link.get('href', '')
2026-01-27 10:14:27 +01:00
text = link.get_text(strip=True)
# Prüfe den Link-Text oder href auf posttrade CSV.gz Dateien
if href and 'posttrade' in href.lower() and '.csv.gz' in href.lower():
# Vollständige URL erstellen
if not href.startswith('http'):
url = f"https://www.gettex.de{href}" if href.startswith('/') else f"https://www.gettex.de/{href}"
else:
url = href
filename = href.split('/')[-1]
files.append({'filename': filename, 'url': url})
elif text and 'posttrade' in text.lower() and '.csv.gz' in text.lower():
# Link-Text ist der Dateiname, href könnte die URL sein
filename = text
if href:
if not href.startswith('http'):
url = f"https://www.gettex.de{href}" if href.startswith('/') else f"https://www.gettex.de/{href}"
else:
url = href
else:
# Fallback: Versuche verschiedene URL-Patterns
url = f"https://www.gettex.de/fileadmin/posttrade-data/{filename}"
files.append({'filename': filename, 'url': url})
2026-01-27 09:59:43 +01:00
2026-01-27 10:14:27 +01:00
print(f"[GETTEX] Found {len(files)} files on page")
2026-01-27 09:59:43 +01:00
except Exception as e:
print(f"[GETTEX] Error fetching page: {e}")
return files
def _generate_expected_files(self, target_date: datetime.date) -> List[str]:
"""
Generiert erwartete Dateinamen basierend auf dem Datum.
gettex veröffentlicht Dateien alle 15 Minuten während des Handels.
Dateiformat: posttrade.YYYYMMDD.HH.mm.{munc|mund}.csv.gz
"""
files = []
date_str = target_date.strftime('%Y%m%d')
# Handelszeiten: ca. 08:00 - 22:00 MEZ
# In UTC: 07:00 - 21:00 (Winter) / 06:00 - 20:00 (Sommer)
# Generiere für alle 15-Minuten-Intervalle
for hour in range(6, 23): # 06:00 - 22:45 UTC (abdeckend)
for minute in [0, 15, 30, 45]:
time_str = f"{hour:02d}.{minute:02d}"
files.append(f"posttrade.{date_str}.{time_str}.munc.csv.gz")
files.append(f"posttrade.{date_str}.{time_str}.mund.csv.gz")
# Auch frühe Dateien vom Folgetag (nach Mitternacht UTC)
next_date = target_date + timedelta(days=1)
next_date_str = next_date.strftime('%Y%m%d')
for hour in range(0, 3): # 00:00 - 02:45 UTC
for minute in [0, 15, 30, 45]:
time_str = f"{hour:02d}.{minute:02d}"
files.append(f"posttrade.{next_date_str}.{time_str}.munc.csv.gz")
files.append(f"posttrade.{next_date_str}.{time_str}.mund.csv.gz")
return files
def _download_and_parse_file(self, filename: str) -> List[Trade]:
"""Lädt eine CSV.gz Datei und parst die Trades"""
trades = []
try:
# Vollständige URL
url = f"{GETTEX_DOWNLOAD_BASE}{filename}"
response = requests.get(url, headers=HEADERS, timeout=60)
if response.status_code == 404:
# Datei existiert nicht - normal für Zeiten ohne Handel
return []
response.raise_for_status()
# Gzip entpacken
with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as f:
csv_text = f.read().decode('utf-8')
# CSV parsen
reader = csv.DictReader(io.StringIO(csv_text), delimiter=';')
for row in reader:
try:
trade = self._parse_csv_row(row)
if trade:
trades.append(trade)
except Exception as e:
print(f"[GETTEX] Error parsing row: {e}")
continue
except requests.exceptions.HTTPError as e:
if e.response.status_code != 404:
print(f"[GETTEX] HTTP error downloading {filename}: {e}")
except Exception as e:
print(f"[GETTEX] Error downloading {filename}: {e}")
return trades
def _parse_csv_row(self, row: dict) -> Optional[Trade]:
"""
Parst eine CSV-Zeile zu einem Trade.
Erwartete Spalten (RTS Format):
- TrdDtTm: Trading Date/Time
- ISIN: Instrument Identifier
- Pric: Preis
- Qty: Menge
- Ccy: Währung
"""
try:
# ISIN
isin = row.get('ISIN', row.get('FinInstrmId', ''))
if not isin:
return None
# Preis
price_str = row.get('Pric', row.get('Price', '0'))
price_str = price_str.replace(',', '.')
price = float(price_str)
if price <= 0:
return None
# Menge
qty_str = row.get('Qty', row.get('Quantity', '0'))
qty_str = qty_str.replace(',', '.')
quantity = float(qty_str)
if quantity <= 0:
return None
# Timestamp
ts_str = row.get('TrdDtTm', row.get('TradingDateTime', ''))
if not ts_str:
# Fallback: Separate Felder
trd_dt = row.get('TrdDt', '')
trd_tm = row.get('TrdTm', '00:00:00')
ts_str = f"{trd_dt}T{trd_tm}"
# Parse Timestamp (UTC)
ts_str = ts_str.replace('Z', '+00:00')
if 'T' not in ts_str:
ts_str = ts_str.replace(' ', 'T')
timestamp = datetime.fromisoformat(ts_str)
if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=timezone.utc)
return Trade(
exchange=self.name,
symbol=isin,
isin=isin,
price=price,
quantity=quantity,
timestamp=timestamp
)
except Exception as e:
print(f"[GETTEX] Error parsing CSV row: {e}")
return None
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
"""
Holt alle Trades vom Vortag.
"""
all_trades = []
# Zieldatum bestimmen
if since_date:
target_date = since_date.date() if hasattr(since_date, 'date') else since_date
else:
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
print(f"[{self.name}] Fetching trades for date: {target_date}")
2026-01-27 10:14:27 +01:00
# Versuche zuerst, Dateien von der Webseite zu laden
page_files = self._get_file_list_from_page()
if page_files:
# Filtere Dateien für das Zieldatum
target_str = target_date.strftime('%Y%m%d')
next_day = target_date + timedelta(days=1)
next_day_str = next_day.strftime('%Y%m%d')
target_files = []
for f in page_files:
filename = f['filename']
# Dateien vom Zieldatum oder frühe Morgenstunden des nächsten Tages
if target_str in filename:
target_files.append(f)
elif next_day_str in filename:
# Frühe Morgenstunden (00:00 - 02:45) gehören zum Vortag
try:
# Format: posttrade.YYYYMMDD.HH.MM.{munc|mund}.csv.gz
parts = filename.split('.')
if len(parts) >= 4:
hour = int(parts[2])
if hour < 3:
target_files.append(f)
except:
pass
print(f"[{self.name}] Found {len(target_files)} files for target date from page")
# Lade Dateien von der Webseite
for f in target_files:
trades = self._download_file_by_url(f['url'], f['filename'])
if trades:
all_trades.extend(trades)
2026-01-27 09:59:43 +01:00
2026-01-27 10:14:27 +01:00
# Fallback: Versuche erwartete Dateinamen
if not all_trades:
print(f"[{self.name}] No files from page, trying generated filenames...")
expected_files = self._generate_expected_files(target_date)
print(f"[{self.name}] Trying {len(expected_files)} potential files")
successful_files = 0
for filename in expected_files:
trades = self._download_and_parse_file(filename)
if trades:
all_trades.extend(trades)
successful_files += 1
print(f"[{self.name}] Successfully downloaded {successful_files} files")
2026-01-27 09:59:43 +01:00
print(f"[{self.name}] Total trades fetched: {len(all_trades)}")
return all_trades
2026-01-27 10:14:27 +01:00
def _download_file_by_url(self, url: str, filename: str) -> List[Trade]:
"""Lädt eine Datei direkt von einer URL"""
trades = []
try:
print(f"[{self.name}] Downloading: {url}")
response = requests.get(url, headers=HEADERS, timeout=60)
if response.status_code == 404:
return []
response.raise_for_status()
# Gzip entpacken
with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as f:
csv_text = f.read().decode('utf-8')
# CSV parsen
reader = csv.DictReader(io.StringIO(csv_text), delimiter=';')
for row in reader:
try:
trade = self._parse_csv_row(row)
if trade:
trades.append(trade)
except Exception as e:
print(f"[{self.name}] Error parsing row: {e}")
continue
print(f"[{self.name}] Parsed {len(trades)} trades from {filename}")
except requests.exceptions.HTTPError as e:
if e.response.status_code != 404:
print(f"[{self.name}] HTTP error downloading {url}: {e}")
except Exception as e:
print(f"[{self.name}] Error downloading {url}: {e}")
return trades