This commit is contained in:
@@ -26,6 +26,21 @@ class AnalyticsWorker:
|
||||
self.last_processed_timestamp = None
|
||||
self.db_url = DB_URL
|
||||
|
||||
def wait_for_questdb(self, max_retries: int = 30, retry_delay: int = 2):
|
||||
"""Wartet bis QuestDB verfügbar ist"""
|
||||
logger.info("Waiting for QuestDB to be available...")
|
||||
for i in range(max_retries):
|
||||
try:
|
||||
response = requests.get(f"{self.db_url}/exec", params={'query': 'select 1'}, auth=DB_AUTH, timeout=2)
|
||||
if response.status_code == 200:
|
||||
logger.info("QuestDB is available!")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.debug(f"QuestDB not ready yet (attempt {i+1}/{max_retries}): {e}")
|
||||
time.sleep(retry_delay)
|
||||
logger.error("QuestDB did not become available after waiting")
|
||||
return False
|
||||
|
||||
def get_last_processed_timestamp(self) -> Optional[datetime.datetime]:
|
||||
"""Holt den letzten verarbeiteten Timestamp aus der Analytics-Tabelle"""
|
||||
try:
|
||||
@@ -414,6 +429,11 @@ class AnalyticsWorker:
|
||||
"""Hauptschleife des Workers"""
|
||||
logger.info("Analytics Worker started.")
|
||||
|
||||
# Warte auf QuestDB
|
||||
if not self.wait_for_questdb():
|
||||
logger.error("Failed to connect to QuestDB. Exiting.")
|
||||
return
|
||||
|
||||
# Initiale Verarbeitung
|
||||
self.process_all_analytics()
|
||||
self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc)
|
||||
@@ -434,8 +454,11 @@ class AnalyticsWorker:
|
||||
|
||||
# Warte 30 Sekunden vor nächster Prüfung
|
||||
time.sleep(30)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
logger.warning(f"Connection error to QuestDB, retrying in 60s: {e}")
|
||||
time.sleep(60) # Längere Pause bei Verbindungsfehler
|
||||
except Exception as e:
|
||||
logger.error(f"Error in worker loop: {e}")
|
||||
logger.error(f"Error in worker loop: {e}", exc_info=True)
|
||||
time.sleep(60) # Längere Pause bei Fehler
|
||||
|
||||
def main():
|
||||
|
||||
Reference in New Issue
Block a user