diff --git a/dashboard/public/index.html b/dashboard/public/index.html
deleted file mode 100644
index d95faa4..0000000
--- a/dashboard/public/index.html
+++ /dev/null
@@ -1,1287 +0,0 @@
-
-
-
-
-
-
- Trading Intelligence Hub
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Regional Distribution
-
-
-
-
-
-
-
-
Erweiterte Statistiken
-
-
-
-
-
-
Moving Average: Tradezahlen & Volumen je Exchange
-
-
-
-
-
-
Tradingvolumen & Anzahl Änderungen
-
-
-
-
-
-
Trendanalyse: Häufig gehandelte Aktien
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Report: Market Activity
-
-
-
-
-
-
-
-
- ANALYTICS
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/dashboard/server.py b/dashboard/server.py
index 9de2bc0..8641d1e 100644
--- a/dashboard/server.py
+++ b/dashboard/server.py
@@ -1,10 +1,10 @@
-from fastapi import FastAPI, HTTPException, Depends
+from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import requests
import os
-import pandas as pd
+from typing import Optional, Dict, Any
app = FastAPI(title="Trading Dashboard API")
@@ -22,97 +22,210 @@ app.mount("/static", StaticFiles(directory="dashboard/public"), name="static")
async def read_index():
return FileResponse('dashboard/public/index.html')
+# QuestDB Konfiguration
DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
DB_HOST = os.getenv("DB_HOST", "questdb")
+DB_URL = f"http://{DB_HOST}:9000"
-@app.get("/api/trades")
-async def get_trades(isin: str = None, days: int = 7, limit: int = 1000):
- """
- Gibt Trades zurück. Standardmäßig limitiert auf 1000 für Performance.
- Für Dashboard-Übersicht werden nur die neuesten Trades benötigt.
- """
- query = f"select * from trades where timestamp > dateadd('d', -{days}, now())"
- if isin:
- query += f" and isin = '{isin}'"
- query += f" order by timestamp desc limit {limit}"
-
+# Hilfsfunktionen
+def query_questdb(query: str, timeout: int = 10) -> Optional[Dict[str, Any]]:
+ """Zentrale QuestDB-Abfrage-Funktion"""
try:
- response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH)
+ response = requests.get(f"{DB_URL}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout)
if response.status_code == 200:
return response.json()
- throw_http_error(response)
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
+ else:
+ raise HTTPException(status_code=response.status_code, detail=f"QuestDB error: {response.text}")
+ except requests.exceptions.Timeout:
+ raise HTTPException(status_code=504, detail="QuestDB query timeout")
+ except requests.exceptions.RequestException as e:
+ raise HTTPException(status_code=500, detail=f"QuestDB connection error: {str(e)}")
+
+def format_questdb_response(data: Dict[str, Any]) -> Dict[str, Any]:
+ """Einheitliche Formatierung der QuestDB-Antworten"""
+ if not data:
+ return {'columns': [], 'dataset': []}
+ return data
+
+# API Endpunkte
+@app.get("/api/trades")
+async def get_trades(isin: str = None, days: int = 7):
+ """
+ Gibt aggregierte Analyse aller Trades zurück (nicht einzelne Trades).
+ Nutzt vorberechnete Daten aus analytics_exchange_daily.
+ """
+ if isin:
+ # Für spezifische ISIN: hole aus trades Tabelle
+ query = f"""
+ select
+ date_trunc('day', timestamp) as date,
+ count(*) as trade_count,
+ sum(price * quantity) as volume,
+ avg(price) as avg_price
+ from trades
+ where isin = '{isin}'
+ and timestamp > dateadd('d', -{days}, now())
+ group by date
+ order by date desc
+ """
+ else:
+ # Aggregierte Daten aus analytics_exchange_daily
+ query = f"""
+ select
+ timestamp as date,
+ exchange,
+ trade_count,
+ volume
+ from analytics_exchange_daily
+ where timestamp >= dateadd('d', -{days}, now())
+ order by date desc, exchange asc
+ """
+
+ data = query_questdb(query)
+ return format_questdb_response(data)
@app.get("/api/metadata")
async def get_metadata():
+ """Gibt alle Metadata zurück"""
query = "select * from metadata"
- try:
- response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH)
- if response.status_code == 200:
- return response.json()
- throw_http_error(response)
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
+ data = query_questdb(query)
+ return format_questdb_response(data)
@app.get("/api/summary")
-async def get_summary(days: int = 7):
+async def get_summary():
"""
- Gibt Zusammenfassung zurück. Optimiert für schnelle Abfrage.
- Falls vorberechnete Daten verfügbar sind, verwende diese.
+ Gibt Zusammenfassung zurück. Nutzt analytics_daily_summary für total_trades (alle Trades).
+ """
+ # Hole Gesamtzahl aller Trades aus analytics_daily_summary
+ query = """
+ select
+ sum(total_trades) as total_trades,
+ sum(total_volume) as total_volume
+ from analytics_daily_summary
"""
- # Versuche zuerst, aus analytics_exchange_daily zu aggregieren (schneller)
- # Falls das nicht funktioniert, falle auf die ursprüngliche Query zurück
- try:
- # Aggregiere aus analytics_exchange_daily für die letzten N Tage
- # Dies ist schneller als eine JOIN-Query auf alle Trades
- query = f"""
- select
- 'All' as continent,
- sum(trade_count) as trade_count,
- sum(volume) as total_volume
- from analytics_exchange_daily
- where timestamp >= dateadd('d', -{days}, now())
- """
- response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5)
- if response.status_code == 200:
- data = response.json()
- # Wenn Daten vorhanden, verwende diese
- if data.get('dataset') and len(data['dataset']) > 0:
- # Formatiere für Kompatibilität mit dem Frontend
- result = {
- 'columns': [
- {'name': 'continent'},
- {'name': 'trade_count'},
- {'name': 'total_volume'}
- ],
- 'dataset': [[row[0], row[1], row[2]] for row in data['dataset']]
- }
- return result
- except Exception:
- # Fallback auf ursprüngliche Query
- pass
- # Fallback: Original Query mit Limit für Performance
- query = f"""
+ data = query_questdb(query)
+ if data and data.get('dataset') and data['dataset']:
+ total_trades = data['dataset'][0][0] if data['dataset'][0][0] else 0
+ total_volume = data['dataset'][0][1] if data['dataset'][0][1] else 0.0
+
+ # Formatiere für Kompatibilität
+ return {
+ 'columns': [
+ {'name': 'continent'},
+ {'name': 'trade_count'},
+ {'name': 'total_volume'}
+ ],
+ 'dataset': [['All', total_trades, total_volume]]
+ }
+
+ # Fallback: Original Query
+ query = """
select
coalesce(m.continent, 'Unknown') as continent,
count(*) as trade_count,
sum(t.price * t.quantity) as total_volume
from trades t
left join metadata m on t.isin = m.isin
- where t.timestamp > dateadd('d', -{days}, now())
group by continent
"""
- try:
- response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=10)
- if response.status_code == 200:
- return response.json()
- throw_http_error(response)
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
+ data = query_questdb(query)
+ return format_questdb_response(data)
+
+@app.get("/api/statistics/total-trades")
+async def get_total_trades():
+ """Gibt Gesamtzahl aller Trades zurück (aus analytics_daily_summary)"""
+ query = "select sum(total_trades) as total from analytics_daily_summary"
+ data = query_questdb(query)
+ if data and data.get('dataset') and data['dataset']:
+ total = data['dataset'][0][0] if data['dataset'][0][0] else 0
+ return {'total_trades': total}
+ return {'total_trades': 0}
+
+@app.get("/api/statistics/moving-average")
+async def get_moving_average(days: int = 7, exchange: str = None):
+ """
+ Gibt Moving Average Daten für Tradezahlen und Volumen je Exchange zurück.
+ Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
+ """
+ if days not in [7, 30, 42, 69, 180, 365]:
+ raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
+
+ query = f"""
+ select
+ timestamp as date,
+ exchange,
+ trade_count,
+ volume,
+ ma{days}_count as ma_count,
+ ma{days}_volume as ma_volume
+ from analytics_exchange_daily
+ where timestamp >= dateadd('d', -{days}, now())
+ """
+
+ if exchange:
+ query += f" and exchange = '{exchange}'"
+
+ query += " order by date asc, exchange asc"
+
+ data = query_questdb(query, timeout=5)
+ return format_questdb_response(data)
+
+@app.get("/api/statistics/volume-changes")
+async def get_volume_changes(days: int = 7):
+ """
+ Gibt Änderungen in Volumen und Anzahl je Exchange zurück.
+ Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
+ """
+ if days not in [7, 30, 42, 69, 180, 365]:
+ raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
+
+ query = f"""
+ select
+ timestamp as date,
+ exchange,
+ trade_count,
+ volume,
+ count_change_pct,
+ volume_change_pct,
+ trend
+ from analytics_volume_changes
+ where period_days = {days}
+ and timestamp >= dateadd('d', -{days}, now())
+ order by date desc, exchange asc
+ """
+
+ data = query_questdb(query, timeout=5)
+ return format_questdb_response(data)
+
+@app.get("/api/statistics/stock-trends")
+async def get_stock_trends(days: int = 7, limit: int = 20):
+ """
+ Gibt Trendanalyse für häufig gehandelte Aktien zurück.
+ Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
+ """
+ if days not in [7, 30, 42, 69, 180, 365]:
+ raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
+
+ query = f"""
+ select
+ timestamp as date,
+ isin,
+ trade_count,
+ volume,
+ count_change_pct,
+ volume_change_pct
+ from analytics_stock_trends
+ where period_days = {days}
+ and timestamp >= dateadd('d', -{days}, now())
+ order by volume desc
+ limit {limit}
+ """
+
+ data = query_questdb(query, timeout=5)
+ return format_questdb_response(data)
@app.get("/api/analytics")
async def get_analytics(
@@ -124,8 +237,7 @@ async def get_analytics(
isins: str = None,
continents: str = None
):
- # Determine if we need to join metadata
- # Determine if we need to join metadata
+ """Analytics Endpunkt für Report Builder"""
composite_keys = ["exchange_continent", "exchange_sector"]
needs_metadata = any([
group_by in ["name", "continent", "sector"] + composite_keys,
@@ -133,7 +245,6 @@ async def get_analytics(
continents is not None
])
- # Use prefixes only if joining
t_prefix = "t." if needs_metadata else ""
m_prefix = "m." if needs_metadata else ""
@@ -191,129 +302,15 @@ async def get_analytics(
query += " order by label asc"
- try:
- response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH)
- if response.status_code == 200:
- return response.json()
- throw_http_error(response)
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
+ data = query_questdb(query)
+ return format_questdb_response(data)
@app.get("/api/metadata/search")
async def search_metadata(q: str):
- # Case-insensitive search for ISIN or Name
+ """Case-insensitive search for ISIN or Name"""
query = f"select isin, name from metadata where isin ilike '%{q}%' or name ilike '%{q}%' limit 10"
- try:
- response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH)
- if response.status_code == 200:
- return response.json()
- throw_http_error(response)
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
-
-@app.get("/api/statistics/moving-average")
-async def get_moving_average(days: int = 7, exchange: str = None):
- """
- Gibt Moving Average Daten für Tradezahlen und Volumen je Exchange zurück.
- Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
- Verwendet vorberechnete Daten aus analytics_exchange_daily für schnelle Antwortzeiten.
- """
- if days not in [7, 30, 42, 69, 180, 365]:
- raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
-
- # Hole Daten aus der vorberechneten analytics_exchange_daily Tabelle
- query = f"""
- select
- timestamp as date,
- exchange,
- trade_count,
- volume,
- ma{days}_count as ma_count,
- ma{days}_volume as ma_volume
- from analytics_exchange_daily
- where timestamp >= dateadd('d', -{days}, now())
- """
-
- if exchange:
- query += f" and exchange = '{exchange}'"
-
- query += " order by date asc, exchange asc"
-
- try:
- response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5)
- if response.status_code == 200:
- return response.json()
- throw_http_error(response)
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
-
-@app.get("/api/statistics/volume-changes")
-async def get_volume_changes(days: int = 7):
- """
- Gibt Änderungen in Volumen und Anzahl je Exchange zurück.
- Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
- Verwendet vorberechnete Daten aus analytics_volume_changes für schnelle Antwortzeiten.
- """
- if days not in [7, 30, 42, 69, 180, 365]:
- raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
-
- query = f"""
- select
- timestamp as date,
- exchange,
- trade_count,
- volume,
- count_change_pct,
- volume_change_pct,
- trend
- from analytics_volume_changes
- where period_days = {days}
- order by date desc, exchange asc
- """
-
- try:
- response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5)
- if response.status_code == 200:
- return response.json()
- throw_http_error(response)
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
-
-@app.get("/api/statistics/stock-trends")
-async def get_stock_trends(days: int = 7, limit: int = 20):
- """
- Gibt Trendanalyse für häufig gehandelte Aktien zurück.
- Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
- Verwendet vorberechnete Daten aus analytics_stock_trends für schnelle Antwortzeiten.
- """
- if days not in [7, 30, 42, 69, 180, 365]:
- raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
-
- # Hole Top-Aktien nach Volumen für den Zeitraum
- query = f"""
- select
- timestamp as date,
- isin,
- trade_count,
- volume,
- count_change_pct,
- volume_change_pct
- from analytics_stock_trends
- where period_days = {days}
- order by volume desc
- limit {limit}
- """
-
- try:
- response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5)
- if response.status_code == 200:
- return response.json()
- throw_http_error(response)
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
-
-def throw_http_error(res):
- raise HTTPException(status_code=res.status_code, detail=f"QuestDB error: {res.text}")
+ data = query_questdb(query)
+ return format_questdb_response(data)
if __name__ == "__main__":
import uvicorn
diff --git a/src/analytics/worker.py b/src/analytics/worker.py
index 77820f2..9cfc22d 100644
--- a/src/analytics/worker.py
+++ b/src/analytics/worker.py
@@ -3,8 +3,9 @@ import logging
import datetime
import os
import requests
-from typing import Dict, List, Tuple, Optional
+from typing import Dict, List, Optional
import pandas as pd
+import json
logging.basicConfig(
level=logging.INFO,
@@ -23,9 +24,8 @@ TIME_PERIODS = [7, 30, 42, 69, 180, 365]
class AnalyticsWorker:
def __init__(self):
- self.last_processed_timestamp = None
self.db_url = DB_URL
-
+
def wait_for_questdb(self, max_retries: int = 30, retry_delay: int = 2):
"""Wartet bis QuestDB verfügbar ist"""
logger.info("Waiting for QuestDB to be available...")
@@ -40,281 +40,317 @@ class AnalyticsWorker:
time.sleep(retry_delay)
logger.error("QuestDB did not become available after waiting")
return False
-
- def get_last_processed_timestamp(self) -> Optional[datetime.datetime]:
- """Holt den letzten verarbeiteten Timestamp aus der Analytics-Tabelle"""
- try:
- query = "select max(timestamp) as last_ts from analytics_exchange_daily"
- response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
- if response.status_code == 200:
- data = response.json()
- if data.get('dataset') and data['dataset'] and len(data['dataset']) > 0 and data['dataset'][0][0]:
- ts_value = data['dataset'][0][0]
- if isinstance(ts_value, str):
- return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
- elif isinstance(ts_value, (int, float)):
- # QuestDB gibt Timestamps in Mikrosekunden zurück
- return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
- except Exception as e:
- logger.debug(f"Could not get last processed timestamp: {e}")
- return None
- def get_new_trades(self, since: Optional[datetime.datetime] = None) -> List[Dict]:
- """Holt neue Trades seit dem letzten Verarbeitungszeitpunkt"""
- if since:
- since_str = since.strftime('%Y-%m-%d %H:%M:%S')
- query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > '{since_str}' order by timestamp asc"
+ def query_questdb(self, query: str, timeout: int = 30) -> Optional[Dict]:
+ """Zentrale QuestDB-Abfrage-Funktion"""
+ try:
+ response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout)
+ if response.status_code == 200:
+ return response.json()
+ else:
+ logger.error(f"QuestDB query failed: {response.status_code} - {response.text}")
+ return None
+ except Exception as e:
+ logger.error(f"Error querying QuestDB: {e}")
+ return None
+
+ def get_existing_dates(self, table_name: str) -> set:
+ """Holt alle bereits berechneten Daten aus einer Analytics-Tabelle"""
+ query = f"select distinct date_trunc('day', timestamp) as date from {table_name}"
+ data = self.query_questdb(query)
+ if not data or not data.get('dataset'):
+ return set()
+
+ dates = set()
+ for row in data['dataset']:
+ if row[0]:
+ if isinstance(row[0], str):
+ dates.add(datetime.datetime.fromisoformat(row[0].replace('Z', '+00:00')).date())
+ elif isinstance(row[0], (int, float)):
+ dates.add(datetime.datetime.fromtimestamp(row[0] / 1000000, tz=datetime.timezone.utc).date())
+ return dates
+
+ def get_missing_dates(self) -> List[datetime.date]:
+ """Ermittelt fehlende Tage, die noch berechnet werden müssen"""
+ # Hole das Datum des ersten Trades
+ query = "select min(date_trunc('day', timestamp)) as first_date from trades"
+ data = self.query_questdb(query)
+ if not data or not data.get('dataset') or not data['dataset'][0][0]:
+ logger.info("No trades found in database")
+ return []
+
+ first_date_value = data['dataset'][0][0]
+ if isinstance(first_date_value, str):
+ first_date = datetime.datetime.fromisoformat(first_date_value.replace('Z', '+00:00')).date()
else:
- # Erste Ausführung: nur die letzten 7 Tage
- query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > dateadd('d', -7, now()) order by timestamp asc"
+ first_date = datetime.datetime.fromtimestamp(first_date_value / 1000000, tz=datetime.timezone.utc).date()
- try:
- response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
- if response.status_code == 200:
- data = response.json()
- columns = data.get('columns', [])
- dataset = data.get('dataset', [])
-
- trades = []
- for row in dataset:
- trade = {}
- for i, col in enumerate(columns):
- trade[col['name']] = row[i]
- trades.append(trade)
- return trades
- except Exception as e:
- logger.error(f"Error fetching new trades: {e}")
- return []
+ # Hole bereits berechnete Daten
+ existing_dates = self.get_existing_dates('analytics_daily_summary')
+
+ # Generiere alle Tage vom ersten Trade bis gestern
+ yesterday = datetime.date.today() - datetime.timedelta(days=1)
+ all_dates = []
+ current = first_date
+ while current <= yesterday:
+ all_dates.append(current)
+ current += datetime.timedelta(days=1)
+
+ # Finde fehlende Tage
+ missing_dates = [d for d in all_dates if d not in existing_dates]
+ logger.info(f"Found {len(missing_dates)} missing dates to calculate (from {len(all_dates)} total dates)")
+ return sorted(missing_dates)
- def calculate_exchange_daily_aggregations(self, days_back: int = 365) -> List[Dict]:
- """Berechnet tägliche Aggregationen je Exchange mit Moving Averages"""
- end_date = datetime.datetime.now(datetime.timezone.utc)
- start_date = end_date - datetime.timedelta(days=days_back)
-
+ def calculate_daily_summary(self, date: datetime.date) -> Optional[Dict]:
+ """Berechnet tägliche Zusammenfassung für einen Tag"""
+ date_str = date.strftime('%Y-%m-%d')
+ query = f"""
+ select
+ count(*) as total_trades,
+ sum(price * quantity) as total_volume,
+ exchange,
+ count(*) as exchange_trades
+ from trades
+ where date_trunc('day', timestamp) = '{date_str}'
+ group by exchange
+ """
+
+ data = self.query_questdb(query)
+ if not data or not data.get('dataset'):
+ return None
+
+ total_trades = 0
+ total_volume = 0.0
+ exchanges = {}
+
+ for row in data['dataset']:
+ exchange = row[2]
+ trades = row[3] if row[3] else 0
+ volume = row[1] if row[1] else 0.0
+
+ total_trades += trades
+ total_volume += volume
+ exchanges[exchange] = {'trades': trades, 'volume': volume}
+
+ return {
+ 'date': date,
+ 'total_trades': total_trades,
+ 'total_volume': total_volume,
+ 'exchanges': json.dumps(exchanges)
+ }
+
+ def calculate_exchange_daily(self, date: datetime.date) -> List[Dict]:
+ """Berechnet tägliche Exchange-Statistiken mit Moving Averages"""
+ date_str = date.strftime('%Y-%m-%d')
+
+ # Hole Daten für diesen Tag
query = f"""
select
- date_trunc('day', timestamp) as date,
exchange,
count(*) as trade_count,
sum(price * quantity) as volume
from trades
- where timestamp >= '{start_date.strftime('%Y-%m-%d')}'
- group by date, exchange
- order by date asc, exchange asc
+ where date_trunc('day', timestamp) = '{date_str}'
+ group by exchange
"""
- try:
- response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
- if response.status_code == 200:
- data = response.json()
- columns = data.get('columns', [])
- dataset = data.get('dataset', [])
+ data = self.query_questdb(query)
+ if not data or not data.get('dataset'):
+ return []
+
+ results = []
+ for row in data['dataset']:
+ exchange = row[0]
+ trade_count = row[1] if row[1] else 0
+ volume = row[2] if row[2] else 0.0
+
+ # Berechne Moving Averages für alle Zeiträume
+ ma_values = {}
+ for period in TIME_PERIODS:
+ # Hole Daten der letzten N Tage inklusive heute
+ end_date = date
+ start_date = end_date - datetime.timedelta(days=period-1)
- results = []
- for row in dataset:
- result = {}
- for i, col in enumerate(columns):
- result[col['name']] = row[i]
- results.append(result)
+ ma_query = f"""
+ select
+ count(*) as ma_count,
+ sum(price * quantity) as ma_volume
+ from trades
+ where exchange = '{exchange}'
+ and date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}'
+ and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}'
+ """
- # Berechne Moving Averages für alle Zeiträume
- df = pd.DataFrame(results)
- if df.empty:
- return []
-
- # Pivot für einfachere MA-Berechnung
- df['date'] = pd.to_datetime(df['date'])
- df = df.sort_values(['date', 'exchange'])
-
- # Berechne MA für jeden Zeitraum
- for period in TIME_PERIODS:
- df[f'ma{period}_count'] = df.groupby('exchange')['trade_count'].transform(
- lambda x: x.rolling(window=period, min_periods=1).mean()
- )
- df[f'ma{period}_volume'] = df.groupby('exchange')['volume'].transform(
- lambda x: x.rolling(window=period, min_periods=1).mean()
- )
-
- # Konvertiere zurück zu Dict-Liste
- return df.to_dict('records')
- except Exception as e:
- logger.error(f"Error calculating exchange daily aggregations: {e}")
- return []
+ ma_data = self.query_questdb(ma_query)
+ if ma_data and ma_data.get('dataset') and ma_data['dataset'][0]:
+ ma_values[f'ma{period}_count'] = ma_data['dataset'][0][0] if ma_data['dataset'][0][0] else 0
+ ma_values[f'ma{period}_volume'] = ma_data['dataset'][0][1] if ma_data['dataset'][0][1] else 0.0
+ else:
+ ma_values[f'ma{period}_count'] = 0
+ ma_values[f'ma{period}_volume'] = 0.0
+
+ results.append({
+ 'date': date,
+ 'exchange': exchange,
+ 'trade_count': trade_count,
+ 'volume': volume,
+ **ma_values
+ })
+
+ return results
- def calculate_stock_trends(self, days: int = 365) -> List[Dict]:
- """Berechnet Trenddaten je ISIN mit Änderungsprozenten"""
- end_date = datetime.datetime.now(datetime.timezone.utc)
- start_date = end_date - datetime.timedelta(days=days)
+ def calculate_stock_trends(self, date: datetime.date) -> List[Dict]:
+ """Berechnet Stock-Trends für alle Zeiträume für einen Tag"""
+ results = []
- # Aktuelle Periode
- query_current = f"""
- select
- date_trunc('day', timestamp) as date,
- isin,
- count(*) as trade_count,
- sum(price * quantity) as volume
- from trades
- where timestamp >= '{start_date.strftime('%Y-%m-%d')}'
- group by date, isin
- order by date asc, isin asc
- """
+ for period in TIME_PERIODS:
+ end_date = date
+ start_date = end_date - datetime.timedelta(days=period-1)
+
+ # Aktuelle Periode
+ query = f"""
+ select
+ isin,
+ count(*) as trade_count,
+ sum(price * quantity) as volume
+ from trades
+ where date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}'
+ and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}'
+ group by isin
+ """
+
+ data = self.query_questdb(query)
+ if not data or not data.get('dataset'):
+ continue
+
+ for row in data['dataset']:
+ isin = row[0]
+ current_count = row[1] if row[1] else 0
+ current_volume = row[2] if row[2] else 0.0
+
+ # Vorherige Periode für Vergleich
+ prev_start = start_date - datetime.timedelta(days=period)
+ prev_end = start_date - datetime.timedelta(days=1)
+
+ prev_query = f"""
+ select
+ count(*) as trade_count,
+ sum(price * quantity) as volume
+ from trades
+ where isin = '{isin}'
+ and date_trunc('day', timestamp) >= '{prev_start.strftime('%Y-%m-%d')}'
+ and date_trunc('day', timestamp) <= '{prev_end.strftime('%Y-%m-%d')}'
+ """
+
+ prev_data = self.query_questdb(prev_query)
+ prev_count = 0
+ prev_volume = 0.0
+
+ if prev_data and prev_data.get('dataset') and prev_data['dataset'][0]:
+ prev_count = prev_data['dataset'][0][0] if prev_data['dataset'][0][0] else 0
+ prev_volume = prev_data['dataset'][0][1] if prev_data['dataset'][0][1] else 0.0
+
+ # Berechne Änderungen
+ count_change_pct = ((current_count - prev_count) / prev_count * 100) if prev_count > 0 else 0
+ volume_change_pct = ((current_volume - prev_volume) / prev_volume * 100) if prev_volume > 0 else 0
+
+ results.append({
+ 'date': date,
+ 'period_days': period,
+ 'isin': isin,
+ 'trade_count': current_count,
+ 'volume': current_volume,
+ 'count_change_pct': count_change_pct,
+ 'volume_change_pct': volume_change_pct
+ })
- try:
- response = requests.get(f"{self.db_url}/exec", params={'query': query_current}, auth=DB_AUTH)
- if response.status_code == 200:
- data = response.json()
- columns = data.get('columns', [])
- dataset = data.get('dataset', [])
-
- results = []
- for row in dataset:
- result = {}
- for i, col in enumerate(columns):
- result[col['name']] = row[i]
- results.append(result)
-
- if not results:
- return []
-
- df = pd.DataFrame(results)
- df['date'] = pd.to_datetime(df['date'])
-
- # Aggregiere je ISIN über den gesamten Zeitraum
- df_agg = df.groupby('isin').agg({
- 'trade_count': 'sum',
- 'volume': 'sum'
- }).reset_index()
-
- # Berechne Änderungen: Vergleich mit vorheriger Periode
- # Für jede ISIN: aktueller Zeitraum vs. vorheriger Zeitraum
- trends = []
- for isin in df_agg['isin'].unique():
- isin_data = df[df['isin'] == isin].sort_values('date')
-
- # Teile in zwei Hälften für Vergleich
- mid_point = len(isin_data) // 2
- if mid_point > 0:
- first_half = isin_data.iloc[:mid_point]
- second_half = isin_data.iloc[mid_point:]
-
- first_count = first_half['trade_count'].sum()
- first_volume = first_half['volume'].sum()
- second_count = second_half['trade_count'].sum()
- second_volume = second_half['volume'].sum()
-
- count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0
- volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0
- else:
- count_change = 0
- volume_change = 0
-
- total_count = isin_data['trade_count'].sum()
- total_volume = isin_data['volume'].sum()
-
- trends.append({
- 'isin': isin,
- 'date': isin_data['date'].max(),
- 'trade_count': total_count,
- 'volume': total_volume,
- 'count_change_pct': count_change,
- 'volume_change_pct': volume_change
- })
-
- return trends
- except Exception as e:
- logger.error(f"Error calculating stock trends: {e}")
- return []
+ return results
- def calculate_volume_changes(self, days: int = 365) -> List[Dict]:
- """Berechnet Volumen- und Anzahl-Änderungen je Exchange"""
- end_date = datetime.datetime.now(datetime.timezone.utc)
- start_date = end_date - datetime.timedelta(days=days)
+ def calculate_volume_changes(self, date: datetime.date) -> List[Dict]:
+ """Berechnet Volumen-Änderungen für alle Zeiträume für einen Tag"""
+ results = []
- query = f"""
- select
- date_trunc('day', timestamp) as date,
- exchange,
- count(*) as trade_count,
- sum(price * quantity) as volume
- from trades
- where timestamp >= '{start_date.strftime('%Y-%m-%d')}'
- group by date, exchange
- order by date asc, exchange asc
- """
+ for period in TIME_PERIODS:
+ end_date = date
+ start_date = end_date - datetime.timedelta(days=period-1)
+
+ # Hole alle Exchanges
+ exchanges_query = "select distinct exchange from trades"
+ exchanges_data = self.query_questdb(exchanges_query)
+ if not exchanges_data or not exchanges_data.get('dataset'):
+ continue
+
+ for exchange_row in exchanges_data['dataset']:
+ exchange = exchange_row[0]
+
+ # Aktuelle Periode
+ query = f"""
+ select
+ count(*) as trade_count,
+ sum(price * quantity) as volume
+ from trades
+ where exchange = '{exchange}'
+ and date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}'
+ and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}'
+ """
+
+ data = self.query_questdb(query)
+ if not data or not data.get('dataset') or not data['dataset'][0]:
+ continue
+
+ current_count = data['dataset'][0][0] if data['dataset'][0][0] else 0
+ current_volume = data['dataset'][0][1] if data['dataset'][0][1] else 0.0
+
+ # Vorherige Periode
+ prev_start = start_date - datetime.timedelta(days=period)
+ prev_end = start_date - datetime.timedelta(days=1)
+
+ prev_query = f"""
+ select
+ count(*) as trade_count,
+ sum(price * quantity) as volume
+ from trades
+ where exchange = '{exchange}'
+ and date_trunc('day', timestamp) >= '{prev_start.strftime('%Y-%m-%d')}'
+ and date_trunc('day', timestamp) <= '{prev_end.strftime('%Y-%m-%d')}'
+ """
+
+ prev_data = self.query_questdb(prev_query)
+ prev_count = 0
+ prev_volume = 0.0
+
+ if prev_data and prev_data.get('dataset') and prev_data['dataset'][0]:
+ prev_count = prev_data['dataset'][0][0] if prev_data['dataset'][0][0] else 0
+ prev_volume = prev_data['dataset'][0][1] if prev_data['dataset'][0][1] else 0.0
+
+ # Berechne Änderungen
+ count_change_pct = ((current_count - prev_count) / prev_count * 100) if prev_count > 0 else 0
+ volume_change_pct = ((current_volume - prev_volume) / prev_volume * 100) if prev_volume > 0 else 0
+
+ # Bestimme Trend
+ if count_change_pct > 5 and volume_change_pct > 5:
+ trend = "mehr_trades_mehr_volumen"
+ elif count_change_pct > 5 and volume_change_pct < -5:
+ trend = "mehr_trades_weniger_volumen"
+ elif count_change_pct < -5 and volume_change_pct > 5:
+ trend = "weniger_trades_mehr_volumen"
+ elif count_change_pct < -5 and volume_change_pct < -5:
+ trend = "weniger_trades_weniger_volumen"
+ else:
+ trend = "stabil"
+
+ results.append({
+ 'date': date,
+ 'period_days': period,
+ 'exchange': exchange,
+ 'trade_count': current_count,
+ 'volume': current_volume,
+ 'count_change_pct': count_change_pct,
+ 'volume_change_pct': volume_change_pct,
+ 'trend': trend
+ })
- try:
- response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
- if response.status_code == 200:
- data = response.json()
- columns = data.get('columns', [])
- dataset = data.get('dataset', [])
-
- results = []
- for row in dataset:
- result = {}
- for i, col in enumerate(columns):
- result[col['name']] = row[i]
- results.append(result)
-
- if not results:
- return []
-
- df = pd.DataFrame(results)
- df['date'] = pd.to_datetime(df['date'])
- df = df.sort_values(['date', 'exchange'])
-
- # Berechne Änderungen je Exchange
- changes = []
- for exchange in df['exchange'].unique():
- exchange_data = df[df['exchange'] == exchange].sort_values('date')
-
- # Teile in zwei Hälften
- mid_point = len(exchange_data) // 2
- if mid_point > 0:
- first_half = exchange_data.iloc[:mid_point]
- second_half = exchange_data.iloc[mid_point:]
-
- first_count = first_half['trade_count'].sum()
- first_volume = first_half['volume'].sum()
- second_count = second_half['trade_count'].sum()
- second_volume = second_half['volume'].sum()
-
- count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0
- volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0
-
- # Bestimme Trend
- if count_change > 5 and volume_change > 5:
- trend = "mehr_trades_mehr_volumen"
- elif count_change > 5 and volume_change < -5:
- trend = "mehr_trades_weniger_volumen"
- elif count_change < -5 and volume_change > 5:
- trend = "weniger_trades_mehr_volumen"
- elif count_change < -5 and volume_change < -5:
- trend = "weniger_trades_weniger_volumen"
- else:
- trend = "stabil"
- else:
- count_change = 0
- volume_change = 0
- trend = "stabil"
-
- total_count = exchange_data['trade_count'].sum()
- total_volume = exchange_data['volume'].sum()
-
- changes.append({
- 'date': exchange_data['date'].max(),
- 'exchange': exchange,
- 'trade_count': total_count,
- 'volume': total_volume,
- 'count_change_pct': count_change,
- 'volume_change_pct': volume_change,
- 'trend': trend
- })
-
- return changes
- except Exception as e:
- logger.error(f"Error calculating volume changes: {e}")
- return []
+ return results
def save_analytics_data(self, table_name: str, data: List[Dict]):
"""Speichert aggregierte Daten in QuestDB via ILP"""
@@ -326,10 +362,10 @@ class AnalyticsWorker:
try:
# Konvertiere Datum zu Timestamp
if 'date' in row:
- if isinstance(row['date'], str):
+ if isinstance(row['date'], datetime.date):
+ dt = datetime.datetime.combine(row['date'], datetime.time.min).replace(tzinfo=datetime.timezone.utc)
+ elif isinstance(row['date'], str):
dt = datetime.datetime.fromisoformat(row['date'].replace('Z', '+00:00'))
- elif isinstance(row['date'], pd.Timestamp):
- dt = row['date'].to_pydatetime()
else:
dt = row['date']
timestamp_ns = int(dt.timestamp() * 1e9)
@@ -350,6 +386,10 @@ class AnalyticsWorker:
isin = str(row['isin']).replace(' ', '\\ ').replace(',', '\\,')
tags.append(f"isin={isin}")
+ # Period als Tag
+ if 'period_days' in row and row['period_days']:
+ tags.append(f"period_days={row['period_days']}")
+
# Trend als Tag
if 'trend' in row and row['trend']:
trend = str(row['trend']).replace(' ', '\\ ').replace(',', '\\,')
@@ -357,16 +397,19 @@ class AnalyticsWorker:
# Numerische Felder
for key, value in row.items():
- if key in ['date', 'exchange', 'isin', 'trend']:
+ if key in ['date', 'exchange', 'isin', 'trend', 'period_days', 'exchanges']:
continue
if value is not None:
if isinstance(value, (int, float)):
fields.append(f"{key}={value}")
elif isinstance(value, str):
- # String-Felder in Anführungszeichen
escaped = value.replace('"', '\\"').replace(' ', '\\ ')
fields.append(f'{key}="{escaped}"')
+ # Exchanges als JSON-Feld
+ if 'exchanges' in row and row['exchanges']:
+ fields.append(f'exchanges="{row["exchanges"]}"')
+
if tags and fields:
line = f"{table_name},{','.join(tags)} {','.join(fields)} {timestamp_ns}"
lines.append(line)
@@ -393,37 +436,46 @@ class AnalyticsWorker:
except Exception as e:
logger.error(f"Error connecting to QuestDB: {e}")
- def process_all_analytics(self):
- """Verarbeitet alle Analytics für alle Zeiträume"""
- logger.info("Starting analytics processing...")
+ def process_date(self, date: datetime.date):
+ """Verarbeitet alle Analytics für einen bestimmten Tag"""
+ logger.info(f"Processing analytics for {date}")
- # 1. Exchange Daily Aggregations (für alle Zeiträume)
- logger.info("Calculating exchange daily aggregations...")
- exchange_data = self.calculate_exchange_daily_aggregations(days_back=365)
+ # 1. Daily Summary
+ summary = self.calculate_daily_summary(date)
+ if summary:
+ self.save_analytics_data('analytics_daily_summary', [summary])
+
+ # 2. Exchange Daily
+ exchange_data = self.calculate_exchange_daily(date)
if exchange_data:
self.save_analytics_data('analytics_exchange_daily', exchange_data)
- # 2. Stock Trends (für alle Zeiträume)
- logger.info("Calculating stock trends...")
- for days in TIME_PERIODS:
- trends = self.calculate_stock_trends(days=days)
- if trends:
- # Füge Zeitraum als Tag hinzu
- for trend in trends:
- trend['period_days'] = days
- self.save_analytics_data('analytics_stock_trends', trends)
+ # 3. Stock Trends
+ stock_trends = self.calculate_stock_trends(date)
+ if stock_trends:
+ self.save_analytics_data('analytics_stock_trends', stock_trends)
- # 3. Volume Changes (für alle Zeiträume)
- logger.info("Calculating volume changes...")
- for days in TIME_PERIODS:
- changes = self.calculate_volume_changes(days=days)
- if changes:
- # Füge Zeitraum als Tag hinzu
- for change in changes:
- change['period_days'] = days
- self.save_analytics_data('analytics_volume_changes', changes)
+ # 4. Volume Changes
+ volume_changes = self.calculate_volume_changes(date)
+ if volume_changes:
+ self.save_analytics_data('analytics_volume_changes', volume_changes)
- logger.info("Analytics processing completed.")
+ logger.info(f"Completed processing for {date}")
+
+ def process_missing_dates(self):
+ """Berechnet alle fehlenden Tage"""
+ missing_dates = self.get_missing_dates()
+ if not missing_dates:
+ logger.info("No missing dates to process")
+ return
+
+ logger.info(f"Processing {len(missing_dates)} missing dates...")
+ for i, date in enumerate(missing_dates, 1):
+ logger.info(f"Processing date {i}/{len(missing_dates)}: {date}")
+ self.process_date(date)
+ # Kleine Pause zwischen den Berechnungen
+ if i % 10 == 0:
+ time.sleep(1)
def run(self):
"""Hauptschleife des Workers"""
@@ -434,32 +486,30 @@ class AnalyticsWorker:
logger.error("Failed to connect to QuestDB. Exiting.")
return
- # Initiale Verarbeitung
- self.process_all_analytics()
- self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc)
+ # Initiale Berechnung fehlender Tage
+ logger.info("Checking for missing dates...")
+ self.process_missing_dates()
- # Polling-Schleife
+ # Hauptschleife: Warte auf Mitternacht
+ logger.info("Waiting for midnight to process yesterday's data...")
while True:
- try:
- # Prüfe auf neue Trades
- last_ts = self.get_last_processed_timestamp()
- new_trades = self.get_new_trades(since=last_ts)
-
- if new_trades:
- logger.info(f"Found {len(new_trades)} new trades, reprocessing analytics...")
- self.process_all_analytics()
- self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc)
- else:
- logger.debug("No new trades found.")
-
- # Warte 30 Sekunden vor nächster Prüfung
- time.sleep(30)
- except requests.exceptions.ConnectionError as e:
- logger.warning(f"Connection error to QuestDB, retrying in 60s: {e}")
- time.sleep(60) # Längere Pause bei Verbindungsfehler
- except Exception as e:
- logger.error(f"Error in worker loop: {e}", exc_info=True)
- time.sleep(60) # Längere Pause bei Fehler
+ now = datetime.datetime.now()
+
+ # Prüfe ob es Mitternacht ist (00:00)
+ if now.hour == 0 and now.minute == 0:
+ yesterday = (now - datetime.timedelta(days=1)).date()
+ logger.info(f"Processing yesterday's data: {yesterday}")
+ self.process_date(yesterday)
+ # Warte 61s, um Mehrfachausführung zu verhindern
+ time.sleep(61)
+
+ # Prüfe auch auf fehlende Tage (alle 6 Stunden)
+ if now.hour % 6 == 0 and now.minute == 0:
+ logger.info("Checking for missing dates...")
+ self.process_missing_dates()
+ time.sleep(61)
+
+ time.sleep(30)
def main():
worker = AnalyticsWorker()
diff --git a/systemd/analytics-worker.service b/systemd/analytics-worker.service
deleted file mode 100644
index 9f1f4d0..0000000
--- a/systemd/analytics-worker.service
+++ /dev/null
@@ -1,18 +0,0 @@
-[Unit]
-Description=Trading Analytics Worker
-After=network.target questdb.service
-
-[Service]
-Type=simple
-User=melchiorreimers
-WorkingDirectory=/Users/melchiorreimers/Documents/trading_daemon
-Environment="PYTHONUNBUFFERED=1"
-Environment="DB_USER=admin"
-Environment="DB_PASSWORD=quest"
-Environment="DB_HOST=localhost"
-ExecStart=/usr/bin/python3 -m src.analytics.worker
-Restart=always
-RestartSec=10
-
-[Install]
-WantedBy=multi-user.target