from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse import requests import os import logging from typing import Optional, Dict, Any logger = logging.getLogger(__name__) app = FastAPI(title="Trading Dashboard API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # Serve static files app.mount("/static", StaticFiles(directory="dashboard/public"), name="static") @app.get("/") async def read_index(): return FileResponse('dashboard/public/index.html') # QuestDB Konfiguration DB_USER = os.getenv("DB_USER", "admin") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None DB_HOST = os.getenv("DB_HOST", "questdb") DB_URL = f"http://{DB_HOST}:9000" # Hilfsfunktionen def query_questdb(query: str, timeout: int = 10) -> Optional[Dict[str, Any]]: """Zentrale QuestDB-Abfrage-Funktion""" try: response = requests.get(f"{DB_URL}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout) if response.status_code == 200: return response.json() else: raise HTTPException(status_code=response.status_code, detail=f"QuestDB error: {response.text}") except requests.exceptions.Timeout: raise HTTPException(status_code=504, detail="QuestDB query timeout") except requests.exceptions.RequestException as e: raise HTTPException(status_code=500, detail=f"QuestDB connection error: {str(e)}") def format_questdb_response(data: Dict[str, Any]) -> Dict[str, Any]: """Einheitliche Formatierung der QuestDB-Antworten""" if not data: return {'columns': [], 'dataset': []} return data # API Endpunkte @app.get("/api/trades") async def get_trades(isin: str = None, days: int = 7): """ Gibt aggregierte Analyse aller Trades zurück (nicht einzelne Trades). Nutzt vorberechnete Daten aus analytics_exchange_daily. """ if isin: # Für spezifische ISIN: hole aus trades Tabelle query = f""" select date_trunc('day', timestamp) as date, count(*) as trade_count, sum(price * quantity) as volume, avg(price) as avg_price from trades where isin = '{isin}' and timestamp > dateadd('d', -{days}, now()) group by date order by date desc """ else: # Aggregierte Daten aus analytics_exchange_daily query = f""" select timestamp as date, exchange, trade_count, volume from analytics_exchange_daily where timestamp >= dateadd('d', -{days}, now()) order by date desc, exchange asc """ data = query_questdb(query) # Wenn analytics_exchange_daily leer ist, gib leere Daten zurück # Der Analytics Worker sollte die Daten berechnen if not data or not data.get('dataset') or len(data.get('dataset', [])) == 0: logger.warning(f"analytics_exchange_daily is empty. Analytics worker should calculate this data.") return {'columns': [{'name': 'date'}, {'name': 'exchange'}, {'name': 'trade_count'}, {'name': 'volume'}], 'dataset': []} return format_questdb_response(data) @app.get("/api/metadata") async def get_metadata(): """Gibt alle Metadata zurück""" query = "select * from metadata" data = query_questdb(query) return format_questdb_response(data) @app.get("/api/summary") async def get_summary(days: int = None): """ Gibt Zusammenfassung zurück. Nutzt analytics_daily_summary für total_trades. Optional: days Parameter für Zeitraum-basierte Zusammenfassung. """ if days: # Zeitraum-basierte Zusammenfassung query = f""" select sum(total_trades) as total_trades, sum(total_volume) as total_volume from analytics_daily_summary where timestamp >= dateadd('d', -{days}, now()) """ else: # Gesamtzahl aller Trades query = """ select sum(total_trades) as total_trades, sum(total_volume) as total_volume from analytics_daily_summary """ data = query_questdb(query) # Wenn analytics_daily_summary leer ist, gib leere Daten zurück # Der Analytics Worker sollte die Daten berechnen if not data or not data.get('dataset') or not data['dataset']: logger.warning(f"analytics_daily_summary is empty. Analytics worker should calculate this data.") return { 'columns': [ {'name': 'continent'}, {'name': 'trade_count'}, {'name': 'total_volume'} ], 'dataset': [['All', 0, 0.0]] } if data and data.get('dataset') and data['dataset']: total_trades = data['dataset'][0][0] if data['dataset'][0][0] else 0 total_volume = data['dataset'][0][1] if data['dataset'][0][1] else 0.0 # Formatiere für Kompatibilität return { 'columns': [ {'name': 'continent'}, {'name': 'trade_count'}, {'name': 'total_volume'} ], 'dataset': [['All', total_trades, total_volume]] } # Wenn keine Daten vorhanden, gib leere Daten zurück return { 'columns': [ {'name': 'continent'}, {'name': 'trade_count'}, {'name': 'total_volume'} ], 'dataset': [['All', 0, 0.0]] } @app.get("/api/statistics/total-trades") async def get_total_trades(days: int = None): """Gibt Gesamtzahl aller Trades zurück (aus analytics_daily_summary). Optional: days Parameter für Zeitraum.""" if days: query = f"select sum(total_trades) as total from analytics_daily_summary where timestamp >= dateadd('d', -{days}, now())" else: query = "select sum(total_trades) as total from analytics_daily_summary" data = query_questdb(query) if data and data.get('dataset') and data['dataset']: total = data['dataset'][0][0] if data['dataset'][0][0] else 0 return {'total_trades': total} return {'total_trades': 0} @app.get("/api/custom-analytics") async def get_custom_analytics( date_from: str, date_to: str, x_axis: str = "date", y_axis: str = "volume", group_by: str = "exchange", exchanges: str = None ): """ Flexibler Analytics-Endpunkt für custom Graphen. Nutzt vorberechnete Daten aus analytics_custom für bessere Performance. Parameters: - date_from: Startdatum (YYYY-MM-DD) - date_to: Enddatum (YYYY-MM-DD) - x_axis: X-Achse (date, exchange, isin) - aktuell nur "date" unterstützt - y_axis: Y-Achse (volume, trade_count, avg_price) - group_by: Gruppierung (exchange, isin, date) - exchanges: Komma-separierte Liste von Exchanges (optional) """ # Validiere Parameter valid_x_axis = ["date", "exchange", "isin"] valid_y_axis = ["volume", "trade_count", "avg_price"] valid_group_by = ["exchange", "isin", "date"] if x_axis not in valid_x_axis: raise HTTPException(status_code=400, detail=f"Invalid x_axis. Must be one of: {valid_x_axis}") if y_axis not in valid_y_axis: raise HTTPException(status_code=400, detail=f"Invalid y_axis. Must be one of: {valid_y_axis}") if group_by not in valid_group_by: raise HTTPException(status_code=400, detail=f"Invalid group_by. Must be one of: {valid_group_by}") # Für Custom Analytics: x_axis muss "date" sein (wird täglich vorberechnet) if x_axis != "date": # Für nicht-date x_axis: gib Fehler zurück, da dies nicht vorberechnet wird raise HTTPException( status_code=400, detail="x_axis must be 'date' for pre-calculated analytics. Other x_axis values are not supported for performance reasons." ) y_axis_map = { "volume": "sum(price * quantity)", "trade_count": "count(*)", "avg_price": "avg(price)" } x_axis_map = { "exchange": "exchange", "isin": "isin" } group_by_map = { "exchange": "exchange", "isin": "isin", "date": "date_trunc('day', timestamp)" } y_metric = y_axis_map[y_axis] x_label = x_axis_map[x_axis] group_by_field = group_by_map[group_by] query = f""" select {x_label} as x_value, {group_by_field} as group_value, {y_metric} as y_value from trades where timestamp >= '{date_from}' and timestamp <= '{date_to}' """ if exchanges: exchange_list = ",".join([f"'{e.strip()}'" for e in exchanges.split(",")]) query += f" and exchange in ({exchange_list})" query += f" group by {x_label}, {group_by_field} order by {x_label} asc, {group_by_field} asc" data = query_questdb(query, timeout=15) return format_questdb_response(data) # Nutze vorberechnete Daten aus analytics_custom exchange_filter = "all" if exchanges: # Wenn mehrere Exchanges angegeben, müssen wir kombinieren # Für jetzt: nutze nur wenn ein Exchange angegeben ist exchange_list = [e.strip() for e in exchanges.split(",")] if len(exchange_list) == 1: exchange_filter = exchange_list[0] else: # Bei mehreren Exchanges: gib Fehler zurück, da dies nicht vorberechnet wird raise HTTPException( status_code=400, detail="Multiple exchanges are not supported for pre-calculated analytics. Please specify a single exchange or leave empty for all exchanges." ) # Query für vorberechnete Daten query = f""" select timestamp as x_value, group_value, y_value from analytics_custom where timestamp >= '{date_from}' and timestamp <= '{date_to}' and y_axis = '{y_axis}' and group_by = '{group_by}' and exchange_filter = '{exchange_filter}' order by timestamp asc, group_value asc """ data = query_questdb(query, timeout=5) if not data or not data.get('dataset'): # Wenn keine vorberechneten Daten vorhanden, gib leere Daten zurück logger.warning(f"No pre-calculated data found in analytics_custom. Analytics worker should calculate this data.") return { 'columns': [ {'name': 'x_value'}, {'name': 'group_value'}, {'name': 'y_value'} ], 'dataset': [] } return format_questdb_response(data) @app.get("/api/statistics/moving-average") async def get_moving_average(days: int = 7, exchange: str = None): """ Gibt Moving Average Daten für Tradezahlen und Volumen je Exchange zurück. Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage """ if days not in [7, 30, 42, 69, 180, 365]: raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") query = f""" select timestamp as date, exchange, trade_count, volume, ma{days}_count as ma_count, ma{days}_volume as ma_volume from analytics_exchange_daily where timestamp >= dateadd('d', -{days}, now()) """ if exchange: query += f" and exchange = '{exchange}'" query += " order by date asc, exchange asc" data = query_questdb(query, timeout=5) # Wenn analytics_exchange_daily leer ist, gib leere Daten zurück # Der Analytics Worker sollte die Daten berechnen if not data or not data.get('dataset') or len(data.get('dataset', [])) == 0: logger.warning(f"analytics_exchange_daily is empty. Analytics worker should calculate this data.") return { 'columns': [ {'name': 'date'}, {'name': 'exchange'}, {'name': 'trade_count'}, {'name': 'volume'}, {'name': 'ma_count'}, {'name': 'ma_volume'} ], 'dataset': [] } return format_questdb_response(data) @app.get("/api/statistics/volume-changes") async def get_volume_changes(days: int = 7): """ Gibt Änderungen in Volumen und Anzahl je Exchange zurück. Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage """ if days not in [7, 30, 42, 69, 180, 365]: raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") # Hole die neuesten Daten für den angegebenen Zeitraum query = f""" select exchange, trade_count, volume, count_change_pct, volume_change_pct, trend from analytics_volume_changes where period_days = {days} order by timestamp desc limit 20 """ data = query_questdb(query, timeout=5) # Wenn keine vorberechneten Daten vorhanden, gib leere Daten zurück # Der Analytics Worker sollte die Daten berechnen if not data or not data.get('dataset'): logger.warning(f"No pre-calculated volume changes found for {days} days. Analytics worker should calculate this data.") return { 'columns': [ {'name': 'exchange'}, {'name': 'trade_count'}, {'name': 'volume'}, {'name': 'count_change_pct'}, {'name': 'volume_change_pct'}, {'name': 'trend'} ], 'dataset': [] } return format_questdb_response(data) @app.get("/api/statistics/stock-trends") async def get_stock_trends(days: int = 7, limit: int = 20): """ Gibt Trendanalyse für häufig gehandelte Aktien zurück. Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage """ if days not in [7, 30, 42, 69, 180, 365]: raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") query = f""" select timestamp as date, isin, trade_count, volume, count_change_pct, volume_change_pct from analytics_stock_trends where period_days = {days} and timestamp >= dateadd('d', -{days}, now()) order by volume desc limit {limit} """ data = query_questdb(query, timeout=5) return format_questdb_response(data) @app.get("/api/analytics") async def get_analytics( metric: str = "volume", group_by: str = "day", sub_group_by: str = None, date_from: str = None, date_to: str = None, isins: str = None, continents: str = None ): """Analytics Endpunkt für Report Builder""" composite_keys = ["exchange_continent", "exchange_sector"] needs_metadata = any([ group_by in ["name", "continent", "sector"] + composite_keys, sub_group_by in ["name", "continent", "sector"] + composite_keys, continents is not None ]) t_prefix = "t." if needs_metadata else "" m_prefix = "m." if needs_metadata else "" metrics_map = { "volume": f"sum({t_prefix}price * {t_prefix}quantity)", "count": f"count(*)", "avg_price": f"avg({t_prefix}price)" } groups_map = { "day": f"date_trunc('day', {t_prefix}timestamp)", "month": f"date_trunc('month', {t_prefix}timestamp)", "exchange": f"{t_prefix}exchange", "isin": f"{t_prefix}isin", "name": f"coalesce({m_prefix}name, {t_prefix}isin)" if needs_metadata else "isin", "continent": f"coalesce({m_prefix}continent, 'Unknown')" if needs_metadata else "'Unknown'", "sector": f"coalesce({m_prefix}sector, 'Unknown')" if needs_metadata else "'Unknown'", "exchange_continent": f"concat({t_prefix}exchange, ' - ', coalesce({m_prefix}continent, 'Unknown'))" if needs_metadata else "'Unknown'", "exchange_sector": f"concat({t_prefix}exchange, ' - ', coalesce({m_prefix}sector, 'Unknown'))" if needs_metadata else "'Unknown'" } selected_metric = metrics_map.get(metric, metrics_map["volume"]) selected_group = groups_map.get(group_by, groups_map["day"]) query = f"select {selected_group} as label" if sub_group_by and sub_group_by in groups_map: query += f", {groups_map[sub_group_by]} as sub_label" if metric == 'all': query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades" else: query += f", {selected_metric} as value from trades" if needs_metadata: query += " t left join metadata m on t.isin = m.isin" query += " where 1=1" if date_from: query += f" and {t_prefix}timestamp >= '{date_from}'" if date_to: query += f" and {t_prefix}timestamp <= '{date_to}'" if isins: isins_list = ",".join([f"'{i.strip()}'" for i in isins.split(",")]) query += f" and {t_prefix}isin in ({isins_list})" if continents and needs_metadata: cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")]) query += f" and {m_prefix}continent in ({cont_list})" query += f" group by {selected_group}" if sub_group_by and sub_group_by in groups_map: query += f", {groups_map[sub_group_by]}" query += " order by label asc" data = query_questdb(query) return format_questdb_response(data) @app.get("/api/metadata/search") async def search_metadata(q: str): """Case-insensitive search for ISIN or Name""" query = f"select isin, name from metadata where isin ilike '%{q}%' or name ilike '%{q}%' limit 10" data = query_questdb(query) return format_questdb_response(data) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)