diff --git a/__pycache__/daemon.cpython-313.pyc b/__pycache__/daemon.cpython-313.pyc new file mode 100644 index 0000000..083d856 Binary files /dev/null and b/__pycache__/daemon.cpython-313.pyc differ diff --git a/cleanup_duplicates.py b/cleanup_duplicates.py new file mode 100644 index 0000000..96a763f --- /dev/null +++ b/cleanup_duplicates.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +""" +Script zum Entfernen von duplizierten Trades aus QuestDB. +Erstellt eine neue Tabelle ohne Duplikate und ersetzt die alte. +""" + +import requests +import os +import sys + +DB_HOST = os.getenv("QUESTDB_HOST", "localhost") +DB_PORT = os.getenv("QUESTDB_PORT", "9000") +DB_USER = os.getenv("DB_USER", "admin") +DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") + +DB_URL = f"http://{DB_HOST}:{DB_PORT}" +DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None + +def execute_query(query, timeout=300): + """Führt eine QuestDB Query aus.""" + try: + response = requests.get( + f"{DB_URL}/exec", + params={'query': query}, + auth=DB_AUTH, + timeout=timeout + ) + if response.status_code == 200: + return response.json() + else: + print(f"Query failed: {response.text}") + return None + except Exception as e: + print(f"Error executing query: {e}") + return None + +def get_table_count(table_name): + """Zählt Einträge in einer Tabelle.""" + result = execute_query(f"SELECT count(*) FROM {table_name}") + if result and result.get('dataset'): + return result['dataset'][0][0] + return 0 + +def main(): + print("=" * 60) + print("QuestDB Duplikat-Bereinigung") + print("=" * 60) + + # 1. Prüfe aktuelle Anzahl + original_count = get_table_count("trades") + print(f"\n1. Aktuelle Anzahl Trades: {original_count:,}") + + if original_count == 0: + print("Keine Trades in der Datenbank. Nichts zu tun.") + return + + # 2. Analysiere Duplikate pro Exchange + print("\n2. Analysiere Duplikate pro Exchange...") + + analysis_query = """ + SELECT + exchange, + count(*) as total, + count(distinct concat(isin, '-', cast(timestamp as string), '-', cast(price as string), '-', cast(quantity as string))) as unique_trades + FROM trades + GROUP BY exchange + ORDER BY exchange + """ + + result = execute_query(analysis_query) + if result and result.get('dataset'): + print(f"\n{'Exchange':<15} {'Total':>12} {'Unique':>12} {'Duplicates':>12}") + print("-" * 55) + total_all = 0 + unique_all = 0 + for row in result['dataset']: + exchange, total, unique = row + duplicates = total - unique + total_all += total + unique_all += unique + print(f"{exchange:<15} {total:>12,} {unique:>12,} {duplicates:>12,}") + print("-" * 55) + print(f"{'TOTAL':<15} {total_all:>12,} {unique_all:>12,} {total_all - unique_all:>12,}") + + # 3. Erstelle bereinigte Tabelle + print("\n3. Erstelle bereinigte Tabelle 'trades_clean'...") + + # Lösche alte clean-Tabelle falls vorhanden + execute_query("DROP TABLE IF EXISTS trades_clean") + + # Erstelle neue Tabelle mit DISTINCT auf allen relevanten Feldern + # QuestDB: Wir erstellen eine neue Tabelle mit DISTINCT + create_clean_query = """ + CREATE TABLE trades_clean AS ( + SELECT DISTINCT + exchange, + symbol, + isin, + price, + quantity, + timestamp + FROM trades + ) TIMESTAMP(timestamp) PARTITION BY DAY WAL + """ + + result = execute_query(create_clean_query, timeout=600) + if result is None: + print("Fehler beim Erstellen der bereinigten Tabelle!") + return + + clean_count = get_table_count("trades_clean") + print(f" Bereinigte Tabelle erstellt: {clean_count:,} Trades") + + removed = original_count - clean_count + print(f" Entfernte Duplikate: {removed:,} ({removed/original_count*100:.1f}%)") + + # 4. Ersetze alte Tabelle + print("\n4. Ersetze alte Tabelle...") + + # Rename alte Tabelle zu backup + execute_query("RENAME TABLE trades TO trades_backup") + + # Rename neue Tabelle zu trades + execute_query("RENAME TABLE trades_clean TO trades") + + # Verifiziere + final_count = get_table_count("trades") + print(f" Neue Trades-Tabelle: {final_count:,} Einträge") + + # 5. Lösche Backup (optional) + print("\n5. Lösche Backup-Tabelle...") + execute_query("DROP TABLE IF EXISTS trades_backup") + print(" Backup gelöscht.") + + # 6. Zusammenfassung + print("\n" + "=" * 60) + print("ZUSAMMENFASSUNG") + print("=" * 60) + print(f"Vorher: {original_count:>15,} Trades") + print(f"Nachher: {final_count:>15,} Trades") + print(f"Entfernt:{removed:>15,} Duplikate ({removed/original_count*100:.1f}%)") + print("=" * 60) + + # 7. Statistik-Tabellen neu berechnen + print("\n6. Lösche alte Analytics-Tabellen (werden neu berechnet)...") + for table in ['analytics_daily_summary', 'analytics_exchange_daily', + 'analytics_stock_trends', 'analytics_volume_changes', 'analytics_custom']: + result = execute_query(f"DROP TABLE IF EXISTS {table}") + print(f" {table} gelöscht") + + print("\nFertig! Der Analytics Worker wird die Statistiken beim nächsten Start neu berechnen.") + +if __name__ == "__main__": + main() diff --git a/daemon.py b/daemon.py index 1aeaadb..1402edf 100644 --- a/daemon.py +++ b/daemon.py @@ -1,6 +1,7 @@ import time import logging import datetime +import hashlib import os import requests from src.exchanges.eix import EIXExchange @@ -20,6 +21,39 @@ DB_USER = os.getenv("DB_USER", "admin") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None +def get_trade_hash(trade): + """Erstellt einen eindeutigen Hash für einen Trade.""" + key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}" + return hashlib.md5(key.encode()).hexdigest() + +def get_existing_trade_hashes(db_url, exchange_name, since_date): + """Holt alle Trade-Hashes für eine Exchange seit einem bestimmten Datum.""" + hashes = set() + + # Hole alle Trades seit dem Datum + date_str = since_date.strftime('%Y-%m-%dT%H:%M:%S.000000Z') + query = f"SELECT exchange, isin, timestamp, price, quantity FROM trades WHERE exchange = '{exchange_name}' AND timestamp >= '{date_str}'" + + try: + response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60) + if response.status_code == 200: + data = response.json() + if data.get('dataset'): + for row in data['dataset']: + exchange, isin, ts, price, qty = row + # Konvertiere Timestamp + if isinstance(ts, str): + ts_iso = ts.replace('Z', '+00:00') + else: + ts_iso = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc).isoformat() + + key = f"{exchange}|{isin}|{ts_iso}|{price}|{qty}" + hashes.add(hashlib.md5(key.encode()).hexdigest()) + except Exception as e: + logger.warning(f"Could not fetch existing trade hashes: {e}") + + return hashes + def get_last_trade_timestamp(db_url, exchange_name): # QuestDB query: get the latest timestamp for a specific exchange query = f"trades where exchange = '{exchange_name}' latest by timestamp" @@ -79,7 +113,7 @@ def run_task(historical=False): db_url = "http://questdb:9000" last_ts = get_last_trade_timestamp(db_url, exchange.name) - logger.info(f"Fetching data from {exchange.name} (Filtering trades older than {last_ts})...") + logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...") # Special handling for EIX to support smart filtering call_args = args.copy() @@ -91,11 +125,30 @@ def run_task(historical=False): trades = exchange.fetch_latest_trades(**call_args) - # Deduplizierung: Nur Trades nehmen, die neuer sind als der letzte in der DB - new_trades = [ - t for t in trades - if t.timestamp.replace(tzinfo=datetime.timezone.utc) > last_ts.replace(tzinfo=datetime.timezone.utc) - ] + if not trades: + logger.info(f"No trades fetched from {exchange.name}.") + continue + + # Hash-basierte Deduplizierung + # Hole existierende Hashes für Trades ab dem ältesten neuen Trade + oldest_trade_ts = min(t.timestamp for t in trades) + + # Nur prüfen wenn wir nicht einen komplett historischen Sync machen + if last_ts > datetime.datetime.min.replace(tzinfo=datetime.timezone.utc): + # Hole Hashes der letzten 7 Tage für diese Exchange + check_since = oldest_trade_ts - datetime.timedelta(days=1) + existing_hashes = get_existing_trade_hashes(db_url, exchange.name, check_since) + logger.info(f"Found {len(existing_hashes)} existing trade hashes in DB") + + # Filtere nur wirklich neue Trades + new_trades = [] + for t in trades: + trade_hash = get_trade_hash(t) + if trade_hash not in existing_hashes: + new_trades.append(t) + else: + # Historischer Sync - keine Deduplizierung nötig + new_trades = trades logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.") diff --git a/src/exchanges/__pycache__/deutsche_boerse.cpython-313.pyc b/src/exchanges/__pycache__/deutsche_boerse.cpython-313.pyc index 8cb1cc7..c69c103 100644 Binary files a/src/exchanges/__pycache__/deutsche_boerse.cpython-313.pyc and b/src/exchanges/__pycache__/deutsche_boerse.cpython-313.pyc differ diff --git a/src/exchanges/__pycache__/gettex.cpython-313.pyc b/src/exchanges/__pycache__/gettex.cpython-313.pyc index 947d402..577fd8f 100644 Binary files a/src/exchanges/__pycache__/gettex.cpython-313.pyc and b/src/exchanges/__pycache__/gettex.cpython-313.pyc differ diff --git a/src/exchanges/__pycache__/stuttgart.cpython-313.pyc b/src/exchanges/__pycache__/stuttgart.cpython-313.pyc index c3bda68..7982c38 100644 Binary files a/src/exchanges/__pycache__/stuttgart.cpython-313.pyc and b/src/exchanges/__pycache__/stuttgart.cpython-313.pyc differ diff --git a/src/exchanges/deutsche_boerse.py b/src/exchanges/deutsche_boerse.py index 6c386b3..7a85211 100644 --- a/src/exchanges/deutsche_boerse.py +++ b/src/exchanges/deutsche_boerse.py @@ -39,22 +39,21 @@ class DeutscheBoerseBase(BaseExchange): # Debug: Response-Länge print(f"[{self.name}] Response length: {len(html_text)} chars") - # Primär: Regex-basierte Extraktion (zuverlässiger) - # Pattern: PREFIX-posttrade-YYYY-MM-DDTHH_MM.json.gz - # Das Prefix wird aus der base_url extrahiert (z.B. DETR, DFRA, DGAT) + # Extrahiere Prefix aus base_url (z.B. DETR, DFRA, DGAT) prefix_match = re.search(r'/([A-Z]{4})-posttrade', self.base_url) - if prefix_match: - prefix = prefix_match.group(1) - # Suche nach Dateinamen mit diesem Prefix - pattern = f'{prefix}-posttrade-\\d{{4}}-\\d{{2}}-\\d{{2}}T\\d{{2}}_\\d{{2}}\\.json\\.gz' - else: - # Generisches Pattern - pattern = r'[A-Z]{4}-posttrade-\d{4}-\d{2}-\d{2}T\d{2}_\d{2}\.json\.gz' + prefix = prefix_match.group(1) if prefix_match else '[A-Z]{4}' + + # Pattern: PREFIX-posttrade-YYYY-MM-DDTHH_MM.json.gz + # Wichtig: Dateinamen erscheinen als Text/Name, nicht nur in href + pattern = f'{prefix}-posttrade-\\d{{4}}-\\d{{2}}-\\d{{2}}T\\d{{2}}_\\d{{2}}\\.json\\.gz' matches = re.findall(pattern, html_text) files = list(set(matches)) - # Sekundär: BeautifulSoup für Links (falls Regex nichts findet) + if files: + print(f"[{self.name}] Found {len(files)} files via regex") + + # Fallback: BeautifulSoup für Links und Text if not files: soup = BeautifulSoup(html_text, 'html.parser') all_links = soup.find_all('a') @@ -64,24 +63,19 @@ class DeutscheBoerseBase(BaseExchange): href = link.get('href', '') text = link.get_text(strip=True) - # Prüfe href und Text für posttrade Dateien - if href and 'posttrade' in href.lower() and '.json.gz' in href.lower(): - # Extrahiere nur den Dateinamen + # Prüfe Link-Text (Dateinamen werden oft als Link-Text angezeigt) + if text and 'posttrade' in text.lower() and '.json.gz' in text.lower(): + files.append(text) + # Prüfe href + elif href and 'posttrade' in href.lower() and '.json.gz' in href.lower(): filename = href.split('/')[-1] if '/' in href else href files.append(filename) - elif text and 'posttrade' in text.lower() and '.json.gz' in text.lower(): - files.append(text) - - # Tertiär: Suche nach jedem "posttrade" im HTML und extrahiere Dateinamen - if not files: - # Allgemeineres Pattern für beliebige Dateinamen mit "posttrade" - general_pattern = r'[\w-]*posttrade[\w-]*\d{4}[-_]\d{2}[-_]\d{2}[T_]\d{2}[_:]\d{2}\.json\.gz' - matches = re.findall(general_pattern, html_text, re.IGNORECASE) - files = list(set(matches)) + + files = list(set(files)) if files: - print(f"[{self.name}] Found {len(files)} files via general pattern") + print(f"[{self.name}] Found {len(files)} files via BeautifulSoup") - print(f"[{self.name}] Found {len(files)} files via regex/soup") + print(f"[{self.name}] Total files found: {len(files)}") return files except Exception as e: print(f"Error fetching file list from {self.base_url}: {e}") @@ -277,9 +271,23 @@ class DeutscheBoerseBase(BaseExchange): return files + def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date: + """ + Findet den letzten Handelstag (überspringt Wochenenden). + Montag=0, Sonntag=6 + """ + date = from_date + # Wenn Samstag (5), gehe zurück zu Freitag + if date.weekday() == 5: + date = date - timedelta(days=1) + # Wenn Sonntag (6), gehe zurück zu Freitag + elif date.weekday() == 6: + date = date - timedelta(days=2) + return date + def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]: """ - Holt alle Trades vom Vortag (oder seit since_date). + Holt alle Trades vom letzten Handelstag (überspringt Wochenenden). """ all_trades = [] @@ -290,6 +298,13 @@ class DeutscheBoerseBase(BaseExchange): # Standard: Vortag target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date() + # Überspringe Wochenenden + original_date = target_date + target_date = self._get_last_trading_day(target_date) + + if target_date != original_date: + print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}") + print(f"[{self.name}] Fetching trades for date: {target_date}") # Erst versuchen, Dateiliste von der Seite zu holen diff --git a/src/exchanges/gettex.py b/src/exchanges/gettex.py index 7a15e69..74da159 100644 --- a/src/exchanges/gettex.py +++ b/src/exchanges/gettex.py @@ -285,9 +285,23 @@ class GettexExchange(BaseExchange): # Nur bei den ersten paar Fehlern loggen return None + def _get_last_trading_day(self, from_date) -> datetime.date: + """ + Findet den letzten Handelstag (überspringt Wochenenden). + Montag=0, Sonntag=6 + """ + date = from_date + # Wenn Samstag (5), gehe zurück zu Freitag + if date.weekday() == 5: + date = date - timedelta(days=1) + # Wenn Sonntag (6), gehe zurück zu Freitag + elif date.weekday() == 6: + date = date - timedelta(days=2) + return date + def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]: """ - Holt alle Trades vom Vortag. + Holt alle Trades vom letzten Handelstag (überspringt Wochenenden). """ all_trades = [] @@ -297,6 +311,13 @@ class GettexExchange(BaseExchange): else: target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date() + # Überspringe Wochenenden + original_date = target_date + target_date = self._get_last_trading_day(target_date) + + if target_date != original_date: + print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}") + print(f"[{self.name}] Fetching trades for date: {target_date}") # Versuche zuerst, Dateien von der Webseite zu laden diff --git a/src/exchanges/stuttgart.py b/src/exchanges/stuttgart.py index 7f11dd0..e9d6207 100644 --- a/src/exchanges/stuttgart.py +++ b/src/exchanges/stuttgart.py @@ -334,9 +334,23 @@ class StuttgartExchange(BaseExchange): print(f"[STU] Error parsing CSV row: {e}") return None + def _get_last_trading_day(self, from_date) -> datetime.date: + """ + Findet den letzten Handelstag (überspringt Wochenenden). + Montag=0, Sonntag=6 + """ + date = from_date + # Wenn Samstag (5), gehe zurück zu Freitag + if date.weekday() == 5: + date = date - timedelta(days=1) + # Wenn Sonntag (6), gehe zurück zu Freitag + elif date.weekday() == 6: + date = date - timedelta(days=2) + return date + def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]: """ - Holt alle Trades vom Vortag. + Holt alle Trades vom letzten Handelstag (überspringt Wochenenden). """ all_trades = [] @@ -346,6 +360,13 @@ class StuttgartExchange(BaseExchange): else: target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date() + # Überspringe Wochenenden + original_date = target_date + target_date = self._get_last_trading_day(target_date) + + if target_date != original_date: + print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}") + print(f"[{self.name}] Fetching trades for date: {target_date}") # Download-Links holen