diff --git a/src/analytics/worker.py b/src/analytics/worker.py index 9cfc22d..d9051b3 100644 --- a/src/analytics/worker.py +++ b/src/analytics/worker.py @@ -3,9 +3,8 @@ import logging import datetime import os import requests -from typing import Dict, List, Optional +from typing import Dict, List, Tuple, Optional import pandas as pd -import json logging.basicConfig( level=logging.INFO, @@ -24,8 +23,9 @@ TIME_PERIODS = [7, 30, 42, 69, 180, 365] class AnalyticsWorker: def __init__(self): + self.last_processed_timestamp = None self.db_url = DB_URL - + def wait_for_questdb(self, max_retries: int = 30, retry_delay: int = 2): """Wartet bis QuestDB verfügbar ist""" logger.info("Waiting for QuestDB to be available...") @@ -40,317 +40,281 @@ class AnalyticsWorker: time.sleep(retry_delay) logger.error("QuestDB did not become available after waiting") return False - - def query_questdb(self, query: str, timeout: int = 30) -> Optional[Dict]: - """Zentrale QuestDB-Abfrage-Funktion""" + + def get_last_processed_timestamp(self) -> Optional[datetime.datetime]: + """Holt den letzten verarbeiteten Timestamp aus der Analytics-Tabelle""" try: - response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout) + query = "select max(timestamp) as last_ts from analytics_exchange_daily" + response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: - return response.json() - else: - logger.error(f"QuestDB query failed: {response.status_code} - {response.text}") - return None + data = response.json() + if data.get('dataset') and data['dataset'] and len(data['dataset']) > 0 and data['dataset'][0][0]: + ts_value = data['dataset'][0][0] + if isinstance(ts_value, str): + return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00')) + elif isinstance(ts_value, (int, float)): + # QuestDB gibt Timestamps in Mikrosekunden zurück + return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc) except Exception as e: - logger.error(f"Error querying QuestDB: {e}") - return None + logger.debug(f"Could not get last processed timestamp: {e}") + return None - def get_existing_dates(self, table_name: str) -> set: - """Holt alle bereits berechneten Daten aus einer Analytics-Tabelle""" - query = f"select distinct date_trunc('day', timestamp) as date from {table_name}" - data = self.query_questdb(query) - if not data or not data.get('dataset'): - return set() - - dates = set() - for row in data['dataset']: - if row[0]: - if isinstance(row[0], str): - dates.add(datetime.datetime.fromisoformat(row[0].replace('Z', '+00:00')).date()) - elif isinstance(row[0], (int, float)): - dates.add(datetime.datetime.fromtimestamp(row[0] / 1000000, tz=datetime.timezone.utc).date()) - return dates - - def get_missing_dates(self) -> List[datetime.date]: - """Ermittelt fehlende Tage, die noch berechnet werden müssen""" - # Hole das Datum des ersten Trades - query = "select min(date_trunc('day', timestamp)) as first_date from trades" - data = self.query_questdb(query) - if not data or not data.get('dataset') or not data['dataset'][0][0]: - logger.info("No trades found in database") - return [] - - first_date_value = data['dataset'][0][0] - if isinstance(first_date_value, str): - first_date = datetime.datetime.fromisoformat(first_date_value.replace('Z', '+00:00')).date() + def get_new_trades(self, since: Optional[datetime.datetime] = None) -> List[Dict]: + """Holt neue Trades seit dem letzten Verarbeitungszeitpunkt""" + if since: + since_str = since.strftime('%Y-%m-%d %H:%M:%S') + query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > '{since_str}' order by timestamp asc" else: - first_date = datetime.datetime.fromtimestamp(first_date_value / 1000000, tz=datetime.timezone.utc).date() + # Erste Ausführung: nur die letzten 7 Tage + query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > dateadd('d', -7, now()) order by timestamp asc" - # Hole bereits berechnete Daten - existing_dates = self.get_existing_dates('analytics_daily_summary') - - # Generiere alle Tage vom ersten Trade bis gestern - yesterday = datetime.date.today() - datetime.timedelta(days=1) - all_dates = [] - current = first_date - while current <= yesterday: - all_dates.append(current) - current += datetime.timedelta(days=1) - - # Finde fehlende Tage - missing_dates = [d for d in all_dates if d not in existing_dates] - logger.info(f"Found {len(missing_dates)} missing dates to calculate (from {len(all_dates)} total dates)") - return sorted(missing_dates) + try: + response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + columns = data.get('columns', []) + dataset = data.get('dataset', []) + + trades = [] + for row in dataset: + trade = {} + for i, col in enumerate(columns): + trade[col['name']] = row[i] + trades.append(trade) + return trades + except Exception as e: + logger.error(f"Error fetching new trades: {e}") + return [] - def calculate_daily_summary(self, date: datetime.date) -> Optional[Dict]: - """Berechnet tägliche Zusammenfassung für einen Tag""" - date_str = date.strftime('%Y-%m-%d') - query = f""" - select - count(*) as total_trades, - sum(price * quantity) as total_volume, - exchange, - count(*) as exchange_trades - from trades - where date_trunc('day', timestamp) = '{date_str}' - group by exchange - """ - - data = self.query_questdb(query) - if not data or not data.get('dataset'): - return None - - total_trades = 0 - total_volume = 0.0 - exchanges = {} - - for row in data['dataset']: - exchange = row[2] - trades = row[3] if row[3] else 0 - volume = row[1] if row[1] else 0.0 - - total_trades += trades - total_volume += volume - exchanges[exchange] = {'trades': trades, 'volume': volume} - - return { - 'date': date, - 'total_trades': total_trades, - 'total_volume': total_volume, - 'exchanges': json.dumps(exchanges) - } - - def calculate_exchange_daily(self, date: datetime.date) -> List[Dict]: - """Berechnet tägliche Exchange-Statistiken mit Moving Averages""" - date_str = date.strftime('%Y-%m-%d') - - # Hole Daten für diesen Tag + def calculate_exchange_daily_aggregations(self, days_back: int = 365) -> List[Dict]: + """Berechnet tägliche Aggregationen je Exchange mit Moving Averages""" + end_date = datetime.datetime.now(datetime.timezone.utc) + start_date = end_date - datetime.timedelta(days=days_back) + query = f""" select + date_trunc('day', timestamp) as date, exchange, count(*) as trade_count, sum(price * quantity) as volume from trades - where date_trunc('day', timestamp) = '{date_str}' - group by exchange + where timestamp >= '{start_date.strftime('%Y-%m-%d')}' + group by date, exchange + order by date asc, exchange asc """ - data = self.query_questdb(query) - if not data or not data.get('dataset'): - return [] - - results = [] - for row in data['dataset']: - exchange = row[0] - trade_count = row[1] if row[1] else 0 - volume = row[2] if row[2] else 0.0 - - # Berechne Moving Averages für alle Zeiträume - ma_values = {} - for period in TIME_PERIODS: - # Hole Daten der letzten N Tage inklusive heute - end_date = date - start_date = end_date - datetime.timedelta(days=period-1) + try: + response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + columns = data.get('columns', []) + dataset = data.get('dataset', []) - ma_query = f""" - select - count(*) as ma_count, - sum(price * quantity) as ma_volume - from trades - where exchange = '{exchange}' - and date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}' - and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}' - """ + results = [] + for row in dataset: + result = {} + for i, col in enumerate(columns): + result[col['name']] = row[i] + results.append(result) - ma_data = self.query_questdb(ma_query) - if ma_data and ma_data.get('dataset') and ma_data['dataset'][0]: - ma_values[f'ma{period}_count'] = ma_data['dataset'][0][0] if ma_data['dataset'][0][0] else 0 - ma_values[f'ma{period}_volume'] = ma_data['dataset'][0][1] if ma_data['dataset'][0][1] else 0.0 - else: - ma_values[f'ma{period}_count'] = 0 - ma_values[f'ma{period}_volume'] = 0.0 - - results.append({ - 'date': date, - 'exchange': exchange, - 'trade_count': trade_count, - 'volume': volume, - **ma_values - }) - - return results + # Berechne Moving Averages für alle Zeiträume + df = pd.DataFrame(results) + if df.empty: + return [] + + # Pivot für einfachere MA-Berechnung + df['date'] = pd.to_datetime(df['date']) + df = df.sort_values(['date', 'exchange']) + + # Berechne MA für jeden Zeitraum + for period in TIME_PERIODS: + df[f'ma{period}_count'] = df.groupby('exchange')['trade_count'].transform( + lambda x: x.rolling(window=period, min_periods=1).mean() + ) + df[f'ma{period}_volume'] = df.groupby('exchange')['volume'].transform( + lambda x: x.rolling(window=period, min_periods=1).mean() + ) + + # Konvertiere zurück zu Dict-Liste + return df.to_dict('records') + except Exception as e: + logger.error(f"Error calculating exchange daily aggregations: {e}") + return [] - def calculate_stock_trends(self, date: datetime.date) -> List[Dict]: - """Berechnet Stock-Trends für alle Zeiträume für einen Tag""" - results = [] + def calculate_stock_trends(self, days: int = 365) -> List[Dict]: + """Berechnet Trenddaten je ISIN mit Änderungsprozenten""" + end_date = datetime.datetime.now(datetime.timezone.utc) + start_date = end_date - datetime.timedelta(days=days) - for period in TIME_PERIODS: - end_date = date - start_date = end_date - datetime.timedelta(days=period-1) - - # Aktuelle Periode - query = f""" - select - isin, - count(*) as trade_count, - sum(price * quantity) as volume - from trades - where date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}' - and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}' - group by isin - """ - - data = self.query_questdb(query) - if not data or not data.get('dataset'): - continue - - for row in data['dataset']: - isin = row[0] - current_count = row[1] if row[1] else 0 - current_volume = row[2] if row[2] else 0.0 - - # Vorherige Periode für Vergleich - prev_start = start_date - datetime.timedelta(days=period) - prev_end = start_date - datetime.timedelta(days=1) - - prev_query = f""" - select - count(*) as trade_count, - sum(price * quantity) as volume - from trades - where isin = '{isin}' - and date_trunc('day', timestamp) >= '{prev_start.strftime('%Y-%m-%d')}' - and date_trunc('day', timestamp) <= '{prev_end.strftime('%Y-%m-%d')}' - """ - - prev_data = self.query_questdb(prev_query) - prev_count = 0 - prev_volume = 0.0 - - if prev_data and prev_data.get('dataset') and prev_data['dataset'][0]: - prev_count = prev_data['dataset'][0][0] if prev_data['dataset'][0][0] else 0 - prev_volume = prev_data['dataset'][0][1] if prev_data['dataset'][0][1] else 0.0 - - # Berechne Änderungen - count_change_pct = ((current_count - prev_count) / prev_count * 100) if prev_count > 0 else 0 - volume_change_pct = ((current_volume - prev_volume) / prev_volume * 100) if prev_volume > 0 else 0 - - results.append({ - 'date': date, - 'period_days': period, - 'isin': isin, - 'trade_count': current_count, - 'volume': current_volume, - 'count_change_pct': count_change_pct, - 'volume_change_pct': volume_change_pct - }) + # Aktuelle Periode + query_current = f""" + select + date_trunc('day', timestamp) as date, + isin, + count(*) as trade_count, + sum(price * quantity) as volume + from trades + where timestamp >= '{start_date.strftime('%Y-%m-%d')}' + group by date, isin + order by date asc, isin asc + """ - return results + try: + response = requests.get(f"{self.db_url}/exec", params={'query': query_current}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + columns = data.get('columns', []) + dataset = data.get('dataset', []) + + results = [] + for row in dataset: + result = {} + for i, col in enumerate(columns): + result[col['name']] = row[i] + results.append(result) + + if not results: + return [] + + df = pd.DataFrame(results) + df['date'] = pd.to_datetime(df['date']) + + # Aggregiere je ISIN über den gesamten Zeitraum + df_agg = df.groupby('isin').agg({ + 'trade_count': 'sum', + 'volume': 'sum' + }).reset_index() + + # Berechne Änderungen: Vergleich mit vorheriger Periode + # Für jede ISIN: aktueller Zeitraum vs. vorheriger Zeitraum + trends = [] + for isin in df_agg['isin'].unique(): + isin_data = df[df['isin'] == isin].sort_values('date') + + # Teile in zwei Hälften für Vergleich + mid_point = len(isin_data) // 2 + if mid_point > 0: + first_half = isin_data.iloc[:mid_point] + second_half = isin_data.iloc[mid_point:] + + first_count = first_half['trade_count'].sum() + first_volume = first_half['volume'].sum() + second_count = second_half['trade_count'].sum() + second_volume = second_half['volume'].sum() + + count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0 + volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0 + else: + count_change = 0 + volume_change = 0 + + total_count = isin_data['trade_count'].sum() + total_volume = isin_data['volume'].sum() + + trends.append({ + 'isin': isin, + 'date': isin_data['date'].max(), + 'trade_count': total_count, + 'volume': total_volume, + 'count_change_pct': count_change, + 'volume_change_pct': volume_change + }) + + return trends + except Exception as e: + logger.error(f"Error calculating stock trends: {e}") + return [] - def calculate_volume_changes(self, date: datetime.date) -> List[Dict]: - """Berechnet Volumen-Änderungen für alle Zeiträume für einen Tag""" - results = [] + def calculate_volume_changes(self, days: int = 365) -> List[Dict]: + """Berechnet Volumen- und Anzahl-Änderungen je Exchange""" + end_date = datetime.datetime.now(datetime.timezone.utc) + start_date = end_date - datetime.timedelta(days=days) - for period in TIME_PERIODS: - end_date = date - start_date = end_date - datetime.timedelta(days=period-1) - - # Hole alle Exchanges - exchanges_query = "select distinct exchange from trades" - exchanges_data = self.query_questdb(exchanges_query) - if not exchanges_data or not exchanges_data.get('dataset'): - continue - - for exchange_row in exchanges_data['dataset']: - exchange = exchange_row[0] - - # Aktuelle Periode - query = f""" - select - count(*) as trade_count, - sum(price * quantity) as volume - from trades - where exchange = '{exchange}' - and date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}' - and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}' - """ - - data = self.query_questdb(query) - if not data or not data.get('dataset') or not data['dataset'][0]: - continue - - current_count = data['dataset'][0][0] if data['dataset'][0][0] else 0 - current_volume = data['dataset'][0][1] if data['dataset'][0][1] else 0.0 - - # Vorherige Periode - prev_start = start_date - datetime.timedelta(days=period) - prev_end = start_date - datetime.timedelta(days=1) - - prev_query = f""" - select - count(*) as trade_count, - sum(price * quantity) as volume - from trades - where exchange = '{exchange}' - and date_trunc('day', timestamp) >= '{prev_start.strftime('%Y-%m-%d')}' - and date_trunc('day', timestamp) <= '{prev_end.strftime('%Y-%m-%d')}' - """ - - prev_data = self.query_questdb(prev_query) - prev_count = 0 - prev_volume = 0.0 - - if prev_data and prev_data.get('dataset') and prev_data['dataset'][0]: - prev_count = prev_data['dataset'][0][0] if prev_data['dataset'][0][0] else 0 - prev_volume = prev_data['dataset'][0][1] if prev_data['dataset'][0][1] else 0.0 - - # Berechne Änderungen - count_change_pct = ((current_count - prev_count) / prev_count * 100) if prev_count > 0 else 0 - volume_change_pct = ((current_volume - prev_volume) / prev_volume * 100) if prev_volume > 0 else 0 - - # Bestimme Trend - if count_change_pct > 5 and volume_change_pct > 5: - trend = "mehr_trades_mehr_volumen" - elif count_change_pct > 5 and volume_change_pct < -5: - trend = "mehr_trades_weniger_volumen" - elif count_change_pct < -5 and volume_change_pct > 5: - trend = "weniger_trades_mehr_volumen" - elif count_change_pct < -5 and volume_change_pct < -5: - trend = "weniger_trades_weniger_volumen" - else: - trend = "stabil" - - results.append({ - 'date': date, - 'period_days': period, - 'exchange': exchange, - 'trade_count': current_count, - 'volume': current_volume, - 'count_change_pct': count_change_pct, - 'volume_change_pct': volume_change_pct, - 'trend': trend - }) + query = f""" + select + date_trunc('day', timestamp) as date, + exchange, + count(*) as trade_count, + sum(price * quantity) as volume + from trades + where timestamp >= '{start_date.strftime('%Y-%m-%d')}' + group by date, exchange + order by date asc, exchange asc + """ - return results + try: + response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + columns = data.get('columns', []) + dataset = data.get('dataset', []) + + results = [] + for row in dataset: + result = {} + for i, col in enumerate(columns): + result[col['name']] = row[i] + results.append(result) + + if not results: + return [] + + df = pd.DataFrame(results) + df['date'] = pd.to_datetime(df['date']) + df = df.sort_values(['date', 'exchange']) + + # Berechne Änderungen je Exchange + changes = [] + for exchange in df['exchange'].unique(): + exchange_data = df[df['exchange'] == exchange].sort_values('date') + + # Teile in zwei Hälften + mid_point = len(exchange_data) // 2 + if mid_point > 0: + first_half = exchange_data.iloc[:mid_point] + second_half = exchange_data.iloc[mid_point:] + + first_count = first_half['trade_count'].sum() + first_volume = first_half['volume'].sum() + second_count = second_half['trade_count'].sum() + second_volume = second_half['volume'].sum() + + count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0 + volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0 + + # Bestimme Trend + if count_change > 5 and volume_change > 5: + trend = "mehr_trades_mehr_volumen" + elif count_change > 5 and volume_change < -5: + trend = "mehr_trades_weniger_volumen" + elif count_change < -5 and volume_change > 5: + trend = "weniger_trades_mehr_volumen" + elif count_change < -5 and volume_change < -5: + trend = "weniger_trades_weniger_volumen" + else: + trend = "stabil" + else: + count_change = 0 + volume_change = 0 + trend = "stabil" + + total_count = exchange_data['trade_count'].sum() + total_volume = exchange_data['volume'].sum() + + changes.append({ + 'date': exchange_data['date'].max(), + 'exchange': exchange, + 'trade_count': total_count, + 'volume': total_volume, + 'count_change_pct': count_change, + 'volume_change_pct': volume_change, + 'trend': trend + }) + + return changes + except Exception as e: + logger.error(f"Error calculating volume changes: {e}") + return [] def save_analytics_data(self, table_name: str, data: List[Dict]): """Speichert aggregierte Daten in QuestDB via ILP""" @@ -362,10 +326,10 @@ class AnalyticsWorker: try: # Konvertiere Datum zu Timestamp if 'date' in row: - if isinstance(row['date'], datetime.date): - dt = datetime.datetime.combine(row['date'], datetime.time.min).replace(tzinfo=datetime.timezone.utc) - elif isinstance(row['date'], str): + if isinstance(row['date'], str): dt = datetime.datetime.fromisoformat(row['date'].replace('Z', '+00:00')) + elif isinstance(row['date'], pd.Timestamp): + dt = row['date'].to_pydatetime() else: dt = row['date'] timestamp_ns = int(dt.timestamp() * 1e9) @@ -386,32 +350,30 @@ class AnalyticsWorker: isin = str(row['isin']).replace(' ', '\\ ').replace(',', '\\,') tags.append(f"isin={isin}") - # Period als Tag - if 'period_days' in row and row['period_days']: - tags.append(f"period_days={row['period_days']}") - # Trend als Tag if 'trend' in row and row['trend']: trend = str(row['trend']).replace(' ', '\\ ').replace(',', '\\,') tags.append(f"trend={trend}") - # Numerische Felder + # Numerische Felder (period_days muss als Feld gespeichert werden, nicht als Tag) for key, value in row.items(): - if key in ['date', 'exchange', 'isin', 'trend', 'period_days', 'exchanges']: + if key in ['date', 'exchange', 'isin', 'trend', 'exchanges']: continue if value is not None: if isinstance(value, (int, float)): fields.append(f"{key}={value}") elif isinstance(value, str): + # String-Felder in Anführungszeichen escaped = value.replace('"', '\\"').replace(' ', '\\ ') fields.append(f'{key}="{escaped}"') - # Exchanges als JSON-Feld - if 'exchanges' in row and row['exchanges']: - fields.append(f'exchanges="{row["exchanges"]}"') - - if tags and fields: - line = f"{table_name},{','.join(tags)} {','.join(fields)} {timestamp_ns}" + # Erstelle Line - auch wenn keine Tags vorhanden sind (nur Fields) + if fields: + if tags: + line = f"{table_name},{','.join(tags)} {','.join(fields)} {timestamp_ns}" + else: + # Manche Tabellen haben keine Tags, nur Fields + line = f"{table_name} {','.join(fields)} {timestamp_ns}" lines.append(line) except Exception as e: logger.error(f"Error formatting row for {table_name}: {e}, row: {row}") @@ -436,46 +398,37 @@ class AnalyticsWorker: except Exception as e: logger.error(f"Error connecting to QuestDB: {e}") - def process_date(self, date: datetime.date): - """Verarbeitet alle Analytics für einen bestimmten Tag""" - logger.info(f"Processing analytics for {date}") + def process_all_analytics(self): + """Verarbeitet alle Analytics für alle Zeiträume""" + logger.info("Starting analytics processing...") - # 1. Daily Summary - summary = self.calculate_daily_summary(date) - if summary: - self.save_analytics_data('analytics_daily_summary', [summary]) - - # 2. Exchange Daily - exchange_data = self.calculate_exchange_daily(date) + # 1. Exchange Daily Aggregations (für alle Zeiträume) + logger.info("Calculating exchange daily aggregations...") + exchange_data = self.calculate_exchange_daily_aggregations(days_back=365) if exchange_data: self.save_analytics_data('analytics_exchange_daily', exchange_data) - # 3. Stock Trends - stock_trends = self.calculate_stock_trends(date) - if stock_trends: - self.save_analytics_data('analytics_stock_trends', stock_trends) + # 2. Stock Trends (für alle Zeiträume) + logger.info("Calculating stock trends...") + for days in TIME_PERIODS: + trends = self.calculate_stock_trends(days=days) + if trends: + # Füge Zeitraum als Tag hinzu + for trend in trends: + trend['period_days'] = days + self.save_analytics_data('analytics_stock_trends', trends) - # 4. Volume Changes - volume_changes = self.calculate_volume_changes(date) - if volume_changes: - self.save_analytics_data('analytics_volume_changes', volume_changes) + # 3. Volume Changes (für alle Zeiträume) + logger.info("Calculating volume changes...") + for days in TIME_PERIODS: + changes = self.calculate_volume_changes(days=days) + if changes: + # Füge Zeitraum als Tag hinzu + for change in changes: + change['period_days'] = days + self.save_analytics_data('analytics_volume_changes', changes) - logger.info(f"Completed processing for {date}") - - def process_missing_dates(self): - """Berechnet alle fehlenden Tage""" - missing_dates = self.get_missing_dates() - if not missing_dates: - logger.info("No missing dates to process") - return - - logger.info(f"Processing {len(missing_dates)} missing dates...") - for i, date in enumerate(missing_dates, 1): - logger.info(f"Processing date {i}/{len(missing_dates)}: {date}") - self.process_date(date) - # Kleine Pause zwischen den Berechnungen - if i % 10 == 0: - time.sleep(1) + logger.info("Analytics processing completed.") def run(self): """Hauptschleife des Workers""" @@ -486,30 +439,32 @@ class AnalyticsWorker: logger.error("Failed to connect to QuestDB. Exiting.") return - # Initiale Berechnung fehlender Tage - logger.info("Checking for missing dates...") - self.process_missing_dates() + # Initiale Verarbeitung + self.process_all_analytics() + self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc) - # Hauptschleife: Warte auf Mitternacht - logger.info("Waiting for midnight to process yesterday's data...") + # Polling-Schleife while True: - now = datetime.datetime.now() - - # Prüfe ob es Mitternacht ist (00:00) - if now.hour == 0 and now.minute == 0: - yesterday = (now - datetime.timedelta(days=1)).date() - logger.info(f"Processing yesterday's data: {yesterday}") - self.process_date(yesterday) - # Warte 61s, um Mehrfachausführung zu verhindern - time.sleep(61) - - # Prüfe auch auf fehlende Tage (alle 6 Stunden) - if now.hour % 6 == 0 and now.minute == 0: - logger.info("Checking for missing dates...") - self.process_missing_dates() - time.sleep(61) - - time.sleep(30) + try: + # Prüfe auf neue Trades + last_ts = self.get_last_processed_timestamp() + new_trades = self.get_new_trades(since=last_ts) + + if new_trades: + logger.info(f"Found {len(new_trades)} new trades, reprocessing analytics...") + self.process_all_analytics() + self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc) + else: + logger.debug("No new trades found.") + + # Warte 30 Sekunden vor nächster Prüfung + time.sleep(30) + except requests.exceptions.ConnectionError as e: + logger.warning(f"Connection error to QuestDB, retrying in 60s: {e}") + time.sleep(60) # Längere Pause bei Verbindungsfehler + except Exception as e: + logger.error(f"Error in worker loop: {e}", exc_info=True) + time.sleep(60) # Längere Pause bei Fehler def main(): worker = AnalyticsWorker()