Fix: Analytics Worker berechnet jetzt alle Tabellen pro Tag
All checks were successful
Deployment / deploy-docker (push) Successful in 17s

This commit is contained in:
Melchior Reimers
2026-01-27 14:12:26 +01:00
parent 4fd93541a2
commit 459c24fcd3
3 changed files with 74 additions and 170 deletions

View File

@@ -4,8 +4,11 @@ from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
import requests import requests
import os import os
import logging
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
app = FastAPI(title="Trading Dashboard API") app = FastAPI(title="Trading Dashboard API")
app.add_middleware( app.add_middleware(
@@ -85,21 +88,11 @@ async def get_trades(isin: str = None, days: int = 7):
data = query_questdb(query) data = query_questdb(query)
# Fallback: Wenn analytics_exchange_daily leer ist, berechne direkt aus trades # Wenn analytics_exchange_daily leer ist, gib leere Daten zurück
# Der Analytics Worker sollte die Daten berechnen
if not data or not data.get('dataset') or len(data.get('dataset', [])) == 0: if not data or not data.get('dataset') or len(data.get('dataset', [])) == 0:
logger.info(f"analytics_exchange_daily is empty, calculating from trades table") logger.warning(f"analytics_exchange_daily is empty. Analytics worker should calculate this data.")
query = f""" return {'columns': [{'name': 'date'}, {'name': 'exchange'}, {'name': 'trade_count'}, {'name': 'volume'}], 'dataset': []}
select
date_trunc('day', timestamp) as date,
exchange,
count(*) as trade_count,
sum(price * quantity) as volume
from trades
where timestamp >= dateadd('d', -{days}, now())
group by date, exchange
order by date desc, exchange asc
"""
data = query_questdb(query)
return format_questdb_response(data) return format_questdb_response(data)
@@ -136,25 +129,18 @@ async def get_summary(days: int = None):
data = query_questdb(query) data = query_questdb(query)
# Fallback: Wenn analytics_daily_summary leer ist, berechne direkt aus trades # Wenn analytics_daily_summary leer ist, gib leere Daten zurück
# Der Analytics Worker sollte die Daten berechnen
if not data or not data.get('dataset') or not data['dataset']: if not data or not data.get('dataset') or not data['dataset']:
logger.info(f"analytics_daily_summary is empty, calculating from trades table") logger.warning(f"analytics_daily_summary is empty. Analytics worker should calculate this data.")
if days: return {
query = f""" 'columns': [
select {'name': 'continent'},
count(*) as total_trades, {'name': 'trade_count'},
sum(price * quantity) as total_volume {'name': 'total_volume'}
from trades ],
where timestamp >= dateadd('d', -{days}, now()) 'dataset': [['All', 0, 0.0]]
""" }
else:
query = """
select
count(*) as total_trades,
sum(price * quantity) as total_volume
from trades
"""
data = query_questdb(query)
if data and data.get('dataset') and data['dataset']: if data and data.get('dataset') and data['dataset']:
total_trades = data['dataset'][0][0] if data['dataset'][0][0] else 0 total_trades = data['dataset'][0][0] if data['dataset'][0][0] else 0
@@ -170,18 +156,15 @@ async def get_summary(days: int = None):
'dataset': [['All', total_trades, total_volume]] 'dataset': [['All', total_trades, total_volume]]
} }
# Fallback: Original Query # Wenn keine Daten vorhanden, gib leere Daten zurück
query = """ return {
select 'columns': [
coalesce(m.continent, 'Unknown') as continent, {'name': 'continent'},
count(*) as trade_count, {'name': 'trade_count'},
sum(t.price * t.quantity) as total_volume {'name': 'total_volume'}
from trades t ],
left join metadata m on t.isin = m.isin 'dataset': [['All', 0, 0.0]]
group by continent }
"""
data = query_questdb(query)
return format_questdb_response(data)
@app.get("/api/statistics/total-trades") @app.get("/api/statistics/total-trades")
async def get_total_trades(days: int = None): async def get_total_trades(days: int = None):
@@ -231,7 +214,11 @@ async def get_custom_analytics(
# Für Custom Analytics: x_axis muss "date" sein (wird täglich vorberechnet) # Für Custom Analytics: x_axis muss "date" sein (wird täglich vorberechnet)
if x_axis != "date": if x_axis != "date":
# Fallback auf direkte Query für nicht-date x_axis # Für nicht-date x_axis: gib Fehler zurück, da dies nicht vorberechnet wird
raise HTTPException(
status_code=400,
detail="x_axis must be 'date' for pre-calculated analytics. Other x_axis values are not supported for performance reasons."
)
y_axis_map = { y_axis_map = {
"volume": "sum(price * quantity)", "volume": "sum(price * quantity)",
"trade_count": "count(*)", "trade_count": "count(*)",
@@ -278,22 +265,12 @@ async def get_custom_analytics(
exchange_list = [e.strip() for e in exchanges.split(",")] exchange_list = [e.strip() for e in exchanges.split(",")]
if len(exchange_list) == 1: if len(exchange_list) == 1:
exchange_filter = exchange_list[0] exchange_filter = exchange_list[0]
# Bei mehreren Exchanges: Fallback auf direkte Query
else: else:
query = f""" # Bei mehreren Exchanges: gib Fehler zurück, da dies nicht vorberechnet wird
select raise HTTPException(
timestamp as x_value, status_code=400,
{group_by} as group_value, detail="Multiple exchanges are not supported for pre-calculated analytics. Please specify a single exchange or leave empty for all exchanges."
{'sum(price * quantity)' if y_axis == 'volume' else 'count(*)' if y_axis == 'trade_count' else 'avg(price)'} as y_value )
from trades
where timestamp >= '{date_from}'
and timestamp <= '{date_to}'
and exchange in ({','.join([f"'{e}'" for e in exchange_list])})
group by timestamp, {group_by}
order by timestamp asc, {group_by} asc
"""
data = query_questdb(query, timeout=15)
return format_questdb_response(data)
# Query für vorberechnete Daten # Query für vorberechnete Daten
query = f""" query = f"""
@@ -312,39 +289,16 @@ async def get_custom_analytics(
data = query_questdb(query, timeout=5) data = query_questdb(query, timeout=5)
if not data or not data.get('dataset'): if not data or not data.get('dataset'):
# Fallback: direkte Query wenn keine vorberechneten Daten vorhanden # Wenn keine vorberechneten Daten vorhanden, gib leere Daten zurück
logger.warning(f"No pre-calculated data found, falling back to direct query") logger.warning(f"No pre-calculated data found in analytics_custom. Analytics worker should calculate this data.")
y_axis_map = { return {
"volume": "sum(price * quantity)", 'columns': [
"trade_count": "count(*)", {'name': 'x_value'},
"avg_price": "avg(price)" {'name': 'group_value'},
{'name': 'y_value'}
],
'dataset': []
} }
group_by_map = {
"exchange": "exchange",
"isin": "isin",
"date": "date_trunc('day', timestamp)"
}
y_metric = y_axis_map[y_axis]
group_by_field = group_by_map[group_by]
query = f"""
select
date_trunc('day', timestamp) as x_value,
{group_by_field} as group_value,
{y_metric} as y_value
from trades
where timestamp >= '{date_from}'
and timestamp <= '{date_to}'
"""
if exchanges:
exchange_list = ",".join([f"'{e.strip()}'" for e in exchanges.split(",")])
query += f" and exchange in ({exchange_list})"
query += f" group by date_trunc('day', timestamp), {group_by_field} order by x_value asc, group_value asc"
data = query_questdb(query, timeout=15)
return format_questdb_response(data) return format_questdb_response(data)
@@ -376,25 +330,21 @@ async def get_moving_average(days: int = 7, exchange: str = None):
data = query_questdb(query, timeout=5) data = query_questdb(query, timeout=5)
# Fallback: Wenn analytics_exchange_daily leer ist, berechne direkt aus trades # Wenn analytics_exchange_daily leer ist, gib leere Daten zurück
# Der Analytics Worker sollte die Daten berechnen
if not data or not data.get('dataset') or len(data.get('dataset', [])) == 0: if not data or not data.get('dataset') or len(data.get('dataset', [])) == 0:
logger.info(f"analytics_exchange_daily is empty, calculating moving average from trades table") logger.warning(f"analytics_exchange_daily is empty. Analytics worker should calculate this data.")
# Berechne Moving Average direkt aus trades (vereinfacht, ohne echte MA-Berechnung) return {
query = f""" 'columns': [
select {'name': 'date'},
date_trunc('day', timestamp) as date, {'name': 'exchange'},
exchange, {'name': 'trade_count'},
count(*) as trade_count, {'name': 'volume'},
sum(price * quantity) as volume, {'name': 'ma_count'},
count(*) as ma_count, {'name': 'ma_volume'}
sum(price * quantity) as ma_volume ],
from trades 'dataset': []
where timestamp >= dateadd('d', -{days}, now()) }
"""
if exchange:
query += f" and exchange = '{exchange}'"
query += " group by date, exchange order by date asc, exchange asc"
data = query_questdb(query, timeout=10)
return format_questdb_response(data) return format_questdb_response(data)
@@ -424,67 +374,21 @@ async def get_volume_changes(days: int = 7):
data = query_questdb(query, timeout=5) data = query_questdb(query, timeout=5)
# Falls keine vorberechneten Daten vorhanden, berechne on-the-fly # Wenn keine vorberechneten Daten vorhanden, gib leere Daten zurück
# Der Analytics Worker sollte die Daten berechnen
if not data or not data.get('dataset'): if not data or not data.get('dataset'):
logger.info(f"No pre-calculated volume changes found for {days} days, calculating on-the-fly") logger.warning(f"No pre-calculated volume changes found for {days} days. Analytics worker should calculate this data.")
return {
# Berechne Volumen-Änderungen direkt aus trades 'columns': [
query = f""" {'name': 'exchange'},
with {'name': 'trade_count'},
first_half as ( {'name': 'volume'},
select {'name': 'count_change_pct'},
exchange, {'name': 'volume_change_pct'},
count(*) as trade_count, {'name': 'trend'}
sum(price * quantity) as volume ],
from trades 'dataset': []
where timestamp >= dateadd('d', -{days}, now()) }
and timestamp < dateadd('d', -{days/2}, now())
group by exchange
),
second_half as (
select
exchange,
count(*) as trade_count,
sum(price * quantity) as volume
from trades
where timestamp >= dateadd('d', -{days/2}, now())
group by exchange
)
select
coalesce(f.exchange, s.exchange) as exchange,
coalesce(s.trade_count, 0) as trade_count,
coalesce(s.volume, 0) as volume,
case when f.trade_count > 0 then
((coalesce(s.trade_count, 0) - f.trade_count) * 100.0 / f.trade_count)
else 0 end as count_change_pct,
case when f.volume > 0 then
((coalesce(s.volume, 0) - f.volume) * 100.0 / f.volume)
else 0 end as volume_change_pct,
case
when f.trade_count > 0 and f.volume > 0 then
case
when ((coalesce(s.trade_count, 0) - f.trade_count) * 100.0 / f.trade_count) > 5
and ((coalesce(s.volume, 0) - f.volume) * 100.0 / f.volume) > 5
then 'mehr_trades_mehr_volumen'
when ((coalesce(s.trade_count, 0) - f.trade_count) * 100.0 / f.trade_count) > 5
and ((coalesce(s.volume, 0) - f.volume) * 100.0 / f.volume) < -5
then 'mehr_trades_weniger_volumen'
when ((coalesce(s.trade_count, 0) - f.trade_count) * 100.0 / f.trade_count) < -5
and ((coalesce(s.volume, 0) - f.volume) * 100.0 / f.volume) > 5
then 'weniger_trades_mehr_volumen'
when ((coalesce(s.trade_count, 0) - f.trade_count) * 100.0 / f.trade_count) < -5
and ((coalesce(s.volume, 0) - f.volume) * 100.0 / f.volume) < -5
then 'weniger_trades_weniger_volumen'
else 'stabil'
end
else 'neu'
end as trend
from first_half f
full outer join second_half s on f.exchange = s.exchange
order by s.volume desc
"""
data = query_questdb(query, timeout=15)
return format_questdb_response(data) return format_questdb_response(data)