diff --git a/dashboard/__pycache__/server.cpython-313.pyc b/dashboard/__pycache__/server.cpython-313.pyc index b0984a8..36b2a12 100644 Binary files a/dashboard/__pycache__/server.cpython-313.pyc and b/dashboard/__pycache__/server.cpython-313.pyc differ diff --git a/dashboard/server.py b/dashboard/server.py index 15a541e..41039e3 100644 --- a/dashboard/server.py +++ b/dashboard/server.py @@ -4,8 +4,11 @@ from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse import requests import os +import logging from typing import Optional, Dict, Any +logger = logging.getLogger(__name__) + app = FastAPI(title="Trading Dashboard API") app.add_middleware( @@ -85,21 +88,11 @@ async def get_trades(isin: str = None, days: int = 7): data = query_questdb(query) - # Fallback: Wenn analytics_exchange_daily leer ist, berechne direkt aus trades + # Wenn analytics_exchange_daily leer ist, gib leere Daten zurück + # Der Analytics Worker sollte die Daten berechnen if not data or not data.get('dataset') or len(data.get('dataset', [])) == 0: - logger.info(f"analytics_exchange_daily is empty, calculating from trades table") - query = f""" - select - date_trunc('day', timestamp) as date, - exchange, - count(*) as trade_count, - sum(price * quantity) as volume - from trades - where timestamp >= dateadd('d', -{days}, now()) - group by date, exchange - order by date desc, exchange asc - """ - data = query_questdb(query) + logger.warning(f"analytics_exchange_daily is empty. Analytics worker should calculate this data.") + return {'columns': [{'name': 'date'}, {'name': 'exchange'}, {'name': 'trade_count'}, {'name': 'volume'}], 'dataset': []} return format_questdb_response(data) @@ -136,25 +129,18 @@ async def get_summary(days: int = None): data = query_questdb(query) - # Fallback: Wenn analytics_daily_summary leer ist, berechne direkt aus trades + # Wenn analytics_daily_summary leer ist, gib leere Daten zurück + # Der Analytics Worker sollte die Daten berechnen if not data or not data.get('dataset') or not data['dataset']: - logger.info(f"analytics_daily_summary is empty, calculating from trades table") - if days: - query = f""" - select - count(*) as total_trades, - sum(price * quantity) as total_volume - from trades - where timestamp >= dateadd('d', -{days}, now()) - """ - else: - query = """ - select - count(*) as total_trades, - sum(price * quantity) as total_volume - from trades - """ - data = query_questdb(query) + logger.warning(f"analytics_daily_summary is empty. Analytics worker should calculate this data.") + return { + 'columns': [ + {'name': 'continent'}, + {'name': 'trade_count'}, + {'name': 'total_volume'} + ], + 'dataset': [['All', 0, 0.0]] + } if data and data.get('dataset') and data['dataset']: total_trades = data['dataset'][0][0] if data['dataset'][0][0] else 0 @@ -170,18 +156,15 @@ async def get_summary(days: int = None): 'dataset': [['All', total_trades, total_volume]] } - # Fallback: Original Query - query = """ - select - coalesce(m.continent, 'Unknown') as continent, - count(*) as trade_count, - sum(t.price * t.quantity) as total_volume - from trades t - left join metadata m on t.isin = m.isin - group by continent - """ - data = query_questdb(query) - return format_questdb_response(data) + # Wenn keine Daten vorhanden, gib leere Daten zurück + return { + 'columns': [ + {'name': 'continent'}, + {'name': 'trade_count'}, + {'name': 'total_volume'} + ], + 'dataset': [['All', 0, 0.0]] + } @app.get("/api/statistics/total-trades") async def get_total_trades(days: int = None): @@ -231,7 +214,11 @@ async def get_custom_analytics( # Für Custom Analytics: x_axis muss "date" sein (wird täglich vorberechnet) if x_axis != "date": - # Fallback auf direkte Query für nicht-date x_axis + # Für nicht-date x_axis: gib Fehler zurück, da dies nicht vorberechnet wird + raise HTTPException( + status_code=400, + detail="x_axis must be 'date' for pre-calculated analytics. Other x_axis values are not supported for performance reasons." + ) y_axis_map = { "volume": "sum(price * quantity)", "trade_count": "count(*)", @@ -278,22 +265,12 @@ async def get_custom_analytics( exchange_list = [e.strip() for e in exchanges.split(",")] if len(exchange_list) == 1: exchange_filter = exchange_list[0] - # Bei mehreren Exchanges: Fallback auf direkte Query else: - query = f""" - select - timestamp as x_value, - {group_by} as group_value, - {'sum(price * quantity)' if y_axis == 'volume' else 'count(*)' if y_axis == 'trade_count' else 'avg(price)'} as y_value - from trades - where timestamp >= '{date_from}' - and timestamp <= '{date_to}' - and exchange in ({','.join([f"'{e}'" for e in exchange_list])}) - group by timestamp, {group_by} - order by timestamp asc, {group_by} asc - """ - data = query_questdb(query, timeout=15) - return format_questdb_response(data) + # Bei mehreren Exchanges: gib Fehler zurück, da dies nicht vorberechnet wird + raise HTTPException( + status_code=400, + detail="Multiple exchanges are not supported for pre-calculated analytics. Please specify a single exchange or leave empty for all exchanges." + ) # Query für vorberechnete Daten query = f""" @@ -312,39 +289,16 @@ async def get_custom_analytics( data = query_questdb(query, timeout=5) if not data or not data.get('dataset'): - # Fallback: direkte Query wenn keine vorberechneten Daten vorhanden - logger.warning(f"No pre-calculated data found, falling back to direct query") - y_axis_map = { - "volume": "sum(price * quantity)", - "trade_count": "count(*)", - "avg_price": "avg(price)" + # Wenn keine vorberechneten Daten vorhanden, gib leere Daten zurück + logger.warning(f"No pre-calculated data found in analytics_custom. Analytics worker should calculate this data.") + return { + 'columns': [ + {'name': 'x_value'}, + {'name': 'group_value'}, + {'name': 'y_value'} + ], + 'dataset': [] } - group_by_map = { - "exchange": "exchange", - "isin": "isin", - "date": "date_trunc('day', timestamp)" - } - - y_metric = y_axis_map[y_axis] - group_by_field = group_by_map[group_by] - - query = f""" - select - date_trunc('day', timestamp) as x_value, - {group_by_field} as group_value, - {y_metric} as y_value - from trades - where timestamp >= '{date_from}' - and timestamp <= '{date_to}' - """ - - if exchanges: - exchange_list = ",".join([f"'{e.strip()}'" for e in exchanges.split(",")]) - query += f" and exchange in ({exchange_list})" - - query += f" group by date_trunc('day', timestamp), {group_by_field} order by x_value asc, group_value asc" - - data = query_questdb(query, timeout=15) return format_questdb_response(data) @@ -376,25 +330,21 @@ async def get_moving_average(days: int = 7, exchange: str = None): data = query_questdb(query, timeout=5) - # Fallback: Wenn analytics_exchange_daily leer ist, berechne direkt aus trades + # Wenn analytics_exchange_daily leer ist, gib leere Daten zurück + # Der Analytics Worker sollte die Daten berechnen if not data or not data.get('dataset') or len(data.get('dataset', [])) == 0: - logger.info(f"analytics_exchange_daily is empty, calculating moving average from trades table") - # Berechne Moving Average direkt aus trades (vereinfacht, ohne echte MA-Berechnung) - query = f""" - select - date_trunc('day', timestamp) as date, - exchange, - count(*) as trade_count, - sum(price * quantity) as volume, - count(*) as ma_count, - sum(price * quantity) as ma_volume - from trades - where timestamp >= dateadd('d', -{days}, now()) - """ - if exchange: - query += f" and exchange = '{exchange}'" - query += " group by date, exchange order by date asc, exchange asc" - data = query_questdb(query, timeout=10) + logger.warning(f"analytics_exchange_daily is empty. Analytics worker should calculate this data.") + return { + 'columns': [ + {'name': 'date'}, + {'name': 'exchange'}, + {'name': 'trade_count'}, + {'name': 'volume'}, + {'name': 'ma_count'}, + {'name': 'ma_volume'} + ], + 'dataset': [] + } return format_questdb_response(data) @@ -424,67 +374,21 @@ async def get_volume_changes(days: int = 7): data = query_questdb(query, timeout=5) - # Falls keine vorberechneten Daten vorhanden, berechne on-the-fly + # Wenn keine vorberechneten Daten vorhanden, gib leere Daten zurück + # Der Analytics Worker sollte die Daten berechnen if not data or not data.get('dataset'): - logger.info(f"No pre-calculated volume changes found for {days} days, calculating on-the-fly") - - # Berechne Volumen-Änderungen direkt aus trades - query = f""" - with - first_half as ( - select - exchange, - count(*) as trade_count, - sum(price * quantity) as volume - from trades - where timestamp >= dateadd('d', -{days}, now()) - and timestamp < dateadd('d', -{days/2}, now()) - group by exchange - ), - second_half as ( - select - exchange, - count(*) as trade_count, - sum(price * quantity) as volume - from trades - where timestamp >= dateadd('d', -{days/2}, now()) - group by exchange - ) - select - coalesce(f.exchange, s.exchange) as exchange, - coalesce(s.trade_count, 0) as trade_count, - coalesce(s.volume, 0) as volume, - case when f.trade_count > 0 then - ((coalesce(s.trade_count, 0) - f.trade_count) * 100.0 / f.trade_count) - else 0 end as count_change_pct, - case when f.volume > 0 then - ((coalesce(s.volume, 0) - f.volume) * 100.0 / f.volume) - else 0 end as volume_change_pct, - case - when f.trade_count > 0 and f.volume > 0 then - case - when ((coalesce(s.trade_count, 0) - f.trade_count) * 100.0 / f.trade_count) > 5 - and ((coalesce(s.volume, 0) - f.volume) * 100.0 / f.volume) > 5 - then 'mehr_trades_mehr_volumen' - when ((coalesce(s.trade_count, 0) - f.trade_count) * 100.0 / f.trade_count) > 5 - and ((coalesce(s.volume, 0) - f.volume) * 100.0 / f.volume) < -5 - then 'mehr_trades_weniger_volumen' - when ((coalesce(s.trade_count, 0) - f.trade_count) * 100.0 / f.trade_count) < -5 - and ((coalesce(s.volume, 0) - f.volume) * 100.0 / f.volume) > 5 - then 'weniger_trades_mehr_volumen' - when ((coalesce(s.trade_count, 0) - f.trade_count) * 100.0 / f.trade_count) < -5 - and ((coalesce(s.volume, 0) - f.volume) * 100.0 / f.volume) < -5 - then 'weniger_trades_weniger_volumen' - else 'stabil' - end - else 'neu' - end as trend - from first_half f - full outer join second_half s on f.exchange = s.exchange - order by s.volume desc - """ - - data = query_questdb(query, timeout=15) + logger.warning(f"No pre-calculated volume changes found for {days} days. Analytics worker should calculate this data.") + return { + 'columns': [ + {'name': 'exchange'}, + {'name': 'trade_count'}, + {'name': 'volume'}, + {'name': 'count_change_pct'}, + {'name': 'volume_change_pct'}, + {'name': 'trend'} + ], + 'dataset': [] + } return format_questdb_response(data) diff --git a/src/analytics/__pycache__/worker.cpython-313.pyc b/src/analytics/__pycache__/worker.cpython-313.pyc index 563a8ba..d238429 100644 Binary files a/src/analytics/__pycache__/worker.cpython-313.pyc and b/src/analytics/__pycache__/worker.cpython-313.pyc differ