From 4fd93541a2e8786ee8372fd6ac07107e7425fb25 Mon Sep 17 00:00:00 2001 From: Melchior Reimers Date: Tue, 27 Jan 2026 13:57:07 +0100 Subject: [PATCH] Fix: Analytics Worker berechnet jetzt alle Tabellen pro Tag --- dashboard/public/index.html | 7 +- src/analytics/worker.py | 184 ++++++++++++++++++++++++++++++++++-- 2 files changed, 181 insertions(+), 10 deletions(-) diff --git a/dashboard/public/index.html b/dashboard/public/index.html index 21c5236..da83119 100644 --- a/dashboard/public/index.html +++ b/dashboard/public/index.html @@ -665,9 +665,14 @@ const groupIdx = columns.findIndex(c => c.name === 'group_value'); const yIdx = columns.findIndex(c => c.name === 'y_value'); - const groups = [...new Set(data.map(r => r[groupIdx]))]; + const groups = [...new Set(data.map(r => r[groupIdx]).filter(g => g && g !== '' && g !== 'NONE'))]; const dates = [...new Set(data.map(r => r[xIdx]))].sort(); + if (groups.length === 0) { + console.log('No valid groups found in data'); + return; + } + // Erweiterte Farben für mehr Exchanges (EIX, LS, XETRA, FRA, GETTEX, STU, QUOTRIX) const colors = ['#38bdf8', '#f43f5e', '#10b981', '#fbbf24', '#8b5cf6', '#f97316', '#ec4899', '#14b8a6', '#84cc16', '#a855f7']; const datasets = groups.map((group, idx) => ({ diff --git a/src/analytics/worker.py b/src/analytics/worker.py index 6691c26..438adbc 100644 --- a/src/analytics/worker.py +++ b/src/analytics/worker.py @@ -557,6 +557,140 @@ class AnalyticsWorker: logger.info("Analytics processing completed.") + def calculate_exchange_daily_for_date(self, date: datetime.date) -> List[Dict]: + """Berechnet exchange_daily Aggregationen für einen einzelnen Tag mit Moving Averages""" + date_str = date.strftime('%Y-%m-%d') + + # Hole Daten für diesen Tag + query = f""" + select + date_trunc('day', timestamp) as date, + exchange, + count(*) as trade_count, + sum(price * quantity) as volume + from trades + where date_trunc('day', timestamp) = '{date_str}' + group by date, exchange + order by exchange asc + """ + + try: + data = self.query_questdb(query) + if not data or not data.get('dataset'): + return [] + + columns = data.get('columns', []) + dataset = data.get('dataset', []) + + results = [] + for row in dataset: + result = {} + for i, col in enumerate(columns): + result[col['name']] = row[i] + results.append(result) + + if not results: + return [] + + # Hole historische Daten für MA-Berechnung (letzte 365 Tage) + end_date = datetime.datetime.combine(date, datetime.time.max).replace(tzinfo=datetime.timezone.utc) + start_date = end_date - datetime.timedelta(days=365) + + hist_query = f""" + select + date_trunc('day', timestamp) as date, + exchange, + count(*) as trade_count, + sum(price * quantity) as volume + from trades + where timestamp >= '{start_date.strftime('%Y-%m-%d')}' + and timestamp <= '{end_date.strftime('%Y-%m-%d')}' + group by date, exchange + order by date asc, exchange asc + """ + + hist_data = self.query_questdb(hist_query) + if hist_data and hist_data.get('dataset'): + hist_columns = hist_data.get('columns', []) + hist_dataset = hist_data.get('dataset', []) + + hist_df = pd.DataFrame([ + {hist_columns[i]['name']: row[i] for i in range(len(hist_columns))} + for row in hist_dataset + ]) + + if not hist_df.empty: + hist_df['date'] = pd.to_datetime(hist_df['date']) + hist_df = hist_df.sort_values(['date', 'exchange']) + + # Berechne MA für jeden Zeitraum + for period in TIME_PERIODS: + hist_df[f'ma{period}_count'] = hist_df.groupby('exchange')['trade_count'].transform( + lambda x: x.rolling(window=period, min_periods=1).mean() + ) + hist_df[f'ma{period}_volume'] = hist_df.groupby('exchange')['volume'].transform( + lambda x: x.rolling(window=period, min_periods=1).mean() + ) + + # Filtere nur den aktuellen Tag + target_date = pd.to_datetime(date_str) + day_df = hist_df[hist_df['date'] == target_date] + + if not day_df.empty: + # Konvertiere date zurück zu datetime für ILP + result_list = day_df.to_dict('records') + for r in result_list: + if isinstance(r.get('date'), pd.Timestamp): + r['date'] = r['date'].to_pydatetime() + return result_list + + # Fallback: Nur aktuelle Daten ohne MA + df = pd.DataFrame(results) + df['date'] = pd.to_datetime(df['date']) + for period in TIME_PERIODS: + df[f'ma{period}_count'] = df['trade_count'] + df[f'ma{period}_volume'] = df['volume'] + + result_list = df.to_dict('records') + # Konvertiere date zurück zu datetime für ILP + for r in result_list: + if isinstance(r.get('date'), pd.Timestamp): + r['date'] = r['date'].to_pydatetime() + return result_list + except Exception as e: + logger.error(f"Error calculating exchange daily for {date}: {e}") + return [] + + def calculate_daily_summary_for_date(self, date: datetime.date) -> Optional[Dict]: + """Berechnet daily_summary für einen einzelnen Tag""" + date_str = date.strftime('%Y-%m-%d') + + query = f""" + select + count(*) as total_trades, + sum(price * quantity) as total_volume, + count(distinct isin) as unique_assets + from trades + where date_trunc('day', timestamp) = '{date_str}' + """ + + try: + data = self.query_questdb(query) + if data and data.get('dataset') and data['dataset']: + row = data['dataset'][0] + columns = data.get('columns', []) + + result = { + 'date': date, + 'total_trades': row[0] if row[0] else 0, + 'total_volume': row[1] if row[1] else 0.0, + 'unique_assets': row[2] if len(row) > 2 and row[2] else 0 + } + return result + except Exception as e: + logger.error(f"Error calculating daily summary for {date}: {e}") + return None + def process_date(self, date: datetime.date): """Verarbeitet alle Analytics für einen bestimmten Tag""" logger.info(f"Processing analytics for {date}") @@ -570,9 +704,9 @@ class AnalyticsWorker: trade_count = check_data['dataset'][0][0] if trade_count == 0: - logger.info(f"No trades found for {date}, creating empty analytics entry") - # Erstelle einen leeren Eintrag, damit der Tag als "verarbeitet" gilt - empty_entry = [{ + logger.info(f"No trades found for {date}, creating empty analytics entries") + # Erstelle leere Einträge für alle Tabellen + empty_custom = [{ 'date': date, 'y_axis': 'volume', 'group_by': 'exchange', @@ -581,18 +715,50 @@ class AnalyticsWorker: 'group_value': '', 'y_value': 0 }] - self.save_analytics_data('analytics_custom', empty_entry) - logger.info(f"Saved empty analytics entry for {date}") + self.save_analytics_data('analytics_custom', empty_custom) + + # Leere exchange_daily Einträge + empty_exchange = [{ + 'date': datetime.datetime.combine(date, datetime.time.min).replace(tzinfo=datetime.timezone.utc), + 'exchange': 'NONE', + 'trade_count': 0, + 'volume': 0.0, + **{f'ma{period}_count': 0 for period in TIME_PERIODS}, + **{f'ma{period}_volume': 0.0 for period in TIME_PERIODS} + }] + self.save_analytics_data('analytics_exchange_daily', empty_exchange) + + # Leere daily_summary + empty_summary = { + 'date': date, + 'total_trades': 0, + 'total_volume': 0.0, + 'unique_assets': 0 + } + self.save_analytics_data('analytics_daily_summary', [empty_summary]) + + logger.info(f"Saved empty analytics entries for {date}") else: - # Custom Analytics (wichtigste Berechnung für Performance) + # 1. Exchange Daily Aggregations + logger.info(f"Calculating exchange daily aggregations for {date}...") + exchange_data = self.calculate_exchange_daily_for_date(date) + if exchange_data: + self.save_analytics_data('analytics_exchange_daily', exchange_data) + logger.info(f"Saved {len(exchange_data)} exchange daily rows for {date}") + + # 2. Daily Summary + logger.info(f"Calculating daily summary for {date}...") + summary_data = self.calculate_daily_summary_for_date(date) + if summary_data: + self.save_analytics_data('analytics_daily_summary', [summary_data]) + logger.info(f"Saved daily summary for {date}") + + # 3. Custom Analytics logger.info(f"Calculating custom analytics for {date} ({trade_count} trades)...") custom_data = self.calculate_custom_analytics_daily(date) if custom_data: self.save_analytics_data('analytics_custom', custom_data) logger.info(f"Saved {len(custom_data)} custom analytics rows for {date}") - else: - # Auch wenn keine Daten zurückkommen, erstelle leeren Eintrag - logger.warning(f"No custom analytics data returned for {date} despite {trade_count} trades") logger.info(f"Completed processing for {date}")