#!/usr/bin/env python3 """ Script zum Entfernen von duplizierten Trades aus QuestDB. Erstellt eine neue Tabelle ohne Duplikate und ersetzt die alte. """ import requests import os import sys DB_HOST = os.getenv("QUESTDB_HOST", "localhost") DB_PORT = os.getenv("QUESTDB_PORT", "9000") DB_USER = os.getenv("DB_USER", "admin") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_URL = f"http://{DB_HOST}:{DB_PORT}" DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None def execute_query(query, timeout=300): """Führt eine QuestDB Query aus.""" try: response = requests.get( f"{DB_URL}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout ) if response.status_code == 200: return response.json() else: print(f"Query failed: {response.text}") return None except Exception as e: print(f"Error executing query: {e}") return None def get_table_count(table_name): """Zählt Einträge in einer Tabelle.""" result = execute_query(f"SELECT count(*) FROM {table_name}") if result and result.get('dataset'): return result['dataset'][0][0] return 0 def main(): print("=" * 60) print("QuestDB Duplikat-Bereinigung") print("=" * 60) # 1. Prüfe aktuelle Anzahl original_count = get_table_count("trades") print(f"\n1. Aktuelle Anzahl Trades: {original_count:,}") if original_count == 0: print("Keine Trades in der Datenbank. Nichts zu tun.") return # 2. Analysiere Duplikate pro Exchange print("\n2. Analysiere Duplikate pro Exchange...") analysis_query = """ SELECT exchange, count(*) as total, count(distinct concat(isin, '-', cast(timestamp as string), '-', cast(price as string), '-', cast(quantity as string))) as unique_trades FROM trades GROUP BY exchange ORDER BY exchange """ result = execute_query(analysis_query) if result and result.get('dataset'): print(f"\n{'Exchange':<15} {'Total':>12} {'Unique':>12} {'Duplicates':>12}") print("-" * 55) total_all = 0 unique_all = 0 for row in result['dataset']: exchange, total, unique = row duplicates = total - unique total_all += total unique_all += unique print(f"{exchange:<15} {total:>12,} {unique:>12,} {duplicates:>12,}") print("-" * 55) print(f"{'TOTAL':<15} {total_all:>12,} {unique_all:>12,} {total_all - unique_all:>12,}") # 3. Erstelle bereinigte Tabelle print("\n3. Erstelle bereinigte Tabelle 'trades_clean'...") print(" HINWEIS: Bei großen Datenmengen kann dies mehrere Minuten dauern...") # Lösche alte clean-Tabelle falls vorhanden execute_query("DROP TABLE IF EXISTS trades_clean") # QuestDB: SAMPLE BY 1T mit LATEST ON für Deduplizierung # Das gruppiert nach Timestamp (auf Nanosekunde genau) und behält nur den letzten Eintrag # Alternative: Wir verwenden GROUP BY mit MIN/MAX # Erst die Tabelle erstellen create_table_query = """ CREATE TABLE trades_clean ( exchange SYMBOL, symbol SYMBOL, isin SYMBOL, price DOUBLE, quantity DOUBLE, timestamp TIMESTAMP ) TIMESTAMP(timestamp) PARTITION BY DAY WAL DEDUP UPSERT KEYS(timestamp, exchange, isin, price, quantity) """ result = execute_query(create_table_query, timeout=60) if result is None: print(" Fehler beim Erstellen der Tabellenstruktur!") # Fallback: Ohne DEDUP create_table_query = """ CREATE TABLE trades_clean ( exchange SYMBOL, symbol SYMBOL, isin SYMBOL, price DOUBLE, quantity DOUBLE, timestamp TIMESTAMP ) TIMESTAMP(timestamp) PARTITION BY DAY WAL """ execute_query(create_table_query, timeout=60) # Dann Daten einfügen mit INSERT ... SELECT (ohne LIMIT!) print(" Kopiere Daten (ohne Duplikate)...") insert_query = """ INSERT INTO trades_clean SELECT exchange, symbol, isin, price, quantity, timestamp FROM ( SELECT exchange, symbol, isin, price, quantity, timestamp, row_number() OVER (PARTITION BY exchange, isin, timestamp, price, quantity ORDER BY timestamp) as rn FROM trades ) WHERE rn = 1 """ result = execute_query(insert_query, timeout=3600) # 1 Stunde Timeout if result is None: print(" Fehler bei INSERT - versuche alternative Methode...") # Fallback: Direkte Kopie ohne Deduplizierung über SQL # Stattdessen per ILP deduplizieren insert_simple = "INSERT INTO trades_clean SELECT * FROM trades" execute_query(insert_simple, timeout=3600) clean_count = get_table_count("trades_clean") print(f" Bereinigte Tabelle: {clean_count:,} Trades") if clean_count == 0: print(" FEHLER: Keine Daten kopiert!") return removed = original_count - clean_count if removed > 0: print(f" Entfernte Duplikate: {removed:,} ({removed/original_count*100:.1f}%)") else: print(" Keine Duplikate durch SQL entfernt (DEDUP wird bei neuen Inserts aktiv)") # 4. Ersetze alte Tabelle print("\n4. Ersetze alte Tabelle...") # Rename alte Tabelle zu backup execute_query("RENAME TABLE trades TO trades_backup") # Rename neue Tabelle zu trades execute_query("RENAME TABLE trades_clean TO trades") # Verifiziere final_count = get_table_count("trades") print(f" Neue Trades-Tabelle: {final_count:,} Einträge") # 5. Lösche Backup (optional) print("\n5. Lösche Backup-Tabelle...") execute_query("DROP TABLE IF EXISTS trades_backup") print(" Backup gelöscht.") # 6. Zusammenfassung print("\n" + "=" * 60) print("ZUSAMMENFASSUNG") print("=" * 60) print(f"Vorher: {original_count:>15,} Trades") print(f"Nachher: {final_count:>15,} Trades") print(f"Entfernt:{removed:>15,} Duplikate ({removed/original_count*100:.1f}%)") print("=" * 60) # 7. Statistik-Tabellen neu berechnen print("\n6. Lösche alte Analytics-Tabellen (werden neu berechnet)...") for table in ['analytics_daily_summary', 'analytics_exchange_daily', 'analytics_stock_trends', 'analytics_volume_changes', 'analytics_custom']: result = execute_query(f"DROP TABLE IF EXISTS {table}") print(f" {table} gelöscht") print("\nFertig! Der Analytics Worker wird die Statistiken beim nächsten Start neu berechnen.") if __name__ == "__main__": main()