From b4b96b96dc2a97d39a0857ec10cc828236fd7eb7 Mon Sep 17 00:00:00 2001 From: Melchior Reimers Date: Sun, 25 Jan 2026 18:02:20 +0100 Subject: [PATCH] updated --- dashboard/public/index.html | 1287 ------------------------------ dashboard/server.py | 377 +++++---- src/analytics/worker.py | 674 ++++++++-------- systemd/analytics-worker.service | 18 - 4 files changed, 549 insertions(+), 1807 deletions(-) delete mode 100644 dashboard/public/index.html delete mode 100644 systemd/analytics-worker.service diff --git a/dashboard/public/index.html b/dashboard/public/index.html deleted file mode 100644 index d95faa4..0000000 --- a/dashboard/public/index.html +++ /dev/null @@ -1,1287 +0,0 @@ - - - - - - - Trading Intelligence Hub - - - - - - - - - - - - -
-
-
-

Market Overview

-
-
- -
- - -
-
-
-

Volume (7d)

-

€0.0

-
-
-

Total Trades

-

0

-
-
-

Assets

-

0

-
-
-
-
-

Live Price Feed

-
-
-
-

Regional Distribution

-
-
-
- - -
-
-

Erweiterte Statistiken

- -
- - -
-

Moving Average: Tradezahlen & Volumen je Exchange

-
-
- - -
-

Tradingvolumen & Anzahl Änderungen

-
-
- - -
-

Trendanalyse: Häufig gehandelte Aktien

-
-
-
-
-
- - - - - - -
- - - - - \ No newline at end of file diff --git a/dashboard/server.py b/dashboard/server.py index 9de2bc0..8641d1e 100644 --- a/dashboard/server.py +++ b/dashboard/server.py @@ -1,10 +1,10 @@ -from fastapi import FastAPI, HTTPException, Depends +from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse import requests import os -import pandas as pd +from typing import Optional, Dict, Any app = FastAPI(title="Trading Dashboard API") @@ -22,97 +22,210 @@ app.mount("/static", StaticFiles(directory="dashboard/public"), name="static") async def read_index(): return FileResponse('dashboard/public/index.html') +# QuestDB Konfiguration DB_USER = os.getenv("DB_USER", "admin") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None DB_HOST = os.getenv("DB_HOST", "questdb") +DB_URL = f"http://{DB_HOST}:9000" -@app.get("/api/trades") -async def get_trades(isin: str = None, days: int = 7, limit: int = 1000): - """ - Gibt Trades zurück. Standardmäßig limitiert auf 1000 für Performance. - Für Dashboard-Übersicht werden nur die neuesten Trades benötigt. - """ - query = f"select * from trades where timestamp > dateadd('d', -{days}, now())" - if isin: - query += f" and isin = '{isin}'" - query += f" order by timestamp desc limit {limit}" - +# Hilfsfunktionen +def query_questdb(query: str, timeout: int = 10) -> Optional[Dict[str, Any]]: + """Zentrale QuestDB-Abfrage-Funktion""" try: - response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) + response = requests.get(f"{DB_URL}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout) if response.status_code == 200: return response.json() - throw_http_error(response) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + else: + raise HTTPException(status_code=response.status_code, detail=f"QuestDB error: {response.text}") + except requests.exceptions.Timeout: + raise HTTPException(status_code=504, detail="QuestDB query timeout") + except requests.exceptions.RequestException as e: + raise HTTPException(status_code=500, detail=f"QuestDB connection error: {str(e)}") + +def format_questdb_response(data: Dict[str, Any]) -> Dict[str, Any]: + """Einheitliche Formatierung der QuestDB-Antworten""" + if not data: + return {'columns': [], 'dataset': []} + return data + +# API Endpunkte +@app.get("/api/trades") +async def get_trades(isin: str = None, days: int = 7): + """ + Gibt aggregierte Analyse aller Trades zurück (nicht einzelne Trades). + Nutzt vorberechnete Daten aus analytics_exchange_daily. + """ + if isin: + # Für spezifische ISIN: hole aus trades Tabelle + query = f""" + select + date_trunc('day', timestamp) as date, + count(*) as trade_count, + sum(price * quantity) as volume, + avg(price) as avg_price + from trades + where isin = '{isin}' + and timestamp > dateadd('d', -{days}, now()) + group by date + order by date desc + """ + else: + # Aggregierte Daten aus analytics_exchange_daily + query = f""" + select + timestamp as date, + exchange, + trade_count, + volume + from analytics_exchange_daily + where timestamp >= dateadd('d', -{days}, now()) + order by date desc, exchange asc + """ + + data = query_questdb(query) + return format_questdb_response(data) @app.get("/api/metadata") async def get_metadata(): + """Gibt alle Metadata zurück""" query = "select * from metadata" - try: - response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) - if response.status_code == 200: - return response.json() - throw_http_error(response) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + data = query_questdb(query) + return format_questdb_response(data) @app.get("/api/summary") -async def get_summary(days: int = 7): +async def get_summary(): """ - Gibt Zusammenfassung zurück. Optimiert für schnelle Abfrage. - Falls vorberechnete Daten verfügbar sind, verwende diese. + Gibt Zusammenfassung zurück. Nutzt analytics_daily_summary für total_trades (alle Trades). + """ + # Hole Gesamtzahl aller Trades aus analytics_daily_summary + query = """ + select + sum(total_trades) as total_trades, + sum(total_volume) as total_volume + from analytics_daily_summary """ - # Versuche zuerst, aus analytics_exchange_daily zu aggregieren (schneller) - # Falls das nicht funktioniert, falle auf die ursprüngliche Query zurück - try: - # Aggregiere aus analytics_exchange_daily für die letzten N Tage - # Dies ist schneller als eine JOIN-Query auf alle Trades - query = f""" - select - 'All' as continent, - sum(trade_count) as trade_count, - sum(volume) as total_volume - from analytics_exchange_daily - where timestamp >= dateadd('d', -{days}, now()) - """ - response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5) - if response.status_code == 200: - data = response.json() - # Wenn Daten vorhanden, verwende diese - if data.get('dataset') and len(data['dataset']) > 0: - # Formatiere für Kompatibilität mit dem Frontend - result = { - 'columns': [ - {'name': 'continent'}, - {'name': 'trade_count'}, - {'name': 'total_volume'} - ], - 'dataset': [[row[0], row[1], row[2]] for row in data['dataset']] - } - return result - except Exception: - # Fallback auf ursprüngliche Query - pass - # Fallback: Original Query mit Limit für Performance - query = f""" + data = query_questdb(query) + if data and data.get('dataset') and data['dataset']: + total_trades = data['dataset'][0][0] if data['dataset'][0][0] else 0 + total_volume = data['dataset'][0][1] if data['dataset'][0][1] else 0.0 + + # Formatiere für Kompatibilität + return { + 'columns': [ + {'name': 'continent'}, + {'name': 'trade_count'}, + {'name': 'total_volume'} + ], + 'dataset': [['All', total_trades, total_volume]] + } + + # Fallback: Original Query + query = """ select coalesce(m.continent, 'Unknown') as continent, count(*) as trade_count, sum(t.price * t.quantity) as total_volume from trades t left join metadata m on t.isin = m.isin - where t.timestamp > dateadd('d', -{days}, now()) group by continent """ - try: - response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=10) - if response.status_code == 200: - return response.json() - throw_http_error(response) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + data = query_questdb(query) + return format_questdb_response(data) + +@app.get("/api/statistics/total-trades") +async def get_total_trades(): + """Gibt Gesamtzahl aller Trades zurück (aus analytics_daily_summary)""" + query = "select sum(total_trades) as total from analytics_daily_summary" + data = query_questdb(query) + if data and data.get('dataset') and data['dataset']: + total = data['dataset'][0][0] if data['dataset'][0][0] else 0 + return {'total_trades': total} + return {'total_trades': 0} + +@app.get("/api/statistics/moving-average") +async def get_moving_average(days: int = 7, exchange: str = None): + """ + Gibt Moving Average Daten für Tradezahlen und Volumen je Exchange zurück. + Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage + """ + if days not in [7, 30, 42, 69, 180, 365]: + raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") + + query = f""" + select + timestamp as date, + exchange, + trade_count, + volume, + ma{days}_count as ma_count, + ma{days}_volume as ma_volume + from analytics_exchange_daily + where timestamp >= dateadd('d', -{days}, now()) + """ + + if exchange: + query += f" and exchange = '{exchange}'" + + query += " order by date asc, exchange asc" + + data = query_questdb(query, timeout=5) + return format_questdb_response(data) + +@app.get("/api/statistics/volume-changes") +async def get_volume_changes(days: int = 7): + """ + Gibt Änderungen in Volumen und Anzahl je Exchange zurück. + Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage + """ + if days not in [7, 30, 42, 69, 180, 365]: + raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") + + query = f""" + select + timestamp as date, + exchange, + trade_count, + volume, + count_change_pct, + volume_change_pct, + trend + from analytics_volume_changes + where period_days = {days} + and timestamp >= dateadd('d', -{days}, now()) + order by date desc, exchange asc + """ + + data = query_questdb(query, timeout=5) + return format_questdb_response(data) + +@app.get("/api/statistics/stock-trends") +async def get_stock_trends(days: int = 7, limit: int = 20): + """ + Gibt Trendanalyse für häufig gehandelte Aktien zurück. + Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage + """ + if days not in [7, 30, 42, 69, 180, 365]: + raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") + + query = f""" + select + timestamp as date, + isin, + trade_count, + volume, + count_change_pct, + volume_change_pct + from analytics_stock_trends + where period_days = {days} + and timestamp >= dateadd('d', -{days}, now()) + order by volume desc + limit {limit} + """ + + data = query_questdb(query, timeout=5) + return format_questdb_response(data) @app.get("/api/analytics") async def get_analytics( @@ -124,8 +237,7 @@ async def get_analytics( isins: str = None, continents: str = None ): - # Determine if we need to join metadata - # Determine if we need to join metadata + """Analytics Endpunkt für Report Builder""" composite_keys = ["exchange_continent", "exchange_sector"] needs_metadata = any([ group_by in ["name", "continent", "sector"] + composite_keys, @@ -133,7 +245,6 @@ async def get_analytics( continents is not None ]) - # Use prefixes only if joining t_prefix = "t." if needs_metadata else "" m_prefix = "m." if needs_metadata else "" @@ -191,129 +302,15 @@ async def get_analytics( query += " order by label asc" - try: - response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) - if response.status_code == 200: - return response.json() - throw_http_error(response) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + data = query_questdb(query) + return format_questdb_response(data) @app.get("/api/metadata/search") async def search_metadata(q: str): - # Case-insensitive search for ISIN or Name + """Case-insensitive search for ISIN or Name""" query = f"select isin, name from metadata where isin ilike '%{q}%' or name ilike '%{q}%' limit 10" - try: - response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) - if response.status_code == 200: - return response.json() - throw_http_error(response) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.get("/api/statistics/moving-average") -async def get_moving_average(days: int = 7, exchange: str = None): - """ - Gibt Moving Average Daten für Tradezahlen und Volumen je Exchange zurück. - Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage - Verwendet vorberechnete Daten aus analytics_exchange_daily für schnelle Antwortzeiten. - """ - if days not in [7, 30, 42, 69, 180, 365]: - raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") - - # Hole Daten aus der vorberechneten analytics_exchange_daily Tabelle - query = f""" - select - timestamp as date, - exchange, - trade_count, - volume, - ma{days}_count as ma_count, - ma{days}_volume as ma_volume - from analytics_exchange_daily - where timestamp >= dateadd('d', -{days}, now()) - """ - - if exchange: - query += f" and exchange = '{exchange}'" - - query += " order by date asc, exchange asc" - - try: - response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5) - if response.status_code == 200: - return response.json() - throw_http_error(response) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.get("/api/statistics/volume-changes") -async def get_volume_changes(days: int = 7): - """ - Gibt Änderungen in Volumen und Anzahl je Exchange zurück. - Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage - Verwendet vorberechnete Daten aus analytics_volume_changes für schnelle Antwortzeiten. - """ - if days not in [7, 30, 42, 69, 180, 365]: - raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") - - query = f""" - select - timestamp as date, - exchange, - trade_count, - volume, - count_change_pct, - volume_change_pct, - trend - from analytics_volume_changes - where period_days = {days} - order by date desc, exchange asc - """ - - try: - response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5) - if response.status_code == 200: - return response.json() - throw_http_error(response) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.get("/api/statistics/stock-trends") -async def get_stock_trends(days: int = 7, limit: int = 20): - """ - Gibt Trendanalyse für häufig gehandelte Aktien zurück. - Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage - Verwendet vorberechnete Daten aus analytics_stock_trends für schnelle Antwortzeiten. - """ - if days not in [7, 30, 42, 69, 180, 365]: - raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") - - # Hole Top-Aktien nach Volumen für den Zeitraum - query = f""" - select - timestamp as date, - isin, - trade_count, - volume, - count_change_pct, - volume_change_pct - from analytics_stock_trends - where period_days = {days} - order by volume desc - limit {limit} - """ - - try: - response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5) - if response.status_code == 200: - return response.json() - throw_http_error(response) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -def throw_http_error(res): - raise HTTPException(status_code=res.status_code, detail=f"QuestDB error: {res.text}") + data = query_questdb(query) + return format_questdb_response(data) if __name__ == "__main__": import uvicorn diff --git a/src/analytics/worker.py b/src/analytics/worker.py index 77820f2..9cfc22d 100644 --- a/src/analytics/worker.py +++ b/src/analytics/worker.py @@ -3,8 +3,9 @@ import logging import datetime import os import requests -from typing import Dict, List, Tuple, Optional +from typing import Dict, List, Optional import pandas as pd +import json logging.basicConfig( level=logging.INFO, @@ -23,9 +24,8 @@ TIME_PERIODS = [7, 30, 42, 69, 180, 365] class AnalyticsWorker: def __init__(self): - self.last_processed_timestamp = None self.db_url = DB_URL - + def wait_for_questdb(self, max_retries: int = 30, retry_delay: int = 2): """Wartet bis QuestDB verfügbar ist""" logger.info("Waiting for QuestDB to be available...") @@ -40,281 +40,317 @@ class AnalyticsWorker: time.sleep(retry_delay) logger.error("QuestDB did not become available after waiting") return False - - def get_last_processed_timestamp(self) -> Optional[datetime.datetime]: - """Holt den letzten verarbeiteten Timestamp aus der Analytics-Tabelle""" - try: - query = "select max(timestamp) as last_ts from analytics_exchange_daily" - response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) - if response.status_code == 200: - data = response.json() - if data.get('dataset') and data['dataset'] and len(data['dataset']) > 0 and data['dataset'][0][0]: - ts_value = data['dataset'][0][0] - if isinstance(ts_value, str): - return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00')) - elif isinstance(ts_value, (int, float)): - # QuestDB gibt Timestamps in Mikrosekunden zurück - return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc) - except Exception as e: - logger.debug(f"Could not get last processed timestamp: {e}") - return None - def get_new_trades(self, since: Optional[datetime.datetime] = None) -> List[Dict]: - """Holt neue Trades seit dem letzten Verarbeitungszeitpunkt""" - if since: - since_str = since.strftime('%Y-%m-%d %H:%M:%S') - query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > '{since_str}' order by timestamp asc" + def query_questdb(self, query: str, timeout: int = 30) -> Optional[Dict]: + """Zentrale QuestDB-Abfrage-Funktion""" + try: + response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout) + if response.status_code == 200: + return response.json() + else: + logger.error(f"QuestDB query failed: {response.status_code} - {response.text}") + return None + except Exception as e: + logger.error(f"Error querying QuestDB: {e}") + return None + + def get_existing_dates(self, table_name: str) -> set: + """Holt alle bereits berechneten Daten aus einer Analytics-Tabelle""" + query = f"select distinct date_trunc('day', timestamp) as date from {table_name}" + data = self.query_questdb(query) + if not data or not data.get('dataset'): + return set() + + dates = set() + for row in data['dataset']: + if row[0]: + if isinstance(row[0], str): + dates.add(datetime.datetime.fromisoformat(row[0].replace('Z', '+00:00')).date()) + elif isinstance(row[0], (int, float)): + dates.add(datetime.datetime.fromtimestamp(row[0] / 1000000, tz=datetime.timezone.utc).date()) + return dates + + def get_missing_dates(self) -> List[datetime.date]: + """Ermittelt fehlende Tage, die noch berechnet werden müssen""" + # Hole das Datum des ersten Trades + query = "select min(date_trunc('day', timestamp)) as first_date from trades" + data = self.query_questdb(query) + if not data or not data.get('dataset') or not data['dataset'][0][0]: + logger.info("No trades found in database") + return [] + + first_date_value = data['dataset'][0][0] + if isinstance(first_date_value, str): + first_date = datetime.datetime.fromisoformat(first_date_value.replace('Z', '+00:00')).date() else: - # Erste Ausführung: nur die letzten 7 Tage - query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > dateadd('d', -7, now()) order by timestamp asc" + first_date = datetime.datetime.fromtimestamp(first_date_value / 1000000, tz=datetime.timezone.utc).date() - try: - response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) - if response.status_code == 200: - data = response.json() - columns = data.get('columns', []) - dataset = data.get('dataset', []) - - trades = [] - for row in dataset: - trade = {} - for i, col in enumerate(columns): - trade[col['name']] = row[i] - trades.append(trade) - return trades - except Exception as e: - logger.error(f"Error fetching new trades: {e}") - return [] + # Hole bereits berechnete Daten + existing_dates = self.get_existing_dates('analytics_daily_summary') + + # Generiere alle Tage vom ersten Trade bis gestern + yesterday = datetime.date.today() - datetime.timedelta(days=1) + all_dates = [] + current = first_date + while current <= yesterday: + all_dates.append(current) + current += datetime.timedelta(days=1) + + # Finde fehlende Tage + missing_dates = [d for d in all_dates if d not in existing_dates] + logger.info(f"Found {len(missing_dates)} missing dates to calculate (from {len(all_dates)} total dates)") + return sorted(missing_dates) - def calculate_exchange_daily_aggregations(self, days_back: int = 365) -> List[Dict]: - """Berechnet tägliche Aggregationen je Exchange mit Moving Averages""" - end_date = datetime.datetime.now(datetime.timezone.utc) - start_date = end_date - datetime.timedelta(days=days_back) - + def calculate_daily_summary(self, date: datetime.date) -> Optional[Dict]: + """Berechnet tägliche Zusammenfassung für einen Tag""" + date_str = date.strftime('%Y-%m-%d') + query = f""" + select + count(*) as total_trades, + sum(price * quantity) as total_volume, + exchange, + count(*) as exchange_trades + from trades + where date_trunc('day', timestamp) = '{date_str}' + group by exchange + """ + + data = self.query_questdb(query) + if not data or not data.get('dataset'): + return None + + total_trades = 0 + total_volume = 0.0 + exchanges = {} + + for row in data['dataset']: + exchange = row[2] + trades = row[3] if row[3] else 0 + volume = row[1] if row[1] else 0.0 + + total_trades += trades + total_volume += volume + exchanges[exchange] = {'trades': trades, 'volume': volume} + + return { + 'date': date, + 'total_trades': total_trades, + 'total_volume': total_volume, + 'exchanges': json.dumps(exchanges) + } + + def calculate_exchange_daily(self, date: datetime.date) -> List[Dict]: + """Berechnet tägliche Exchange-Statistiken mit Moving Averages""" + date_str = date.strftime('%Y-%m-%d') + + # Hole Daten für diesen Tag query = f""" select - date_trunc('day', timestamp) as date, exchange, count(*) as trade_count, sum(price * quantity) as volume from trades - where timestamp >= '{start_date.strftime('%Y-%m-%d')}' - group by date, exchange - order by date asc, exchange asc + where date_trunc('day', timestamp) = '{date_str}' + group by exchange """ - try: - response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) - if response.status_code == 200: - data = response.json() - columns = data.get('columns', []) - dataset = data.get('dataset', []) + data = self.query_questdb(query) + if not data or not data.get('dataset'): + return [] + + results = [] + for row in data['dataset']: + exchange = row[0] + trade_count = row[1] if row[1] else 0 + volume = row[2] if row[2] else 0.0 + + # Berechne Moving Averages für alle Zeiträume + ma_values = {} + for period in TIME_PERIODS: + # Hole Daten der letzten N Tage inklusive heute + end_date = date + start_date = end_date - datetime.timedelta(days=period-1) - results = [] - for row in dataset: - result = {} - for i, col in enumerate(columns): - result[col['name']] = row[i] - results.append(result) + ma_query = f""" + select + count(*) as ma_count, + sum(price * quantity) as ma_volume + from trades + where exchange = '{exchange}' + and date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}' + and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}' + """ - # Berechne Moving Averages für alle Zeiträume - df = pd.DataFrame(results) - if df.empty: - return [] - - # Pivot für einfachere MA-Berechnung - df['date'] = pd.to_datetime(df['date']) - df = df.sort_values(['date', 'exchange']) - - # Berechne MA für jeden Zeitraum - for period in TIME_PERIODS: - df[f'ma{period}_count'] = df.groupby('exchange')['trade_count'].transform( - lambda x: x.rolling(window=period, min_periods=1).mean() - ) - df[f'ma{period}_volume'] = df.groupby('exchange')['volume'].transform( - lambda x: x.rolling(window=period, min_periods=1).mean() - ) - - # Konvertiere zurück zu Dict-Liste - return df.to_dict('records') - except Exception as e: - logger.error(f"Error calculating exchange daily aggregations: {e}") - return [] + ma_data = self.query_questdb(ma_query) + if ma_data and ma_data.get('dataset') and ma_data['dataset'][0]: + ma_values[f'ma{period}_count'] = ma_data['dataset'][0][0] if ma_data['dataset'][0][0] else 0 + ma_values[f'ma{period}_volume'] = ma_data['dataset'][0][1] if ma_data['dataset'][0][1] else 0.0 + else: + ma_values[f'ma{period}_count'] = 0 + ma_values[f'ma{period}_volume'] = 0.0 + + results.append({ + 'date': date, + 'exchange': exchange, + 'trade_count': trade_count, + 'volume': volume, + **ma_values + }) + + return results - def calculate_stock_trends(self, days: int = 365) -> List[Dict]: - """Berechnet Trenddaten je ISIN mit Änderungsprozenten""" - end_date = datetime.datetime.now(datetime.timezone.utc) - start_date = end_date - datetime.timedelta(days=days) + def calculate_stock_trends(self, date: datetime.date) -> List[Dict]: + """Berechnet Stock-Trends für alle Zeiträume für einen Tag""" + results = [] - # Aktuelle Periode - query_current = f""" - select - date_trunc('day', timestamp) as date, - isin, - count(*) as trade_count, - sum(price * quantity) as volume - from trades - where timestamp >= '{start_date.strftime('%Y-%m-%d')}' - group by date, isin - order by date asc, isin asc - """ + for period in TIME_PERIODS: + end_date = date + start_date = end_date - datetime.timedelta(days=period-1) + + # Aktuelle Periode + query = f""" + select + isin, + count(*) as trade_count, + sum(price * quantity) as volume + from trades + where date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}' + and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}' + group by isin + """ + + data = self.query_questdb(query) + if not data or not data.get('dataset'): + continue + + for row in data['dataset']: + isin = row[0] + current_count = row[1] if row[1] else 0 + current_volume = row[2] if row[2] else 0.0 + + # Vorherige Periode für Vergleich + prev_start = start_date - datetime.timedelta(days=period) + prev_end = start_date - datetime.timedelta(days=1) + + prev_query = f""" + select + count(*) as trade_count, + sum(price * quantity) as volume + from trades + where isin = '{isin}' + and date_trunc('day', timestamp) >= '{prev_start.strftime('%Y-%m-%d')}' + and date_trunc('day', timestamp) <= '{prev_end.strftime('%Y-%m-%d')}' + """ + + prev_data = self.query_questdb(prev_query) + prev_count = 0 + prev_volume = 0.0 + + if prev_data and prev_data.get('dataset') and prev_data['dataset'][0]: + prev_count = prev_data['dataset'][0][0] if prev_data['dataset'][0][0] else 0 + prev_volume = prev_data['dataset'][0][1] if prev_data['dataset'][0][1] else 0.0 + + # Berechne Änderungen + count_change_pct = ((current_count - prev_count) / prev_count * 100) if prev_count > 0 else 0 + volume_change_pct = ((current_volume - prev_volume) / prev_volume * 100) if prev_volume > 0 else 0 + + results.append({ + 'date': date, + 'period_days': period, + 'isin': isin, + 'trade_count': current_count, + 'volume': current_volume, + 'count_change_pct': count_change_pct, + 'volume_change_pct': volume_change_pct + }) - try: - response = requests.get(f"{self.db_url}/exec", params={'query': query_current}, auth=DB_AUTH) - if response.status_code == 200: - data = response.json() - columns = data.get('columns', []) - dataset = data.get('dataset', []) - - results = [] - for row in dataset: - result = {} - for i, col in enumerate(columns): - result[col['name']] = row[i] - results.append(result) - - if not results: - return [] - - df = pd.DataFrame(results) - df['date'] = pd.to_datetime(df['date']) - - # Aggregiere je ISIN über den gesamten Zeitraum - df_agg = df.groupby('isin').agg({ - 'trade_count': 'sum', - 'volume': 'sum' - }).reset_index() - - # Berechne Änderungen: Vergleich mit vorheriger Periode - # Für jede ISIN: aktueller Zeitraum vs. vorheriger Zeitraum - trends = [] - for isin in df_agg['isin'].unique(): - isin_data = df[df['isin'] == isin].sort_values('date') - - # Teile in zwei Hälften für Vergleich - mid_point = len(isin_data) // 2 - if mid_point > 0: - first_half = isin_data.iloc[:mid_point] - second_half = isin_data.iloc[mid_point:] - - first_count = first_half['trade_count'].sum() - first_volume = first_half['volume'].sum() - second_count = second_half['trade_count'].sum() - second_volume = second_half['volume'].sum() - - count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0 - volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0 - else: - count_change = 0 - volume_change = 0 - - total_count = isin_data['trade_count'].sum() - total_volume = isin_data['volume'].sum() - - trends.append({ - 'isin': isin, - 'date': isin_data['date'].max(), - 'trade_count': total_count, - 'volume': total_volume, - 'count_change_pct': count_change, - 'volume_change_pct': volume_change - }) - - return trends - except Exception as e: - logger.error(f"Error calculating stock trends: {e}") - return [] + return results - def calculate_volume_changes(self, days: int = 365) -> List[Dict]: - """Berechnet Volumen- und Anzahl-Änderungen je Exchange""" - end_date = datetime.datetime.now(datetime.timezone.utc) - start_date = end_date - datetime.timedelta(days=days) + def calculate_volume_changes(self, date: datetime.date) -> List[Dict]: + """Berechnet Volumen-Änderungen für alle Zeiträume für einen Tag""" + results = [] - query = f""" - select - date_trunc('day', timestamp) as date, - exchange, - count(*) as trade_count, - sum(price * quantity) as volume - from trades - where timestamp >= '{start_date.strftime('%Y-%m-%d')}' - group by date, exchange - order by date asc, exchange asc - """ + for period in TIME_PERIODS: + end_date = date + start_date = end_date - datetime.timedelta(days=period-1) + + # Hole alle Exchanges + exchanges_query = "select distinct exchange from trades" + exchanges_data = self.query_questdb(exchanges_query) + if not exchanges_data or not exchanges_data.get('dataset'): + continue + + for exchange_row in exchanges_data['dataset']: + exchange = exchange_row[0] + + # Aktuelle Periode + query = f""" + select + count(*) as trade_count, + sum(price * quantity) as volume + from trades + where exchange = '{exchange}' + and date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}' + and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}' + """ + + data = self.query_questdb(query) + if not data or not data.get('dataset') or not data['dataset'][0]: + continue + + current_count = data['dataset'][0][0] if data['dataset'][0][0] else 0 + current_volume = data['dataset'][0][1] if data['dataset'][0][1] else 0.0 + + # Vorherige Periode + prev_start = start_date - datetime.timedelta(days=period) + prev_end = start_date - datetime.timedelta(days=1) + + prev_query = f""" + select + count(*) as trade_count, + sum(price * quantity) as volume + from trades + where exchange = '{exchange}' + and date_trunc('day', timestamp) >= '{prev_start.strftime('%Y-%m-%d')}' + and date_trunc('day', timestamp) <= '{prev_end.strftime('%Y-%m-%d')}' + """ + + prev_data = self.query_questdb(prev_query) + prev_count = 0 + prev_volume = 0.0 + + if prev_data and prev_data.get('dataset') and prev_data['dataset'][0]: + prev_count = prev_data['dataset'][0][0] if prev_data['dataset'][0][0] else 0 + prev_volume = prev_data['dataset'][0][1] if prev_data['dataset'][0][1] else 0.0 + + # Berechne Änderungen + count_change_pct = ((current_count - prev_count) / prev_count * 100) if prev_count > 0 else 0 + volume_change_pct = ((current_volume - prev_volume) / prev_volume * 100) if prev_volume > 0 else 0 + + # Bestimme Trend + if count_change_pct > 5 and volume_change_pct > 5: + trend = "mehr_trades_mehr_volumen" + elif count_change_pct > 5 and volume_change_pct < -5: + trend = "mehr_trades_weniger_volumen" + elif count_change_pct < -5 and volume_change_pct > 5: + trend = "weniger_trades_mehr_volumen" + elif count_change_pct < -5 and volume_change_pct < -5: + trend = "weniger_trades_weniger_volumen" + else: + trend = "stabil" + + results.append({ + 'date': date, + 'period_days': period, + 'exchange': exchange, + 'trade_count': current_count, + 'volume': current_volume, + 'count_change_pct': count_change_pct, + 'volume_change_pct': volume_change_pct, + 'trend': trend + }) - try: - response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) - if response.status_code == 200: - data = response.json() - columns = data.get('columns', []) - dataset = data.get('dataset', []) - - results = [] - for row in dataset: - result = {} - for i, col in enumerate(columns): - result[col['name']] = row[i] - results.append(result) - - if not results: - return [] - - df = pd.DataFrame(results) - df['date'] = pd.to_datetime(df['date']) - df = df.sort_values(['date', 'exchange']) - - # Berechne Änderungen je Exchange - changes = [] - for exchange in df['exchange'].unique(): - exchange_data = df[df['exchange'] == exchange].sort_values('date') - - # Teile in zwei Hälften - mid_point = len(exchange_data) // 2 - if mid_point > 0: - first_half = exchange_data.iloc[:mid_point] - second_half = exchange_data.iloc[mid_point:] - - first_count = first_half['trade_count'].sum() - first_volume = first_half['volume'].sum() - second_count = second_half['trade_count'].sum() - second_volume = second_half['volume'].sum() - - count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0 - volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0 - - # Bestimme Trend - if count_change > 5 and volume_change > 5: - trend = "mehr_trades_mehr_volumen" - elif count_change > 5 and volume_change < -5: - trend = "mehr_trades_weniger_volumen" - elif count_change < -5 and volume_change > 5: - trend = "weniger_trades_mehr_volumen" - elif count_change < -5 and volume_change < -5: - trend = "weniger_trades_weniger_volumen" - else: - trend = "stabil" - else: - count_change = 0 - volume_change = 0 - trend = "stabil" - - total_count = exchange_data['trade_count'].sum() - total_volume = exchange_data['volume'].sum() - - changes.append({ - 'date': exchange_data['date'].max(), - 'exchange': exchange, - 'trade_count': total_count, - 'volume': total_volume, - 'count_change_pct': count_change, - 'volume_change_pct': volume_change, - 'trend': trend - }) - - return changes - except Exception as e: - logger.error(f"Error calculating volume changes: {e}") - return [] + return results def save_analytics_data(self, table_name: str, data: List[Dict]): """Speichert aggregierte Daten in QuestDB via ILP""" @@ -326,10 +362,10 @@ class AnalyticsWorker: try: # Konvertiere Datum zu Timestamp if 'date' in row: - if isinstance(row['date'], str): + if isinstance(row['date'], datetime.date): + dt = datetime.datetime.combine(row['date'], datetime.time.min).replace(tzinfo=datetime.timezone.utc) + elif isinstance(row['date'], str): dt = datetime.datetime.fromisoformat(row['date'].replace('Z', '+00:00')) - elif isinstance(row['date'], pd.Timestamp): - dt = row['date'].to_pydatetime() else: dt = row['date'] timestamp_ns = int(dt.timestamp() * 1e9) @@ -350,6 +386,10 @@ class AnalyticsWorker: isin = str(row['isin']).replace(' ', '\\ ').replace(',', '\\,') tags.append(f"isin={isin}") + # Period als Tag + if 'period_days' in row and row['period_days']: + tags.append(f"period_days={row['period_days']}") + # Trend als Tag if 'trend' in row and row['trend']: trend = str(row['trend']).replace(' ', '\\ ').replace(',', '\\,') @@ -357,16 +397,19 @@ class AnalyticsWorker: # Numerische Felder for key, value in row.items(): - if key in ['date', 'exchange', 'isin', 'trend']: + if key in ['date', 'exchange', 'isin', 'trend', 'period_days', 'exchanges']: continue if value is not None: if isinstance(value, (int, float)): fields.append(f"{key}={value}") elif isinstance(value, str): - # String-Felder in Anführungszeichen escaped = value.replace('"', '\\"').replace(' ', '\\ ') fields.append(f'{key}="{escaped}"') + # Exchanges als JSON-Feld + if 'exchanges' in row and row['exchanges']: + fields.append(f'exchanges="{row["exchanges"]}"') + if tags and fields: line = f"{table_name},{','.join(tags)} {','.join(fields)} {timestamp_ns}" lines.append(line) @@ -393,37 +436,46 @@ class AnalyticsWorker: except Exception as e: logger.error(f"Error connecting to QuestDB: {e}") - def process_all_analytics(self): - """Verarbeitet alle Analytics für alle Zeiträume""" - logger.info("Starting analytics processing...") + def process_date(self, date: datetime.date): + """Verarbeitet alle Analytics für einen bestimmten Tag""" + logger.info(f"Processing analytics for {date}") - # 1. Exchange Daily Aggregations (für alle Zeiträume) - logger.info("Calculating exchange daily aggregations...") - exchange_data = self.calculate_exchange_daily_aggregations(days_back=365) + # 1. Daily Summary + summary = self.calculate_daily_summary(date) + if summary: + self.save_analytics_data('analytics_daily_summary', [summary]) + + # 2. Exchange Daily + exchange_data = self.calculate_exchange_daily(date) if exchange_data: self.save_analytics_data('analytics_exchange_daily', exchange_data) - # 2. Stock Trends (für alle Zeiträume) - logger.info("Calculating stock trends...") - for days in TIME_PERIODS: - trends = self.calculate_stock_trends(days=days) - if trends: - # Füge Zeitraum als Tag hinzu - for trend in trends: - trend['period_days'] = days - self.save_analytics_data('analytics_stock_trends', trends) + # 3. Stock Trends + stock_trends = self.calculate_stock_trends(date) + if stock_trends: + self.save_analytics_data('analytics_stock_trends', stock_trends) - # 3. Volume Changes (für alle Zeiträume) - logger.info("Calculating volume changes...") - for days in TIME_PERIODS: - changes = self.calculate_volume_changes(days=days) - if changes: - # Füge Zeitraum als Tag hinzu - for change in changes: - change['period_days'] = days - self.save_analytics_data('analytics_volume_changes', changes) + # 4. Volume Changes + volume_changes = self.calculate_volume_changes(date) + if volume_changes: + self.save_analytics_data('analytics_volume_changes', volume_changes) - logger.info("Analytics processing completed.") + logger.info(f"Completed processing for {date}") + + def process_missing_dates(self): + """Berechnet alle fehlenden Tage""" + missing_dates = self.get_missing_dates() + if not missing_dates: + logger.info("No missing dates to process") + return + + logger.info(f"Processing {len(missing_dates)} missing dates...") + for i, date in enumerate(missing_dates, 1): + logger.info(f"Processing date {i}/{len(missing_dates)}: {date}") + self.process_date(date) + # Kleine Pause zwischen den Berechnungen + if i % 10 == 0: + time.sleep(1) def run(self): """Hauptschleife des Workers""" @@ -434,32 +486,30 @@ class AnalyticsWorker: logger.error("Failed to connect to QuestDB. Exiting.") return - # Initiale Verarbeitung - self.process_all_analytics() - self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc) + # Initiale Berechnung fehlender Tage + logger.info("Checking for missing dates...") + self.process_missing_dates() - # Polling-Schleife + # Hauptschleife: Warte auf Mitternacht + logger.info("Waiting for midnight to process yesterday's data...") while True: - try: - # Prüfe auf neue Trades - last_ts = self.get_last_processed_timestamp() - new_trades = self.get_new_trades(since=last_ts) - - if new_trades: - logger.info(f"Found {len(new_trades)} new trades, reprocessing analytics...") - self.process_all_analytics() - self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc) - else: - logger.debug("No new trades found.") - - # Warte 30 Sekunden vor nächster Prüfung - time.sleep(30) - except requests.exceptions.ConnectionError as e: - logger.warning(f"Connection error to QuestDB, retrying in 60s: {e}") - time.sleep(60) # Längere Pause bei Verbindungsfehler - except Exception as e: - logger.error(f"Error in worker loop: {e}", exc_info=True) - time.sleep(60) # Längere Pause bei Fehler + now = datetime.datetime.now() + + # Prüfe ob es Mitternacht ist (00:00) + if now.hour == 0 and now.minute == 0: + yesterday = (now - datetime.timedelta(days=1)).date() + logger.info(f"Processing yesterday's data: {yesterday}") + self.process_date(yesterday) + # Warte 61s, um Mehrfachausführung zu verhindern + time.sleep(61) + + # Prüfe auch auf fehlende Tage (alle 6 Stunden) + if now.hour % 6 == 0 and now.minute == 0: + logger.info("Checking for missing dates...") + self.process_missing_dates() + time.sleep(61) + + time.sleep(30) def main(): worker = AnalyticsWorker() diff --git a/systemd/analytics-worker.service b/systemd/analytics-worker.service deleted file mode 100644 index 9f1f4d0..0000000 --- a/systemd/analytics-worker.service +++ /dev/null @@ -1,18 +0,0 @@ -[Unit] -Description=Trading Analytics Worker -After=network.target questdb.service - -[Service] -Type=simple -User=melchiorreimers -WorkingDirectory=/Users/melchiorreimers/Documents/trading_daemon -Environment="PYTHONUNBUFFERED=1" -Environment="DB_USER=admin" -Environment="DB_PASSWORD=quest" -Environment="DB_HOST=localhost" -ExecStart=/usr/bin/python3 -m src.analytics.worker -Restart=always -RestartSec=10 - -[Install] -WantedBy=multi-user.target