Compare commits
3 Commits
f325941e24
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1dc79b8b64 | ||
|
|
cf55a0bd06 | ||
|
|
9cd84e0855 |
414
daemon.py
414
daemon.py
@@ -4,6 +4,9 @@ import datetime
|
|||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
import requests
|
import requests
|
||||||
|
from typing import List, Type
|
||||||
|
|
||||||
|
from src.exchanges.base import BaseExchange
|
||||||
from src.exchanges.eix import EIXExchange
|
from src.exchanges.eix import EIXExchange
|
||||||
from src.exchanges.ls import LSExchange
|
from src.exchanges.ls import LSExchange
|
||||||
from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange
|
from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange
|
||||||
@@ -25,41 +28,62 @@ DB_USER = os.getenv("DB_USER", "admin")
|
|||||||
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
|
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
|
||||||
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
|
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Exchange Registry - Neue Börsen hier hinzufügen
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
# Exchanges die Streaming-Verarbeitung benötigen (große Datenmengen)
|
||||||
|
STREAMING_EXCHANGES: List[Type[BaseExchange]] = [
|
||||||
|
EIXExchange,
|
||||||
|
]
|
||||||
|
|
||||||
|
# Standard-Exchanges (normale Batch-Verarbeitung)
|
||||||
|
STANDARD_EXCHANGES: List[Type[BaseExchange]] = [
|
||||||
|
# Lang & Schwarz
|
||||||
|
LSExchange,
|
||||||
|
# Deutsche Börse
|
||||||
|
XetraExchange,
|
||||||
|
FrankfurtExchange,
|
||||||
|
QuotrixExchange,
|
||||||
|
# Weitere Börsen
|
||||||
|
GettexExchange,
|
||||||
|
StuttgartExchange,
|
||||||
|
# Börsenag (Düsseldorf, Hamburg, Hannover)
|
||||||
|
DUSAExchange,
|
||||||
|
DUSBExchange,
|
||||||
|
DUSCExchange,
|
||||||
|
DUSDExchange,
|
||||||
|
HAMAExchange,
|
||||||
|
HAMBExchange,
|
||||||
|
HANAExchange,
|
||||||
|
HANBExchange,
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Trades Cache
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
# Cache für existierende Trades pro Tag (wird nach jedem Exchange geleert)
|
||||||
|
_existing_trades_cache = {}
|
||||||
|
|
||||||
def get_trade_hash(trade):
|
def get_trade_hash(trade):
|
||||||
"""Erstellt einen eindeutigen Hash für einen Trade."""
|
"""Erstellt einen eindeutigen Hash für einen Trade."""
|
||||||
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
|
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
|
||||||
return hashlib.md5(key.encode()).hexdigest()
|
return hashlib.md5(key.encode()).hexdigest()
|
||||||
|
|
||||||
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=1000):
|
def get_existing_trades_for_day(db_url, exchange_name, day):
|
||||||
"""Filtert neue Trades in Batches, um RAM zu sparen. Verwendet Batch-Queries statt einzelne Checks."""
|
"""Holt existierende Trades für einen Tag aus der DB (mit Caching)."""
|
||||||
if not trades:
|
cache_key = f"{exchange_name}_{day.strftime('%Y-%m-%d')}"
|
||||||
return []
|
|
||||||
|
|
||||||
new_trades = []
|
if cache_key in _existing_trades_cache:
|
||||||
total_batches = (len(trades) + batch_size - 1) // batch_size
|
return _existing_trades_cache[cache_key]
|
||||||
|
|
||||||
for batch_idx in range(0, len(trades), batch_size):
|
|
||||||
batch = trades[batch_idx:batch_idx + batch_size]
|
|
||||||
batch_num = (batch_idx // batch_size) + 1
|
|
||||||
|
|
||||||
if batch_num % 10 == 0 or batch_num == 1:
|
|
||||||
logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} trades)...")
|
|
||||||
|
|
||||||
# Gruppiere Trades nach Tag für effizientere Queries
|
|
||||||
trades_by_day = {}
|
|
||||||
for trade in batch:
|
|
||||||
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
||||||
if day not in trades_by_day:
|
|
||||||
trades_by_day[day] = []
|
|
||||||
trades_by_day[day].append(trade)
|
|
||||||
|
|
||||||
# Prüfe jeden Tag separat
|
|
||||||
for day, day_trades in trades_by_day.items():
|
|
||||||
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
||||||
day_end = day + datetime.timedelta(days=1)
|
day_end = day + datetime.timedelta(days=1)
|
||||||
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
||||||
|
|
||||||
# Hole alle existierenden Trades für diesen Tag
|
|
||||||
query = f"""
|
query = f"""
|
||||||
SELECT isin, timestamp, price, quantity
|
SELECT isin, timestamp, price, quantity
|
||||||
FROM trades
|
FROM trades
|
||||||
@@ -68,187 +92,281 @@ def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=1000):
|
|||||||
AND timestamp < '{day_end_str}'
|
AND timestamp < '{day_end_str}'
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
existing_trades = set()
|
||||||
try:
|
try:
|
||||||
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=30)
|
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
data = response.json()
|
data = response.json()
|
||||||
existing_trades = set()
|
|
||||||
if data.get('dataset'):
|
if data.get('dataset'):
|
||||||
for row in data['dataset']:
|
for row in data['dataset']:
|
||||||
isin, ts, price, qty = row
|
isin, ts, price, qty = row
|
||||||
# Normalisiere Timestamp für Vergleich
|
|
||||||
if isinstance(ts, str):
|
if isinstance(ts, str):
|
||||||
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
|
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
|
||||||
else:
|
else:
|
||||||
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
|
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
|
||||||
# Erstelle Vergleichs-Key (ohne Hash, direkter Vergleich)
|
|
||||||
key = (isin, ts_dt.isoformat(), float(price), float(qty))
|
key = (isin, ts_dt.isoformat(), float(price), float(qty))
|
||||||
existing_trades.add(key)
|
existing_trades.add(key)
|
||||||
|
|
||||||
# Prüfe welche Trades neu sind
|
|
||||||
for trade in day_trades:
|
|
||||||
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
|
|
||||||
if trade_key not in existing_trades:
|
|
||||||
new_trades.append(trade)
|
|
||||||
else:
|
|
||||||
# Bei Fehler: alle Trades als neu behandeln (sicherer)
|
|
||||||
logger.warning(f"Query failed for day {day}, treating all trades as new")
|
|
||||||
new_trades.extend(day_trades)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Bei Fehler: alle Trades als neu behandeln (sicherer)
|
logger.warning(f"Error fetching existing trades for {day}: {e}")
|
||||||
logger.warning(f"Error checking trades for day {day}: {e}, treating all trades as new")
|
|
||||||
new_trades.extend(day_trades)
|
|
||||||
|
|
||||||
# Kleine Pause zwischen Batches, um DB nicht zu überlasten
|
_existing_trades_cache[cache_key] = existing_trades
|
||||||
if batch_idx + batch_size < len(trades):
|
return existing_trades
|
||||||
time.sleep(0.05)
|
|
||||||
|
def clear_trades_cache():
|
||||||
|
"""Leert den Cache für existierende Trades."""
|
||||||
|
global _existing_trades_cache
|
||||||
|
_existing_trades_cache = {}
|
||||||
|
|
||||||
|
def filter_new_trades_for_day(db_url, exchange_name, trades, day):
|
||||||
|
"""Filtert neue Trades für einen einzelnen Tag."""
|
||||||
|
if not trades:
|
||||||
|
return []
|
||||||
|
|
||||||
|
existing = get_existing_trades_for_day(db_url, exchange_name, day)
|
||||||
|
|
||||||
|
new_trades = []
|
||||||
|
for trade in trades:
|
||||||
|
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
|
||||||
|
if trade_key not in existing:
|
||||||
|
new_trades.append(trade)
|
||||||
|
|
||||||
return new_trades
|
return new_trades
|
||||||
|
|
||||||
def get_last_trade_timestamp(db_url, exchange_name):
|
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=5000):
|
||||||
# QuestDB query: get the latest timestamp for a specific exchange
|
"""Filtert neue Trades in Batches, gruppiert nach Tag."""
|
||||||
|
if not trades:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Gruppiere alle Trades nach Tag
|
||||||
|
trades_by_day = {}
|
||||||
|
for trade in trades:
|
||||||
|
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||||
|
if day not in trades_by_day:
|
||||||
|
trades_by_day[day] = []
|
||||||
|
trades_by_day[day].append(trade)
|
||||||
|
|
||||||
|
new_trades = []
|
||||||
|
total_days = len(trades_by_day)
|
||||||
|
|
||||||
|
for i, (day, day_trades) in enumerate(sorted(trades_by_day.items()), 1):
|
||||||
|
if i % 10 == 0 or i == 1:
|
||||||
|
logger.info(f"Checking day {i}/{total_days}: {day.strftime('%Y-%m-%d')} ({len(day_trades)} trades)...")
|
||||||
|
|
||||||
|
new_for_day = filter_new_trades_for_day(db_url, exchange_name, day_trades, day)
|
||||||
|
new_trades.extend(new_for_day)
|
||||||
|
|
||||||
|
# Kleine Pause um DB nicht zu überlasten
|
||||||
|
if i < total_days:
|
||||||
|
time.sleep(0.02)
|
||||||
|
|
||||||
|
return new_trades
|
||||||
|
|
||||||
|
def get_last_trade_timestamp(db_url: str, exchange_name: str) -> datetime.datetime:
|
||||||
|
"""Holt den Timestamp des letzten Trades für eine Exchange aus QuestDB."""
|
||||||
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
|
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
|
||||||
try:
|
try:
|
||||||
# Using the /exec endpoint to get data
|
|
||||||
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
|
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
data = response.json()
|
data = response.json()
|
||||||
if data['dataset']:
|
if data.get('dataset'):
|
||||||
# QuestDB returns timestamp in micros since epoch by default in some views, or ISO
|
# QuestDB gibt Timestamps in Mikrosekunden oder ISO-Format zurück
|
||||||
# Let's assume the timestamp is in the dataset
|
ts_value = data['dataset'][0][0]
|
||||||
# ILP timestamps are stored as designated timestamps.
|
|
||||||
ts_value = data['dataset'][0][0] # Adjust index based on column order
|
|
||||||
if isinstance(ts_value, str):
|
if isinstance(ts_value, str):
|
||||||
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
|
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
|
||||||
else:
|
else:
|
||||||
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
|
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}")
|
logger.debug(f"Keine existierenden Daten für {exchange_name} oder DB nicht erreichbar: {e}")
|
||||||
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
|
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
|
||||||
|
|
||||||
def run_task(historical=False):
|
def process_eix_streaming(db, db_url: str, exchange: BaseExchange, historical: bool = False):
|
||||||
logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...")
|
"""Verarbeitet eine Exchange im Streaming-Modus um RAM zu sparen."""
|
||||||
|
|
||||||
# Initialize exchanges
|
|
||||||
eix = EIXExchange()
|
|
||||||
ls = LSExchange()
|
|
||||||
|
|
||||||
# Neue Deutsche Börse Exchanges
|
|
||||||
xetra = XetraExchange()
|
|
||||||
frankfurt = FrankfurtExchange()
|
|
||||||
quotrix = QuotrixExchange()
|
|
||||||
gettex = GettexExchange()
|
|
||||||
stuttgart = StuttgartExchange()
|
|
||||||
|
|
||||||
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
|
|
||||||
dusa = DUSAExchange()
|
|
||||||
dusb = DUSBExchange()
|
|
||||||
dusc = DUSCExchange()
|
|
||||||
dusd = DUSDExchange()
|
|
||||||
hama = HAMAExchange()
|
|
||||||
hamb = HAMBExchange()
|
|
||||||
hana = HANAExchange()
|
|
||||||
hanb = HANBExchange()
|
|
||||||
|
|
||||||
# Pass last_ts to fetcher to allow smart filtering
|
|
||||||
# daemon.py runs daily, so we want to fetch everything since DB state
|
|
||||||
# BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument
|
|
||||||
# We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop.
|
|
||||||
|
|
||||||
# We will modify the loop below to handle args dynamically
|
|
||||||
exchanges_to_process = [
|
|
||||||
(eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical
|
|
||||||
(ls, {'include_yesterday': historical}),
|
|
||||||
# Deutsche Börse Exchanges
|
|
||||||
(xetra, {'include_yesterday': historical}),
|
|
||||||
(frankfurt, {'include_yesterday': historical}),
|
|
||||||
(quotrix, {'include_yesterday': historical}),
|
|
||||||
(gettex, {'include_yesterday': historical}),
|
|
||||||
(stuttgart, {'include_yesterday': historical}),
|
|
||||||
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
|
|
||||||
(dusa, {'include_yesterday': historical}),
|
|
||||||
(dusb, {'include_yesterday': historical}),
|
|
||||||
(dusc, {'include_yesterday': historical}),
|
|
||||||
(dusd, {'include_yesterday': historical}),
|
|
||||||
(hama, {'include_yesterday': historical}),
|
|
||||||
(hamb, {'include_yesterday': historical}),
|
|
||||||
(hana, {'include_yesterday': historical}),
|
|
||||||
(hanb, {'include_yesterday': historical}),
|
|
||||||
]
|
|
||||||
|
|
||||||
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
|
|
||||||
|
|
||||||
for exchange, args in exchanges_to_process:
|
|
||||||
try:
|
|
||||||
db_url = "http://questdb:9000"
|
|
||||||
last_ts = get_last_trade_timestamp(db_url, exchange.name)
|
last_ts = get_last_trade_timestamp(db_url, exchange.name)
|
||||||
|
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts}) - STREAMING...")
|
||||||
|
|
||||||
logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...")
|
# Hole Liste der zu verarbeitenden Dateien
|
||||||
|
if historical:
|
||||||
|
files = exchange.get_files_to_process(limit=None, since_date=None)
|
||||||
|
else:
|
||||||
|
files = exchange.get_files_to_process(limit=None, since_date=last_ts)
|
||||||
|
|
||||||
# Special handling for EIX to support smart filtering
|
if not files:
|
||||||
call_args = args.copy()
|
logger.info(f"Keine {exchange.name} Dateien zu verarbeiten.")
|
||||||
if exchange.name == "EIX" and not historical:
|
return
|
||||||
call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc)
|
|
||||||
# Remove limit if we are filtering by date to ensure we get everything
|
|
||||||
if 'limit' in call_args:
|
|
||||||
call_args.pop('limit')
|
|
||||||
|
|
||||||
trades = exchange.fetch_latest_trades(**call_args)
|
logger.info(f"{len(files)} {exchange.name} Dateien gefunden...")
|
||||||
|
|
||||||
|
total_new = 0
|
||||||
|
total_processed = 0
|
||||||
|
|
||||||
|
for i, file_item in enumerate(files, 1):
|
||||||
|
file_name = file_item.get('fileName', 'unknown').split('/')[-1]
|
||||||
|
logger.info(f"Verarbeite {exchange.name} Datei {i}/{len(files)}: {file_name}")
|
||||||
|
|
||||||
|
trades = exchange.fetch_trades_from_file(file_item)
|
||||||
|
|
||||||
if not trades:
|
if not trades:
|
||||||
logger.info(f"No trades fetched from {exchange.name}.")
|
logger.info(f" Keine Trades in {file_name}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Hash-basierte Deduplizierung - Batch-Verarbeitung um RAM zu sparen
|
total_processed += len(trades)
|
||||||
logger.info(f"Filtering {len(trades)} trades for duplicates (batch processing)...")
|
logger.info(f" {len(trades)} Trades geladen, filtere Duplikate...")
|
||||||
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=500)
|
|
||||||
|
|
||||||
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
|
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
|
||||||
|
|
||||||
if new_trades:
|
if new_trades:
|
||||||
# Sort trades by timestamp before saving (QuestDB likes this)
|
|
||||||
new_trades.sort(key=lambda x: x.timestamp)
|
new_trades.sort(key=lambda x: x.timestamp)
|
||||||
db.save_trades(new_trades)
|
db.save_trades(new_trades)
|
||||||
logger.info(f"Stored {len(new_trades)} new trades in QuestDB.")
|
total_new += len(new_trades)
|
||||||
except Exception as e:
|
logger.info(f" {len(new_trades)} neue Trades gespeichert (gesamt neu: {total_new})")
|
||||||
logger.error(f"Error processing exchange {exchange.name}: {e}")
|
else:
|
||||||
|
logger.info(f" Keine neuen Trades in dieser Datei")
|
||||||
|
|
||||||
def main():
|
# Referenzen freigeben
|
||||||
logger.info("Trading Daemon started.")
|
del trades
|
||||||
|
del new_trades
|
||||||
|
|
||||||
# 1. Startup Check: Ist die DB leer?
|
time.sleep(0.1)
|
||||||
db_url = "http://questdb:9000"
|
|
||||||
is_empty = True
|
logger.info(f"{exchange.name} fertig: {total_new} neue Trades von {total_processed} verarbeitet.")
|
||||||
|
clear_trades_cache()
|
||||||
|
|
||||||
|
def process_standard_exchange(db, db_url: str, exchange: BaseExchange, historical: bool):
|
||||||
|
"""Verarbeitet einen Standard-Exchange mit Batch-Verarbeitung."""
|
||||||
|
try:
|
||||||
|
last_ts = get_last_trade_timestamp(db_url, exchange.name)
|
||||||
|
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts})...")
|
||||||
|
|
||||||
|
trades = exchange.fetch_latest_trades(include_yesterday=historical)
|
||||||
|
|
||||||
|
if not trades:
|
||||||
|
logger.info(f"Keine Trades von {exchange.name} erhalten.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Deduplizierung
|
||||||
|
logger.info(f"Filtere {len(trades)} Trades auf Duplikate...")
|
||||||
|
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
|
||||||
|
|
||||||
|
logger.info(f"Gefunden: {len(trades)} Trades gesamt, {len(new_trades)} sind neu.")
|
||||||
|
|
||||||
|
if new_trades:
|
||||||
|
new_trades.sort(key=lambda x: x.timestamp)
|
||||||
|
db.save_trades(new_trades)
|
||||||
|
logger.info(f"{len(new_trades)} neue Trades in QuestDB gespeichert.")
|
||||||
|
|
||||||
|
# Referenzen freigeben
|
||||||
|
del trades
|
||||||
|
if new_trades:
|
||||||
|
del new_trades
|
||||||
|
clear_trades_cache()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler bei Exchange {exchange.name}: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def run_task(historical=False):
|
||||||
|
"""Haupttask: Holt Trades von allen registrierten Exchanges."""
|
||||||
|
logger.info(f"Starte Trading Data Fetcher (Historical: {historical})...")
|
||||||
|
|
||||||
|
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
|
||||||
|
db_url = "http://questdb:9000"
|
||||||
|
|
||||||
|
# Streaming-Exchanges verarbeiten (große Datenmengen)
|
||||||
|
for exchange_class in STREAMING_EXCHANGES:
|
||||||
|
try:
|
||||||
|
exchange = exchange_class()
|
||||||
|
logger.info(f"Verarbeite {exchange.name} im Streaming-Modus...")
|
||||||
|
process_eix_streaming(db, db_url, exchange, historical=historical)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler bei Streaming-Exchange {exchange_class.__name__}: {e}")
|
||||||
|
|
||||||
|
# Standard-Exchanges verarbeiten
|
||||||
|
for exchange_class in STANDARD_EXCHANGES:
|
||||||
|
try:
|
||||||
|
exchange = exchange_class()
|
||||||
|
process_standard_exchange(db, db_url, exchange, historical)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler bei Exchange {exchange_class.__name__}: {e}")
|
||||||
|
|
||||||
|
logger.info("Alle Exchanges verarbeitet.")
|
||||||
|
|
||||||
|
def is_database_empty(db_url: str) -> bool:
|
||||||
|
"""Prüft ob die Datenbank leer ist oder die Tabelle nicht existiert."""
|
||||||
try:
|
try:
|
||||||
# Prüfe ob bereits Trades in der Tabelle sind
|
|
||||||
response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH)
|
response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
data = response.json()
|
data = response.json()
|
||||||
if data['dataset'] and data['dataset'][0][0] > 0:
|
if data.get('dataset') and data['dataset'][0][0] > 0:
|
||||||
is_empty = False
|
return False
|
||||||
except Exception:
|
except Exception:
|
||||||
# Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist
|
pass
|
||||||
is_empty = True
|
return True
|
||||||
|
|
||||||
if is_empty:
|
|
||||||
logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...")
|
def calculate_seconds_until_target(target_hour: int, target_minute: int = 0) -> int:
|
||||||
|
"""Berechnet Sekunden bis zur nächsten Zielzeit."""
|
||||||
|
now = datetime.datetime.now()
|
||||||
|
target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
|
||||||
|
|
||||||
|
# Wenn Zielzeit heute schon vorbei ist, nimm morgen
|
||||||
|
if target <= now:
|
||||||
|
target += datetime.timedelta(days=1)
|
||||||
|
|
||||||
|
return int((target - now).total_seconds())
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
logger.info("Trading Daemon gestartet.")
|
||||||
|
|
||||||
|
db_url = "http://questdb:9000"
|
||||||
|
|
||||||
|
# Startup: Initialer Sync
|
||||||
|
if is_database_empty(db_url):
|
||||||
|
logger.info("Datenbank ist leer. Starte initialen historischen Fetch...")
|
||||||
run_task(historical=True)
|
run_task(historical=True)
|
||||||
else:
|
else:
|
||||||
logger.info("Found existing data in database. Triggering catch-up sync...")
|
logger.info("Existierende Daten gefunden. Starte Catch-up Sync...")
|
||||||
# Run a normal task to fetch any missing data since the last run
|
|
||||||
run_task(historical=False)
|
run_task(historical=False)
|
||||||
logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.")
|
logger.info("Catch-up Sync abgeschlossen.")
|
||||||
|
|
||||||
|
# Scheduling Konfiguration
|
||||||
|
SCHEDULE_HOUR = 23
|
||||||
|
SCHEDULE_MINUTE = 0
|
||||||
|
last_run_date = None
|
||||||
|
|
||||||
|
logger.info(f"Warte auf täglichen Run um {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d}...")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
now = datetime.datetime.now()
|
now = datetime.datetime.now()
|
||||||
# Täglich um 23:00 Uhr
|
today = now.date()
|
||||||
if now.hour == 23 and now.minute == 0:
|
|
||||||
run_task(historical=False)
|
# Prüfe ob wir heute schon gelaufen sind
|
||||||
# Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern
|
already_ran_today = (last_run_date == today)
|
||||||
time.sleep(61)
|
|
||||||
|
# Prüfe ob wir im Zeitfenster sind (23:00 - 23:59)
|
||||||
|
in_schedule_window = (now.hour == SCHEDULE_HOUR and now.minute >= SCHEDULE_MINUTE)
|
||||||
|
|
||||||
|
if in_schedule_window and not already_ran_today:
|
||||||
|
logger.info(f"Geplanter Task startet ({now.strftime('%Y-%m-%d %H:%M:%S')})...")
|
||||||
|
run_task(historical=False)
|
||||||
|
last_run_date = today
|
||||||
|
logger.info("Geplanter Task abgeschlossen. Warte auf nächsten Tag...")
|
||||||
|
|
||||||
|
# Dynamische Sleep-Zeit: Kurz vor Zielzeit öfter prüfen
|
||||||
|
seconds_until_target = calculate_seconds_until_target(SCHEDULE_HOUR, SCHEDULE_MINUTE)
|
||||||
|
|
||||||
|
if seconds_until_target > 3600:
|
||||||
|
# Mehr als 1 Stunde: Schlafe 30 Minuten
|
||||||
|
sleep_time = 1800
|
||||||
|
elif seconds_until_target > 300:
|
||||||
|
# 5 Minuten bis 1 Stunde: Schlafe 5 Minuten
|
||||||
|
sleep_time = 300
|
||||||
|
else:
|
||||||
|
# Unter 5 Minuten: Schlafe 30 Sekunden
|
||||||
|
sleep_time = 30
|
||||||
|
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
|
||||||
# Check alle 30 Sekunden
|
|
||||||
time.sleep(30)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
5
read.py
5
read.py
@@ -1,5 +0,0 @@
|
|||||||
import gzip
|
|
||||||
import json
|
|
||||||
with gzip.open("DGAT-posttrade-2026-01-29T14_07.json.gz", mode="rt") as f:
|
|
||||||
data = [json.loads(line) for line in f]
|
|
||||||
print (str(data))
|
|
||||||
79
scripts/inspect_gzip.py
Normal file
79
scripts/inspect_gzip.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Utility-Script zum Inspizieren von gzip-komprimierten JSON-Dateien.
|
||||||
|
Verarbeitet Dateien streaming, ohne alles in den RAM zu laden.
|
||||||
|
|
||||||
|
Verwendung:
|
||||||
|
python scripts/inspect_gzip.py <datei.json.gz> [--limit N] [--output datei.json]
|
||||||
|
"""
|
||||||
|
import gzip
|
||||||
|
import json
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def inspect_gzip_file(filepath: str, limit: int = None, output_file: str = None):
|
||||||
|
"""
|
||||||
|
Liest eine gzip-komprimierte NDJSON-Datei und gibt die Inhalte aus.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath: Pfad zur .json.gz Datei
|
||||||
|
limit: Maximale Anzahl der auszugebenden Records (None = alle)
|
||||||
|
output_file: Optional: Ausgabe in Datei statt stdout
|
||||||
|
"""
|
||||||
|
path = Path(filepath)
|
||||||
|
if not path.exists():
|
||||||
|
print(f"Fehler: Datei '{filepath}' nicht gefunden.", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
output = open(output_file, 'w', encoding='utf-8') if output_file else sys.stdout
|
||||||
|
|
||||||
|
try:
|
||||||
|
with gzip.open(filepath, mode='rt', encoding='utf-8') as f:
|
||||||
|
for line in f:
|
||||||
|
if not line.strip():
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
record = json.loads(line)
|
||||||
|
# Pretty-print einzelner Record
|
||||||
|
json.dump(record, output, indent=2, ensure_ascii=False)
|
||||||
|
output.write('\n')
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
if limit and count >= limit:
|
||||||
|
break
|
||||||
|
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
print(f"JSON-Fehler in Zeile {count + 1}: {e}", file=sys.stderr)
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"\n--- {count} Records verarbeitet ---", file=sys.stderr)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if output_file and output != sys.stdout:
|
||||||
|
output.close()
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Inspiziert gzip-komprimierte JSON-Dateien (NDJSON-Format)'
|
||||||
|
)
|
||||||
|
parser.add_argument('file', help='Pfad zur .json.gz Datei')
|
||||||
|
parser.add_argument('--limit', '-n', type=int, default=10,
|
||||||
|
help='Maximale Anzahl der Records (default: 10, 0 = alle)')
|
||||||
|
parser.add_argument('--output', '-o', type=str,
|
||||||
|
help='Ausgabe in Datei statt stdout')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
limit = args.limit if args.limit > 0 else None
|
||||||
|
return inspect_gzip_file(args.file, limit=limit, output_file=args.output)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
sys.exit(main())
|
||||||
@@ -865,6 +865,39 @@ class AnalyticsWorker:
|
|||||||
if i % 10 == 0:
|
if i % 10 == 0:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
def delete_analytics_for_date(self, date: datetime.date):
|
||||||
|
"""Löscht alle Analytics-Daten für ein bestimmtes Datum, damit sie neu berechnet werden können."""
|
||||||
|
date_str = date.strftime('%Y-%m-%d')
|
||||||
|
next_day = date + datetime.timedelta(days=1)
|
||||||
|
next_day_str = next_day.strftime('%Y-%m-%d')
|
||||||
|
|
||||||
|
tables = ['analytics_custom', 'analytics_exchange_daily', 'analytics_daily_summary']
|
||||||
|
|
||||||
|
for table in tables:
|
||||||
|
try:
|
||||||
|
# QuestDB DELETE syntax
|
||||||
|
delete_query = f"DELETE FROM {table} WHERE timestamp >= '{date_str}' AND timestamp < '{next_day_str}'"
|
||||||
|
response = requests.get(
|
||||||
|
f"{self.questdb_url}/exec",
|
||||||
|
params={'query': delete_query},
|
||||||
|
auth=self.auth,
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
if response.status_code == 200:
|
||||||
|
logger.debug(f"Deleted old analytics from {table} for {date}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Could not delete from {table} for {date}: {e}")
|
||||||
|
|
||||||
|
def force_recalculate_date(self, date: datetime.date):
|
||||||
|
"""Erzwingt Neuberechnung der Analytics für ein Datum (löscht alte Daten zuerst)."""
|
||||||
|
logger.info(f"Force recalculating analytics for {date}...")
|
||||||
|
|
||||||
|
# Lösche alte Analytics-Daten für dieses Datum
|
||||||
|
self.delete_analytics_for_date(date)
|
||||||
|
|
||||||
|
# Berechne neu
|
||||||
|
self.process_date(date)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Hauptschleife des Workers"""
|
"""Hauptschleife des Workers"""
|
||||||
logger.info("Analytics Worker started.")
|
logger.info("Analytics Worker started.")
|
||||||
@@ -874,33 +907,24 @@ class AnalyticsWorker:
|
|||||||
logger.error("Failed to connect to QuestDB. Exiting.")
|
logger.error("Failed to connect to QuestDB. Exiting.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Initiale Berechnung fehlender Tage (inkl. gestern und heute)
|
# Initiale Berechnung fehlender Tage
|
||||||
logger.info("Checking for missing dates...")
|
logger.info("Checking for missing dates...")
|
||||||
self.process_missing_dates()
|
self.process_missing_dates()
|
||||||
|
|
||||||
# Stelle sicher, dass gestern und heute verarbeitet werden
|
# IMMER heute und gestern neu berechnen (da neue Trades hinzukommen können)
|
||||||
today = datetime.date.today()
|
today = datetime.date.today()
|
||||||
yesterday = today - datetime.timedelta(days=1)
|
yesterday = today - datetime.timedelta(days=1)
|
||||||
|
|
||||||
logger.info(f"Ensuring yesterday ({yesterday}) and today ({today}) are processed...")
|
logger.info(f"Force recalculating yesterday ({yesterday}) and today ({today}) - new trades may have been added...")
|
||||||
# Prüfe alle drei Tabellen
|
|
||||||
existing_custom = self.get_existing_dates('analytics_custom')
|
|
||||||
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
|
|
||||||
existing_summary = self.get_existing_dates('analytics_daily_summary')
|
|
||||||
existing_dates = existing_custom | existing_exchange | existing_summary
|
|
||||||
|
|
||||||
if yesterday not in existing_dates:
|
# Gestern immer neu berechnen
|
||||||
logger.info(f"Processing yesterday's data: {yesterday}")
|
self.force_recalculate_date(yesterday)
|
||||||
self.process_date(yesterday)
|
|
||||||
|
|
||||||
# Heute wird verarbeitet, wenn es bereits Trades gibt
|
# Heute nur wenn es Trades gibt
|
||||||
if today not in existing_dates:
|
|
||||||
# Prüfe ob es heute schon Trades gibt
|
|
||||||
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
||||||
data = self.query_questdb(query)
|
data = self.query_questdb(query)
|
||||||
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
||||||
logger.info(f"Found trades for today ({today}), processing...")
|
self.force_recalculate_date(today)
|
||||||
self.process_date(today)
|
|
||||||
else:
|
else:
|
||||||
logger.info(f"No trades found for today ({today}) yet, will process later")
|
logger.info(f"No trades found for today ({today}) yet, will process later")
|
||||||
|
|
||||||
@@ -917,32 +941,24 @@ class AnalyticsWorker:
|
|||||||
self.process_missing_dates()
|
self.process_missing_dates()
|
||||||
last_check_hour = current_hour
|
last_check_hour = current_hour
|
||||||
|
|
||||||
# Stelle sicher, dass gestern und heute verarbeitet wurden
|
# IMMER heute und gestern neu berechnen
|
||||||
today = now.date()
|
today = now.date()
|
||||||
yesterday = today - datetime.timedelta(days=1)
|
yesterday = today - datetime.timedelta(days=1)
|
||||||
# Prüfe alle drei Tabellen
|
|
||||||
existing_custom = self.get_existing_dates('analytics_custom')
|
|
||||||
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
|
|
||||||
existing_summary = self.get_existing_dates('analytics_daily_summary')
|
|
||||||
existing_dates = existing_custom | existing_exchange | existing_summary
|
|
||||||
|
|
||||||
if yesterday not in existing_dates:
|
logger.info(f"Hourly recalculation of yesterday ({yesterday}) and today ({today})...")
|
||||||
logger.info(f"Processing yesterday's data: {yesterday}")
|
self.force_recalculate_date(yesterday)
|
||||||
self.process_date(yesterday)
|
|
||||||
|
|
||||||
# Prüfe heute, ob es Trades gibt
|
# Prüfe heute, ob es Trades gibt
|
||||||
if today not in existing_dates:
|
|
||||||
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
||||||
data = self.query_questdb(query)
|
data = self.query_questdb(query)
|
||||||
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
||||||
logger.info(f"Found trades for today ({today}), processing...")
|
self.force_recalculate_date(today)
|
||||||
self.process_date(today)
|
|
||||||
|
|
||||||
# Prüfe ob es Mitternacht ist (00:00) - verarbeite dann gestern
|
# Prüfe ob es Mitternacht ist (00:00) - verarbeite dann gestern
|
||||||
if now.hour == 0 and now.minute == 0:
|
if now.hour == 0 and now.minute == 0:
|
||||||
yesterday = (now - datetime.timedelta(days=1)).date()
|
yesterday = (now - datetime.timedelta(days=1)).date()
|
||||||
logger.info(f"Midnight reached - processing yesterday's data: {yesterday}")
|
logger.info(f"Midnight reached - force recalculating yesterday's data: {yesterday}")
|
||||||
self.process_date(yesterday)
|
self.force_recalculate_date(yesterday)
|
||||||
# Warte 61s, um Mehrfachausführung zu verhindern
|
# Warte 61s, um Mehrfachausführung zu verhindern
|
||||||
time.sleep(61)
|
time.sleep(61)
|
||||||
|
|
||||||
|
|||||||
@@ -2,16 +2,20 @@ import requests
|
|||||||
import gzip
|
import gzip
|
||||||
import json
|
import json
|
||||||
import io
|
import io
|
||||||
|
import re
|
||||||
import time
|
import time
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
from .base import BaseExchange, Trade
|
from .base import BaseExchange, Trade
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Rate-Limiting Konfiguration
|
# Rate-Limiting Konfiguration
|
||||||
RATE_LIMIT_DELAY = 0.5 # Sekunden zwischen Requests
|
RATE_LIMIT_DELAY = 0.5 # Sekunden zwischen Requests
|
||||||
RATE_LIMIT_RETRY_DELAY = 5 # Sekunden Wartezeit bei 429
|
RATE_LIMIT_RETRY_DELAY = 5 # Sekunden Wartezeit bei 429
|
||||||
MAX_RETRIES = 3 # Maximale Wiederholungen bei 429
|
MAX_RETRIES = 5 # Maximale Wiederholungen bei 429
|
||||||
|
|
||||||
# API URLs für Deutsche Börse
|
# API URLs für Deutsche Börse
|
||||||
API_URLS = {
|
API_URLS = {
|
||||||
@@ -21,17 +25,47 @@ API_URLS = {
|
|||||||
}
|
}
|
||||||
DOWNLOAD_BASE_URL = "https://mfs.deutsche-boerse.com/api/download"
|
DOWNLOAD_BASE_URL = "https://mfs.deutsche-boerse.com/api/download"
|
||||||
|
|
||||||
# Browser User-Agent für Zugriff
|
# Liste von User-Agents für Rotation bei Rate-Limiting
|
||||||
HEADERS = {
|
USER_AGENTS = [
|
||||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||||
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
|
||||||
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0',
|
||||||
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
|
||||||
|
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||||
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
|
||||||
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0',
|
||||||
|
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0',
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class UserAgentRotator:
|
||||||
|
"""Thread-safe User-Agent Rotation"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._index = 0
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def get_headers(self, rotate: bool = False) -> dict:
|
||||||
|
"""Gibt Headers mit aktuellem User-Agent zurück. Bei rotate=True wird zum nächsten gewechselt."""
|
||||||
|
with self._lock:
|
||||||
|
if rotate:
|
||||||
|
self._index = (self._index + 1) % len(USER_AGENTS)
|
||||||
|
return {
|
||||||
|
'User-Agent': USER_AGENTS[self._index],
|
||||||
'Accept': 'application/json, application/gzip, */*',
|
'Accept': 'application/json, application/gzip, */*',
|
||||||
'Referer': 'https://mfs.deutsche-boerse.com/',
|
'Referer': 'https://mfs.deutsche-boerse.com/',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Globale Instanz für User-Agent Rotation
|
||||||
|
_ua_rotator = UserAgentRotator()
|
||||||
|
|
||||||
|
|
||||||
class DeutscheBoerseBase(BaseExchange):
|
class DeutscheBoerseBase(BaseExchange):
|
||||||
"""Basisklasse für Deutsche Börse Exchanges (Xetra, Frankfurt, Quotrix)"""
|
"""Basisklasse für Deutsche Börse Exchanges (Xetra, Frankfurt, Quotrix)"""
|
||||||
|
|
||||||
|
# Regex für Dateinamen-Parsing (kompiliert für Performance)
|
||||||
|
_FILENAME_PATTERN = re.compile(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def base_url(self) -> str:
|
def base_url(self) -> str:
|
||||||
"""Override in subclasses"""
|
"""Override in subclasses"""
|
||||||
@@ -46,60 +80,73 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
"""API URL für die Dateiliste"""
|
"""API URL für die Dateiliste"""
|
||||||
return API_URLS.get(self.name, self.base_url)
|
return API_URLS.get(self.name, self.base_url)
|
||||||
|
|
||||||
|
def _handle_rate_limit(self, retry: int, context: str) -> None:
|
||||||
|
"""Zentrale Rate-Limit Behandlung: rotiert User-Agent und wartet."""
|
||||||
|
_ua_rotator.get_headers(rotate=True)
|
||||||
|
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
|
||||||
|
logger.warning(f"[{self.name}] Rate limited ({context}), rotating User-Agent and waiting {wait_time}s... (retry {retry + 1}/{MAX_RETRIES})")
|
||||||
|
time.sleep(wait_time)
|
||||||
|
|
||||||
def _get_file_list(self) -> List[str]:
|
def _get_file_list(self) -> List[str]:
|
||||||
"""Holt die Dateiliste von der JSON API"""
|
"""Holt die Dateiliste von der JSON API"""
|
||||||
try:
|
|
||||||
api_url = self.api_url
|
api_url = self.api_url
|
||||||
print(f"[{self.name}] Fetching file list from: {api_url}")
|
|
||||||
response = requests.get(api_url, headers=HEADERS, timeout=30)
|
for retry in range(MAX_RETRIES):
|
||||||
|
try:
|
||||||
|
headers = _ua_rotator.get_headers(rotate=(retry > 0))
|
||||||
|
logger.info(f"[{self.name}] Fetching file list from: {api_url}")
|
||||||
|
response = requests.get(api_url, headers=headers, timeout=30)
|
||||||
|
|
||||||
|
if response.status_code == 429:
|
||||||
|
self._handle_rate_limit(retry, "file list")
|
||||||
|
continue
|
||||||
|
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
data = response.json()
|
data = response.json()
|
||||||
files = data.get('CurrentFiles', [])
|
files = data.get('CurrentFiles', [])
|
||||||
|
|
||||||
print(f"[{self.name}] API returned {len(files)} files")
|
logger.info(f"[{self.name}] API returned {len(files)} files")
|
||||||
if files:
|
if files:
|
||||||
print(f"[{self.name}] Sample files: {files[:3]}")
|
logger.debug(f"[{self.name}] Sample files: {files[:3]}")
|
||||||
return files
|
return files
|
||||||
|
|
||||||
|
except requests.exceptions.HTTPError as e:
|
||||||
|
if e.response.status_code == 429:
|
||||||
|
self._handle_rate_limit(retry, "file list HTTPError")
|
||||||
|
continue
|
||||||
|
logger.error(f"[{self.name}] HTTP error fetching file list: {e}")
|
||||||
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[{self.name}] Error fetching file list from API: {e}")
|
logger.exception(f"[{self.name}] Error fetching file list from API: {e}")
|
||||||
import traceback
|
break
|
||||||
print(f"[{self.name}] Traceback: {traceback.format_exc()}")
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]:
|
def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]:
|
||||||
"""
|
"""
|
||||||
Filtert Dateien für ein bestimmtes Datum.
|
Filtert Dateien für ein bestimmtes Datum.
|
||||||
Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz (mit Unterstrich!)
|
Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz
|
||||||
|
|
||||||
Da Handel bis 22:00 MEZ geht (21:00/20:00 UTC), müssen wir auch
|
Da Handel bis 22:00 MEZ geht (21:00/20:00 UTC), müssen wir auch
|
||||||
Dateien nach Mitternacht UTC berücksichtigen.
|
Dateien nach Mitternacht UTC berücksichtigen.
|
||||||
"""
|
"""
|
||||||
import re
|
|
||||||
filtered = []
|
filtered = []
|
||||||
|
|
||||||
# Für den Vortag: Dateien vom target_date UND vom Folgetag (bis ~02:00 UTC)
|
|
||||||
target_str = target_date.strftime('%Y-%m-%d')
|
target_str = target_date.strftime('%Y-%m-%d')
|
||||||
next_day = target_date + timedelta(days=1)
|
next_day = target_date + timedelta(days=1)
|
||||||
next_day_str = next_day.strftime('%Y-%m-%d')
|
next_day_str = next_day.strftime('%Y-%m-%d')
|
||||||
|
|
||||||
for file in files:
|
for file in files:
|
||||||
# Extrahiere Datum aus Dateiname
|
|
||||||
# Format: DETR-posttrade-2026-01-26T21_30.json.gz
|
|
||||||
if target_str in file:
|
if target_str in file:
|
||||||
filtered.append(file)
|
filtered.append(file)
|
||||||
elif next_day_str in file:
|
elif next_day_str in file:
|
||||||
# Prüfe ob es eine frühe Datei vom nächsten Tag ist (< 03:00 UTC)
|
# Prüfe ob es eine frühe Datei vom nächsten Tag ist (< 03:00 UTC)
|
||||||
try:
|
match = self._FILENAME_PATTERN.search(file)
|
||||||
# Finde Timestamp im Dateinamen mit Unterstrich für Minuten
|
|
||||||
match = re.search(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})', file)
|
|
||||||
if match:
|
if match:
|
||||||
hour = int(match.group(2))
|
hour = int(match.group(2))
|
||||||
if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag
|
if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag
|
||||||
filtered.append(file)
|
filtered.append(file)
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return filtered
|
return filtered
|
||||||
|
|
||||||
@@ -110,17 +157,14 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
|
|
||||||
for retry in range(MAX_RETRIES):
|
for retry in range(MAX_RETRIES):
|
||||||
try:
|
try:
|
||||||
response = requests.get(full_url, headers=HEADERS, timeout=60)
|
headers = _ua_rotator.get_headers(rotate=(retry > 0))
|
||||||
|
response = requests.get(full_url, headers=headers, timeout=60)
|
||||||
|
|
||||||
if response.status_code == 404:
|
if response.status_code == 404:
|
||||||
# Datei nicht gefunden - normal für alte Dateien
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
if response.status_code == 429:
|
if response.status_code == 429:
|
||||||
# Rate-Limit erreicht - warten und erneut versuchen
|
self._handle_rate_limit(retry, "download")
|
||||||
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
|
|
||||||
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
|
|
||||||
time.sleep(wait_time)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
@@ -130,13 +174,11 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
content = f.read().decode('utf-8')
|
content = f.read().decode('utf-8')
|
||||||
|
|
||||||
if not content.strip():
|
if not content.strip():
|
||||||
# Leere Datei
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# NDJSON Format: Eine JSON-Zeile pro Trade
|
# NDJSON Format: Eine JSON-Zeile pro Trade
|
||||||
lines = content.strip().split('\n')
|
lines = content.strip().split('\n')
|
||||||
if not lines or (len(lines) == 1 and not lines[0].strip()):
|
if not lines or (len(lines) == 1 and not lines[0].strip()):
|
||||||
# Leere Datei
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
for line in lines:
|
for line in lines:
|
||||||
@@ -147,116 +189,146 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
trade = self._parse_trade_record(record)
|
trade = self._parse_trade_record(record)
|
||||||
if trade:
|
if trade:
|
||||||
trades.append(trade)
|
trades.append(trade)
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError as e:
|
||||||
continue
|
logger.debug(f"[{self.name}] JSON decode error in {filename}: {e}")
|
||||||
except Exception:
|
except Exception as e:
|
||||||
continue
|
logger.debug(f"[{self.name}] Error parsing record in {filename}: {e}")
|
||||||
|
|
||||||
# Erfolg - keine weitere Retry nötig
|
# Erfolg
|
||||||
break
|
break
|
||||||
|
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
if e.response.status_code == 429:
|
if e.response.status_code == 429:
|
||||||
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
|
self._handle_rate_limit(retry, "download HTTPError")
|
||||||
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
|
|
||||||
time.sleep(wait_time)
|
|
||||||
continue
|
continue
|
||||||
elif e.response.status_code != 404:
|
elif e.response.status_code != 404:
|
||||||
print(f"[{self.name}] HTTP error downloading {filename}: {e}")
|
logger.error(f"[{self.name}] HTTP error downloading {filename}: {e}")
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[{self.name}] Error downloading/parsing {filename}: {e}")
|
logger.error(f"[{self.name}] Error downloading/parsing {filename}: {e}")
|
||||||
break
|
break
|
||||||
|
|
||||||
return trades
|
return trades
|
||||||
|
|
||||||
|
def _parse_timestamp(self, ts_str: str) -> Optional[datetime]:
|
||||||
|
"""
|
||||||
|
Parst einen Timestamp-String in ein datetime-Objekt.
|
||||||
|
Unterstützt Nanosekunden durch Kürzung auf Mikrosekunden.
|
||||||
|
"""
|
||||||
|
if not ts_str:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Ersetze 'Z' durch '+00:00' für ISO-Kompatibilität
|
||||||
|
ts_str = ts_str.replace('Z', '+00:00')
|
||||||
|
|
||||||
|
# Kürze Nanosekunden auf Mikrosekunden (Python max 6 Dezimalstellen)
|
||||||
|
if '.' in ts_str:
|
||||||
|
# Split bei '+' oder '-' für Timezone
|
||||||
|
if '+' in ts_str:
|
||||||
|
time_part, tz_part = ts_str.rsplit('+', 1)
|
||||||
|
tz_part = '+' + tz_part
|
||||||
|
elif ts_str.count('-') > 2: # Negative Timezone
|
||||||
|
time_part, tz_part = ts_str.rsplit('-', 1)
|
||||||
|
tz_part = '-' + tz_part
|
||||||
|
else:
|
||||||
|
time_part, tz_part = ts_str, ''
|
||||||
|
|
||||||
|
if '.' in time_part:
|
||||||
|
base, frac = time_part.split('.')
|
||||||
|
frac = frac[:6] # Kürze auf 6 Stellen
|
||||||
|
ts_str = f"{base}.{frac}{tz_part}"
|
||||||
|
|
||||||
|
return datetime.fromisoformat(ts_str)
|
||||||
|
|
||||||
|
def _extract_price(self, record: dict) -> Optional[float]:
|
||||||
|
"""Extrahiert den Preis aus verschiedenen JSON-Formaten."""
|
||||||
|
# Neues Format
|
||||||
|
if 'lastTrade' in record:
|
||||||
|
return float(record['lastTrade'])
|
||||||
|
|
||||||
|
# Altes Format mit verschachteltem Pric-Objekt
|
||||||
|
pric = record.get('Pric')
|
||||||
|
if pric is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if isinstance(pric, (int, float)):
|
||||||
|
return float(pric)
|
||||||
|
|
||||||
|
if isinstance(pric, dict):
|
||||||
|
# Versuche verschiedene Pfade
|
||||||
|
if 'Pric' in pric:
|
||||||
|
inner = pric['Pric']
|
||||||
|
if isinstance(inner, dict):
|
||||||
|
amt = inner.get('MntryVal', {}).get('Amt') or inner.get('Amt')
|
||||||
|
if amt is not None:
|
||||||
|
return float(amt)
|
||||||
|
if 'MntryVal' in pric:
|
||||||
|
amt = pric['MntryVal'].get('Amt')
|
||||||
|
if amt is not None:
|
||||||
|
return float(amt)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _extract_quantity(self, record: dict) -> Optional[float]:
|
||||||
|
"""Extrahiert die Menge aus verschiedenen JSON-Formaten."""
|
||||||
|
# Neues Format
|
||||||
|
if 'lastQty' in record:
|
||||||
|
return float(record['lastQty'])
|
||||||
|
|
||||||
|
# Altes Format
|
||||||
|
qty = record.get('Qty')
|
||||||
|
if qty is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if isinstance(qty, (int, float)):
|
||||||
|
return float(qty)
|
||||||
|
|
||||||
|
if isinstance(qty, dict):
|
||||||
|
val = qty.get('Unit') or qty.get('Qty')
|
||||||
|
if val is not None:
|
||||||
|
return float(val)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
def _parse_trade_record(self, record: dict) -> Optional[Trade]:
|
def _parse_trade_record(self, record: dict) -> Optional[Trade]:
|
||||||
"""
|
"""
|
||||||
Parst einen einzelnen Trade-Record aus dem JSON.
|
Parst einen einzelnen Trade-Record aus dem JSON.
|
||||||
|
|
||||||
Aktuelles JSON-Format (NDJSON):
|
Unterstützte Formate:
|
||||||
{
|
- Neues Format: isin, lastTrade, lastQty, lastTradeTime
|
||||||
"messageId": "posttrade",
|
- Altes Format: FinInstrmId.Id, Pric, Qty, TrdDt/TrdTm
|
||||||
"sourceName": "GAT",
|
|
||||||
"isin": "US00123Q1040",
|
|
||||||
"lastTradeTime": "2026-01-29T14:07:00.419000000Z",
|
|
||||||
"lastTrade": 10.145,
|
|
||||||
"lastQty": 500.0,
|
|
||||||
"currency": "EUR",
|
|
||||||
...
|
|
||||||
}
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# ISIN extrahieren - neues Format verwendet 'isin' lowercase
|
# ISIN extrahieren
|
||||||
isin = record.get('isin') or record.get('ISIN') or record.get('instrumentId') or record.get('FinInstrmId', {}).get('Id', '')
|
isin = (
|
||||||
|
record.get('isin') or
|
||||||
|
record.get('ISIN') or
|
||||||
|
record.get('instrumentId') or
|
||||||
|
record.get('FinInstrmId', {}).get('Id', '')
|
||||||
|
)
|
||||||
if not isin:
|
if not isin:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Preis extrahieren - neues Format: 'lastTrade'
|
# Preis extrahieren
|
||||||
price = None
|
price = self._extract_price(record)
|
||||||
if 'lastTrade' in record:
|
|
||||||
price = float(record['lastTrade'])
|
|
||||||
elif 'Pric' in record:
|
|
||||||
pric = record['Pric']
|
|
||||||
if isinstance(pric, dict):
|
|
||||||
if 'Pric' in pric:
|
|
||||||
inner = pric['Pric']
|
|
||||||
if 'MntryVal' in inner:
|
|
||||||
price = float(inner['MntryVal'].get('Amt', 0))
|
|
||||||
elif 'Amt' in inner:
|
|
||||||
price = float(inner['Amt'])
|
|
||||||
elif 'MntryVal' in pric:
|
|
||||||
price = float(pric['MntryVal'].get('Amt', 0))
|
|
||||||
elif isinstance(pric, (int, float)):
|
|
||||||
price = float(pric)
|
|
||||||
|
|
||||||
if price is None or price <= 0:
|
if price is None or price <= 0:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Menge extrahieren - neues Format: 'lastQty'
|
# Menge extrahieren
|
||||||
quantity = None
|
quantity = self._extract_quantity(record)
|
||||||
if 'lastQty' in record:
|
|
||||||
quantity = float(record['lastQty'])
|
|
||||||
elif 'Qty' in record:
|
|
||||||
qty = record['Qty']
|
|
||||||
if isinstance(qty, dict):
|
|
||||||
quantity = float(qty.get('Unit', qty.get('Qty', 0)))
|
|
||||||
elif isinstance(qty, (int, float)):
|
|
||||||
quantity = float(qty)
|
|
||||||
|
|
||||||
if quantity is None or quantity <= 0:
|
if quantity is None or quantity <= 0:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Timestamp extrahieren - neues Format: 'lastTradeTime'
|
# Timestamp extrahieren
|
||||||
timestamp = None
|
timestamp = None
|
||||||
if 'lastTradeTime' in record:
|
if 'lastTradeTime' in record:
|
||||||
ts_str = record['lastTradeTime']
|
timestamp = self._parse_timestamp(record['lastTradeTime'])
|
||||||
# Format: "2026-01-29T14:07:00.419000000Z"
|
|
||||||
# Python kann max 6 Dezimalstellen, also kürzen
|
|
||||||
if '.' in ts_str:
|
|
||||||
parts = ts_str.replace('Z', '').split('.')
|
|
||||||
if len(parts) == 2 and len(parts[1]) > 6:
|
|
||||||
ts_str = parts[0] + '.' + parts[1][:6] + '+00:00'
|
|
||||||
else:
|
|
||||||
ts_str = ts_str.replace('Z', '+00:00')
|
|
||||||
else:
|
|
||||||
ts_str = ts_str.replace('Z', '+00:00')
|
|
||||||
timestamp = datetime.fromisoformat(ts_str)
|
|
||||||
else:
|
else:
|
||||||
# Fallback für altes Format
|
# Fallback für altes Format
|
||||||
trd_dt = record.get('TrdDt', '')
|
trd_dt = record.get('TrdDt', '')
|
||||||
trd_tm = record.get('TrdTm', '00:00:00')
|
trd_tm = record.get('TrdTm', '00:00:00')
|
||||||
|
if trd_dt:
|
||||||
if not trd_dt:
|
timestamp = self._parse_timestamp(f"{trd_dt}T{trd_tm}")
|
||||||
return None
|
|
||||||
|
|
||||||
ts_str = f"{trd_dt}T{trd_tm}"
|
|
||||||
if '.' in ts_str:
|
|
||||||
parts = ts_str.split('.')
|
|
||||||
if len(parts[1]) > 6:
|
|
||||||
ts_str = parts[0] + '.' + parts[1][:6]
|
|
||||||
|
|
||||||
timestamp = datetime.fromisoformat(ts_str)
|
|
||||||
|
|
||||||
if timestamp is None:
|
if timestamp is None:
|
||||||
return None
|
return None
|
||||||
@@ -273,22 +345,41 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
timestamp=timestamp
|
timestamp=timestamp
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except (ValueError, TypeError, KeyError) as e:
|
||||||
# Debug: Zeige ersten fehlgeschlagenen Record
|
logger.debug(f"[{self.name}] Failed to parse trade record: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date:
|
def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date:
|
||||||
"""
|
"""
|
||||||
Findet den letzten Handelstag (überspringt Wochenenden).
|
Findet den letzten Handelstag (überspringt Wochenenden und bekannte Feiertage).
|
||||||
Montag=0, Sonntag=6
|
Montag=0, Sonntag=6
|
||||||
"""
|
"""
|
||||||
|
# Deutsche Börsen-Feiertage (fixe Daten, jedes Jahr gleich)
|
||||||
|
# Bewegliche Feiertage (Ostern etc.) müssten jährlich berechnet werden
|
||||||
|
fixed_holidays = {
|
||||||
|
(1, 1), # Neujahr
|
||||||
|
(5, 1), # Tag der Arbeit
|
||||||
|
(12, 24), # Heiligabend
|
||||||
|
(12, 25), # 1. Weihnachtstag
|
||||||
|
(12, 26), # 2. Weihnachtstag
|
||||||
|
(12, 31), # Silvester
|
||||||
|
}
|
||||||
|
|
||||||
date = from_date
|
date = from_date
|
||||||
# Wenn Samstag (5), gehe zurück zu Freitag
|
max_iterations = 10 # Sicherheit gegen Endlosschleife
|
||||||
if date.weekday() == 5:
|
|
||||||
|
for _ in range(max_iterations):
|
||||||
|
# Wochenende überspringen
|
||||||
|
if date.weekday() == 5: # Samstag
|
||||||
date = date - timedelta(days=1)
|
date = date - timedelta(days=1)
|
||||||
# Wenn Sonntag (6), gehe zurück zu Freitag
|
elif date.weekday() == 6: # Sonntag
|
||||||
elif date.weekday() == 6:
|
|
||||||
date = date - timedelta(days=2)
|
date = date - timedelta(days=2)
|
||||||
|
# Feiertag überspringen
|
||||||
|
elif (date.month, date.day) in fixed_holidays:
|
||||||
|
date = date - timedelta(days=1)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
return date
|
return date
|
||||||
|
|
||||||
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
|
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
|
||||||
@@ -304,40 +395,36 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
# Standard: Vortag
|
# Standard: Vortag
|
||||||
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
|
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
|
||||||
|
|
||||||
# Überspringe Wochenenden
|
# Überspringe Wochenenden und Feiertage
|
||||||
original_date = target_date
|
original_date = target_date
|
||||||
target_date = self._get_last_trading_day(target_date)
|
target_date = self._get_last_trading_day(target_date)
|
||||||
|
|
||||||
if target_date != original_date:
|
if target_date != original_date:
|
||||||
print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}")
|
logger.info(f"[{self.name}] Adjusted date: {original_date} -> {target_date} (weekend/holiday)")
|
||||||
|
|
||||||
print(f"[{self.name}] Fetching trades for date: {target_date}")
|
logger.info(f"[{self.name}] Fetching trades for date: {target_date}")
|
||||||
|
|
||||||
# Hole Dateiliste von der API
|
# Hole Dateiliste von der API
|
||||||
files = self._get_file_list()
|
files = self._get_file_list()
|
||||||
|
|
||||||
if not files:
|
if not files:
|
||||||
print(f"[{self.name}] No files available from API")
|
logger.warning(f"[{self.name}] No files available from API")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Dateien für Zieldatum filtern
|
# Dateien für Zieldatum filtern
|
||||||
target_files = self._filter_files_for_date(files, target_date)
|
target_files = self._filter_files_for_date(files, target_date)
|
||||||
print(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)")
|
logger.info(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)")
|
||||||
|
|
||||||
if not target_files:
|
if not target_files:
|
||||||
print(f"[{self.name}] No files for target date found")
|
logger.warning(f"[{self.name}] No files for target date found")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Alle passenden Dateien herunterladen und parsen (mit Rate-Limiting)
|
# Alle passenden Dateien herunterladen und parsen
|
||||||
successful = 0
|
successful = 0
|
||||||
failed = 0
|
failed = 0
|
||||||
total_files = len(target_files)
|
total_files = len(target_files)
|
||||||
|
|
||||||
if total_files == 0:
|
logger.info(f"[{self.name}] Starting download of {total_files} files...")
|
||||||
print(f"[{self.name}] No files to download for date {target_date}")
|
|
||||||
return []
|
|
||||||
|
|
||||||
print(f"[{self.name}] Starting download of {total_files} files...")
|
|
||||||
|
|
||||||
for i, file in enumerate(target_files):
|
for i, file in enumerate(target_files):
|
||||||
trades = self._download_and_parse_file(file)
|
trades = self._download_and_parse_file(file)
|
||||||
@@ -353,9 +440,9 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
|
|
||||||
# Fortschritt alle 100 Dateien
|
# Fortschritt alle 100 Dateien
|
||||||
if (i + 1) % 100 == 0:
|
if (i + 1) % 100 == 0:
|
||||||
print(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far")
|
logger.info(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far")
|
||||||
|
|
||||||
print(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades")
|
logger.info(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades")
|
||||||
return all_trades
|
return all_trades
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,30 +1,38 @@
|
|||||||
import requests
|
import requests
|
||||||
import json
|
import logging
|
||||||
from bs4 import BeautifulSoup
|
from datetime import datetime, timezone
|
||||||
from datetime import datetime
|
from typing import List, Generator, Tuple
|
||||||
from typing import List
|
|
||||||
from .base import BaseExchange, Trade
|
from .base import BaseExchange, Trade
|
||||||
import csv
|
import csv
|
||||||
import io
|
import io
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class EIXExchange(BaseExchange):
|
class EIXExchange(BaseExchange):
|
||||||
|
"""European Investor Exchange - CSV-basierte Trade-Daten."""
|
||||||
|
|
||||||
|
API_BASE_URL = "https://european-investor-exchange.com/api"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self) -> str:
|
def name(self) -> str:
|
||||||
return "EIX"
|
return "EIX"
|
||||||
|
|
||||||
def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None) -> List[Trade]:
|
def get_files_to_process(self, limit: int = 1, since_date: datetime = None) -> List[dict]:
|
||||||
# EIX stores its file list in a separate API endpoint
|
"""Holt die Liste der zu verarbeitenden Dateien ohne sie herunterzuladen."""
|
||||||
url = "https://european-investor-exchange.com/api/official-trades"
|
url = f"{self.API_BASE_URL}/official-trades"
|
||||||
try:
|
try:
|
||||||
response = requests.get(url, timeout=15)
|
response = requests.get(url, timeout=15)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
files_list = response.json()
|
files_list = response.json()
|
||||||
except Exception as e:
|
except requests.exceptions.RequestException as e:
|
||||||
print(f"Error fetching EIX file list: {e}")
|
logger.error(f"[{self.name}] Fehler beim Abrufen der Dateiliste: {e}")
|
||||||
|
return []
|
||||||
|
except ValueError as e:
|
||||||
|
logger.error(f"[{self.name}] Ungültiges JSON in Dateiliste: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Filter files based on date in filename if since_date provided
|
# Filtere Dateien nach Datum wenn since_date angegeben
|
||||||
# Format: "kursblatt/2025/Kursblatt.2025-07-14.1752526803105.csv"
|
|
||||||
filtered_files = []
|
filtered_files = []
|
||||||
for item in files_list:
|
for item in files_list:
|
||||||
file_key = item.get('fileName')
|
file_key = item.get('fileName')
|
||||||
@@ -33,90 +41,118 @@ class EIXExchange(BaseExchange):
|
|||||||
|
|
||||||
if since_date:
|
if since_date:
|
||||||
try:
|
try:
|
||||||
# Extract date from filename: Kursblatt.YYYY-MM-DD
|
|
||||||
parts = file_key.split('/')[-1].split('.')
|
parts = file_key.split('/')[-1].split('.')
|
||||||
# parts example: ['Kursblatt', '2025-07-14', '1752526803105', 'csv']
|
|
||||||
if len(parts) >= 2:
|
if len(parts) >= 2:
|
||||||
date_str = parts[1]
|
date_str = parts[1]
|
||||||
file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
|
file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
|
||||||
|
|
||||||
# Check if file date is newer than since_date (compare dates only)
|
if file_date.date() >= since_date.date():
|
||||||
if file_date.date() > since_date.date():
|
|
||||||
filtered_files.append(item)
|
filtered_files.append(item)
|
||||||
continue
|
except (ValueError, IndexError) as e:
|
||||||
# If same day, we might need to check it too, but EIX seems to be daily files
|
# Dateiname hat unerwartetes Format - zur Sicherheit einschließen
|
||||||
if file_date.date() == since_date.date():
|
logger.debug(f"[{self.name}] Konnte Datum nicht aus {file_key} extrahieren: {e}")
|
||||||
filtered_files.append(item)
|
|
||||||
continue
|
|
||||||
except Exception:
|
|
||||||
# If parsing fails, default to including it (safety) or skipping?
|
|
||||||
# Let's include it if we are not sure
|
|
||||||
filtered_files.append(item)
|
filtered_files.append(item)
|
||||||
else:
|
else:
|
||||||
filtered_files.append(item)
|
filtered_files.append(item)
|
||||||
|
|
||||||
# Sort files to process oldest to newest if doing a sync, or newest to oldest?
|
|
||||||
# If we have limit=1 (default), we usually want the newest.
|
|
||||||
# But if we are syncing history (since_date set), we probably want all of them.
|
|
||||||
|
|
||||||
# Logic: If since_date is set, we ignore limit (or use it as safety cap) and process ALL new files
|
|
||||||
if since_date:
|
if since_date:
|
||||||
files_to_process = filtered_files
|
return filtered_files
|
||||||
# Sort by date ? The API list seems chronological.
|
|
||||||
else:
|
else:
|
||||||
# Default behavior: take the last N files (API returns oldest first usually?)
|
|
||||||
# Let's assume list is chronological.
|
|
||||||
if limit:
|
if limit:
|
||||||
files_to_process = files_list[-limit:]
|
return files_list[-limit:]
|
||||||
else:
|
return files_list
|
||||||
files_to_process = files_list
|
|
||||||
|
|
||||||
trades = []
|
def fetch_trades_from_file(self, file_item: dict) -> List[Trade]:
|
||||||
count = 0
|
"""Lädt und parst eine einzelne CSV-Datei."""
|
||||||
for item in files_to_process:
|
file_key = file_item.get('fileName')
|
||||||
file_key = item.get('fileName')
|
if not file_key:
|
||||||
|
return []
|
||||||
|
|
||||||
# Download the CSV
|
csv_url = f"{self.API_BASE_URL}/trade-file-contents?key={file_key}"
|
||||||
csv_url = f"https://european-investor-exchange.com/api/trade-file-contents?key={file_key}"
|
|
||||||
try:
|
try:
|
||||||
csv_response = requests.get(csv_url, timeout=20)
|
response = requests.get(csv_url, timeout=60)
|
||||||
if csv_response.status_code == 200:
|
response.raise_for_status()
|
||||||
trades.extend(self._parse_csv(csv_response.text))
|
return self._parse_csv(response.text)
|
||||||
count += 1
|
except requests.exceptions.RequestException as e:
|
||||||
# Only enforce limit if since_date is NOT set
|
logger.error(f"[{self.name}] Fehler beim Download von {file_key}: {e}")
|
||||||
if not since_date and limit and count >= limit:
|
|
||||||
break
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error downloading EIX CSV {file_key}: {e}")
|
logger.error(f"[{self.name}] Unerwarteter Fehler bei {file_key}: {e}")
|
||||||
|
|
||||||
return trades
|
return []
|
||||||
|
|
||||||
|
def fetch_trades_streaming(self, limit: int = 1, since_date: datetime = None) -> Generator[Tuple[str, List[Trade]], None, None]:
|
||||||
|
"""
|
||||||
|
Generator der Trades dateiweise zurückgibt.
|
||||||
|
Yields: (filename, trades) Tupel
|
||||||
|
"""
|
||||||
|
files = self.get_files_to_process(limit=limit, since_date=since_date)
|
||||||
|
|
||||||
|
for item in files:
|
||||||
|
file_key = item.get('fileName', 'unknown')
|
||||||
|
trades = self.fetch_trades_from_file(item)
|
||||||
|
if trades:
|
||||||
|
yield (file_key, trades)
|
||||||
|
|
||||||
|
def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None, **kwargs) -> List[Trade]:
|
||||||
|
"""
|
||||||
|
Legacy-Methode für Kompatibilität.
|
||||||
|
WARNUNG: Lädt alle Trades in den Speicher! Für große Datenmengen fetch_trades_streaming() verwenden.
|
||||||
|
"""
|
||||||
|
# Für kleine Requests (limit <= 5) normale Verarbeitung
|
||||||
|
if limit and limit <= 5 and not since_date:
|
||||||
|
all_trades = []
|
||||||
|
for filename, trades in self.fetch_trades_streaming(limit=limit, since_date=since_date):
|
||||||
|
all_trades.extend(trades)
|
||||||
|
return all_trades
|
||||||
|
|
||||||
|
# Für große Requests: Warnung ausgeben und leere Liste zurückgeben
|
||||||
|
logger.warning(f"[{self.name}] fetch_latest_trades() mit großem Dataset aufgerufen. Verwende Streaming.")
|
||||||
|
return []
|
||||||
|
|
||||||
def _parse_csv(self, csv_text: str) -> List[Trade]:
|
def _parse_csv(self, csv_text: str) -> List[Trade]:
|
||||||
|
"""Parst CSV-Text zu Trade-Objekten."""
|
||||||
trades = []
|
trades = []
|
||||||
|
parse_errors = 0
|
||||||
|
|
||||||
f = io.StringIO(csv_text)
|
f = io.StringIO(csv_text)
|
||||||
# Header: Trading day & Trading time UTC,Instrument Identifier,Quantity,Unit Price,Price Currency,Venue Identifier,Side
|
|
||||||
reader = csv.DictReader(f, delimiter=',')
|
reader = csv.DictReader(f, delimiter=',')
|
||||||
for row in reader:
|
|
||||||
|
for row_num, row in enumerate(reader, start=2): # Start bei 2 wegen Header
|
||||||
try:
|
try:
|
||||||
price = float(row['Unit Price'])
|
price = float(row['Unit Price'])
|
||||||
quantity = float(row['Quantity'])
|
quantity = float(row['Quantity'])
|
||||||
isin = row['Instrument Identifier']
|
isin = row['Instrument Identifier']
|
||||||
symbol = isin # Often symbol is unknown, use ISIN
|
|
||||||
time_str = row['Trading day & Trading time UTC']
|
time_str = row['Trading day & Trading time UTC']
|
||||||
|
|
||||||
# Format: 2026-01-22T06:30:00.617Z
|
# Preis und Menge validieren
|
||||||
# Python 3.11+ supports ISO with Z, otherwise we strip Z
|
if price <= 0 or quantity <= 0:
|
||||||
|
logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Preis/Menge: {price}/{quantity}")
|
||||||
|
parse_errors += 1
|
||||||
|
continue
|
||||||
|
|
||||||
ts_str = time_str.replace('Z', '+00:00')
|
ts_str = time_str.replace('Z', '+00:00')
|
||||||
timestamp = datetime.fromisoformat(ts_str)
|
timestamp = datetime.fromisoformat(ts_str)
|
||||||
|
|
||||||
trades.append(Trade(
|
trades.append(Trade(
|
||||||
exchange=self.name,
|
exchange=self.name,
|
||||||
symbol=symbol,
|
symbol=isin,
|
||||||
isin=isin,
|
isin=isin,
|
||||||
price=price,
|
price=price,
|
||||||
quantity=quantity,
|
quantity=quantity,
|
||||||
timestamp=timestamp
|
timestamp=timestamp
|
||||||
))
|
))
|
||||||
except Exception:
|
|
||||||
continue
|
except KeyError as e:
|
||||||
|
logger.debug(f"[{self.name}] Zeile {row_num}: Fehlendes Feld {e}")
|
||||||
|
parse_errors += 1
|
||||||
|
except ValueError as e:
|
||||||
|
logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Wert: {e}")
|
||||||
|
parse_errors += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[{self.name}] Zeile {row_num}: Unerwarteter Fehler: {e}")
|
||||||
|
parse_errors += 1
|
||||||
|
|
||||||
|
if parse_errors > 0:
|
||||||
|
logger.debug(f"[{self.name}] {parse_errors} Zeilen konnten nicht geparst werden")
|
||||||
|
|
||||||
return trades
|
return trades
|
||||||
|
|||||||
Reference in New Issue
Block a user