From df478b0ea4acae3e127eaf6d0462f5bae807cdd1 Mon Sep 17 00:00:00 2001 From: Melchior Reimers Date: Sun, 25 Jan 2026 18:31:13 +0100 Subject: [PATCH] updated --- dashboard/server.py | 142 +++++++++++++++++++++------ src/analytics/worker.py | 209 +++++++++++++++++++++++++++++++++++----- 2 files changed, 294 insertions(+), 57 deletions(-) diff --git a/dashboard/server.py b/dashboard/server.py index fe59e71..c1c8abe 100644 --- a/dashboard/server.py +++ b/dashboard/server.py @@ -169,11 +169,12 @@ async def get_custom_analytics( ): """ Flexibler Analytics-Endpunkt für custom Graphen. + Nutzt vorberechnete Daten aus analytics_custom für bessere Performance. Parameters: - date_from: Startdatum (YYYY-MM-DD) - date_to: Enddatum (YYYY-MM-DD) - - x_axis: X-Achse (date, exchange, isin) + - x_axis: X-Achse (date, exchange, isin) - aktuell nur "date" unterstützt - y_axis: Y-Achse (volume, trade_count, avg_price) - group_by: Gruppierung (exchange, isin, date) - exchanges: Komma-separierte Liste von Exchanges (optional) @@ -190,46 +191,123 @@ async def get_custom_analytics( if group_by not in valid_group_by: raise HTTPException(status_code=400, detail=f"Invalid group_by. Must be one of: {valid_group_by}") - # Baue Query auf - y_axis_map = { - "volume": "sum(price * quantity)", - "trade_count": "count(*)", - "avg_price": "avg(price)" - } + # Für Custom Analytics: x_axis muss "date" sein (wird täglich vorberechnet) + if x_axis != "date": + # Fallback auf direkte Query für nicht-date x_axis + y_axis_map = { + "volume": "sum(price * quantity)", + "trade_count": "count(*)", + "avg_price": "avg(price)" + } + x_axis_map = { + "exchange": "exchange", + "isin": "isin" + } + group_by_map = { + "exchange": "exchange", + "isin": "isin", + "date": "date_trunc('day', timestamp)" + } + + y_metric = y_axis_map[y_axis] + x_label = x_axis_map[x_axis] + group_by_field = group_by_map[group_by] + + query = f""" + select + {x_label} as x_value, + {group_by_field} as group_value, + {y_metric} as y_value + from trades + where timestamp >= '{date_from}' + and timestamp <= '{date_to}' + """ + + if exchanges: + exchange_list = ",".join([f"'{e.strip()}'" for e in exchanges.split(",")]) + query += f" and exchange in ({exchange_list})" + + query += f" group by {x_label}, {group_by_field} order by {x_label} asc, {group_by_field} asc" + + data = query_questdb(query, timeout=15) + return format_questdb_response(data) - x_axis_map = { - "date": "date_trunc('day', timestamp)", - "exchange": "exchange", - "isin": "isin" - } - - group_by_map = { - "exchange": "exchange", - "isin": "isin", - "date": "date_trunc('day', timestamp)" - } - - y_metric = y_axis_map[y_axis] - x_label = x_axis_map[x_axis] - group_by_field = group_by_map[group_by] + # Nutze vorberechnete Daten aus analytics_custom + exchange_filter = "all" + if exchanges: + # Wenn mehrere Exchanges angegeben, müssen wir kombinieren + # Für jetzt: nutze nur wenn ein Exchange angegeben ist + exchange_list = [e.strip() for e in exchanges.split(",")] + if len(exchange_list) == 1: + exchange_filter = exchange_list[0] + # Bei mehreren Exchanges: Fallback auf direkte Query + else: + query = f""" + select + timestamp as x_value, + {group_by} as group_value, + {'sum(price * quantity)' if y_axis == 'volume' else 'count(*)' if y_axis == 'trade_count' else 'avg(price)'} as y_value + from trades + where timestamp >= '{date_from}' + and timestamp <= '{date_to}' + and exchange in ({','.join([f"'{e}'" for e in exchange_list])}) + group by timestamp, {group_by} + order by timestamp asc, {group_by} asc + """ + data = query_questdb(query, timeout=15) + return format_questdb_response(data) + # Query für vorberechnete Daten query = f""" select - {x_label} as x_value, - {group_by_field} as group_value, - {y_metric} as y_value - from trades + timestamp as x_value, + group_value, + y_value + from analytics_custom where timestamp >= '{date_from}' and timestamp <= '{date_to}' + and y_axis = '{y_axis}' + and group_by = '{group_by}' + and exchange_filter = '{exchange_filter}' + order by timestamp asc, group_value asc """ - if exchanges: - exchange_list = ",".join([f"'{e.strip()}'" for e in exchanges.split(",")]) - query += f" and exchange in ({exchange_list})" + data = query_questdb(query, timeout=5) + if not data or not data.get('dataset'): + # Fallback: direkte Query wenn keine vorberechneten Daten vorhanden + logger.warning(f"No pre-calculated data found, falling back to direct query") + y_axis_map = { + "volume": "sum(price * quantity)", + "trade_count": "count(*)", + "avg_price": "avg(price)" + } + group_by_map = { + "exchange": "exchange", + "isin": "isin", + "date": "date_trunc('day', timestamp)" + } + + y_metric = y_axis_map[y_axis] + group_by_field = group_by_map[group_by] + + query = f""" + select + date_trunc('day', timestamp) as x_value, + {group_by_field} as group_value, + {y_metric} as y_value + from trades + where timestamp >= '{date_from}' + and timestamp <= '{date_to}' + """ + + if exchanges: + exchange_list = ",".join([f"'{e.strip()}'" for e in exchanges.split(",")]) + query += f" and exchange in ({exchange_list})" + + query += f" group by date_trunc('day', timestamp), {group_by_field} order by x_value asc, group_value asc" + + data = query_questdb(query, timeout=15) - query += f" group by {x_label}, {group_by_field} order by {x_label} asc, {group_by_field} asc" - - data = query_questdb(query, timeout=15) return format_questdb_response(data) @app.get("/api/statistics/moving-average") diff --git a/src/analytics/worker.py b/src/analytics/worker.py index d9051b3..907c8c9 100644 --- a/src/analytics/worker.py +++ b/src/analytics/worker.py @@ -330,9 +330,18 @@ class AnalyticsWorker: dt = datetime.datetime.fromisoformat(row['date'].replace('Z', '+00:00')) elif isinstance(row['date'], pd.Timestamp): dt = row['date'].to_pydatetime() + elif isinstance(row['date'], datetime.date): + dt = datetime.datetime.combine(row['date'], datetime.time(), tzinfo=datetime.timezone.utc) else: dt = row['date'] timestamp_ns = int(dt.timestamp() * 1e9) + elif 'x_value' in row and isinstance(row['x_value'], (datetime.datetime, datetime.date)): + # Für Custom Analytics: nutze x_value als Timestamp + if isinstance(row['x_value'], datetime.date): + dt = datetime.datetime.combine(row['x_value'], datetime.time(), tzinfo=datetime.timezone.utc) + else: + dt = row['x_value'] + timestamp_ns = int(dt.timestamp() * 1e9) else: timestamp_ns = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1e9) @@ -355,9 +364,30 @@ class AnalyticsWorker: trend = str(row['trend']).replace(' ', '\\ ').replace(',', '\\,') tags.append(f"trend={trend}") + # Custom Analytics Tags + if 'y_axis' in row and row['y_axis']: + tags.append(f"y_axis={row['y_axis']}") + if 'group_by' in row and row['group_by']: + tags.append(f"group_by={row['group_by']}") + if 'exchange_filter' in row and row['exchange_filter']: + exchange_filter = str(row['exchange_filter']).replace(' ', '\\ ').replace(',', '\\,') + tags.append(f"exchange_filter={exchange_filter}") + + # Custom Analytics Felder (nur für analytics_custom Tabelle) + if table_name == 'analytics_custom': + # x_value wird nicht als separates Feld gespeichert - der timestamp_ns ist bereits x_value + # group_value als String-Feld + if 'group_value' in row and row['group_value'] is not None: + group_val = str(row['group_value']).replace('"', '\\"').replace(' ', '\\ ') + fields.append(f'group_value="{group_val}"') + + # y_value als numerisches Feld + if 'y_value' in row and row['y_value'] is not None: + fields.append(f"y_value={row['y_value']}") + # Numerische Felder (period_days muss als Feld gespeichert werden, nicht als Tag) for key, value in row.items(): - if key in ['date', 'exchange', 'isin', 'trend', 'exchanges']: + if key in ['date', 'exchange', 'isin', 'trend', 'exchanges', 'y_axis', 'group_by', 'exchange_filter', 'x_value', 'group_value', 'y_value']: continue if value is not None: if isinstance(value, (int, float)): @@ -398,6 +428,78 @@ class AnalyticsWorker: except Exception as e: logger.error(f"Error connecting to QuestDB: {e}") + def calculate_custom_analytics_daily(self, date: datetime.date) -> List[Dict]: + """Berechnet Custom Analytics für einen Tag - für alle Kombinationen von Y-Achse und Gruppierung""" + date_str = date.strftime('%Y-%m-%d') + results = [] + + # Hole alle verfügbaren Exchanges + exchanges_query = "select distinct exchange from trades" + exchanges_data = self.query_questdb(exchanges_query) + all_exchanges = [] + if exchanges_data and exchanges_data.get('dataset'): + all_exchanges = [row[0] for row in exchanges_data['dataset'] if row[0]] + + # Kombinationen: Y-Achse x Gruppierung x Exchange-Filter + y_axes = ['volume', 'trade_count', 'avg_price'] + group_bys = ['exchange', 'isin', 'date'] + exchange_filters = [None] + all_exchanges # None = alle Exchanges + + for y_axis in y_axes: + for group_by in group_bys: + for exchange_filter in exchange_filters: + # Berechne für diesen Tag + y_metric_map = { + 'volume': 'sum(price * quantity)', + 'trade_count': 'count(*)', + 'avg_price': 'avg(price)' + } + + group_by_map = { + 'exchange': 'exchange', + 'isin': 'isin', + 'date': "date_trunc('day', timestamp)" + } + + y_metric = y_metric_map[y_axis] + group_by_field = group_by_map[group_by] + + query = f""" + select + date_trunc('day', timestamp) as x_value, + {group_by_field} as group_value, + {y_metric} as y_value + from trades + where date_trunc('day', timestamp) = '{date_str}' + """ + + if exchange_filter: + query += f" and exchange = '{exchange_filter}'" + + query += f" group by date_trunc('day', timestamp), {group_by_field}" + + data = self.query_questdb(query) + if data and data.get('dataset'): + for row in data['dataset']: + # x_value ist bereits ein Datum/Timestamp + x_val = row[0] + if isinstance(x_val, str): + x_val = datetime.datetime.fromisoformat(x_val.replace('Z', '+00:00')) + elif isinstance(x_val, (int, float)): + x_val = datetime.datetime.fromtimestamp(x_val / 1000000, tz=datetime.timezone.utc) + + results.append({ + 'date': date, + 'y_axis': y_axis, + 'group_by': group_by, + 'exchange_filter': exchange_filter or 'all', + 'x_value': x_val, + 'group_value': row[1] if row[1] else '', + 'y_value': row[2] if row[2] else 0 + }) + + return results + def process_all_analytics(self): """Verarbeitet alle Analytics für alle Zeiträume""" logger.info("Starting analytics processing...") @@ -430,6 +532,65 @@ class AnalyticsWorker: logger.info("Analytics processing completed.") + def process_date(self, date: datetime.date): + """Verarbeitet alle Analytics für einen bestimmten Tag""" + logger.info(f"Processing analytics for {date}") + + # Custom Analytics (wichtigste Berechnung für Performance) + logger.info(f"Calculating custom analytics for {date}...") + custom_data = self.calculate_custom_analytics_daily(date) + if custom_data: + self.save_analytics_data('analytics_custom', custom_data) + logger.info(f"Saved {len(custom_data)} custom analytics rows for {date}") + + logger.info(f"Completed processing for {date}") + + def get_missing_dates(self) -> List[datetime.date]: + """Ermittelt fehlende Tage, die noch berechnet werden müssen""" + # Hole das Datum des ersten Trades + query = "select min(date_trunc('day', timestamp)) as first_date from trades" + data = self.query_questdb(query) + if not data or not data.get('dataset') or not data['dataset'][0][0]: + logger.info("No trades found in database") + return [] + + first_date_value = data['dataset'][0][0] + if isinstance(first_date_value, str): + first_date = datetime.datetime.fromisoformat(first_date_value.replace('Z', '+00:00')).date() + else: + first_date = datetime.datetime.fromtimestamp(first_date_value / 1000000, tz=datetime.timezone.utc).date() + + # Hole bereits berechnete Daten + existing_dates = self.get_existing_dates('analytics_daily_summary') + + # Generiere alle Tage vom ersten Trade bis gestern + yesterday = datetime.date.today() - datetime.timedelta(days=1) + all_dates = [] + current = first_date + while current <= yesterday: + all_dates.append(current) + current += datetime.timedelta(days=1) + + # Finde fehlende Tage + missing_dates = [d for d in all_dates if d not in existing_dates] + logger.info(f"Found {len(missing_dates)} missing dates to calculate (from {len(all_dates)} total dates)") + return sorted(missing_dates) + + def process_missing_dates(self): + """Berechnet alle fehlenden Tage""" + missing_dates = self.get_missing_dates() + if not missing_dates: + logger.info("No missing dates to process") + return + + logger.info(f"Processing {len(missing_dates)} missing dates...") + for i, date in enumerate(missing_dates, 1): + logger.info(f"Processing date {i}/{len(missing_dates)}: {date}") + self.process_date(date) + # Kleine Pause zwischen den Berechnungen + if i % 10 == 0: + time.sleep(1) + def run(self): """Hauptschleife des Workers""" logger.info("Analytics Worker started.") @@ -439,32 +600,30 @@ class AnalyticsWorker: logger.error("Failed to connect to QuestDB. Exiting.") return - # Initiale Verarbeitung - self.process_all_analytics() - self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc) + # Initiale Berechnung fehlender Tage + logger.info("Checking for missing dates...") + self.process_missing_dates() - # Polling-Schleife + # Hauptschleife: Warte auf Mitternacht + logger.info("Waiting for midnight to process yesterday's data...") while True: - try: - # Prüfe auf neue Trades - last_ts = self.get_last_processed_timestamp() - new_trades = self.get_new_trades(since=last_ts) - - if new_trades: - logger.info(f"Found {len(new_trades)} new trades, reprocessing analytics...") - self.process_all_analytics() - self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc) - else: - logger.debug("No new trades found.") - - # Warte 30 Sekunden vor nächster Prüfung - time.sleep(30) - except requests.exceptions.ConnectionError as e: - logger.warning(f"Connection error to QuestDB, retrying in 60s: {e}") - time.sleep(60) # Längere Pause bei Verbindungsfehler - except Exception as e: - logger.error(f"Error in worker loop: {e}", exc_info=True) - time.sleep(60) # Längere Pause bei Fehler + now = datetime.datetime.now() + + # Prüfe ob es Mitternacht ist (00:00) + if now.hour == 0 and now.minute == 0: + yesterday = (now - datetime.timedelta(days=1)).date() + logger.info(f"Processing yesterday's data: {yesterday}") + self.process_date(yesterday) + # Warte 61s, um Mehrfachausführung zu verhindern + time.sleep(61) + + # Prüfe auch auf fehlende Tage (alle 6 Stunden) + if now.hour % 6 == 0 and now.minute == 0: + logger.info("Checking for missing dates...") + self.process_missing_dates() + time.sleep(61) + + time.sleep(30) def main(): worker = AnalyticsWorker()