updated
All checks were successful
Deployment / deploy-docker (push) Successful in 16s

This commit is contained in:
Melchior Reimers
2026-01-25 18:02:20 +01:00
parent 64ffd9aa32
commit b4b96b96dc
4 changed files with 549 additions and 1807 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +1,10 @@
from fastapi import FastAPI, HTTPException, Depends from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
import requests import requests
import os import os
import pandas as pd from typing import Optional, Dict, Any
app = FastAPI(title="Trading Dashboard API") app = FastAPI(title="Trading Dashboard API")
@@ -22,97 +22,210 @@ app.mount("/static", StaticFiles(directory="dashboard/public"), name="static")
async def read_index(): async def read_index():
return FileResponse('dashboard/public/index.html') return FileResponse('dashboard/public/index.html')
# QuestDB Konfiguration
DB_USER = os.getenv("DB_USER", "admin") DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
DB_HOST = os.getenv("DB_HOST", "questdb") DB_HOST = os.getenv("DB_HOST", "questdb")
DB_URL = f"http://{DB_HOST}:9000"
@app.get("/api/trades") # Hilfsfunktionen
async def get_trades(isin: str = None, days: int = 7, limit: int = 1000): def query_questdb(query: str, timeout: int = 10) -> Optional[Dict[str, Any]]:
""" """Zentrale QuestDB-Abfrage-Funktion"""
Gibt Trades zurück. Standardmäßig limitiert auf 1000 für Performance.
Für Dashboard-Übersicht werden nur die neuesten Trades benötigt.
"""
query = f"select * from trades where timestamp > dateadd('d', -{days}, now())"
if isin:
query += f" and isin = '{isin}'"
query += f" order by timestamp desc limit {limit}"
try: try:
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) response = requests.get(f"{DB_URL}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout)
if response.status_code == 200: if response.status_code == 200:
return response.json() return response.json()
throw_http_error(response) else:
except Exception as e: raise HTTPException(status_code=response.status_code, detail=f"QuestDB error: {response.text}")
raise HTTPException(status_code=500, detail=str(e)) except requests.exceptions.Timeout:
raise HTTPException(status_code=504, detail="QuestDB query timeout")
except requests.exceptions.RequestException as e:
raise HTTPException(status_code=500, detail=f"QuestDB connection error: {str(e)}")
def format_questdb_response(data: Dict[str, Any]) -> Dict[str, Any]:
"""Einheitliche Formatierung der QuestDB-Antworten"""
if not data:
return {'columns': [], 'dataset': []}
return data
# API Endpunkte
@app.get("/api/trades")
async def get_trades(isin: str = None, days: int = 7):
"""
Gibt aggregierte Analyse aller Trades zurück (nicht einzelne Trades).
Nutzt vorberechnete Daten aus analytics_exchange_daily.
"""
if isin:
# Für spezifische ISIN: hole aus trades Tabelle
query = f"""
select
date_trunc('day', timestamp) as date,
count(*) as trade_count,
sum(price * quantity) as volume,
avg(price) as avg_price
from trades
where isin = '{isin}'
and timestamp > dateadd('d', -{days}, now())
group by date
order by date desc
"""
else:
# Aggregierte Daten aus analytics_exchange_daily
query = f"""
select
timestamp as date,
exchange,
trade_count,
volume
from analytics_exchange_daily
where timestamp >= dateadd('d', -{days}, now())
order by date desc, exchange asc
"""
data = query_questdb(query)
return format_questdb_response(data)
@app.get("/api/metadata") @app.get("/api/metadata")
async def get_metadata(): async def get_metadata():
"""Gibt alle Metadata zurück"""
query = "select * from metadata" query = "select * from metadata"
try: data = query_questdb(query)
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) return format_questdb_response(data)
if response.status_code == 200:
return response.json()
throw_http_error(response)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/summary") @app.get("/api/summary")
async def get_summary(days: int = 7): async def get_summary():
""" """
Gibt Zusammenfassung zurück. Optimiert für schnelle Abfrage. Gibt Zusammenfassung zurück. Nutzt analytics_daily_summary für total_trades (alle Trades).
Falls vorberechnete Daten verfügbar sind, verwende diese. """
# Hole Gesamtzahl aller Trades aus analytics_daily_summary
query = """
select
sum(total_trades) as total_trades,
sum(total_volume) as total_volume
from analytics_daily_summary
""" """
# Versuche zuerst, aus analytics_exchange_daily zu aggregieren (schneller)
# Falls das nicht funktioniert, falle auf die ursprüngliche Query zurück
try:
# Aggregiere aus analytics_exchange_daily für die letzten N Tage
# Dies ist schneller als eine JOIN-Query auf alle Trades
query = f"""
select
'All' as continent,
sum(trade_count) as trade_count,
sum(volume) as total_volume
from analytics_exchange_daily
where timestamp >= dateadd('d', -{days}, now())
"""
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5)
if response.status_code == 200:
data = response.json()
# Wenn Daten vorhanden, verwende diese
if data.get('dataset') and len(data['dataset']) > 0:
# Formatiere für Kompatibilität mit dem Frontend
result = {
'columns': [
{'name': 'continent'},
{'name': 'trade_count'},
{'name': 'total_volume'}
],
'dataset': [[row[0], row[1], row[2]] for row in data['dataset']]
}
return result
except Exception:
# Fallback auf ursprüngliche Query
pass
# Fallback: Original Query mit Limit für Performance data = query_questdb(query)
query = f""" if data and data.get('dataset') and data['dataset']:
total_trades = data['dataset'][0][0] if data['dataset'][0][0] else 0
total_volume = data['dataset'][0][1] if data['dataset'][0][1] else 0.0
# Formatiere für Kompatibilität
return {
'columns': [
{'name': 'continent'},
{'name': 'trade_count'},
{'name': 'total_volume'}
],
'dataset': [['All', total_trades, total_volume]]
}
# Fallback: Original Query
query = """
select select
coalesce(m.continent, 'Unknown') as continent, coalesce(m.continent, 'Unknown') as continent,
count(*) as trade_count, count(*) as trade_count,
sum(t.price * t.quantity) as total_volume sum(t.price * t.quantity) as total_volume
from trades t from trades t
left join metadata m on t.isin = m.isin left join metadata m on t.isin = m.isin
where t.timestamp > dateadd('d', -{days}, now())
group by continent group by continent
""" """
try: data = query_questdb(query)
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=10) return format_questdb_response(data)
if response.status_code == 200:
return response.json() @app.get("/api/statistics/total-trades")
throw_http_error(response) async def get_total_trades():
except Exception as e: """Gibt Gesamtzahl aller Trades zurück (aus analytics_daily_summary)"""
raise HTTPException(status_code=500, detail=str(e)) query = "select sum(total_trades) as total from analytics_daily_summary"
data = query_questdb(query)
if data and data.get('dataset') and data['dataset']:
total = data['dataset'][0][0] if data['dataset'][0][0] else 0
return {'total_trades': total}
return {'total_trades': 0}
@app.get("/api/statistics/moving-average")
async def get_moving_average(days: int = 7, exchange: str = None):
"""
Gibt Moving Average Daten für Tradezahlen und Volumen je Exchange zurück.
Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
"""
if days not in [7, 30, 42, 69, 180, 365]:
raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
query = f"""
select
timestamp as date,
exchange,
trade_count,
volume,
ma{days}_count as ma_count,
ma{days}_volume as ma_volume
from analytics_exchange_daily
where timestamp >= dateadd('d', -{days}, now())
"""
if exchange:
query += f" and exchange = '{exchange}'"
query += " order by date asc, exchange asc"
data = query_questdb(query, timeout=5)
return format_questdb_response(data)
@app.get("/api/statistics/volume-changes")
async def get_volume_changes(days: int = 7):
"""
Gibt Änderungen in Volumen und Anzahl je Exchange zurück.
Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
"""
if days not in [7, 30, 42, 69, 180, 365]:
raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
query = f"""
select
timestamp as date,
exchange,
trade_count,
volume,
count_change_pct,
volume_change_pct,
trend
from analytics_volume_changes
where period_days = {days}
and timestamp >= dateadd('d', -{days}, now())
order by date desc, exchange asc
"""
data = query_questdb(query, timeout=5)
return format_questdb_response(data)
@app.get("/api/statistics/stock-trends")
async def get_stock_trends(days: int = 7, limit: int = 20):
"""
Gibt Trendanalyse für häufig gehandelte Aktien zurück.
Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
"""
if days not in [7, 30, 42, 69, 180, 365]:
raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
query = f"""
select
timestamp as date,
isin,
trade_count,
volume,
count_change_pct,
volume_change_pct
from analytics_stock_trends
where period_days = {days}
and timestamp >= dateadd('d', -{days}, now())
order by volume desc
limit {limit}
"""
data = query_questdb(query, timeout=5)
return format_questdb_response(data)
@app.get("/api/analytics") @app.get("/api/analytics")
async def get_analytics( async def get_analytics(
@@ -124,8 +237,7 @@ async def get_analytics(
isins: str = None, isins: str = None,
continents: str = None continents: str = None
): ):
# Determine if we need to join metadata """Analytics Endpunkt für Report Builder"""
# Determine if we need to join metadata
composite_keys = ["exchange_continent", "exchange_sector"] composite_keys = ["exchange_continent", "exchange_sector"]
needs_metadata = any([ needs_metadata = any([
group_by in ["name", "continent", "sector"] + composite_keys, group_by in ["name", "continent", "sector"] + composite_keys,
@@ -133,7 +245,6 @@ async def get_analytics(
continents is not None continents is not None
]) ])
# Use prefixes only if joining
t_prefix = "t." if needs_metadata else "" t_prefix = "t." if needs_metadata else ""
m_prefix = "m." if needs_metadata else "" m_prefix = "m." if needs_metadata else ""
@@ -191,129 +302,15 @@ async def get_analytics(
query += " order by label asc" query += " order by label asc"
try: data = query_questdb(query)
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) return format_questdb_response(data)
if response.status_code == 200:
return response.json()
throw_http_error(response)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/metadata/search") @app.get("/api/metadata/search")
async def search_metadata(q: str): async def search_metadata(q: str):
# Case-insensitive search for ISIN or Name """Case-insensitive search for ISIN or Name"""
query = f"select isin, name from metadata where isin ilike '%{q}%' or name ilike '%{q}%' limit 10" query = f"select isin, name from metadata where isin ilike '%{q}%' or name ilike '%{q}%' limit 10"
try: data = query_questdb(query)
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) return format_questdb_response(data)
if response.status_code == 200:
return response.json()
throw_http_error(response)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/statistics/moving-average")
async def get_moving_average(days: int = 7, exchange: str = None):
"""
Gibt Moving Average Daten für Tradezahlen und Volumen je Exchange zurück.
Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
Verwendet vorberechnete Daten aus analytics_exchange_daily für schnelle Antwortzeiten.
"""
if days not in [7, 30, 42, 69, 180, 365]:
raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
# Hole Daten aus der vorberechneten analytics_exchange_daily Tabelle
query = f"""
select
timestamp as date,
exchange,
trade_count,
volume,
ma{days}_count as ma_count,
ma{days}_volume as ma_volume
from analytics_exchange_daily
where timestamp >= dateadd('d', -{days}, now())
"""
if exchange:
query += f" and exchange = '{exchange}'"
query += " order by date asc, exchange asc"
try:
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5)
if response.status_code == 200:
return response.json()
throw_http_error(response)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/statistics/volume-changes")
async def get_volume_changes(days: int = 7):
"""
Gibt Änderungen in Volumen und Anzahl je Exchange zurück.
Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
Verwendet vorberechnete Daten aus analytics_volume_changes für schnelle Antwortzeiten.
"""
if days not in [7, 30, 42, 69, 180, 365]:
raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
query = f"""
select
timestamp as date,
exchange,
trade_count,
volume,
count_change_pct,
volume_change_pct,
trend
from analytics_volume_changes
where period_days = {days}
order by date desc, exchange asc
"""
try:
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5)
if response.status_code == 200:
return response.json()
throw_http_error(response)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/statistics/stock-trends")
async def get_stock_trends(days: int = 7, limit: int = 20):
"""
Gibt Trendanalyse für häufig gehandelte Aktien zurück.
Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage
Verwendet vorberechnete Daten aus analytics_stock_trends für schnelle Antwortzeiten.
"""
if days not in [7, 30, 42, 69, 180, 365]:
raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365")
# Hole Top-Aktien nach Volumen für den Zeitraum
query = f"""
select
timestamp as date,
isin,
trade_count,
volume,
count_change_pct,
volume_change_pct
from analytics_stock_trends
where period_days = {days}
order by volume desc
limit {limit}
"""
try:
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH, timeout=5)
if response.status_code == 200:
return response.json()
throw_http_error(response)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def throw_http_error(res):
raise HTTPException(status_code=res.status_code, detail=f"QuestDB error: {res.text}")
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn

View File

@@ -3,8 +3,9 @@ import logging
import datetime import datetime
import os import os
import requests import requests
from typing import Dict, List, Tuple, Optional from typing import Dict, List, Optional
import pandas as pd import pandas as pd
import json
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -23,9 +24,8 @@ TIME_PERIODS = [7, 30, 42, 69, 180, 365]
class AnalyticsWorker: class AnalyticsWorker:
def __init__(self): def __init__(self):
self.last_processed_timestamp = None
self.db_url = DB_URL self.db_url = DB_URL
def wait_for_questdb(self, max_retries: int = 30, retry_delay: int = 2): def wait_for_questdb(self, max_retries: int = 30, retry_delay: int = 2):
"""Wartet bis QuestDB verfügbar ist""" """Wartet bis QuestDB verfügbar ist"""
logger.info("Waiting for QuestDB to be available...") logger.info("Waiting for QuestDB to be available...")
@@ -40,281 +40,317 @@ class AnalyticsWorker:
time.sleep(retry_delay) time.sleep(retry_delay)
logger.error("QuestDB did not become available after waiting") logger.error("QuestDB did not become available after waiting")
return False return False
def get_last_processed_timestamp(self) -> Optional[datetime.datetime]:
"""Holt den letzten verarbeiteten Timestamp aus der Analytics-Tabelle"""
try:
query = "select max(timestamp) as last_ts from analytics_exchange_daily"
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data.get('dataset') and data['dataset'] and len(data['dataset']) > 0 and data['dataset'][0][0]:
ts_value = data['dataset'][0][0]
if isinstance(ts_value, str):
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
elif isinstance(ts_value, (int, float)):
# QuestDB gibt Timestamps in Mikrosekunden zurück
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
except Exception as e:
logger.debug(f"Could not get last processed timestamp: {e}")
return None
def get_new_trades(self, since: Optional[datetime.datetime] = None) -> List[Dict]: def query_questdb(self, query: str, timeout: int = 30) -> Optional[Dict]:
"""Holt neue Trades seit dem letzten Verarbeitungszeitpunkt""" """Zentrale QuestDB-Abfrage-Funktion"""
if since: try:
since_str = since.strftime('%Y-%m-%d %H:%M:%S') response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout)
query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > '{since_str}' order by timestamp asc" if response.status_code == 200:
return response.json()
else:
logger.error(f"QuestDB query failed: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error querying QuestDB: {e}")
return None
def get_existing_dates(self, table_name: str) -> set:
"""Holt alle bereits berechneten Daten aus einer Analytics-Tabelle"""
query = f"select distinct date_trunc('day', timestamp) as date from {table_name}"
data = self.query_questdb(query)
if not data or not data.get('dataset'):
return set()
dates = set()
for row in data['dataset']:
if row[0]:
if isinstance(row[0], str):
dates.add(datetime.datetime.fromisoformat(row[0].replace('Z', '+00:00')).date())
elif isinstance(row[0], (int, float)):
dates.add(datetime.datetime.fromtimestamp(row[0] / 1000000, tz=datetime.timezone.utc).date())
return dates
def get_missing_dates(self) -> List[datetime.date]:
"""Ermittelt fehlende Tage, die noch berechnet werden müssen"""
# Hole das Datum des ersten Trades
query = "select min(date_trunc('day', timestamp)) as first_date from trades"
data = self.query_questdb(query)
if not data or not data.get('dataset') or not data['dataset'][0][0]:
logger.info("No trades found in database")
return []
first_date_value = data['dataset'][0][0]
if isinstance(first_date_value, str):
first_date = datetime.datetime.fromisoformat(first_date_value.replace('Z', '+00:00')).date()
else: else:
# Erste Ausführung: nur die letzten 7 Tage first_date = datetime.datetime.fromtimestamp(first_date_value / 1000000, tz=datetime.timezone.utc).date()
query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > dateadd('d', -7, now()) order by timestamp asc"
try: # Hole bereits berechnete Daten
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) existing_dates = self.get_existing_dates('analytics_daily_summary')
if response.status_code == 200:
data = response.json() # Generiere alle Tage vom ersten Trade bis gestern
columns = data.get('columns', []) yesterday = datetime.date.today() - datetime.timedelta(days=1)
dataset = data.get('dataset', []) all_dates = []
current = first_date
trades = [] while current <= yesterday:
for row in dataset: all_dates.append(current)
trade = {} current += datetime.timedelta(days=1)
for i, col in enumerate(columns):
trade[col['name']] = row[i] # Finde fehlende Tage
trades.append(trade) missing_dates = [d for d in all_dates if d not in existing_dates]
return trades logger.info(f"Found {len(missing_dates)} missing dates to calculate (from {len(all_dates)} total dates)")
except Exception as e: return sorted(missing_dates)
logger.error(f"Error fetching new trades: {e}")
return []
def calculate_exchange_daily_aggregations(self, days_back: int = 365) -> List[Dict]: def calculate_daily_summary(self, date: datetime.date) -> Optional[Dict]:
"""Berechnet tägliche Aggregationen je Exchange mit Moving Averages""" """Berechnet tägliche Zusammenfassung für einen Tag"""
end_date = datetime.datetime.now(datetime.timezone.utc) date_str = date.strftime('%Y-%m-%d')
start_date = end_date - datetime.timedelta(days=days_back) query = f"""
select
count(*) as total_trades,
sum(price * quantity) as total_volume,
exchange,
count(*) as exchange_trades
from trades
where date_trunc('day', timestamp) = '{date_str}'
group by exchange
"""
data = self.query_questdb(query)
if not data or not data.get('dataset'):
return None
total_trades = 0
total_volume = 0.0
exchanges = {}
for row in data['dataset']:
exchange = row[2]
trades = row[3] if row[3] else 0
volume = row[1] if row[1] else 0.0
total_trades += trades
total_volume += volume
exchanges[exchange] = {'trades': trades, 'volume': volume}
return {
'date': date,
'total_trades': total_trades,
'total_volume': total_volume,
'exchanges': json.dumps(exchanges)
}
def calculate_exchange_daily(self, date: datetime.date) -> List[Dict]:
"""Berechnet tägliche Exchange-Statistiken mit Moving Averages"""
date_str = date.strftime('%Y-%m-%d')
# Hole Daten für diesen Tag
query = f""" query = f"""
select select
date_trunc('day', timestamp) as date,
exchange, exchange,
count(*) as trade_count, count(*) as trade_count,
sum(price * quantity) as volume sum(price * quantity) as volume
from trades from trades
where timestamp >= '{start_date.strftime('%Y-%m-%d')}' where date_trunc('day', timestamp) = '{date_str}'
group by date, exchange group by exchange
order by date asc, exchange asc
""" """
try: data = self.query_questdb(query)
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) if not data or not data.get('dataset'):
if response.status_code == 200: return []
data = response.json()
columns = data.get('columns', []) results = []
dataset = data.get('dataset', []) for row in data['dataset']:
exchange = row[0]
trade_count = row[1] if row[1] else 0
volume = row[2] if row[2] else 0.0
# Berechne Moving Averages für alle Zeiträume
ma_values = {}
for period in TIME_PERIODS:
# Hole Daten der letzten N Tage inklusive heute
end_date = date
start_date = end_date - datetime.timedelta(days=period-1)
results = [] ma_query = f"""
for row in dataset: select
result = {} count(*) as ma_count,
for i, col in enumerate(columns): sum(price * quantity) as ma_volume
result[col['name']] = row[i] from trades
results.append(result) where exchange = '{exchange}'
and date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}'
and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}'
"""
# Berechne Moving Averages für alle Zeiträume ma_data = self.query_questdb(ma_query)
df = pd.DataFrame(results) if ma_data and ma_data.get('dataset') and ma_data['dataset'][0]:
if df.empty: ma_values[f'ma{period}_count'] = ma_data['dataset'][0][0] if ma_data['dataset'][0][0] else 0
return [] ma_values[f'ma{period}_volume'] = ma_data['dataset'][0][1] if ma_data['dataset'][0][1] else 0.0
else:
# Pivot für einfachere MA-Berechnung ma_values[f'ma{period}_count'] = 0
df['date'] = pd.to_datetime(df['date']) ma_values[f'ma{period}_volume'] = 0.0
df = df.sort_values(['date', 'exchange'])
results.append({
# Berechne MA für jeden Zeitraum 'date': date,
for period in TIME_PERIODS: 'exchange': exchange,
df[f'ma{period}_count'] = df.groupby('exchange')['trade_count'].transform( 'trade_count': trade_count,
lambda x: x.rolling(window=period, min_periods=1).mean() 'volume': volume,
) **ma_values
df[f'ma{period}_volume'] = df.groupby('exchange')['volume'].transform( })
lambda x: x.rolling(window=period, min_periods=1).mean()
) return results
# Konvertiere zurück zu Dict-Liste
return df.to_dict('records')
except Exception as e:
logger.error(f"Error calculating exchange daily aggregations: {e}")
return []
def calculate_stock_trends(self, days: int = 365) -> List[Dict]: def calculate_stock_trends(self, date: datetime.date) -> List[Dict]:
"""Berechnet Trenddaten je ISIN mit Änderungsprozenten""" """Berechnet Stock-Trends für alle Zeiträume für einen Tag"""
end_date = datetime.datetime.now(datetime.timezone.utc) results = []
start_date = end_date - datetime.timedelta(days=days)
# Aktuelle Periode for period in TIME_PERIODS:
query_current = f""" end_date = date
select start_date = end_date - datetime.timedelta(days=period-1)
date_trunc('day', timestamp) as date,
isin, # Aktuelle Periode
count(*) as trade_count, query = f"""
sum(price * quantity) as volume select
from trades isin,
where timestamp >= '{start_date.strftime('%Y-%m-%d')}' count(*) as trade_count,
group by date, isin sum(price * quantity) as volume
order by date asc, isin asc from trades
""" where date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}'
and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}'
group by isin
"""
data = self.query_questdb(query)
if not data or not data.get('dataset'):
continue
for row in data['dataset']:
isin = row[0]
current_count = row[1] if row[1] else 0
current_volume = row[2] if row[2] else 0.0
# Vorherige Periode für Vergleich
prev_start = start_date - datetime.timedelta(days=period)
prev_end = start_date - datetime.timedelta(days=1)
prev_query = f"""
select
count(*) as trade_count,
sum(price * quantity) as volume
from trades
where isin = '{isin}'
and date_trunc('day', timestamp) >= '{prev_start.strftime('%Y-%m-%d')}'
and date_trunc('day', timestamp) <= '{prev_end.strftime('%Y-%m-%d')}'
"""
prev_data = self.query_questdb(prev_query)
prev_count = 0
prev_volume = 0.0
if prev_data and prev_data.get('dataset') and prev_data['dataset'][0]:
prev_count = prev_data['dataset'][0][0] if prev_data['dataset'][0][0] else 0
prev_volume = prev_data['dataset'][0][1] if prev_data['dataset'][0][1] else 0.0
# Berechne Änderungen
count_change_pct = ((current_count - prev_count) / prev_count * 100) if prev_count > 0 else 0
volume_change_pct = ((current_volume - prev_volume) / prev_volume * 100) if prev_volume > 0 else 0
results.append({
'date': date,
'period_days': period,
'isin': isin,
'trade_count': current_count,
'volume': current_volume,
'count_change_pct': count_change_pct,
'volume_change_pct': volume_change_pct
})
try: return results
response = requests.get(f"{self.db_url}/exec", params={'query': query_current}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
columns = data.get('columns', [])
dataset = data.get('dataset', [])
results = []
for row in dataset:
result = {}
for i, col in enumerate(columns):
result[col['name']] = row[i]
results.append(result)
if not results:
return []
df = pd.DataFrame(results)
df['date'] = pd.to_datetime(df['date'])
# Aggregiere je ISIN über den gesamten Zeitraum
df_agg = df.groupby('isin').agg({
'trade_count': 'sum',
'volume': 'sum'
}).reset_index()
# Berechne Änderungen: Vergleich mit vorheriger Periode
# Für jede ISIN: aktueller Zeitraum vs. vorheriger Zeitraum
trends = []
for isin in df_agg['isin'].unique():
isin_data = df[df['isin'] == isin].sort_values('date')
# Teile in zwei Hälften für Vergleich
mid_point = len(isin_data) // 2
if mid_point > 0:
first_half = isin_data.iloc[:mid_point]
second_half = isin_data.iloc[mid_point:]
first_count = first_half['trade_count'].sum()
first_volume = first_half['volume'].sum()
second_count = second_half['trade_count'].sum()
second_volume = second_half['volume'].sum()
count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0
volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0
else:
count_change = 0
volume_change = 0
total_count = isin_data['trade_count'].sum()
total_volume = isin_data['volume'].sum()
trends.append({
'isin': isin,
'date': isin_data['date'].max(),
'trade_count': total_count,
'volume': total_volume,
'count_change_pct': count_change,
'volume_change_pct': volume_change
})
return trends
except Exception as e:
logger.error(f"Error calculating stock trends: {e}")
return []
def calculate_volume_changes(self, days: int = 365) -> List[Dict]: def calculate_volume_changes(self, date: datetime.date) -> List[Dict]:
"""Berechnet Volumen- und Anzahl-Änderungen je Exchange""" """Berechnet Volumen-Änderungen für alle Zeiträume für einen Tag"""
end_date = datetime.datetime.now(datetime.timezone.utc) results = []
start_date = end_date - datetime.timedelta(days=days)
query = f""" for period in TIME_PERIODS:
select end_date = date
date_trunc('day', timestamp) as date, start_date = end_date - datetime.timedelta(days=period-1)
exchange,
count(*) as trade_count, # Hole alle Exchanges
sum(price * quantity) as volume exchanges_query = "select distinct exchange from trades"
from trades exchanges_data = self.query_questdb(exchanges_query)
where timestamp >= '{start_date.strftime('%Y-%m-%d')}' if not exchanges_data or not exchanges_data.get('dataset'):
group by date, exchange continue
order by date asc, exchange asc
""" for exchange_row in exchanges_data['dataset']:
exchange = exchange_row[0]
# Aktuelle Periode
query = f"""
select
count(*) as trade_count,
sum(price * quantity) as volume
from trades
where exchange = '{exchange}'
and date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}'
and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}'
"""
data = self.query_questdb(query)
if not data or not data.get('dataset') or not data['dataset'][0]:
continue
current_count = data['dataset'][0][0] if data['dataset'][0][0] else 0
current_volume = data['dataset'][0][1] if data['dataset'][0][1] else 0.0
# Vorherige Periode
prev_start = start_date - datetime.timedelta(days=period)
prev_end = start_date - datetime.timedelta(days=1)
prev_query = f"""
select
count(*) as trade_count,
sum(price * quantity) as volume
from trades
where exchange = '{exchange}'
and date_trunc('day', timestamp) >= '{prev_start.strftime('%Y-%m-%d')}'
and date_trunc('day', timestamp) <= '{prev_end.strftime('%Y-%m-%d')}'
"""
prev_data = self.query_questdb(prev_query)
prev_count = 0
prev_volume = 0.0
if prev_data and prev_data.get('dataset') and prev_data['dataset'][0]:
prev_count = prev_data['dataset'][0][0] if prev_data['dataset'][0][0] else 0
prev_volume = prev_data['dataset'][0][1] if prev_data['dataset'][0][1] else 0.0
# Berechne Änderungen
count_change_pct = ((current_count - prev_count) / prev_count * 100) if prev_count > 0 else 0
volume_change_pct = ((current_volume - prev_volume) / prev_volume * 100) if prev_volume > 0 else 0
# Bestimme Trend
if count_change_pct > 5 and volume_change_pct > 5:
trend = "mehr_trades_mehr_volumen"
elif count_change_pct > 5 and volume_change_pct < -5:
trend = "mehr_trades_weniger_volumen"
elif count_change_pct < -5 and volume_change_pct > 5:
trend = "weniger_trades_mehr_volumen"
elif count_change_pct < -5 and volume_change_pct < -5:
trend = "weniger_trades_weniger_volumen"
else:
trend = "stabil"
results.append({
'date': date,
'period_days': period,
'exchange': exchange,
'trade_count': current_count,
'volume': current_volume,
'count_change_pct': count_change_pct,
'volume_change_pct': volume_change_pct,
'trend': trend
})
try: return results
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
columns = data.get('columns', [])
dataset = data.get('dataset', [])
results = []
for row in dataset:
result = {}
for i, col in enumerate(columns):
result[col['name']] = row[i]
results.append(result)
if not results:
return []
df = pd.DataFrame(results)
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values(['date', 'exchange'])
# Berechne Änderungen je Exchange
changes = []
for exchange in df['exchange'].unique():
exchange_data = df[df['exchange'] == exchange].sort_values('date')
# Teile in zwei Hälften
mid_point = len(exchange_data) // 2
if mid_point > 0:
first_half = exchange_data.iloc[:mid_point]
second_half = exchange_data.iloc[mid_point:]
first_count = first_half['trade_count'].sum()
first_volume = first_half['volume'].sum()
second_count = second_half['trade_count'].sum()
second_volume = second_half['volume'].sum()
count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0
volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0
# Bestimme Trend
if count_change > 5 and volume_change > 5:
trend = "mehr_trades_mehr_volumen"
elif count_change > 5 and volume_change < -5:
trend = "mehr_trades_weniger_volumen"
elif count_change < -5 and volume_change > 5:
trend = "weniger_trades_mehr_volumen"
elif count_change < -5 and volume_change < -5:
trend = "weniger_trades_weniger_volumen"
else:
trend = "stabil"
else:
count_change = 0
volume_change = 0
trend = "stabil"
total_count = exchange_data['trade_count'].sum()
total_volume = exchange_data['volume'].sum()
changes.append({
'date': exchange_data['date'].max(),
'exchange': exchange,
'trade_count': total_count,
'volume': total_volume,
'count_change_pct': count_change,
'volume_change_pct': volume_change,
'trend': trend
})
return changes
except Exception as e:
logger.error(f"Error calculating volume changes: {e}")
return []
def save_analytics_data(self, table_name: str, data: List[Dict]): def save_analytics_data(self, table_name: str, data: List[Dict]):
"""Speichert aggregierte Daten in QuestDB via ILP""" """Speichert aggregierte Daten in QuestDB via ILP"""
@@ -326,10 +362,10 @@ class AnalyticsWorker:
try: try:
# Konvertiere Datum zu Timestamp # Konvertiere Datum zu Timestamp
if 'date' in row: if 'date' in row:
if isinstance(row['date'], str): if isinstance(row['date'], datetime.date):
dt = datetime.datetime.combine(row['date'], datetime.time.min).replace(tzinfo=datetime.timezone.utc)
elif isinstance(row['date'], str):
dt = datetime.datetime.fromisoformat(row['date'].replace('Z', '+00:00')) dt = datetime.datetime.fromisoformat(row['date'].replace('Z', '+00:00'))
elif isinstance(row['date'], pd.Timestamp):
dt = row['date'].to_pydatetime()
else: else:
dt = row['date'] dt = row['date']
timestamp_ns = int(dt.timestamp() * 1e9) timestamp_ns = int(dt.timestamp() * 1e9)
@@ -350,6 +386,10 @@ class AnalyticsWorker:
isin = str(row['isin']).replace(' ', '\\ ').replace(',', '\\,') isin = str(row['isin']).replace(' ', '\\ ').replace(',', '\\,')
tags.append(f"isin={isin}") tags.append(f"isin={isin}")
# Period als Tag
if 'period_days' in row and row['period_days']:
tags.append(f"period_days={row['period_days']}")
# Trend als Tag # Trend als Tag
if 'trend' in row and row['trend']: if 'trend' in row and row['trend']:
trend = str(row['trend']).replace(' ', '\\ ').replace(',', '\\,') trend = str(row['trend']).replace(' ', '\\ ').replace(',', '\\,')
@@ -357,16 +397,19 @@ class AnalyticsWorker:
# Numerische Felder # Numerische Felder
for key, value in row.items(): for key, value in row.items():
if key in ['date', 'exchange', 'isin', 'trend']: if key in ['date', 'exchange', 'isin', 'trend', 'period_days', 'exchanges']:
continue continue
if value is not None: if value is not None:
if isinstance(value, (int, float)): if isinstance(value, (int, float)):
fields.append(f"{key}={value}") fields.append(f"{key}={value}")
elif isinstance(value, str): elif isinstance(value, str):
# String-Felder in Anführungszeichen
escaped = value.replace('"', '\\"').replace(' ', '\\ ') escaped = value.replace('"', '\\"').replace(' ', '\\ ')
fields.append(f'{key}="{escaped}"') fields.append(f'{key}="{escaped}"')
# Exchanges als JSON-Feld
if 'exchanges' in row and row['exchanges']:
fields.append(f'exchanges="{row["exchanges"]}"')
if tags and fields: if tags and fields:
line = f"{table_name},{','.join(tags)} {','.join(fields)} {timestamp_ns}" line = f"{table_name},{','.join(tags)} {','.join(fields)} {timestamp_ns}"
lines.append(line) lines.append(line)
@@ -393,37 +436,46 @@ class AnalyticsWorker:
except Exception as e: except Exception as e:
logger.error(f"Error connecting to QuestDB: {e}") logger.error(f"Error connecting to QuestDB: {e}")
def process_all_analytics(self): def process_date(self, date: datetime.date):
"""Verarbeitet alle Analytics für alle Zeiträume""" """Verarbeitet alle Analytics für einen bestimmten Tag"""
logger.info("Starting analytics processing...") logger.info(f"Processing analytics for {date}")
# 1. Exchange Daily Aggregations (für alle Zeiträume) # 1. Daily Summary
logger.info("Calculating exchange daily aggregations...") summary = self.calculate_daily_summary(date)
exchange_data = self.calculate_exchange_daily_aggregations(days_back=365) if summary:
self.save_analytics_data('analytics_daily_summary', [summary])
# 2. Exchange Daily
exchange_data = self.calculate_exchange_daily(date)
if exchange_data: if exchange_data:
self.save_analytics_data('analytics_exchange_daily', exchange_data) self.save_analytics_data('analytics_exchange_daily', exchange_data)
# 2. Stock Trends (für alle Zeiträume) # 3. Stock Trends
logger.info("Calculating stock trends...") stock_trends = self.calculate_stock_trends(date)
for days in TIME_PERIODS: if stock_trends:
trends = self.calculate_stock_trends(days=days) self.save_analytics_data('analytics_stock_trends', stock_trends)
if trends:
# Füge Zeitraum als Tag hinzu
for trend in trends:
trend['period_days'] = days
self.save_analytics_data('analytics_stock_trends', trends)
# 3. Volume Changes (für alle Zeiträume) # 4. Volume Changes
logger.info("Calculating volume changes...") volume_changes = self.calculate_volume_changes(date)
for days in TIME_PERIODS: if volume_changes:
changes = self.calculate_volume_changes(days=days) self.save_analytics_data('analytics_volume_changes', volume_changes)
if changes:
# Füge Zeitraum als Tag hinzu
for change in changes:
change['period_days'] = days
self.save_analytics_data('analytics_volume_changes', changes)
logger.info("Analytics processing completed.") logger.info(f"Completed processing for {date}")
def process_missing_dates(self):
"""Berechnet alle fehlenden Tage"""
missing_dates = self.get_missing_dates()
if not missing_dates:
logger.info("No missing dates to process")
return
logger.info(f"Processing {len(missing_dates)} missing dates...")
for i, date in enumerate(missing_dates, 1):
logger.info(f"Processing date {i}/{len(missing_dates)}: {date}")
self.process_date(date)
# Kleine Pause zwischen den Berechnungen
if i % 10 == 0:
time.sleep(1)
def run(self): def run(self):
"""Hauptschleife des Workers""" """Hauptschleife des Workers"""
@@ -434,32 +486,30 @@ class AnalyticsWorker:
logger.error("Failed to connect to QuestDB. Exiting.") logger.error("Failed to connect to QuestDB. Exiting.")
return return
# Initiale Verarbeitung # Initiale Berechnung fehlender Tage
self.process_all_analytics() logger.info("Checking for missing dates...")
self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc) self.process_missing_dates()
# Polling-Schleife # Hauptschleife: Warte auf Mitternacht
logger.info("Waiting for midnight to process yesterday's data...")
while True: while True:
try: now = datetime.datetime.now()
# Prüfe auf neue Trades
last_ts = self.get_last_processed_timestamp() # Prüfe ob es Mitternacht ist (00:00)
new_trades = self.get_new_trades(since=last_ts) if now.hour == 0 and now.minute == 0:
yesterday = (now - datetime.timedelta(days=1)).date()
if new_trades: logger.info(f"Processing yesterday's data: {yesterday}")
logger.info(f"Found {len(new_trades)} new trades, reprocessing analytics...") self.process_date(yesterday)
self.process_all_analytics() # Warte 61s, um Mehrfachausführung zu verhindern
self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc) time.sleep(61)
else:
logger.debug("No new trades found.") # Prüfe auch auf fehlende Tage (alle 6 Stunden)
if now.hour % 6 == 0 and now.minute == 0:
# Warte 30 Sekunden vor nächster Prüfung logger.info("Checking for missing dates...")
time.sleep(30) self.process_missing_dates()
except requests.exceptions.ConnectionError as e: time.sleep(61)
logger.warning(f"Connection error to QuestDB, retrying in 60s: {e}")
time.sleep(60) # Längere Pause bei Verbindungsfehler time.sleep(30)
except Exception as e:
logger.error(f"Error in worker loop: {e}", exc_info=True)
time.sleep(60) # Längere Pause bei Fehler
def main(): def main():
worker = AnalyticsWorker() worker = AnalyticsWorker()

View File

@@ -1,18 +0,0 @@
[Unit]
Description=Trading Analytics Worker
After=network.target questdb.service
[Service]
Type=simple
User=melchiorreimers
WorkingDirectory=/Users/melchiorreimers/Documents/trading_daemon
Environment="PYTHONUNBUFFERED=1"
Environment="DB_USER=admin"
Environment="DB_PASSWORD=quest"
Environment="DB_HOST=localhost"
ExecStart=/usr/bin/python3 -m src.analytics.worker
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target