performance improvements by pre-defining queries
All checks were successful
Deployment / deploy-docker (push) Successful in 17s

This commit is contained in:
Melchior Reimers
2026-01-25 17:11:05 +01:00
parent 9772d81f7d
commit 786fef2e71
4 changed files with 329 additions and 42 deletions

168
src/analytics/worker.py Normal file
View File

@@ -0,0 +1,168 @@
import logging
import requests
import time
from datetime import datetime, timedelta
logger = logging.getLogger("AnalyticsWorker")
class AnalyticsWorker:
def __init__(self, db_host="questdb", db_port=9000, auth=None):
self.db_url = f"http://{db_host}:{db_port}"
self.auth = auth
def execute_query(self, query):
try:
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=self.auth)
if response.status_code == 200:
logger.debug(f"Query executed successfully: {query[:50]}...")
return response.json()
else:
logger.error(f"Query failed: {response.text} - Query: {query}")
return None
except Exception as e:
logger.error(f"DB connection error: {e}")
return None
def initialize_tables(self):
"""Create necessary tables for pre-aggregation if they don't exist"""
# 1. Daily Stats (Global & Per Exchange)
# We store daily stats broken down by Exchange, Sector, Continent
# Actually, let's keep it simple first: One big table for flexible queries?
# Or multiple small tables?
# For performance, pre-aggregating by (Day, Exchange, Sector) is best.
# Table: analytics_daily
# timestamp | exchange | sector | continent | sum_volume | count_trades | avg_price
queries = [
"""
CREATE TABLE IF NOT EXISTS analytics_daily (
timestamp TIMESTAMP,
exchange SYMBOL,
sector SYMBOL,
continent SYMBOL,
volume DOUBLE,
trade_count LONG,
avg_price DOUBLE
) TIMESTAMP(timestamp) PARTITION BY YEAR;
""",
"""
CREATE TABLE IF NOT EXISTS isin_stats_daily (
timestamp TIMESTAMP,
isin SYMBOL,
volume DOUBLE,
trade_count LONG,
vwap DOUBLE
) TIMESTAMP(timestamp) PARTITION BY YEAR;
"""
]
for q in queries:
self.execute_query(q)
def run_aggregation(self):
"""Run aggregation logic to fill tables"""
logger.info("Starting analytics aggregation...")
# 1. Aggregate into analytics_daily
# We perform an INSERT INTO ... SELECT
# We need to manage deduplication or delete/replace. QuestDB append only model
# implies we should be careful.
# Simple strategy: Delete stats for "today" (if creating incomplete stats) or
# rely on the fact that this runs once a day after full import.
# But for 'catch-up' we might process ranges.
# Let's try to aggregate everything that is NOT in analytics_daily.
# Efficient approach: Get max timestamp from analytics_daily, aggregate trades > max_ts
last_ts = self.get_last_aggregated_ts("analytics_daily")
logger.info(f"Last analytics_daily timestamp: {last_ts}")
# QuestDB INSERT INTO ... SELECT
# Grouping by 1d, exchange, sector, continent (requires join)
query = f"""
INSERT INTO analytics_daily
SELECT
timestamp,
exchange,
coalesce(m.sector, 'Unknown'),
coalesce(m.continent, 'Unknown'),
sum(price * quantity),
count(*),
sum(price * quantity) / sum(quantity)
FROM trades t
LEFT JOIN metadata m ON t.isin = m.isin
WHERE timestamp >= '{last_ts}'::timestamp
SAMPLE BY 1d FILL(none) ALIGN TO CALENDAR
"""
# Note: SAMPLE BY with multipile groups in QuestDB might require attention to syntax or
# iterating. QuestDB's SAMPLE BY creates a time series bucket.
# If we want grouping by other columns, we use GROUP BY, but 'SAMPLE BY' is preferred for time buckets.
# SAMPLE BY 1d, exchange, m.sector, m.continent -- not standard SQL.
# Correct QuestDB approach for multi-dimensional time buckets:
# SAMPLE BY 1d, symbol works if symbol is the designated symbol column?
# No, QuestDB SAMPLE BY groups by time. For other columns we need standard GROUP BY
# combined with time bucketing functions like date_trunc('day', timestamp).
query_daily = f"""
INSERT INTO analytics_daily
SELECT
date_trunc('day', t.timestamp) as timestamp,
t.exchange,
coalesce(m.sector, 'Unknown') as sector,
coalesce(m.continent, 'Unknown') as continent,
sum(t.price * t.quantity) as volume,
count(*) as trade_count,
sum(t.price * t.quantity) / sum(t.quantity) as avg_price
FROM trades t
LEFT JOIN metadata m ON t.isin = m.isin
WHERE t.timestamp > '{last_ts}'::timestamp
GROUP BY
date_trunc('day', t.timestamp),
t.exchange,
coalesce(m.sector, 'Unknown'),
coalesce(m.continent, 'Unknown')
"""
start_t = time.time()
res = self.execute_query(query_daily)
if res:
logger.info(f"Updated analytics_daily in {time.time()-start_t:.2f}s")
# 2. Aggregate ISIN stats
last_isin_ts = self.get_last_aggregated_ts("isin_stats_daily")
logger.info(f"Last isin_stats_daily timestamp: {last_isin_ts}")
query_isin = f"""
INSERT INTO isin_stats_daily
SELECT
date_trunc('day', timestamp) as timestamp,
isin,
sum(price * quantity) as volume,
count(*) as trade_count,
sum(price * quantity) / sum(quantity) as vwap
FROM trades
WHERE timestamp > '{last_isin_ts}'::timestamp
GROUP BY date_trunc('day', timestamp), isin
"""
start_t = time.time()
res = self.execute_query(query_isin)
if res:
logger.info(f"Updated isin_stats_daily in {time.time()-start_t:.2f}s")
def get_last_aggregated_ts(self, table):
res = self.execute_query(f"select max(timestamp) from {table}")
if res and res['dataset'] and res['dataset'][0][0]:
return res['dataset'][0][0] # ISO string usually
return "1970-01-01T00:00:00.000000Z"
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
worker = AnalyticsWorker(db_host="localhost", auth=("admin", "quest"))
worker.initialize_tables()
worker.run_aggregation()