Fix: Analytics Worker berechnet heute/gestern IMMER neu
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
- Neue force_recalculate_date() Methode löscht alte Daten vor Neuberechnung - Heute und gestern werden bei jedem Stunden-Check neu berechnet - Behebt Problem, dass neue Trades nicht in Analytics aufgenommen wurden
This commit is contained in:
@@ -865,6 +865,39 @@ class AnalyticsWorker:
|
|||||||
if i % 10 == 0:
|
if i % 10 == 0:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
def delete_analytics_for_date(self, date: datetime.date):
|
||||||
|
"""Löscht alle Analytics-Daten für ein bestimmtes Datum, damit sie neu berechnet werden können."""
|
||||||
|
date_str = date.strftime('%Y-%m-%d')
|
||||||
|
next_day = date + datetime.timedelta(days=1)
|
||||||
|
next_day_str = next_day.strftime('%Y-%m-%d')
|
||||||
|
|
||||||
|
tables = ['analytics_custom', 'analytics_exchange_daily', 'analytics_daily_summary']
|
||||||
|
|
||||||
|
for table in tables:
|
||||||
|
try:
|
||||||
|
# QuestDB DELETE syntax
|
||||||
|
delete_query = f"DELETE FROM {table} WHERE timestamp >= '{date_str}' AND timestamp < '{next_day_str}'"
|
||||||
|
response = requests.get(
|
||||||
|
f"{self.questdb_url}/exec",
|
||||||
|
params={'query': delete_query},
|
||||||
|
auth=self.auth,
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
if response.status_code == 200:
|
||||||
|
logger.debug(f"Deleted old analytics from {table} for {date}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Could not delete from {table} for {date}: {e}")
|
||||||
|
|
||||||
|
def force_recalculate_date(self, date: datetime.date):
|
||||||
|
"""Erzwingt Neuberechnung der Analytics für ein Datum (löscht alte Daten zuerst)."""
|
||||||
|
logger.info(f"Force recalculating analytics for {date}...")
|
||||||
|
|
||||||
|
# Lösche alte Analytics-Daten für dieses Datum
|
||||||
|
self.delete_analytics_for_date(date)
|
||||||
|
|
||||||
|
# Berechne neu
|
||||||
|
self.process_date(date)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Hauptschleife des Workers"""
|
"""Hauptschleife des Workers"""
|
||||||
logger.info("Analytics Worker started.")
|
logger.info("Analytics Worker started.")
|
||||||
@@ -874,35 +907,26 @@ class AnalyticsWorker:
|
|||||||
logger.error("Failed to connect to QuestDB. Exiting.")
|
logger.error("Failed to connect to QuestDB. Exiting.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Initiale Berechnung fehlender Tage (inkl. gestern und heute)
|
# Initiale Berechnung fehlender Tage
|
||||||
logger.info("Checking for missing dates...")
|
logger.info("Checking for missing dates...")
|
||||||
self.process_missing_dates()
|
self.process_missing_dates()
|
||||||
|
|
||||||
# Stelle sicher, dass gestern und heute verarbeitet werden
|
# IMMER heute und gestern neu berechnen (da neue Trades hinzukommen können)
|
||||||
today = datetime.date.today()
|
today = datetime.date.today()
|
||||||
yesterday = today - datetime.timedelta(days=1)
|
yesterday = today - datetime.timedelta(days=1)
|
||||||
|
|
||||||
logger.info(f"Ensuring yesterday ({yesterday}) and today ({today}) are processed...")
|
logger.info(f"Force recalculating yesterday ({yesterday}) and today ({today}) - new trades may have been added...")
|
||||||
# Prüfe alle drei Tabellen
|
|
||||||
existing_custom = self.get_existing_dates('analytics_custom')
|
|
||||||
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
|
|
||||||
existing_summary = self.get_existing_dates('analytics_daily_summary')
|
|
||||||
existing_dates = existing_custom | existing_exchange | existing_summary
|
|
||||||
|
|
||||||
if yesterday not in existing_dates:
|
# Gestern immer neu berechnen
|
||||||
logger.info(f"Processing yesterday's data: {yesterday}")
|
self.force_recalculate_date(yesterday)
|
||||||
self.process_date(yesterday)
|
|
||||||
|
|
||||||
# Heute wird verarbeitet, wenn es bereits Trades gibt
|
# Heute nur wenn es Trades gibt
|
||||||
if today not in existing_dates:
|
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
||||||
# Prüfe ob es heute schon Trades gibt
|
data = self.query_questdb(query)
|
||||||
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
||||||
data = self.query_questdb(query)
|
self.force_recalculate_date(today)
|
||||||
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
else:
|
||||||
logger.info(f"Found trades for today ({today}), processing...")
|
logger.info(f"No trades found for today ({today}) yet, will process later")
|
||||||
self.process_date(today)
|
|
||||||
else:
|
|
||||||
logger.info(f"No trades found for today ({today}) yet, will process later")
|
|
||||||
|
|
||||||
# Hauptschleife: Prüfe regelmäßig auf fehlende Tage
|
# Hauptschleife: Prüfe regelmäßig auf fehlende Tage
|
||||||
logger.info("Starting main loop - checking for missing dates every hour...")
|
logger.info("Starting main loop - checking for missing dates every hour...")
|
||||||
@@ -917,32 +941,24 @@ class AnalyticsWorker:
|
|||||||
self.process_missing_dates()
|
self.process_missing_dates()
|
||||||
last_check_hour = current_hour
|
last_check_hour = current_hour
|
||||||
|
|
||||||
# Stelle sicher, dass gestern und heute verarbeitet wurden
|
# IMMER heute und gestern neu berechnen
|
||||||
today = now.date()
|
today = now.date()
|
||||||
yesterday = today - datetime.timedelta(days=1)
|
yesterday = today - datetime.timedelta(days=1)
|
||||||
# Prüfe alle drei Tabellen
|
|
||||||
existing_custom = self.get_existing_dates('analytics_custom')
|
|
||||||
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
|
|
||||||
existing_summary = self.get_existing_dates('analytics_daily_summary')
|
|
||||||
existing_dates = existing_custom | existing_exchange | existing_summary
|
|
||||||
|
|
||||||
if yesterday not in existing_dates:
|
logger.info(f"Hourly recalculation of yesterday ({yesterday}) and today ({today})...")
|
||||||
logger.info(f"Processing yesterday's data: {yesterday}")
|
self.force_recalculate_date(yesterday)
|
||||||
self.process_date(yesterday)
|
|
||||||
|
|
||||||
# Prüfe heute, ob es Trades gibt
|
# Prüfe heute, ob es Trades gibt
|
||||||
if today not in existing_dates:
|
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
||||||
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
data = self.query_questdb(query)
|
||||||
data = self.query_questdb(query)
|
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
||||||
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
self.force_recalculate_date(today)
|
||||||
logger.info(f"Found trades for today ({today}), processing...")
|
|
||||||
self.process_date(today)
|
|
||||||
|
|
||||||
# Prüfe ob es Mitternacht ist (00:00) - verarbeite dann gestern
|
# Prüfe ob es Mitternacht ist (00:00) - verarbeite dann gestern
|
||||||
if now.hour == 0 and now.minute == 0:
|
if now.hour == 0 and now.minute == 0:
|
||||||
yesterday = (now - datetime.timedelta(days=1)).date()
|
yesterday = (now - datetime.timedelta(days=1)).date()
|
||||||
logger.info(f"Midnight reached - processing yesterday's data: {yesterday}")
|
logger.info(f"Midnight reached - force recalculating yesterday's data: {yesterday}")
|
||||||
self.process_date(yesterday)
|
self.force_recalculate_date(yesterday)
|
||||||
# Warte 61s, um Mehrfachausführung zu verhindern
|
# Warte 61s, um Mehrfachausführung zu verhindern
|
||||||
time.sleep(61)
|
time.sleep(61)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user