From cf55a0bd06fe43e1c95dc61207772d3910df92f7 Mon Sep 17 00:00:00 2001 From: Melchior Reimers Date: Thu, 29 Jan 2026 22:36:22 +0100 Subject: [PATCH] Fix: Analytics Worker berechnet heute/gestern IMMER neu MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Neue force_recalculate_date() Methode löscht alte Daten vor Neuberechnung - Heute und gestern werden bei jedem Stunden-Check neu berechnet - Behebt Problem, dass neue Trades nicht in Analytics aufgenommen wurden --- src/analytics/worker.py | 92 ++++++++++++++++++++++++----------------- 1 file changed, 54 insertions(+), 38 deletions(-) diff --git a/src/analytics/worker.py b/src/analytics/worker.py index 49253af..a0810be 100644 --- a/src/analytics/worker.py +++ b/src/analytics/worker.py @@ -865,6 +865,39 @@ class AnalyticsWorker: if i % 10 == 0: time.sleep(1) + def delete_analytics_for_date(self, date: datetime.date): + """Löscht alle Analytics-Daten für ein bestimmtes Datum, damit sie neu berechnet werden können.""" + date_str = date.strftime('%Y-%m-%d') + next_day = date + datetime.timedelta(days=1) + next_day_str = next_day.strftime('%Y-%m-%d') + + tables = ['analytics_custom', 'analytics_exchange_daily', 'analytics_daily_summary'] + + for table in tables: + try: + # QuestDB DELETE syntax + delete_query = f"DELETE FROM {table} WHERE timestamp >= '{date_str}' AND timestamp < '{next_day_str}'" + response = requests.get( + f"{self.questdb_url}/exec", + params={'query': delete_query}, + auth=self.auth, + timeout=30 + ) + if response.status_code == 200: + logger.debug(f"Deleted old analytics from {table} for {date}") + except Exception as e: + logger.debug(f"Could not delete from {table} for {date}: {e}") + + def force_recalculate_date(self, date: datetime.date): + """Erzwingt Neuberechnung der Analytics für ein Datum (löscht alte Daten zuerst).""" + logger.info(f"Force recalculating analytics for {date}...") + + # Lösche alte Analytics-Daten für dieses Datum + self.delete_analytics_for_date(date) + + # Berechne neu + self.process_date(date) + def run(self): """Hauptschleife des Workers""" logger.info("Analytics Worker started.") @@ -874,35 +907,26 @@ class AnalyticsWorker: logger.error("Failed to connect to QuestDB. Exiting.") return - # Initiale Berechnung fehlender Tage (inkl. gestern und heute) + # Initiale Berechnung fehlender Tage logger.info("Checking for missing dates...") self.process_missing_dates() - # Stelle sicher, dass gestern und heute verarbeitet werden + # IMMER heute und gestern neu berechnen (da neue Trades hinzukommen können) today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) - logger.info(f"Ensuring yesterday ({yesterday}) and today ({today}) are processed...") - # Prüfe alle drei Tabellen - existing_custom = self.get_existing_dates('analytics_custom') - existing_exchange = self.get_existing_dates('analytics_exchange_daily') - existing_summary = self.get_existing_dates('analytics_daily_summary') - existing_dates = existing_custom | existing_exchange | existing_summary + logger.info(f"Force recalculating yesterday ({yesterday}) and today ({today}) - new trades may have been added...") - if yesterday not in existing_dates: - logger.info(f"Processing yesterday's data: {yesterday}") - self.process_date(yesterday) + # Gestern immer neu berechnen + self.force_recalculate_date(yesterday) - # Heute wird verarbeitet, wenn es bereits Trades gibt - if today not in existing_dates: - # Prüfe ob es heute schon Trades gibt - query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'" - data = self.query_questdb(query) - if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0: - logger.info(f"Found trades for today ({today}), processing...") - self.process_date(today) - else: - logger.info(f"No trades found for today ({today}) yet, will process later") + # Heute nur wenn es Trades gibt + query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'" + data = self.query_questdb(query) + if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0: + self.force_recalculate_date(today) + else: + logger.info(f"No trades found for today ({today}) yet, will process later") # Hauptschleife: Prüfe regelmäßig auf fehlende Tage logger.info("Starting main loop - checking for missing dates every hour...") @@ -917,32 +941,24 @@ class AnalyticsWorker: self.process_missing_dates() last_check_hour = current_hour - # Stelle sicher, dass gestern und heute verarbeitet wurden + # IMMER heute und gestern neu berechnen today = now.date() yesterday = today - datetime.timedelta(days=1) - # Prüfe alle drei Tabellen - existing_custom = self.get_existing_dates('analytics_custom') - existing_exchange = self.get_existing_dates('analytics_exchange_daily') - existing_summary = self.get_existing_dates('analytics_daily_summary') - existing_dates = existing_custom | existing_exchange | existing_summary - if yesterday not in existing_dates: - logger.info(f"Processing yesterday's data: {yesterday}") - self.process_date(yesterday) + logger.info(f"Hourly recalculation of yesterday ({yesterday}) and today ({today})...") + self.force_recalculate_date(yesterday) # Prüfe heute, ob es Trades gibt - if today not in existing_dates: - query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'" - data = self.query_questdb(query) - if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0: - logger.info(f"Found trades for today ({today}), processing...") - self.process_date(today) + query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'" + data = self.query_questdb(query) + if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0: + self.force_recalculate_date(today) # Prüfe ob es Mitternacht ist (00:00) - verarbeite dann gestern if now.hour == 0 and now.minute == 0: yesterday = (now - datetime.timedelta(days=1)).date() - logger.info(f"Midnight reached - processing yesterday's data: {yesterday}") - self.process_date(yesterday) + logger.info(f"Midnight reached - force recalculating yesterday's data: {yesterday}") + self.force_recalculate_date(yesterday) # Warte 61s, um Mehrfachausführung zu verhindern time.sleep(61)