This commit is contained in:
@@ -26,6 +26,31 @@ class AnalyticsWorker:
|
||||
self.last_processed_timestamp = None
|
||||
self.db_url = DB_URL
|
||||
|
||||
def query_questdb(self, query: str, timeout: int = 30) -> Optional[Dict]:
|
||||
"""Zentrale QuestDB-Abfrage-Funktion"""
|
||||
try:
|
||||
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
# Prüfe auf Fehler in der JSON-Antwort (z.B. "table does not exist")
|
||||
if isinstance(data, dict) and 'error' in data:
|
||||
# Für "table does not exist" Fehler, geben wir None zurück (wird als leere Tabelle behandelt)
|
||||
if 'does not exist' in str(data.get('error', '')):
|
||||
return None
|
||||
logger.debug(f"QuestDB query error: {data.get('error')}")
|
||||
return None
|
||||
return data
|
||||
else:
|
||||
# Bei 400/404 Fehlern (z.B. Tabelle existiert nicht), geben wir None zurück
|
||||
if response.status_code in [400, 404]:
|
||||
logger.debug(f"QuestDB query failed (table may not exist): {response.status_code}")
|
||||
return None
|
||||
logger.error(f"QuestDB query failed: {response.status_code} - {response.text}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.debug(f"Error querying QuestDB: {e}")
|
||||
return None
|
||||
|
||||
def wait_for_questdb(self, max_retries: int = 30, retry_delay: int = 2):
|
||||
"""Wartet bis QuestDB verfügbar ist"""
|
||||
logger.info("Waiting for QuestDB to be available...")
|
||||
@@ -545,6 +570,31 @@ class AnalyticsWorker:
|
||||
|
||||
logger.info(f"Completed processing for {date}")
|
||||
|
||||
def get_existing_dates(self, table_name: str) -> set:
|
||||
"""Holt alle bereits berechneten Daten aus einer Analytics-Tabelle"""
|
||||
query = f"select distinct date_trunc('day', timestamp) as date from {table_name}"
|
||||
data = self.query_questdb(query)
|
||||
if not data:
|
||||
# Tabelle existiert noch nicht - das ist OK beim ersten Start
|
||||
return set()
|
||||
|
||||
# Prüfe auf Fehler in der Antwort (z.B. "table does not exist")
|
||||
if isinstance(data, dict) and 'error' in data:
|
||||
logger.debug(f"Table {table_name} does not exist yet: {data.get('error')}")
|
||||
return set()
|
||||
|
||||
if not data.get('dataset'):
|
||||
return set()
|
||||
|
||||
dates = set()
|
||||
for row in data['dataset']:
|
||||
if row and row[0]:
|
||||
if isinstance(row[0], str):
|
||||
dates.add(datetime.datetime.fromisoformat(row[0].replace('Z', '+00:00')).date())
|
||||
elif isinstance(row[0], (int, float)):
|
||||
dates.add(datetime.datetime.fromtimestamp(row[0] / 1000000, tz=datetime.timezone.utc).date())
|
||||
return dates
|
||||
|
||||
def get_missing_dates(self) -> List[datetime.date]:
|
||||
"""Ermittelt fehlende Tage, die noch berechnet werden müssen"""
|
||||
# Hole das Datum des ersten Trades
|
||||
@@ -561,7 +611,7 @@ class AnalyticsWorker:
|
||||
first_date = datetime.datetime.fromtimestamp(first_date_value / 1000000, tz=datetime.timezone.utc).date()
|
||||
|
||||
# Hole bereits berechnete Daten
|
||||
existing_dates = self.get_existing_dates('analytics_daily_summary')
|
||||
existing_dates = self.get_existing_dates('analytics_custom')
|
||||
|
||||
# Generiere alle Tage vom ersten Trade bis gestern
|
||||
yesterday = datetime.date.today() - datetime.timedelta(days=1)
|
||||
|
||||
Reference in New Issue
Block a user