Fix: Analytics Worker berechnet jetzt alle Tabellen pro Tag
All checks were successful
Deployment / deploy-docker (push) Successful in 16s

This commit is contained in:
Melchior Reimers
2026-01-27 13:57:07 +01:00
parent d66f261bcc
commit 4fd93541a2
2 changed files with 181 additions and 10 deletions

View File

@@ -665,9 +665,14 @@
const groupIdx = columns.findIndex(c => c.name === 'group_value'); const groupIdx = columns.findIndex(c => c.name === 'group_value');
const yIdx = columns.findIndex(c => c.name === 'y_value'); const yIdx = columns.findIndex(c => c.name === 'y_value');
const groups = [...new Set(data.map(r => r[groupIdx]))]; const groups = [...new Set(data.map(r => r[groupIdx]).filter(g => g && g !== '' && g !== 'NONE'))];
const dates = [...new Set(data.map(r => r[xIdx]))].sort(); const dates = [...new Set(data.map(r => r[xIdx]))].sort();
if (groups.length === 0) {
console.log('No valid groups found in data');
return;
}
// Erweiterte Farben für mehr Exchanges (EIX, LS, XETRA, FRA, GETTEX, STU, QUOTRIX) // Erweiterte Farben für mehr Exchanges (EIX, LS, XETRA, FRA, GETTEX, STU, QUOTRIX)
const colors = ['#38bdf8', '#f43f5e', '#10b981', '#fbbf24', '#8b5cf6', '#f97316', '#ec4899', '#14b8a6', '#84cc16', '#a855f7']; const colors = ['#38bdf8', '#f43f5e', '#10b981', '#fbbf24', '#8b5cf6', '#f97316', '#ec4899', '#14b8a6', '#84cc16', '#a855f7'];
const datasets = groups.map((group, idx) => ({ const datasets = groups.map((group, idx) => ({

View File

@@ -557,6 +557,140 @@ class AnalyticsWorker:
logger.info("Analytics processing completed.") logger.info("Analytics processing completed.")
def calculate_exchange_daily_for_date(self, date: datetime.date) -> List[Dict]:
"""Berechnet exchange_daily Aggregationen für einen einzelnen Tag mit Moving Averages"""
date_str = date.strftime('%Y-%m-%d')
# Hole Daten für diesen Tag
query = f"""
select
date_trunc('day', timestamp) as date,
exchange,
count(*) as trade_count,
sum(price * quantity) as volume
from trades
where date_trunc('day', timestamp) = '{date_str}'
group by date, exchange
order by exchange asc
"""
try:
data = self.query_questdb(query)
if not data or not data.get('dataset'):
return []
columns = data.get('columns', [])
dataset = data.get('dataset', [])
results = []
for row in dataset:
result = {}
for i, col in enumerate(columns):
result[col['name']] = row[i]
results.append(result)
if not results:
return []
# Hole historische Daten für MA-Berechnung (letzte 365 Tage)
end_date = datetime.datetime.combine(date, datetime.time.max).replace(tzinfo=datetime.timezone.utc)
start_date = end_date - datetime.timedelta(days=365)
hist_query = f"""
select
date_trunc('day', timestamp) as date,
exchange,
count(*) as trade_count,
sum(price * quantity) as volume
from trades
where timestamp >= '{start_date.strftime('%Y-%m-%d')}'
and timestamp <= '{end_date.strftime('%Y-%m-%d')}'
group by date, exchange
order by date asc, exchange asc
"""
hist_data = self.query_questdb(hist_query)
if hist_data and hist_data.get('dataset'):
hist_columns = hist_data.get('columns', [])
hist_dataset = hist_data.get('dataset', [])
hist_df = pd.DataFrame([
{hist_columns[i]['name']: row[i] for i in range(len(hist_columns))}
for row in hist_dataset
])
if not hist_df.empty:
hist_df['date'] = pd.to_datetime(hist_df['date'])
hist_df = hist_df.sort_values(['date', 'exchange'])
# Berechne MA für jeden Zeitraum
for period in TIME_PERIODS:
hist_df[f'ma{period}_count'] = hist_df.groupby('exchange')['trade_count'].transform(
lambda x: x.rolling(window=period, min_periods=1).mean()
)
hist_df[f'ma{period}_volume'] = hist_df.groupby('exchange')['volume'].transform(
lambda x: x.rolling(window=period, min_periods=1).mean()
)
# Filtere nur den aktuellen Tag
target_date = pd.to_datetime(date_str)
day_df = hist_df[hist_df['date'] == target_date]
if not day_df.empty:
# Konvertiere date zurück zu datetime für ILP
result_list = day_df.to_dict('records')
for r in result_list:
if isinstance(r.get('date'), pd.Timestamp):
r['date'] = r['date'].to_pydatetime()
return result_list
# Fallback: Nur aktuelle Daten ohne MA
df = pd.DataFrame(results)
df['date'] = pd.to_datetime(df['date'])
for period in TIME_PERIODS:
df[f'ma{period}_count'] = df['trade_count']
df[f'ma{period}_volume'] = df['volume']
result_list = df.to_dict('records')
# Konvertiere date zurück zu datetime für ILP
for r in result_list:
if isinstance(r.get('date'), pd.Timestamp):
r['date'] = r['date'].to_pydatetime()
return result_list
except Exception as e:
logger.error(f"Error calculating exchange daily for {date}: {e}")
return []
def calculate_daily_summary_for_date(self, date: datetime.date) -> Optional[Dict]:
"""Berechnet daily_summary für einen einzelnen Tag"""
date_str = date.strftime('%Y-%m-%d')
query = f"""
select
count(*) as total_trades,
sum(price * quantity) as total_volume,
count(distinct isin) as unique_assets
from trades
where date_trunc('day', timestamp) = '{date_str}'
"""
try:
data = self.query_questdb(query)
if data and data.get('dataset') and data['dataset']:
row = data['dataset'][0]
columns = data.get('columns', [])
result = {
'date': date,
'total_trades': row[0] if row[0] else 0,
'total_volume': row[1] if row[1] else 0.0,
'unique_assets': row[2] if len(row) > 2 and row[2] else 0
}
return result
except Exception as e:
logger.error(f"Error calculating daily summary for {date}: {e}")
return None
def process_date(self, date: datetime.date): def process_date(self, date: datetime.date):
"""Verarbeitet alle Analytics für einen bestimmten Tag""" """Verarbeitet alle Analytics für einen bestimmten Tag"""
logger.info(f"Processing analytics for {date}") logger.info(f"Processing analytics for {date}")
@@ -570,9 +704,9 @@ class AnalyticsWorker:
trade_count = check_data['dataset'][0][0] trade_count = check_data['dataset'][0][0]
if trade_count == 0: if trade_count == 0:
logger.info(f"No trades found for {date}, creating empty analytics entry") logger.info(f"No trades found for {date}, creating empty analytics entries")
# Erstelle einen leeren Eintrag, damit der Tag als "verarbeitet" gilt # Erstelle leere Einträge für alle Tabellen
empty_entry = [{ empty_custom = [{
'date': date, 'date': date,
'y_axis': 'volume', 'y_axis': 'volume',
'group_by': 'exchange', 'group_by': 'exchange',
@@ -581,18 +715,50 @@ class AnalyticsWorker:
'group_value': '', 'group_value': '',
'y_value': 0 'y_value': 0
}] }]
self.save_analytics_data('analytics_custom', empty_entry) self.save_analytics_data('analytics_custom', empty_custom)
logger.info(f"Saved empty analytics entry for {date}")
# Leere exchange_daily Einträge
empty_exchange = [{
'date': datetime.datetime.combine(date, datetime.time.min).replace(tzinfo=datetime.timezone.utc),
'exchange': 'NONE',
'trade_count': 0,
'volume': 0.0,
**{f'ma{period}_count': 0 for period in TIME_PERIODS},
**{f'ma{period}_volume': 0.0 for period in TIME_PERIODS}
}]
self.save_analytics_data('analytics_exchange_daily', empty_exchange)
# Leere daily_summary
empty_summary = {
'date': date,
'total_trades': 0,
'total_volume': 0.0,
'unique_assets': 0
}
self.save_analytics_data('analytics_daily_summary', [empty_summary])
logger.info(f"Saved empty analytics entries for {date}")
else: else:
# Custom Analytics (wichtigste Berechnung für Performance) # 1. Exchange Daily Aggregations
logger.info(f"Calculating exchange daily aggregations for {date}...")
exchange_data = self.calculate_exchange_daily_for_date(date)
if exchange_data:
self.save_analytics_data('analytics_exchange_daily', exchange_data)
logger.info(f"Saved {len(exchange_data)} exchange daily rows for {date}")
# 2. Daily Summary
logger.info(f"Calculating daily summary for {date}...")
summary_data = self.calculate_daily_summary_for_date(date)
if summary_data:
self.save_analytics_data('analytics_daily_summary', [summary_data])
logger.info(f"Saved daily summary for {date}")
# 3. Custom Analytics
logger.info(f"Calculating custom analytics for {date} ({trade_count} trades)...") logger.info(f"Calculating custom analytics for {date} ({trade_count} trades)...")
custom_data = self.calculate_custom_analytics_daily(date) custom_data = self.calculate_custom_analytics_daily(date)
if custom_data: if custom_data:
self.save_analytics_data('analytics_custom', custom_data) self.save_analytics_data('analytics_custom', custom_data)
logger.info(f"Saved {len(custom_data)} custom analytics rows for {date}") logger.info(f"Saved {len(custom_data)} custom analytics rows for {date}")
else:
# Auch wenn keine Daten zurückkommen, erstelle leeren Eintrag
logger.warning(f"No custom analytics data returned for {date} despite {trade_count} trades")
logger.info(f"Completed processing for {date}") logger.info(f"Completed processing for {date}")