diff --git a/dashboard/public/index.html b/dashboard/public/index.html
index 21c5236..da83119 100644
--- a/dashboard/public/index.html
+++ b/dashboard/public/index.html
@@ -665,9 +665,14 @@
const groupIdx = columns.findIndex(c => c.name === 'group_value');
const yIdx = columns.findIndex(c => c.name === 'y_value');
- const groups = [...new Set(data.map(r => r[groupIdx]))];
+ const groups = [...new Set(data.map(r => r[groupIdx]).filter(g => g && g !== '' && g !== 'NONE'))];
const dates = [...new Set(data.map(r => r[xIdx]))].sort();
+ if (groups.length === 0) {
+ console.log('No valid groups found in data');
+ return;
+ }
+
// Erweiterte Farben für mehr Exchanges (EIX, LS, XETRA, FRA, GETTEX, STU, QUOTRIX)
const colors = ['#38bdf8', '#f43f5e', '#10b981', '#fbbf24', '#8b5cf6', '#f97316', '#ec4899', '#14b8a6', '#84cc16', '#a855f7'];
const datasets = groups.map((group, idx) => ({
diff --git a/src/analytics/worker.py b/src/analytics/worker.py
index 6691c26..438adbc 100644
--- a/src/analytics/worker.py
+++ b/src/analytics/worker.py
@@ -557,6 +557,140 @@ class AnalyticsWorker:
logger.info("Analytics processing completed.")
+ def calculate_exchange_daily_for_date(self, date: datetime.date) -> List[Dict]:
+ """Berechnet exchange_daily Aggregationen für einen einzelnen Tag mit Moving Averages"""
+ date_str = date.strftime('%Y-%m-%d')
+
+ # Hole Daten für diesen Tag
+ query = f"""
+ select
+ date_trunc('day', timestamp) as date,
+ exchange,
+ count(*) as trade_count,
+ sum(price * quantity) as volume
+ from trades
+ where date_trunc('day', timestamp) = '{date_str}'
+ group by date, exchange
+ order by exchange asc
+ """
+
+ try:
+ data = self.query_questdb(query)
+ if not data or not data.get('dataset'):
+ return []
+
+ columns = data.get('columns', [])
+ dataset = data.get('dataset', [])
+
+ results = []
+ for row in dataset:
+ result = {}
+ for i, col in enumerate(columns):
+ result[col['name']] = row[i]
+ results.append(result)
+
+ if not results:
+ return []
+
+ # Hole historische Daten für MA-Berechnung (letzte 365 Tage)
+ end_date = datetime.datetime.combine(date, datetime.time.max).replace(tzinfo=datetime.timezone.utc)
+ start_date = end_date - datetime.timedelta(days=365)
+
+ hist_query = f"""
+ select
+ date_trunc('day', timestamp) as date,
+ exchange,
+ count(*) as trade_count,
+ sum(price * quantity) as volume
+ from trades
+ where timestamp >= '{start_date.strftime('%Y-%m-%d')}'
+ and timestamp <= '{end_date.strftime('%Y-%m-%d')}'
+ group by date, exchange
+ order by date asc, exchange asc
+ """
+
+ hist_data = self.query_questdb(hist_query)
+ if hist_data and hist_data.get('dataset'):
+ hist_columns = hist_data.get('columns', [])
+ hist_dataset = hist_data.get('dataset', [])
+
+ hist_df = pd.DataFrame([
+ {hist_columns[i]['name']: row[i] for i in range(len(hist_columns))}
+ for row in hist_dataset
+ ])
+
+ if not hist_df.empty:
+ hist_df['date'] = pd.to_datetime(hist_df['date'])
+ hist_df = hist_df.sort_values(['date', 'exchange'])
+
+ # Berechne MA für jeden Zeitraum
+ for period in TIME_PERIODS:
+ hist_df[f'ma{period}_count'] = hist_df.groupby('exchange')['trade_count'].transform(
+ lambda x: x.rolling(window=period, min_periods=1).mean()
+ )
+ hist_df[f'ma{period}_volume'] = hist_df.groupby('exchange')['volume'].transform(
+ lambda x: x.rolling(window=period, min_periods=1).mean()
+ )
+
+ # Filtere nur den aktuellen Tag
+ target_date = pd.to_datetime(date_str)
+ day_df = hist_df[hist_df['date'] == target_date]
+
+ if not day_df.empty:
+ # Konvertiere date zurück zu datetime für ILP
+ result_list = day_df.to_dict('records')
+ for r in result_list:
+ if isinstance(r.get('date'), pd.Timestamp):
+ r['date'] = r['date'].to_pydatetime()
+ return result_list
+
+ # Fallback: Nur aktuelle Daten ohne MA
+ df = pd.DataFrame(results)
+ df['date'] = pd.to_datetime(df['date'])
+ for period in TIME_PERIODS:
+ df[f'ma{period}_count'] = df['trade_count']
+ df[f'ma{period}_volume'] = df['volume']
+
+ result_list = df.to_dict('records')
+ # Konvertiere date zurück zu datetime für ILP
+ for r in result_list:
+ if isinstance(r.get('date'), pd.Timestamp):
+ r['date'] = r['date'].to_pydatetime()
+ return result_list
+ except Exception as e:
+ logger.error(f"Error calculating exchange daily for {date}: {e}")
+ return []
+
+ def calculate_daily_summary_for_date(self, date: datetime.date) -> Optional[Dict]:
+ """Berechnet daily_summary für einen einzelnen Tag"""
+ date_str = date.strftime('%Y-%m-%d')
+
+ query = f"""
+ select
+ count(*) as total_trades,
+ sum(price * quantity) as total_volume,
+ count(distinct isin) as unique_assets
+ from trades
+ where date_trunc('day', timestamp) = '{date_str}'
+ """
+
+ try:
+ data = self.query_questdb(query)
+ if data and data.get('dataset') and data['dataset']:
+ row = data['dataset'][0]
+ columns = data.get('columns', [])
+
+ result = {
+ 'date': date,
+ 'total_trades': row[0] if row[0] else 0,
+ 'total_volume': row[1] if row[1] else 0.0,
+ 'unique_assets': row[2] if len(row) > 2 and row[2] else 0
+ }
+ return result
+ except Exception as e:
+ logger.error(f"Error calculating daily summary for {date}: {e}")
+ return None
+
def process_date(self, date: datetime.date):
"""Verarbeitet alle Analytics für einen bestimmten Tag"""
logger.info(f"Processing analytics for {date}")
@@ -570,9 +704,9 @@ class AnalyticsWorker:
trade_count = check_data['dataset'][0][0]
if trade_count == 0:
- logger.info(f"No trades found for {date}, creating empty analytics entry")
- # Erstelle einen leeren Eintrag, damit der Tag als "verarbeitet" gilt
- empty_entry = [{
+ logger.info(f"No trades found for {date}, creating empty analytics entries")
+ # Erstelle leere Einträge für alle Tabellen
+ empty_custom = [{
'date': date,
'y_axis': 'volume',
'group_by': 'exchange',
@@ -581,18 +715,50 @@ class AnalyticsWorker:
'group_value': '',
'y_value': 0
}]
- self.save_analytics_data('analytics_custom', empty_entry)
- logger.info(f"Saved empty analytics entry for {date}")
+ self.save_analytics_data('analytics_custom', empty_custom)
+
+ # Leere exchange_daily Einträge
+ empty_exchange = [{
+ 'date': datetime.datetime.combine(date, datetime.time.min).replace(tzinfo=datetime.timezone.utc),
+ 'exchange': 'NONE',
+ 'trade_count': 0,
+ 'volume': 0.0,
+ **{f'ma{period}_count': 0 for period in TIME_PERIODS},
+ **{f'ma{period}_volume': 0.0 for period in TIME_PERIODS}
+ }]
+ self.save_analytics_data('analytics_exchange_daily', empty_exchange)
+
+ # Leere daily_summary
+ empty_summary = {
+ 'date': date,
+ 'total_trades': 0,
+ 'total_volume': 0.0,
+ 'unique_assets': 0
+ }
+ self.save_analytics_data('analytics_daily_summary', [empty_summary])
+
+ logger.info(f"Saved empty analytics entries for {date}")
else:
- # Custom Analytics (wichtigste Berechnung für Performance)
+ # 1. Exchange Daily Aggregations
+ logger.info(f"Calculating exchange daily aggregations for {date}...")
+ exchange_data = self.calculate_exchange_daily_for_date(date)
+ if exchange_data:
+ self.save_analytics_data('analytics_exchange_daily', exchange_data)
+ logger.info(f"Saved {len(exchange_data)} exchange daily rows for {date}")
+
+ # 2. Daily Summary
+ logger.info(f"Calculating daily summary for {date}...")
+ summary_data = self.calculate_daily_summary_for_date(date)
+ if summary_data:
+ self.save_analytics_data('analytics_daily_summary', [summary_data])
+ logger.info(f"Saved daily summary for {date}")
+
+ # 3. Custom Analytics
logger.info(f"Calculating custom analytics for {date} ({trade_count} trades)...")
custom_data = self.calculate_custom_analytics_daily(date)
if custom_data:
self.save_analytics_data('analytics_custom', custom_data)
logger.info(f"Saved {len(custom_data)} custom analytics rows for {date}")
- else:
- # Auch wenn keine Daten zurückkommen, erstelle leeren Eintrag
- logger.warning(f"No custom analytics data returned for {date} despite {trade_count} trades")
logger.info(f"Completed processing for {date}")