updated
All checks were successful
Deployment / deploy-docker (push) Successful in 17s

This commit is contained in:
Melchior Reimers
2026-01-25 17:36:29 +01:00
parent 4f4d734643
commit 33f5c90fce
8 changed files with 996 additions and 355 deletions

View File

@@ -0,0 +1 @@
# Analytics module for trading data aggregation

View File

@@ -1,168 +1,446 @@
import logging
import requests
import time
from datetime import datetime, timedelta
import logging
import datetime
import os
import requests
from typing import Dict, List, Tuple, Optional
import pandas as pd
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("AnalyticsWorker")
DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
DB_HOST = os.getenv("DB_HOST", "questdb")
DB_URL = f"http://{DB_HOST}:9000"
# Unterstützte Zeiträume
TIME_PERIODS = [7, 30, 42, 69, 180, 365]
class AnalyticsWorker:
def __init__(self, db_host="questdb", db_port=9000, auth=None):
self.db_url = f"http://{db_host}:{db_port}"
self.auth = auth
def execute_query(self, query):
def __init__(self):
self.last_processed_timestamp = None
self.db_url = DB_URL
def get_last_processed_timestamp(self) -> Optional[datetime.datetime]:
"""Holt den letzten verarbeiteten Timestamp aus der Analytics-Tabelle"""
try:
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=self.auth)
query = "select max(timestamp) as last_ts from analytics_exchange_daily"
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
logger.debug(f"Query executed successfully: {query[:50]}...")
return response.json()
else:
logger.error(f"Query failed: {response.text} - Query: {query}")
return None
data = response.json()
if data.get('dataset') and data['dataset'] and len(data['dataset']) > 0 and data['dataset'][0][0]:
ts_value = data['dataset'][0][0]
if isinstance(ts_value, str):
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
elif isinstance(ts_value, (int, float)):
# QuestDB gibt Timestamps in Mikrosekunden zurück
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
except Exception as e:
logger.error(f"DB connection error: {e}")
return None
def initialize_tables(self):
"""Create necessary tables for pre-aggregation if they don't exist"""
logger.debug(f"Could not get last processed timestamp: {e}")
return None
def get_new_trades(self, since: Optional[datetime.datetime] = None) -> List[Dict]:
"""Holt neue Trades seit dem letzten Verarbeitungszeitpunkt"""
if since:
since_str = since.strftime('%Y-%m-%d %H:%M:%S')
query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > '{since_str}' order by timestamp asc"
else:
# Erste Ausführung: nur die letzten 7 Tage
query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > dateadd('d', -7, now()) order by timestamp asc"
# 1. Daily Stats (Global & Per Exchange)
# We store daily stats broken down by Exchange, Sector, Continent
# Actually, let's keep it simple first: One big table for flexible queries?
# Or multiple small tables?
# For performance, pre-aggregating by (Day, Exchange, Sector) is best.
# Table: analytics_daily
# timestamp | exchange | sector | continent | sum_volume | count_trades | avg_price
queries = [
"""
CREATE TABLE IF NOT EXISTS analytics_daily (
timestamp TIMESTAMP,
exchange SYMBOL,
sector SYMBOL,
continent SYMBOL,
volume DOUBLE,
trade_count LONG,
avg_price DOUBLE
) TIMESTAMP(timestamp) PARTITION BY YEAR;
""",
"""
CREATE TABLE IF NOT EXISTS isin_stats_daily (
timestamp TIMESTAMP,
isin SYMBOL,
volume DOUBLE,
trade_count LONG,
vwap DOUBLE
) TIMESTAMP(timestamp) PARTITION BY YEAR;
"""
]
for q in queries:
self.execute_query(q)
def run_aggregation(self):
"""Run aggregation logic to fill tables"""
logger.info("Starting analytics aggregation...")
# 1. Aggregate into analytics_daily
# We perform an INSERT INTO ... SELECT
# We need to manage deduplication or delete/replace. QuestDB append only model
# implies we should be careful.
# Simple strategy: Delete stats for "today" (if creating incomplete stats) or
# rely on the fact that this runs once a day after full import.
# But for 'catch-up' we might process ranges.
# Let's try to aggregate everything that is NOT in analytics_daily.
# Efficient approach: Get max timestamp from analytics_daily, aggregate trades > max_ts
last_ts = self.get_last_aggregated_ts("analytics_daily")
logger.info(f"Last analytics_daily timestamp: {last_ts}")
# QuestDB INSERT INTO ... SELECT
# Grouping by 1d, exchange, sector, continent (requires join)
try:
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
columns = data.get('columns', [])
dataset = data.get('dataset', [])
trades = []
for row in dataset:
trade = {}
for i, col in enumerate(columns):
trade[col['name']] = row[i]
trades.append(trade)
return trades
except Exception as e:
logger.error(f"Error fetching new trades: {e}")
return []
def calculate_exchange_daily_aggregations(self, days_back: int = 365) -> List[Dict]:
"""Berechnet tägliche Aggregationen je Exchange mit Moving Averages"""
end_date = datetime.datetime.now(datetime.timezone.utc)
start_date = end_date - datetime.timedelta(days=days_back)
query = f"""
INSERT INTO analytics_daily
SELECT
timestamp,
select
date_trunc('day', timestamp) as date,
exchange,
coalesce(m.sector, 'Unknown'),
coalesce(m.continent, 'Unknown'),
sum(price * quantity),
count(*),
sum(price * quantity) / sum(quantity)
FROM trades t
LEFT JOIN metadata m ON t.isin = m.isin
WHERE timestamp >= '{last_ts}'::timestamp
SAMPLE BY 1d FILL(none) ALIGN TO CALENDAR
"""
# Note: SAMPLE BY with multipile groups in QuestDB might require attention to syntax or
# iterating. QuestDB's SAMPLE BY creates a time series bucket.
# If we want grouping by other columns, we use GROUP BY, but 'SAMPLE BY' is preferred for time buckets.
# SAMPLE BY 1d, exchange, m.sector, m.continent -- not standard SQL.
# Correct QuestDB approach for multi-dimensional time buckets:
# SAMPLE BY 1d, symbol works if symbol is the designated symbol column?
# No, QuestDB SAMPLE BY groups by time. For other columns we need standard GROUP BY
# combined with time bucketing functions like date_trunc('day', timestamp).
query_daily = f"""
INSERT INTO analytics_daily
SELECT
date_trunc('day', t.timestamp) as timestamp,
t.exchange,
coalesce(m.sector, 'Unknown') as sector,
coalesce(m.continent, 'Unknown') as continent,
sum(t.price * t.quantity) as volume,
count(*) as trade_count,
sum(t.price * t.quantity) / sum(t.quantity) as avg_price
FROM trades t
LEFT JOIN metadata m ON t.isin = m.isin
WHERE t.timestamp > '{last_ts}'::timestamp
GROUP BY
date_trunc('day', t.timestamp),
t.exchange,
coalesce(m.sector, 'Unknown'),
coalesce(m.continent, 'Unknown')
sum(price * quantity) as volume
from trades
where timestamp >= '{start_date.strftime('%Y-%m-%d')}'
group by date, exchange
order by date asc, exchange asc
"""
start_t = time.time()
res = self.execute_query(query_daily)
if res:
logger.info(f"Updated analytics_daily in {time.time()-start_t:.2f}s")
try:
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
columns = data.get('columns', [])
dataset = data.get('dataset', [])
results = []
for row in dataset:
result = {}
for i, col in enumerate(columns):
result[col['name']] = row[i]
results.append(result)
# Berechne Moving Averages für alle Zeiträume
df = pd.DataFrame(results)
if df.empty:
return []
# Pivot für einfachere MA-Berechnung
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values(['date', 'exchange'])
# Berechne MA für jeden Zeitraum
for period in TIME_PERIODS:
df[f'ma{period}_count'] = df.groupby('exchange')['trade_count'].transform(
lambda x: x.rolling(window=period, min_periods=1).mean()
)
df[f'ma{period}_volume'] = df.groupby('exchange')['volume'].transform(
lambda x: x.rolling(window=period, min_periods=1).mean()
)
# Konvertiere zurück zu Dict-Liste
return df.to_dict('records')
except Exception as e:
logger.error(f"Error calculating exchange daily aggregations: {e}")
return []
def calculate_stock_trends(self, days: int = 365) -> List[Dict]:
"""Berechnet Trenddaten je ISIN mit Änderungsprozenten"""
end_date = datetime.datetime.now(datetime.timezone.utc)
start_date = end_date - datetime.timedelta(days=days)
# 2. Aggregate ISIN stats
last_isin_ts = self.get_last_aggregated_ts("isin_stats_daily")
logger.info(f"Last isin_stats_daily timestamp: {last_isin_ts}")
query_isin = f"""
INSERT INTO isin_stats_daily
SELECT
date_trunc('day', timestamp) as timestamp,
# Aktuelle Periode
query_current = f"""
select
date_trunc('day', timestamp) as date,
isin,
sum(price * quantity) as volume,
count(*) as trade_count,
sum(price * quantity) / sum(quantity) as vwap
FROM trades
WHERE timestamp > '{last_isin_ts}'::timestamp
GROUP BY date_trunc('day', timestamp), isin
sum(price * quantity) as volume
from trades
where timestamp >= '{start_date.strftime('%Y-%m-%d')}'
group by date, isin
order by date asc, isin asc
"""
start_t = time.time()
res = self.execute_query(query_isin)
if res:
logger.info(f"Updated isin_stats_daily in {time.time()-start_t:.2f}s")
try:
response = requests.get(f"{self.db_url}/exec", params={'query': query_current}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
columns = data.get('columns', [])
dataset = data.get('dataset', [])
results = []
for row in dataset:
result = {}
for i, col in enumerate(columns):
result[col['name']] = row[i]
results.append(result)
if not results:
return []
df = pd.DataFrame(results)
df['date'] = pd.to_datetime(df['date'])
# Aggregiere je ISIN über den gesamten Zeitraum
df_agg = df.groupby('isin').agg({
'trade_count': 'sum',
'volume': 'sum'
}).reset_index()
# Berechne Änderungen: Vergleich mit vorheriger Periode
# Für jede ISIN: aktueller Zeitraum vs. vorheriger Zeitraum
trends = []
for isin in df_agg['isin'].unique():
isin_data = df[df['isin'] == isin].sort_values('date')
# Teile in zwei Hälften für Vergleich
mid_point = len(isin_data) // 2
if mid_point > 0:
first_half = isin_data.iloc[:mid_point]
second_half = isin_data.iloc[mid_point:]
first_count = first_half['trade_count'].sum()
first_volume = first_half['volume'].sum()
second_count = second_half['trade_count'].sum()
second_volume = second_half['volume'].sum()
count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0
volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0
else:
count_change = 0
volume_change = 0
total_count = isin_data['trade_count'].sum()
total_volume = isin_data['volume'].sum()
trends.append({
'isin': isin,
'date': isin_data['date'].max(),
'trade_count': total_count,
'volume': total_volume,
'count_change_pct': count_change,
'volume_change_pct': volume_change
})
return trends
except Exception as e:
logger.error(f"Error calculating stock trends: {e}")
return []
def calculate_volume_changes(self, days: int = 365) -> List[Dict]:
"""Berechnet Volumen- und Anzahl-Änderungen je Exchange"""
end_date = datetime.datetime.now(datetime.timezone.utc)
start_date = end_date - datetime.timedelta(days=days)
query = f"""
select
date_trunc('day', timestamp) as date,
exchange,
count(*) as trade_count,
sum(price * quantity) as volume
from trades
where timestamp >= '{start_date.strftime('%Y-%m-%d')}'
group by date, exchange
order by date asc, exchange asc
"""
try:
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
columns = data.get('columns', [])
dataset = data.get('dataset', [])
results = []
for row in dataset:
result = {}
for i, col in enumerate(columns):
result[col['name']] = row[i]
results.append(result)
if not results:
return []
df = pd.DataFrame(results)
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values(['date', 'exchange'])
# Berechne Änderungen je Exchange
changes = []
for exchange in df['exchange'].unique():
exchange_data = df[df['exchange'] == exchange].sort_values('date')
# Teile in zwei Hälften
mid_point = len(exchange_data) // 2
if mid_point > 0:
first_half = exchange_data.iloc[:mid_point]
second_half = exchange_data.iloc[mid_point:]
first_count = first_half['trade_count'].sum()
first_volume = first_half['volume'].sum()
second_count = second_half['trade_count'].sum()
second_volume = second_half['volume'].sum()
count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0
volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0
# Bestimme Trend
if count_change > 5 and volume_change > 5:
trend = "mehr_trades_mehr_volumen"
elif count_change > 5 and volume_change < -5:
trend = "mehr_trades_weniger_volumen"
elif count_change < -5 and volume_change > 5:
trend = "weniger_trades_mehr_volumen"
elif count_change < -5 and volume_change < -5:
trend = "weniger_trades_weniger_volumen"
else:
trend = "stabil"
else:
count_change = 0
volume_change = 0
trend = "stabil"
total_count = exchange_data['trade_count'].sum()
total_volume = exchange_data['volume'].sum()
changes.append({
'date': exchange_data['date'].max(),
'exchange': exchange,
'trade_count': total_count,
'volume': total_volume,
'count_change_pct': count_change,
'volume_change_pct': volume_change,
'trend': trend
})
return changes
except Exception as e:
logger.error(f"Error calculating volume changes: {e}")
return []
def save_analytics_data(self, table_name: str, data: List[Dict]):
"""Speichert aggregierte Daten in QuestDB via ILP"""
if not data:
return
lines = []
for row in data:
try:
# Konvertiere Datum zu Timestamp
if 'date' in row:
if isinstance(row['date'], str):
dt = datetime.datetime.fromisoformat(row['date'].replace('Z', '+00:00'))
elif isinstance(row['date'], pd.Timestamp):
dt = row['date'].to_pydatetime()
else:
dt = row['date']
timestamp_ns = int(dt.timestamp() * 1e9)
else:
timestamp_ns = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1e9)
# Baue ILP-Line auf
tags = []
fields = []
# Exchange als Tag
if 'exchange' in row and row['exchange']:
exchange = str(row['exchange']).replace(' ', '\\ ').replace(',', '\\,')
tags.append(f"exchange={exchange}")
# ISIN als Tag
if 'isin' in row and row['isin']:
isin = str(row['isin']).replace(' ', '\\ ').replace(',', '\\,')
tags.append(f"isin={isin}")
# Trend als Tag
if 'trend' in row and row['trend']:
trend = str(row['trend']).replace(' ', '\\ ').replace(',', '\\,')
tags.append(f"trend={trend}")
# Numerische Felder
for key, value in row.items():
if key in ['date', 'exchange', 'isin', 'trend']:
continue
if value is not None:
if isinstance(value, (int, float)):
fields.append(f"{key}={value}")
elif isinstance(value, str):
# String-Felder in Anführungszeichen
escaped = value.replace('"', '\\"').replace(' ', '\\ ')
fields.append(f'{key}="{escaped}"')
if tags and fields:
line = f"{table_name},{','.join(tags)} {','.join(fields)} {timestamp_ns}"
lines.append(line)
except Exception as e:
logger.error(f"Error formatting row for {table_name}: {e}, row: {row}")
continue
if not lines:
return
payload = "\n".join(lines) + "\n"
try:
response = requests.post(
f"{self.db_url}/write",
data=payload,
params={'precision': 'ns'},
auth=DB_AUTH
)
if response.status_code not in [200, 204]:
logger.error(f"Error saving to {table_name}: {response.text}")
else:
logger.info(f"Saved {len(lines)} rows to {table_name}")
except Exception as e:
logger.error(f"Error connecting to QuestDB: {e}")
def process_all_analytics(self):
"""Verarbeitet alle Analytics für alle Zeiträume"""
logger.info("Starting analytics processing...")
# 1. Exchange Daily Aggregations (für alle Zeiträume)
logger.info("Calculating exchange daily aggregations...")
exchange_data = self.calculate_exchange_daily_aggregations(days_back=365)
if exchange_data:
self.save_analytics_data('analytics_exchange_daily', exchange_data)
# 2. Stock Trends (für alle Zeiträume)
logger.info("Calculating stock trends...")
for days in TIME_PERIODS:
trends = self.calculate_stock_trends(days=days)
if trends:
# Füge Zeitraum als Tag hinzu
for trend in trends:
trend['period_days'] = days
self.save_analytics_data('analytics_stock_trends', trends)
# 3. Volume Changes (für alle Zeiträume)
logger.info("Calculating volume changes...")
for days in TIME_PERIODS:
changes = self.calculate_volume_changes(days=days)
if changes:
# Füge Zeitraum als Tag hinzu
for change in changes:
change['period_days'] = days
self.save_analytics_data('analytics_volume_changes', changes)
logger.info("Analytics processing completed.")
def run(self):
"""Hauptschleife des Workers"""
logger.info("Analytics Worker started.")
# Initiale Verarbeitung
self.process_all_analytics()
self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc)
# Polling-Schleife
while True:
try:
# Prüfe auf neue Trades
last_ts = self.get_last_processed_timestamp()
new_trades = self.get_new_trades(since=last_ts)
if new_trades:
logger.info(f"Found {len(new_trades)} new trades, reprocessing analytics...")
self.process_all_analytics()
self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc)
else:
logger.debug("No new trades found.")
# Warte 30 Sekunden vor nächster Prüfung
time.sleep(30)
except Exception as e:
logger.error(f"Error in worker loop: {e}")
time.sleep(60) # Längere Pause bei Fehler
def get_last_aggregated_ts(self, table):
res = self.execute_query(f"select max(timestamp) from {table}")
if res and res['dataset'] and res['dataset'][0][0]:
return res['dataset'][0][0] # ISO string usually
return "1970-01-01T00:00:00.000000Z"
def main():
worker = AnalyticsWorker()
worker.run()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
worker = AnalyticsWorker(db_host="localhost", auth=("admin", "quest"))
worker.initialize_tables()
worker.run_aggregation()
main()