Files
trading-daemon/cleanup_duplicates.py
Melchior Reimers 22b09669c1
All checks were successful
Deployment / deploy-docker (push) Successful in 18s
updated dashboard
2026-01-27 10:48:11 +01:00

155 lines
5.0 KiB
Python

#!/usr/bin/env python3
"""
Script zum Entfernen von duplizierten Trades aus QuestDB.
Erstellt eine neue Tabelle ohne Duplikate und ersetzt die alte.
"""
import requests
import os
import sys
DB_HOST = os.getenv("QUESTDB_HOST", "localhost")
DB_PORT = os.getenv("QUESTDB_PORT", "9000")
DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_URL = f"http://{DB_HOST}:{DB_PORT}"
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
def execute_query(query, timeout=300):
"""Führt eine QuestDB Query aus."""
try:
response = requests.get(
f"{DB_URL}/exec",
params={'query': query},
auth=DB_AUTH,
timeout=timeout
)
if response.status_code == 200:
return response.json()
else:
print(f"Query failed: {response.text}")
return None
except Exception as e:
print(f"Error executing query: {e}")
return None
def get_table_count(table_name):
"""Zählt Einträge in einer Tabelle."""
result = execute_query(f"SELECT count(*) FROM {table_name}")
if result and result.get('dataset'):
return result['dataset'][0][0]
return 0
def main():
print("=" * 60)
print("QuestDB Duplikat-Bereinigung")
print("=" * 60)
# 1. Prüfe aktuelle Anzahl
original_count = get_table_count("trades")
print(f"\n1. Aktuelle Anzahl Trades: {original_count:,}")
if original_count == 0:
print("Keine Trades in der Datenbank. Nichts zu tun.")
return
# 2. Analysiere Duplikate pro Exchange
print("\n2. Analysiere Duplikate pro Exchange...")
analysis_query = """
SELECT
exchange,
count(*) as total,
count(distinct concat(isin, '-', cast(timestamp as string), '-', cast(price as string), '-', cast(quantity as string))) as unique_trades
FROM trades
GROUP BY exchange
ORDER BY exchange
"""
result = execute_query(analysis_query)
if result and result.get('dataset'):
print(f"\n{'Exchange':<15} {'Total':>12} {'Unique':>12} {'Duplicates':>12}")
print("-" * 55)
total_all = 0
unique_all = 0
for row in result['dataset']:
exchange, total, unique = row
duplicates = total - unique
total_all += total
unique_all += unique
print(f"{exchange:<15} {total:>12,} {unique:>12,} {duplicates:>12,}")
print("-" * 55)
print(f"{'TOTAL':<15} {total_all:>12,} {unique_all:>12,} {total_all - unique_all:>12,}")
# 3. Erstelle bereinigte Tabelle
print("\n3. Erstelle bereinigte Tabelle 'trades_clean'...")
# Lösche alte clean-Tabelle falls vorhanden
execute_query("DROP TABLE IF EXISTS trades_clean")
# Erstelle neue Tabelle mit DISTINCT auf allen relevanten Feldern
# QuestDB: Wir erstellen eine neue Tabelle mit DISTINCT
create_clean_query = """
CREATE TABLE trades_clean AS (
SELECT DISTINCT
exchange,
symbol,
isin,
price,
quantity,
timestamp
FROM trades
) TIMESTAMP(timestamp) PARTITION BY DAY WAL
"""
result = execute_query(create_clean_query, timeout=600)
if result is None:
print("Fehler beim Erstellen der bereinigten Tabelle!")
return
clean_count = get_table_count("trades_clean")
print(f" Bereinigte Tabelle erstellt: {clean_count:,} Trades")
removed = original_count - clean_count
print(f" Entfernte Duplikate: {removed:,} ({removed/original_count*100:.1f}%)")
# 4. Ersetze alte Tabelle
print("\n4. Ersetze alte Tabelle...")
# Rename alte Tabelle zu backup
execute_query("RENAME TABLE trades TO trades_backup")
# Rename neue Tabelle zu trades
execute_query("RENAME TABLE trades_clean TO trades")
# Verifiziere
final_count = get_table_count("trades")
print(f" Neue Trades-Tabelle: {final_count:,} Einträge")
# 5. Lösche Backup (optional)
print("\n5. Lösche Backup-Tabelle...")
execute_query("DROP TABLE IF EXISTS trades_backup")
print(" Backup gelöscht.")
# 6. Zusammenfassung
print("\n" + "=" * 60)
print("ZUSAMMENFASSUNG")
print("=" * 60)
print(f"Vorher: {original_count:>15,} Trades")
print(f"Nachher: {final_count:>15,} Trades")
print(f"Entfernt:{removed:>15,} Duplikate ({removed/original_count*100:.1f}%)")
print("=" * 60)
# 7. Statistik-Tabellen neu berechnen
print("\n6. Lösche alte Analytics-Tabellen (werden neu berechnet)...")
for table in ['analytics_daily_summary', 'analytics_exchange_daily',
'analytics_stock_trends', 'analytics_volume_changes', 'analytics_custom']:
result = execute_query(f"DROP TABLE IF EXISTS {table}")
print(f" {table} gelöscht")
print("\nFertig! Der Analytics Worker wird die Statistiken beim nächsten Start neu berechnen.")
if __name__ == "__main__":
main()