Files
trading-daemon/dashboard/server.py
Melchior Reimers 459c24fcd3
All checks were successful
Deployment / deploy-docker (push) Successful in 17s
Fix: Analytics Worker berechnet jetzt alle Tabellen pro Tag
2026-01-27 14:12:26 +01:00

510 lines
18 KiB
Python

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import requests
import os
import logging
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
app = FastAPI(title="Trading Dashboard API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Serve static files
app.mount("/static", StaticFiles(directory="dashboard/public"), name="static")
@app.get("/")
async def read_index():
return FileResponse('dashboard/public/index.html')
# QuestDB Konfiguration
DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
DB_HOST = os.getenv("DB_HOST", "questdb")
DB_URL = f"http://{DB_HOST}:9000"
# Hilfsfunktionen
def query_questdb(query: str, timeout: int = 10) -> Optional[Dict[str, Any]]:
"""Zentrale QuestDB-Abfrage-Funktion"""
try:
response = requests.get(f"{DB_URL}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout)
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=f"QuestDB error: {response.text}")
except requests.exceptions.Timeout:
raise HTTPException(status_code=504, detail="QuestDB query timeout")
except requests.exceptions.RequestException as e:
raise HTTPException(status_code=500, detail=f"QuestDB connection error: {str(e)}")
def format_questdb_response(data: Dict[str, Any]) -> Dict[str, Any]:
"""Einheitliche Formatierung der QuestDB-Antworten"""
if not data:
return {'columns': [], 'dataset': []}
return data
# API Endpunkte
@app.get("/api/trades")
async def get_trades(isin: str = None, days: int = 7):
"""
Gibt aggregierte Analyse aller Trades zurück (nicht einzelne Trades).
Nutzt vorberechnete Daten aus analytics_exchange_daily.
"""
if isin:
# Für spezifische ISIN: hole aus trades Tabelle
query = f"""
select
date_trunc('day', timestamp) as date,
count(*) as trade_count,
sum(price * quantity) as volume,
avg(price) as avg_price
from trades
where isin = '{isin}'
and timestamp > dateadd('d', -{days}, now())
group by date
order by date desc
"""
else:
# Aggregierte Daten aus analytics_exchange_daily
query = f"""
select
timestamp as date,
exchange,
trade_count,
volume
from analytics_exchange_daily
where timestamp >= dateadd('d', -{days}, now())
order by date desc, exchange asc
"""
data = query_questdb(query)
# Wenn analytics_exchange_daily leer ist, gib leere Daten zurück
# Der Analytics Worker sollte die Daten berechnen
if not data or not data.get('dataset') or len(data.get('dataset', [])) == 0:
logger.warning(f"analytics_exchange_daily is empty. Analytics worker should calculate this data.")
return {'columns': [{'name': 'date'}, {'name': 'exchange'}, {'name': 'trade_count'}, {'name': 'volume'}], 'dataset': []}
return format_questdb_response(data)
@app.get("/api/metadata")
async def get_metadata():
"""Gibt alle Metadata zurück"""
query = "select * from metadata"
data = query_questdb(query)
return format_questdb_response(data)
@app.get("/api/summary")
async def get_summary(days: int = None):
"""
Gibt Zusammenfassung zurück. Nutzt analytics_daily_summary für total_trades.
Optional: days Parameter für Zeitraum-basierte Zusammenfassung.
"""
if days:
# Zeitraum-basierte Zusammenfassung
query = f"""
select
sum(total_trades) as total_trades,
sum(total_volume) as total_volume
from analytics_daily_summary
where timestamp >= dateadd('d', -{days}, now())
"""
else:
# Gesamtzahl aller Trades
query = """
select
sum(total_trades) as total_trades,
sum(total_volume) as total_volume
from analytics_daily_summary
"""
data = query_questdb(query)
# Wenn analytics_daily_summary leer ist, gib leere Daten zurück
# Der Analytics Worker sollte die Daten berechnen
if not data or not data.get('dataset') or not data['dataset']:
logger.warning(f"analytics_daily_summary is empty. Analytics worker should calculate this data.")
return {
'columns': [
{'name': 'continent'},
{'name': 'trade_count'},
{'name': 'total_volume'}
],
'dataset': [['All', 0, 0.0]]
}
if data and data.get('dataset') and data['dataset']:
total_trades = data['dataset'][0][0] if data['dataset'][0][0] else 0
total_volume = data['dataset'][0][1] if data['dataset'][0][1] else 0.0
# Formatiere für Kompatibilität
return {
'columns': [
{'name': 'continent'},
{'name': 'trade_count'},
{'name': 'total_volume'}
],
'dataset': [['All', total_trades, total_volume]]
}
# Wenn keine Daten vorhanden, gib leere Daten zurück
return {
'columns': [
{'name': 'continent'},
{'name': 'trade_count'},
{'name': 'total_volume'}
],
'dataset': [['All', 0, 0.0]]
}
@app.get("/api/statistics/total-trades")
async def get_total_trades(days: int = None):
"""Gibt Gesamtzahl aller Trades zurück (aus analytics_daily_summary). Optional: days Parameter für Zeitraum."""
if days:
query = f"select sum(total_trades) as total from analytics_daily_summary where timestamp >= dateadd('d', -{days}, now())"
else:
query = "select sum(total_trades) as total from analytics_daily_summary"
data = query_questdb(query)
if data and data.get('dataset') and data['dataset']:
total = data['dataset'][0][0] if data['dataset'][0][0] else 0
return {'total_trades': total}
return {'total_trades': 0}
@app.get("/api/custom-analytics")
async def get_custom_analytics(
date_from: str,
date_to: str,
x_axis: str = "date",
y_axis: str = "volume",
group_by: str = "exchange",
exchanges: str = None
):
"""
Flexibler Analytics-Endpunkt für custom Graphen.
Nutzt vorberechnete Daten aus analytics_custom für bessere Performance.
Parameters:
- date_from: Startdatum (YYYY-MM-DD)
- date_to: Enddatum (YYYY-MM-DD)
- x_axis: X-Achse (date, exchange, isin) - aktuell nur "date" unterstützt
- y_axis: Y-Achse (volume, trade_count, avg_price)
- group_by: Gruppierung (exchange, isin, date)
- exchanges: Komma-separierte Liste von Exchanges (optional)
"""
# Validiere Parameter
valid_x_axis = ["date", "exchange", "isin"]
valid_y_axis = ["volume", "trade_count", "avg_price"]
valid_group_by = ["exchange", "isin", "date"]
if x_axis not in valid_x_axis:
raise HTTPException(status_code=400, detail=f"Invalid x_axis. Must be one of: {valid_x_axis}")
if y_axis not in valid_y_axis:
raise HTTPException(status_code=400, detail=f"Invalid y_axis. Must be one of: {valid_y_axis}")
if group_by not in valid_group_by:
raise HTTPException(status_code=400, detail=f"Invalid group_by. Must be one of: {valid_group_by}")
# Für Custom Analytics: x_axis muss "date" sein (wird täglich vorberechnet)
if x_axis != "date":
# Für nicht-date x_axis: gib Fehler zurück, da dies nicht vorberechnet wird
raise HTTPException(
status_code=400,
detail="x_axis must be 'date' for pre-calculated analytics. Other x_axis values are not supported for performance reasons."
)
y_axis_map = {
"volume": "sum(price * quantity)",
"trade_count": "count(*)",
"avg_price": "avg(price)"
}
x_axis_map = {
"exchange": "exchange",
"isin": "isin"
}
group_by_map = {
"exchange": "exchange",
"isin": "isin",
"date": "date_trunc('day', timestamp)"
}
y_metric = y_axis_map[y_axis]
x_label = x_axis_map[x_axis]
group_by_field = group_by_map[group_by]
query = f"""
select
{x_label} as x_value,
{group_by_field} as group_value,
{y_metric} as y_value
from trades
where timestamp >= '{date_from}'
and timestamp <= '{date_to}'
"""
if exchanges:
exchange_list = ",".join([f"'{e.strip()}'" for e in exchanges.split(",")])
query += f" and exchange in ({exchange_list})"
query += f" group by {x_label}, {group_by_field} order by {x_label} asc, {group_by_field} asc"
data = query_questdb(query, timeout=15)
return format_questdb_response(data)
# Nutze vorberechnete Daten aus analytics_custom
exchange_filter = "all"
if exchanges:
# Wenn mehrere Exchanges angegeben, müssen wir kombinieren
# Für jetzt: nutze nur wenn ein Exchange angegeben ist
exchange_list = [e.strip() for e in exchanges.split(",")]
if len(exchange_list) == 1:
exchange_filter = exchange_list[0]
else:
# Bei mehreren Exchanges: gib Fehler zurück, da dies nicht vorberechnet wird
raise HTTPException(
status_code=400,
detail="Multiple exchanges are not supported for pre-calculated analytics. Please specify a single exchange or leave empty for all exchanges."
)
# Query für vorberechnete Daten
query = f"""
select
timestamp as x_value,
group_value,
y_value
from analytics_custom
where timestamp >= '{date_from}'
and timestamp <= '{date_to}'
and y_axis = '{y_axis}'
and group_by = '{group_by}'
and exchange_filter = '{exchange_filter}'
order by timestamp asc, group_value asc
"""
data = query_questdb(query, timeout=5)
if not data or not data.get('dataset'):
# Wenn keine vorberechneten Daten vorhanden, gib leere Daten zurück
logger.warning(f"No pre-calculated data found in analytics_custom. Analytics worker should calculate this data.")
return {
'columns': [
{'name': 'x_value'},
{'name': 'group_value'},
{'name': 'y_value'}
],
'dataset': []
}
return format_questdb_response(data)
@app.get("/api/statistics/moving-average")
async def get_moving_average(days: int = 7, exchange: str = None):
"""
Gibt Moving Average Daten für Tradezahlen und Volumen je Exchange zurück.
Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
"""
if days not in [7, 30, 42, 69, 180, 365]:
raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
query = f"""
select
timestamp as date,
exchange,
trade_count,
volume,
ma{days}_count as ma_count,
ma{days}_volume as ma_volume
from analytics_exchange_daily
where timestamp >= dateadd('d', -{days}, now())
"""
if exchange:
query += f" and exchange = '{exchange}'"
query += " order by date asc, exchange asc"
data = query_questdb(query, timeout=5)
# Wenn analytics_exchange_daily leer ist, gib leere Daten zurück
# Der Analytics Worker sollte die Daten berechnen
if not data or not data.get('dataset') or len(data.get('dataset', [])) == 0:
logger.warning(f"analytics_exchange_daily is empty. Analytics worker should calculate this data.")
return {
'columns': [
{'name': 'date'},
{'name': 'exchange'},
{'name': 'trade_count'},
{'name': 'volume'},
{'name': 'ma_count'},
{'name': 'ma_volume'}
],
'dataset': []
}
return format_questdb_response(data)
@app.get("/api/statistics/volume-changes")
async def get_volume_changes(days: int = 7):
"""
Gibt Änderungen in Volumen und Anzahl je Exchange zurück.
Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
"""
if days not in [7, 30, 42, 69, 180, 365]:
raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
# Hole die neuesten Daten für den angegebenen Zeitraum
query = f"""
select
exchange,
trade_count,
volume,
count_change_pct,
volume_change_pct,
trend
from analytics_volume_changes
where period_days = {days}
order by timestamp desc
limit 20
"""
data = query_questdb(query, timeout=5)
# Wenn keine vorberechneten Daten vorhanden, gib leere Daten zurück
# Der Analytics Worker sollte die Daten berechnen
if not data or not data.get('dataset'):
logger.warning(f"No pre-calculated volume changes found for {days} days. Analytics worker should calculate this data.")
return {
'columns': [
{'name': 'exchange'},
{'name': 'trade_count'},
{'name': 'volume'},
{'name': 'count_change_pct'},
{'name': 'volume_change_pct'},
{'name': 'trend'}
],
'dataset': []
}
return format_questdb_response(data)
@app.get("/api/statistics/stock-trends")
async def get_stock_trends(days: int = 7, limit: int = 20):
"""
Gibt Trendanalyse für häufig gehandelte Aktien zurück.
Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
"""
if days not in [7, 30, 42, 69, 180, 365]:
raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
query = f"""
select
timestamp as date,
isin,
trade_count,
volume,
count_change_pct,
volume_change_pct
from analytics_stock_trends
where period_days = {days}
and timestamp >= dateadd('d', -{days}, now())
order by volume desc
limit {limit}
"""
data = query_questdb(query, timeout=5)
return format_questdb_response(data)
@app.get("/api/analytics")
async def get_analytics(
metric: str = "volume",
group_by: str = "day",
sub_group_by: str = None,
date_from: str = None,
date_to: str = None,
isins: str = None,
continents: str = None
):
"""Analytics Endpunkt für Report Builder"""
composite_keys = ["exchange_continent", "exchange_sector"]
needs_metadata = any([
group_by in ["name", "continent", "sector"] + composite_keys,
sub_group_by in ["name", "continent", "sector"] + composite_keys,
continents is not None
])
t_prefix = "t." if needs_metadata else ""
m_prefix = "m." if needs_metadata else ""
metrics_map = {
"volume": f"sum({t_prefix}price * {t_prefix}quantity)",
"count": f"count(*)",
"avg_price": f"avg({t_prefix}price)"
}
groups_map = {
"day": f"date_trunc('day', {t_prefix}timestamp)",
"month": f"date_trunc('month', {t_prefix}timestamp)",
"exchange": f"{t_prefix}exchange",
"isin": f"{t_prefix}isin",
"name": f"coalesce({m_prefix}name, {t_prefix}isin)" if needs_metadata else "isin",
"continent": f"coalesce({m_prefix}continent, 'Unknown')" if needs_metadata else "'Unknown'",
"sector": f"coalesce({m_prefix}sector, 'Unknown')" if needs_metadata else "'Unknown'",
"exchange_continent": f"concat({t_prefix}exchange, ' - ', coalesce({m_prefix}continent, 'Unknown'))" if needs_metadata else "'Unknown'",
"exchange_sector": f"concat({t_prefix}exchange, ' - ', coalesce({m_prefix}sector, 'Unknown'))" if needs_metadata else "'Unknown'"
}
selected_metric = metrics_map.get(metric, metrics_map["volume"])
selected_group = groups_map.get(group_by, groups_map["day"])
query = f"select {selected_group} as label"
if sub_group_by and sub_group_by in groups_map:
query += f", {groups_map[sub_group_by]} as sub_label"
if metric == 'all':
query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades"
else:
query += f", {selected_metric} as value from trades"
if needs_metadata:
query += " t left join metadata m on t.isin = m.isin"
query += " where 1=1"
if date_from:
query += f" and {t_prefix}timestamp >= '{date_from}'"
if date_to:
query += f" and {t_prefix}timestamp <= '{date_to}'"
if isins:
isins_list = ",".join([f"'{i.strip()}'" for i in isins.split(",")])
query += f" and {t_prefix}isin in ({isins_list})"
if continents and needs_metadata:
cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")])
query += f" and {m_prefix}continent in ({cont_list})"
query += f" group by {selected_group}"
if sub_group_by and sub_group_by in groups_map:
query += f", {groups_map[sub_group_by]}"
query += " order by label asc"
data = query_questdb(query)
return format_questdb_response(data)
@app.get("/api/metadata/search")
async def search_metadata(q: str):
"""Case-insensitive search for ISIN or Name"""
query = f"select isin, name from metadata where isin ilike '%{q}%' or name ilike '%{q}%' limit 10"
data = query_questdb(query)
return format_questdb_response(data)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)