Files
trading-daemon/restore_and_fix.py
Melchior Reimers b25bab2288
All checks were successful
Deployment / deploy-docker (push) Successful in 18s
updated dashboard
2026-01-27 11:00:55 +01:00

126 lines
4.3 KiB
Python

#!/usr/bin/env python3
"""
Script zum Wiederherstellen und korrekten Bereinigen der Trades.
"""
import requests
import os
import sys
DB_HOST = os.getenv("QUESTDB_HOST", "localhost")
DB_PORT = os.getenv("QUESTDB_PORT", "9000")
DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_URL = f"http://{DB_HOST}:{DB_PORT}"
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
def execute_query(query, timeout=300):
"""Führt eine QuestDB Query aus."""
try:
response = requests.get(
f"{DB_URL}/exec",
params={'query': query, 'count': 'true'},
auth=DB_AUTH,
timeout=timeout
)
if response.status_code == 200:
return response.json()
else:
print(f"Query failed: {response.text[:500]}")
return None
except Exception as e:
print(f"Error executing query: {e}")
return None
def get_table_count(table_name):
"""Zählt Einträge in einer Tabelle."""
result = execute_query(f"SELECT count(*) FROM {table_name}")
if result and result.get('dataset'):
return result['dataset'][0][0]
return 0
def table_exists(table_name):
"""Prüft ob eine Tabelle existiert."""
result = execute_query(f"SELECT count(*) FROM {table_name} LIMIT 1")
return result is not None
def main():
print("=" * 60)
print("QuestDB Daten-Wiederherstellung und Bereinigung")
print("=" * 60)
# 1. Prüfe aktuellen Stand
current_count = get_table_count("trades")
print(f"\n1. Aktuelle Trades-Tabelle: {current_count:,} Einträge")
# 2. Prüfe ob Backup existiert
backup_exists = table_exists("trades_backup")
if backup_exists:
backup_count = get_table_count("trades_backup")
print(f" Backup-Tabelle gefunden: {backup_count:,} Einträge")
if backup_count > current_count:
print("\n2. Backup hat mehr Daten - Wiederherstellung möglich!")
response = input(" Backup wiederherstellen? (j/n): ")
if response.lower() == 'j':
print(" Lösche aktuelle Tabelle...")
execute_query("DROP TABLE trades")
print(" Benenne Backup um...")
execute_query("RENAME TABLE trades_backup TO trades")
new_count = get_table_count("trades")
print(f" Wiederhergestellt: {new_count:,} Trades")
else:
print(" Backup hat weniger/gleich viele Daten - keine Wiederherstellung nötig")
else:
print(" Kein Backup gefunden!")
# 3. Zeige Statistik pro Exchange
print("\n3. Trades pro Exchange:")
result = execute_query("""
SELECT exchange, count(*) as cnt
FROM trades
GROUP BY exchange
ORDER BY cnt DESC
""")
if result and result.get('dataset'):
for row in result['dataset']:
print(f" {row[0]}: {row[1]:,}")
# 4. Aktiviere DEDUP für zukünftige Inserts
print("\n4. Prüfe DEDUP-Status...")
# QuestDB: DEDUP kann nur bei Tabellenerstellung gesetzt werden
# Wir können aber eine neue Tabelle mit DEDUP erstellen
print("\n5. Empfehlung:")
print(" - Die Deduplizierung sollte im daemon.py erfolgen (bereits implementiert)")
print(" - Der Hash-basierte Check verhindert zukünftige Duplikate")
print(" - Für bestehende Duplikate: Manuelles Cleanup in Batches")
# 6. Zeige Duplikat-Analyse für eine Exchange
print("\n6. Stichproben-Analyse für Duplikate...")
result = execute_query("""
SELECT exchange, isin, timestamp, price, quantity, count(*) as cnt
FROM trades
WHERE exchange = 'EIX'
GROUP BY exchange, isin, timestamp, price, quantity
HAVING count(*) > 1
LIMIT 10
""", timeout=120)
if result and result.get('dataset') and len(result['dataset']) > 0:
print(" Gefundene Duplikate (Beispiele):")
for row in result['dataset'][:5]:
print(f" {row[0]} | {row[1]} | {row[2]} | {row[3]} | {row[4]} | {row[5]}x")
else:
print(" Keine Duplikate in EIX gefunden (oder Query timeout)")
print("\n" + "=" * 60)
print("Fertig!")
if __name__ == "__main__":
main()