This commit is contained in:
@@ -169,11 +169,12 @@ async def get_custom_analytics(
|
||||
):
|
||||
"""
|
||||
Flexibler Analytics-Endpunkt für custom Graphen.
|
||||
Nutzt vorberechnete Daten aus analytics_custom für bessere Performance.
|
||||
|
||||
Parameters:
|
||||
- date_from: Startdatum (YYYY-MM-DD)
|
||||
- date_to: Enddatum (YYYY-MM-DD)
|
||||
- x_axis: X-Achse (date, exchange, isin)
|
||||
- x_axis: X-Achse (date, exchange, isin) - aktuell nur "date" unterstützt
|
||||
- y_axis: Y-Achse (volume, trade_count, avg_price)
|
||||
- group_by: Gruppierung (exchange, isin, date)
|
||||
- exchanges: Komma-separierte Liste von Exchanges (optional)
|
||||
@@ -190,19 +191,18 @@ async def get_custom_analytics(
|
||||
if group_by not in valid_group_by:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid group_by. Must be one of: {valid_group_by}")
|
||||
|
||||
# Baue Query auf
|
||||
# Für Custom Analytics: x_axis muss "date" sein (wird täglich vorberechnet)
|
||||
if x_axis != "date":
|
||||
# Fallback auf direkte Query für nicht-date x_axis
|
||||
y_axis_map = {
|
||||
"volume": "sum(price * quantity)",
|
||||
"trade_count": "count(*)",
|
||||
"avg_price": "avg(price)"
|
||||
}
|
||||
|
||||
x_axis_map = {
|
||||
"date": "date_trunc('day', timestamp)",
|
||||
"exchange": "exchange",
|
||||
"isin": "isin"
|
||||
}
|
||||
|
||||
group_by_map = {
|
||||
"exchange": "exchange",
|
||||
"isin": "isin",
|
||||
@@ -232,6 +232,84 @@ async def get_custom_analytics(
|
||||
data = query_questdb(query, timeout=15)
|
||||
return format_questdb_response(data)
|
||||
|
||||
# Nutze vorberechnete Daten aus analytics_custom
|
||||
exchange_filter = "all"
|
||||
if exchanges:
|
||||
# Wenn mehrere Exchanges angegeben, müssen wir kombinieren
|
||||
# Für jetzt: nutze nur wenn ein Exchange angegeben ist
|
||||
exchange_list = [e.strip() for e in exchanges.split(",")]
|
||||
if len(exchange_list) == 1:
|
||||
exchange_filter = exchange_list[0]
|
||||
# Bei mehreren Exchanges: Fallback auf direkte Query
|
||||
else:
|
||||
query = f"""
|
||||
select
|
||||
timestamp as x_value,
|
||||
{group_by} as group_value,
|
||||
{'sum(price * quantity)' if y_axis == 'volume' else 'count(*)' if y_axis == 'trade_count' else 'avg(price)'} as y_value
|
||||
from trades
|
||||
where timestamp >= '{date_from}'
|
||||
and timestamp <= '{date_to}'
|
||||
and exchange in ({','.join([f"'{e}'" for e in exchange_list])})
|
||||
group by timestamp, {group_by}
|
||||
order by timestamp asc, {group_by} asc
|
||||
"""
|
||||
data = query_questdb(query, timeout=15)
|
||||
return format_questdb_response(data)
|
||||
|
||||
# Query für vorberechnete Daten
|
||||
query = f"""
|
||||
select
|
||||
timestamp as x_value,
|
||||
group_value,
|
||||
y_value
|
||||
from analytics_custom
|
||||
where timestamp >= '{date_from}'
|
||||
and timestamp <= '{date_to}'
|
||||
and y_axis = '{y_axis}'
|
||||
and group_by = '{group_by}'
|
||||
and exchange_filter = '{exchange_filter}'
|
||||
order by timestamp asc, group_value asc
|
||||
"""
|
||||
|
||||
data = query_questdb(query, timeout=5)
|
||||
if not data or not data.get('dataset'):
|
||||
# Fallback: direkte Query wenn keine vorberechneten Daten vorhanden
|
||||
logger.warning(f"No pre-calculated data found, falling back to direct query")
|
||||
y_axis_map = {
|
||||
"volume": "sum(price * quantity)",
|
||||
"trade_count": "count(*)",
|
||||
"avg_price": "avg(price)"
|
||||
}
|
||||
group_by_map = {
|
||||
"exchange": "exchange",
|
||||
"isin": "isin",
|
||||
"date": "date_trunc('day', timestamp)"
|
||||
}
|
||||
|
||||
y_metric = y_axis_map[y_axis]
|
||||
group_by_field = group_by_map[group_by]
|
||||
|
||||
query = f"""
|
||||
select
|
||||
date_trunc('day', timestamp) as x_value,
|
||||
{group_by_field} as group_value,
|
||||
{y_metric} as y_value
|
||||
from trades
|
||||
where timestamp >= '{date_from}'
|
||||
and timestamp <= '{date_to}'
|
||||
"""
|
||||
|
||||
if exchanges:
|
||||
exchange_list = ",".join([f"'{e.strip()}'" for e in exchanges.split(",")])
|
||||
query += f" and exchange in ({exchange_list})"
|
||||
|
||||
query += f" group by date_trunc('day', timestamp), {group_by_field} order by x_value asc, group_value asc"
|
||||
|
||||
data = query_questdb(query, timeout=15)
|
||||
|
||||
return format_questdb_response(data)
|
||||
|
||||
@app.get("/api/statistics/moving-average")
|
||||
async def get_moving_average(days: int = 7, exchange: str = None):
|
||||
"""
|
||||
|
||||
@@ -330,9 +330,18 @@ class AnalyticsWorker:
|
||||
dt = datetime.datetime.fromisoformat(row['date'].replace('Z', '+00:00'))
|
||||
elif isinstance(row['date'], pd.Timestamp):
|
||||
dt = row['date'].to_pydatetime()
|
||||
elif isinstance(row['date'], datetime.date):
|
||||
dt = datetime.datetime.combine(row['date'], datetime.time(), tzinfo=datetime.timezone.utc)
|
||||
else:
|
||||
dt = row['date']
|
||||
timestamp_ns = int(dt.timestamp() * 1e9)
|
||||
elif 'x_value' in row and isinstance(row['x_value'], (datetime.datetime, datetime.date)):
|
||||
# Für Custom Analytics: nutze x_value als Timestamp
|
||||
if isinstance(row['x_value'], datetime.date):
|
||||
dt = datetime.datetime.combine(row['x_value'], datetime.time(), tzinfo=datetime.timezone.utc)
|
||||
else:
|
||||
dt = row['x_value']
|
||||
timestamp_ns = int(dt.timestamp() * 1e9)
|
||||
else:
|
||||
timestamp_ns = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1e9)
|
||||
|
||||
@@ -355,9 +364,30 @@ class AnalyticsWorker:
|
||||
trend = str(row['trend']).replace(' ', '\\ ').replace(',', '\\,')
|
||||
tags.append(f"trend={trend}")
|
||||
|
||||
# Custom Analytics Tags
|
||||
if 'y_axis' in row and row['y_axis']:
|
||||
tags.append(f"y_axis={row['y_axis']}")
|
||||
if 'group_by' in row and row['group_by']:
|
||||
tags.append(f"group_by={row['group_by']}")
|
||||
if 'exchange_filter' in row and row['exchange_filter']:
|
||||
exchange_filter = str(row['exchange_filter']).replace(' ', '\\ ').replace(',', '\\,')
|
||||
tags.append(f"exchange_filter={exchange_filter}")
|
||||
|
||||
# Custom Analytics Felder (nur für analytics_custom Tabelle)
|
||||
if table_name == 'analytics_custom':
|
||||
# x_value wird nicht als separates Feld gespeichert - der timestamp_ns ist bereits x_value
|
||||
# group_value als String-Feld
|
||||
if 'group_value' in row and row['group_value'] is not None:
|
||||
group_val = str(row['group_value']).replace('"', '\\"').replace(' ', '\\ ')
|
||||
fields.append(f'group_value="{group_val}"')
|
||||
|
||||
# y_value als numerisches Feld
|
||||
if 'y_value' in row and row['y_value'] is not None:
|
||||
fields.append(f"y_value={row['y_value']}")
|
||||
|
||||
# Numerische Felder (period_days muss als Feld gespeichert werden, nicht als Tag)
|
||||
for key, value in row.items():
|
||||
if key in ['date', 'exchange', 'isin', 'trend', 'exchanges']:
|
||||
if key in ['date', 'exchange', 'isin', 'trend', 'exchanges', 'y_axis', 'group_by', 'exchange_filter', 'x_value', 'group_value', 'y_value']:
|
||||
continue
|
||||
if value is not None:
|
||||
if isinstance(value, (int, float)):
|
||||
@@ -398,6 +428,78 @@ class AnalyticsWorker:
|
||||
except Exception as e:
|
||||
logger.error(f"Error connecting to QuestDB: {e}")
|
||||
|
||||
def calculate_custom_analytics_daily(self, date: datetime.date) -> List[Dict]:
|
||||
"""Berechnet Custom Analytics für einen Tag - für alle Kombinationen von Y-Achse und Gruppierung"""
|
||||
date_str = date.strftime('%Y-%m-%d')
|
||||
results = []
|
||||
|
||||
# Hole alle verfügbaren Exchanges
|
||||
exchanges_query = "select distinct exchange from trades"
|
||||
exchanges_data = self.query_questdb(exchanges_query)
|
||||
all_exchanges = []
|
||||
if exchanges_data and exchanges_data.get('dataset'):
|
||||
all_exchanges = [row[0] for row in exchanges_data['dataset'] if row[0]]
|
||||
|
||||
# Kombinationen: Y-Achse x Gruppierung x Exchange-Filter
|
||||
y_axes = ['volume', 'trade_count', 'avg_price']
|
||||
group_bys = ['exchange', 'isin', 'date']
|
||||
exchange_filters = [None] + all_exchanges # None = alle Exchanges
|
||||
|
||||
for y_axis in y_axes:
|
||||
for group_by in group_bys:
|
||||
for exchange_filter in exchange_filters:
|
||||
# Berechne für diesen Tag
|
||||
y_metric_map = {
|
||||
'volume': 'sum(price * quantity)',
|
||||
'trade_count': 'count(*)',
|
||||
'avg_price': 'avg(price)'
|
||||
}
|
||||
|
||||
group_by_map = {
|
||||
'exchange': 'exchange',
|
||||
'isin': 'isin',
|
||||
'date': "date_trunc('day', timestamp)"
|
||||
}
|
||||
|
||||
y_metric = y_metric_map[y_axis]
|
||||
group_by_field = group_by_map[group_by]
|
||||
|
||||
query = f"""
|
||||
select
|
||||
date_trunc('day', timestamp) as x_value,
|
||||
{group_by_field} as group_value,
|
||||
{y_metric} as y_value
|
||||
from trades
|
||||
where date_trunc('day', timestamp) = '{date_str}'
|
||||
"""
|
||||
|
||||
if exchange_filter:
|
||||
query += f" and exchange = '{exchange_filter}'"
|
||||
|
||||
query += f" group by date_trunc('day', timestamp), {group_by_field}"
|
||||
|
||||
data = self.query_questdb(query)
|
||||
if data and data.get('dataset'):
|
||||
for row in data['dataset']:
|
||||
# x_value ist bereits ein Datum/Timestamp
|
||||
x_val = row[0]
|
||||
if isinstance(x_val, str):
|
||||
x_val = datetime.datetime.fromisoformat(x_val.replace('Z', '+00:00'))
|
||||
elif isinstance(x_val, (int, float)):
|
||||
x_val = datetime.datetime.fromtimestamp(x_val / 1000000, tz=datetime.timezone.utc)
|
||||
|
||||
results.append({
|
||||
'date': date,
|
||||
'y_axis': y_axis,
|
||||
'group_by': group_by,
|
||||
'exchange_filter': exchange_filter or 'all',
|
||||
'x_value': x_val,
|
||||
'group_value': row[1] if row[1] else '',
|
||||
'y_value': row[2] if row[2] else 0
|
||||
})
|
||||
|
||||
return results
|
||||
|
||||
def process_all_analytics(self):
|
||||
"""Verarbeitet alle Analytics für alle Zeiträume"""
|
||||
logger.info("Starting analytics processing...")
|
||||
@@ -430,6 +532,65 @@ class AnalyticsWorker:
|
||||
|
||||
logger.info("Analytics processing completed.")
|
||||
|
||||
def process_date(self, date: datetime.date):
|
||||
"""Verarbeitet alle Analytics für einen bestimmten Tag"""
|
||||
logger.info(f"Processing analytics for {date}")
|
||||
|
||||
# Custom Analytics (wichtigste Berechnung für Performance)
|
||||
logger.info(f"Calculating custom analytics for {date}...")
|
||||
custom_data = self.calculate_custom_analytics_daily(date)
|
||||
if custom_data:
|
||||
self.save_analytics_data('analytics_custom', custom_data)
|
||||
logger.info(f"Saved {len(custom_data)} custom analytics rows for {date}")
|
||||
|
||||
logger.info(f"Completed processing for {date}")
|
||||
|
||||
def get_missing_dates(self) -> List[datetime.date]:
|
||||
"""Ermittelt fehlende Tage, die noch berechnet werden müssen"""
|
||||
# Hole das Datum des ersten Trades
|
||||
query = "select min(date_trunc('day', timestamp)) as first_date from trades"
|
||||
data = self.query_questdb(query)
|
||||
if not data or not data.get('dataset') or not data['dataset'][0][0]:
|
||||
logger.info("No trades found in database")
|
||||
return []
|
||||
|
||||
first_date_value = data['dataset'][0][0]
|
||||
if isinstance(first_date_value, str):
|
||||
first_date = datetime.datetime.fromisoformat(first_date_value.replace('Z', '+00:00')).date()
|
||||
else:
|
||||
first_date = datetime.datetime.fromtimestamp(first_date_value / 1000000, tz=datetime.timezone.utc).date()
|
||||
|
||||
# Hole bereits berechnete Daten
|
||||
existing_dates = self.get_existing_dates('analytics_daily_summary')
|
||||
|
||||
# Generiere alle Tage vom ersten Trade bis gestern
|
||||
yesterday = datetime.date.today() - datetime.timedelta(days=1)
|
||||
all_dates = []
|
||||
current = first_date
|
||||
while current <= yesterday:
|
||||
all_dates.append(current)
|
||||
current += datetime.timedelta(days=1)
|
||||
|
||||
# Finde fehlende Tage
|
||||
missing_dates = [d for d in all_dates if d not in existing_dates]
|
||||
logger.info(f"Found {len(missing_dates)} missing dates to calculate (from {len(all_dates)} total dates)")
|
||||
return sorted(missing_dates)
|
||||
|
||||
def process_missing_dates(self):
|
||||
"""Berechnet alle fehlenden Tage"""
|
||||
missing_dates = self.get_missing_dates()
|
||||
if not missing_dates:
|
||||
logger.info("No missing dates to process")
|
||||
return
|
||||
|
||||
logger.info(f"Processing {len(missing_dates)} missing dates...")
|
||||
for i, date in enumerate(missing_dates, 1):
|
||||
logger.info(f"Processing date {i}/{len(missing_dates)}: {date}")
|
||||
self.process_date(date)
|
||||
# Kleine Pause zwischen den Berechnungen
|
||||
if i % 10 == 0:
|
||||
time.sleep(1)
|
||||
|
||||
def run(self):
|
||||
"""Hauptschleife des Workers"""
|
||||
logger.info("Analytics Worker started.")
|
||||
@@ -439,32 +600,30 @@ class AnalyticsWorker:
|
||||
logger.error("Failed to connect to QuestDB. Exiting.")
|
||||
return
|
||||
|
||||
# Initiale Verarbeitung
|
||||
self.process_all_analytics()
|
||||
self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc)
|
||||
# Initiale Berechnung fehlender Tage
|
||||
logger.info("Checking for missing dates...")
|
||||
self.process_missing_dates()
|
||||
|
||||
# Polling-Schleife
|
||||
# Hauptschleife: Warte auf Mitternacht
|
||||
logger.info("Waiting for midnight to process yesterday's data...")
|
||||
while True:
|
||||
try:
|
||||
# Prüfe auf neue Trades
|
||||
last_ts = self.get_last_processed_timestamp()
|
||||
new_trades = self.get_new_trades(since=last_ts)
|
||||
now = datetime.datetime.now()
|
||||
|
||||
if new_trades:
|
||||
logger.info(f"Found {len(new_trades)} new trades, reprocessing analytics...")
|
||||
self.process_all_analytics()
|
||||
self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc)
|
||||
else:
|
||||
logger.debug("No new trades found.")
|
||||
# Prüfe ob es Mitternacht ist (00:00)
|
||||
if now.hour == 0 and now.minute == 0:
|
||||
yesterday = (now - datetime.timedelta(days=1)).date()
|
||||
logger.info(f"Processing yesterday's data: {yesterday}")
|
||||
self.process_date(yesterday)
|
||||
# Warte 61s, um Mehrfachausführung zu verhindern
|
||||
time.sleep(61)
|
||||
|
||||
# Prüfe auch auf fehlende Tage (alle 6 Stunden)
|
||||
if now.hour % 6 == 0 and now.minute == 0:
|
||||
logger.info("Checking for missing dates...")
|
||||
self.process_missing_dates()
|
||||
time.sleep(61)
|
||||
|
||||
# Warte 30 Sekunden vor nächster Prüfung
|
||||
time.sleep(30)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
logger.warning(f"Connection error to QuestDB, retrying in 60s: {e}")
|
||||
time.sleep(60) # Längere Pause bei Verbindungsfehler
|
||||
except Exception as e:
|
||||
logger.error(f"Error in worker loop: {e}", exc_info=True)
|
||||
time.sleep(60) # Längere Pause bei Fehler
|
||||
|
||||
def main():
|
||||
worker = AnalyticsWorker()
|
||||
|
||||
Reference in New Issue
Block a user