From 786fef2e71bf028ba13d50400a8640b46906a7bd Mon Sep 17 00:00:00 2001 From: Melchior Reimers Date: Sun, 25 Jan 2026 17:11:05 +0100 Subject: [PATCH] performance improvements by pre-defining queries --- daemon.py | 22 +++-- dashboard/public/index.html | 46 +++++++++- dashboard/server.py | 135 ++++++++++++++++++++++------- src/analytics/worker.py | 168 ++++++++++++++++++++++++++++++++++++ 4 files changed, 329 insertions(+), 42 deletions(-) create mode 100644 src/analytics/worker.py diff --git a/daemon.py b/daemon.py index edf77aa..5a1d48e 100644 --- a/daemon.py +++ b/daemon.py @@ -6,6 +6,7 @@ import requests from src.exchanges.eix import EIXExchange from src.exchanges.ls import LSExchange from src.database.questdb_client import DatabaseClient +from src.analytics.worker import AnalyticsWorker logging.basicConfig( level=logging.INFO, @@ -27,8 +28,6 @@ def get_last_trade_timestamp(db_url, exchange_name): data = response.json() if data['dataset']: # QuestDB returns timestamp in micros since epoch by default in some views, or ISO - # Let's assume the timestamp is in the dataset - # ILP timestamps are stored as designated timestamps. ts_value = data['dataset'][0][0] # Adjust index based on column order if isinstance(ts_value, str): return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00')) @@ -45,14 +44,8 @@ def run_task(historical=False): eix = EIXExchange() ls = LSExchange() - # Pass last_ts to fetcher to allow smart filtering - # daemon.py runs daily, so we want to fetch everything since DB state - # BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument - # We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop. - - # We will modify the loop below to handle args dynamically exchanges_to_process = [ - (eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical + (eix, {'limit': None if historical else 5}), (ls, {'include_yesterday': historical}) ] @@ -91,6 +84,14 @@ def run_task(historical=False): except Exception as e: logger.error(f"Error processing exchange {exchange.name}: {e}") +def run_analytics(db_url="questdb", db_port=9000): + try: + worker = AnalyticsWorker(db_host=db_url, db_port=db_port, auth=DB_AUTH) + worker.initialize_tables() + worker.run_aggregation() + except Exception as e: + logger.error(f"Analytics aggregation failed: {e}") + def main(): logger.info("Trading Daemon started.") @@ -111,10 +112,12 @@ def main(): if is_empty: logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...") run_task(historical=True) + run_analytics() else: logger.info("Found existing data in database. Triggering catch-up sync...") # Run a normal task to fetch any missing data since the last run run_task(historical=False) + run_analytics() logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.") while True: @@ -122,6 +125,7 @@ def main(): # Täglich um 23:00 Uhr if now.hour == 23 and now.minute == 0: run_task(historical=False) + run_analytics() # Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern time.sleep(61) diff --git a/dashboard/public/index.html b/dashboard/public/index.html index fb2744c..66cc127 100644 --- a/dashboard/public/index.html +++ b/dashboard/public/index.html @@ -268,6 +268,10 @@ + + + + @@ -585,18 +589,36 @@ if (y === 'all') { // Dual axis for breakdown // Volume Dataset + const volData = labels.map(l => { + const row = data.find(r => r[0] === l && r[1] === name); + return row ? row[3] : 0; // value_volume is index 3 + }); + datasets.push({ label: `${name} (Vol)`, - data: labels.map(l => { - const row = data.find(r => r[0] === l && r[1] === name); - return row ? row[3] : 0; // value_volume is index 3 - }), + data: volData, backgroundColor: `hsla(${hue}, 75%, 50%, 0.7)`, borderColor: `hsla(${hue}, 75%, 50%, 1)`, borderWidth: 2, yAxisID: 'y', type: 'bar' }); + + // Add MA7 for Volume if enough data points + if (volData.length > 7) { + const ma7 = calculateMA(volData, 7); + datasets.push({ + label: `${name} (Vol MA7)`, + data: ma7, + borderColor: `hsla(${hue}, 90%, 80%, 0.8)`, + borderWidth: 1.5, + borderDash: [5, 5], + pointRadius: 0, + yAxisID: 'y', + type: 'line', + tension: 0.4 + }); + } // Count Dataset datasets.push({ label: `${name} (Cnt)`, @@ -864,6 +886,22 @@ updateUrlParams(); } + function calculateMA(data, period) { + let ma = []; + for (let i = 0; i < data.length; i++) { + if (i < period - 1) { + ma.push(null); + continue; + } + let sum = 0; + for (let j = 0; j < period; j++) { + sum += data[i - j] || 0; + } + ma.push(sum / period); + } + return ma; + } + function fillMetadataTable() { const tbody = document.getElementById('metadataRows'); tbody.innerHTML = store.metadata.map(r => ` diff --git a/dashboard/server.py b/dashboard/server.py index e8f7f58..f18bcb3 100644 --- a/dashboard/server.py +++ b/dashboard/server.py @@ -114,41 +114,118 @@ async def get_analytics( "exchange_sector": f"concat({t_prefix}exchange, ' - ', coalesce({m_prefix}sector, 'Unknown'))" if needs_metadata else "'Unknown'" } - selected_metric = metrics_map.get(metric, metrics_map["volume"]) - selected_group = groups_map.get(group_by, groups_map["day"]) + # Determine table based on granularity and needs + # For day/month aggregation without ISIN specific filtering, use analytics_daily + # But analytics_daily doesn't have individual ISINs (except via another table) + # So if ISIN filter is off, use analytics_daily - query = f"select {selected_group} as label" + use_analytics_table = False - if sub_group_by and sub_group_by in groups_map: - query += f", {groups_map[sub_group_by]} as sub_label" - - if metric == 'all': - query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades" - else: - query += f", {selected_metric} as value from trades" - if needs_metadata: - query += " t left join metadata m on t.isin = m.isin" + # Check if we can use the pre-aggregated table + if not isins and not sub_group_by == "isin" and group_by != "isin" and group_by != "name": + use_analytics_table = True - query += " where 1=1" + table_name = "analytics_daily" if use_analytics_table else "trades" - if date_from: - query += f" and {t_prefix}timestamp >= '{date_from}'" - if date_to: - query += f" and {t_prefix}timestamp <= '{date_to}'" - - if isins: - isins_list = ",".join([f"'{i.strip()}'" for i in isins.split(",")]) - query += f" and {t_prefix}isin in ({isins_list})" + # If using analytics table, columns might be named differently? + # analytics_daily: timestamp, exchange, sector, continent, volume, trade_count, avg_price + + # We need to map our generic query builder to this table + # This might be tricky if column names don't align exactly or if we need dynamic mapping. + # To keep it safe for now, let's just stick to 'trades' but hint towards optimization. + # Actually, let's implement IT for the main view (Exchange/Continent breakdown) + + if use_analytics_table: + # Simplified query for analytics table + # Note: timestamps are day-aligned in analytics table + + # Adjust metric mapping for analytics table + metrics_map_opt = { + "volume": "sum(volume)", + "count": "sum(trade_count)", + "avg_price": "avg(avg_price)", # Not mathematically perfect but close for display + "all": "count(*) as value_count, sum(volume) as value_volume" # Wait, 'all' needs specific handling + } + + if metric == 'all': + metric_expr = "sum(trade_count) as value_count, sum(volume) as value_volume" + else: + metric_expr = f"{metrics_map_opt.get(metric, 'sum(volume)')} as value" - if continents and needs_metadata: - cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")]) - query += f" and {m_prefix}continent in ({cont_list})" + + # Group mapping logic + # analytics_daily has: timestamp, exchange, sector, continent + groups_map_opt = { + "day": "timestamp", + "month": "date_trunc('month', timestamp)", + "exchange": "exchange", + "continent": "continent", + "sector": "sector", + "exchange_continent": "concat(exchange, ' - ', continent)", + "exchange_sector": "concat(exchange, ' - ', sector)" + } + + sel_group_expr = groups_map_opt.get(group_by, "timestamp") + + query = f"select {sel_group_expr} as label" + + if sub_group_by and sub_group_by in groups_map_opt: + query += f", {groups_map_opt[sub_group_by]} as sub_label" + + query += f", {metric_expr} from analytics_daily where 1=1" + + if date_from: query += f" and timestamp >= '{date_from}'" + if date_to: query += f" and timestamp <= '{date_to}'" + + # Filters + if continents: + cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")]) + query += f" and continent in ({cont_list})" + + query += f" group by {sel_group_expr}" + if sub_group_by: query += f", {groups_map_opt[sub_group_by]}" + + query += " order by label asc" + + else: + # Fallback to RAW TRADES query (existing logic) + # ... (keep existing logic but indented/wrapped) + selected_metric = metrics_map.get(metric, metrics_map["volume"]) + selected_group = groups_map.get(group_by, groups_map["day"]) - query += f" group by {selected_group}" - if sub_group_by and sub_group_by in groups_map: - query += f", {groups_map[sub_group_by]}" - - query += " order by label asc" + query = f"select {selected_group} as label" + + if sub_group_by and sub_group_by in groups_map: + query += f", {groups_map[sub_group_by]} as sub_label" + + if metric == 'all': + query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades" + else: + query += f", {selected_metric} as value from trades" + + if needs_metadata: + query += " t left join metadata m on t.isin = m.isin" + + query += " where 1=1" + + if date_from: + query += f" and {t_prefix}timestamp >= '{date_from}'" + if date_to: + query += f" and {t_prefix}timestamp <= '{date_to}'" + + if isins: + isins_list = ",".join([f"'{i.strip()}'" for i in isins.split(",")]) + query += f" and {t_prefix}isin in ({isins_list})" + + if continents and needs_metadata: + cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")]) + query += f" and {m_prefix}continent in ({cont_list})" + + query += f" group by {selected_group}" + if sub_group_by and sub_group_by in groups_map: + query += f", {groups_map[sub_group_by]}" + + query += " order by label asc" try: response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) diff --git a/src/analytics/worker.py b/src/analytics/worker.py new file mode 100644 index 0000000..c1f362a --- /dev/null +++ b/src/analytics/worker.py @@ -0,0 +1,168 @@ +import logging +import requests +import time +from datetime import datetime, timedelta + +logger = logging.getLogger("AnalyticsWorker") + +class AnalyticsWorker: + def __init__(self, db_host="questdb", db_port=9000, auth=None): + self.db_url = f"http://{db_host}:{db_port}" + self.auth = auth + + def execute_query(self, query): + try: + response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=self.auth) + if response.status_code == 200: + logger.debug(f"Query executed successfully: {query[:50]}...") + return response.json() + else: + logger.error(f"Query failed: {response.text} - Query: {query}") + return None + except Exception as e: + logger.error(f"DB connection error: {e}") + return None + + def initialize_tables(self): + """Create necessary tables for pre-aggregation if they don't exist""" + + # 1. Daily Stats (Global & Per Exchange) + # We store daily stats broken down by Exchange, Sector, Continent + # Actually, let's keep it simple first: One big table for flexible queries? + # Or multiple small tables? + # For performance, pre-aggregating by (Day, Exchange, Sector) is best. + + # Table: analytics_daily + # timestamp | exchange | sector | continent | sum_volume | count_trades | avg_price + + queries = [ + """ + CREATE TABLE IF NOT EXISTS analytics_daily ( + timestamp TIMESTAMP, + exchange SYMBOL, + sector SYMBOL, + continent SYMBOL, + volume DOUBLE, + trade_count LONG, + avg_price DOUBLE + ) TIMESTAMP(timestamp) PARTITION BY YEAR; + """, + """ + CREATE TABLE IF NOT EXISTS isin_stats_daily ( + timestamp TIMESTAMP, + isin SYMBOL, + volume DOUBLE, + trade_count LONG, + vwap DOUBLE + ) TIMESTAMP(timestamp) PARTITION BY YEAR; + """ + ] + + for q in queries: + self.execute_query(q) + + def run_aggregation(self): + """Run aggregation logic to fill tables""" + logger.info("Starting analytics aggregation...") + + # 1. Aggregate into analytics_daily + # We perform an INSERT INTO ... SELECT + # We need to manage deduplication or delete/replace. QuestDB append only model + # implies we should be careful. + # Simple strategy: Delete stats for "today" (if creating incomplete stats) or + # rely on the fact that this runs once a day after full import. + # But for 'catch-up' we might process ranges. + + # Let's try to aggregate everything that is NOT in analytics_daily. + # Efficient approach: Get max timestamp from analytics_daily, aggregate trades > max_ts + + last_ts = self.get_last_aggregated_ts("analytics_daily") + logger.info(f"Last analytics_daily timestamp: {last_ts}") + + # QuestDB INSERT INTO ... SELECT + # Grouping by 1d, exchange, sector, continent (requires join) + + query = f""" + INSERT INTO analytics_daily + SELECT + timestamp, + exchange, + coalesce(m.sector, 'Unknown'), + coalesce(m.continent, 'Unknown'), + sum(price * quantity), + count(*), + sum(price * quantity) / sum(quantity) + FROM trades t + LEFT JOIN metadata m ON t.isin = m.isin + WHERE timestamp >= '{last_ts}'::timestamp + SAMPLE BY 1d FILL(none) ALIGN TO CALENDAR + """ + # Note: SAMPLE BY with multipile groups in QuestDB might require attention to syntax or + # iterating. QuestDB's SAMPLE BY creates a time series bucket. + # If we want grouping by other columns, we use GROUP BY, but 'SAMPLE BY' is preferred for time buckets. + # SAMPLE BY 1d, exchange, m.sector, m.continent -- not standard SQL. + + # Correct QuestDB approach for multi-dimensional time buckets: + # SAMPLE BY 1d, symbol works if symbol is the designated symbol column? + # No, QuestDB SAMPLE BY groups by time. For other columns we need standard GROUP BY + # combined with time bucketing functions like date_trunc('day', timestamp). + + query_daily = f""" + INSERT INTO analytics_daily + SELECT + date_trunc('day', t.timestamp) as timestamp, + t.exchange, + coalesce(m.sector, 'Unknown') as sector, + coalesce(m.continent, 'Unknown') as continent, + sum(t.price * t.quantity) as volume, + count(*) as trade_count, + sum(t.price * t.quantity) / sum(t.quantity) as avg_price + FROM trades t + LEFT JOIN metadata m ON t.isin = m.isin + WHERE t.timestamp > '{last_ts}'::timestamp + GROUP BY + date_trunc('day', t.timestamp), + t.exchange, + coalesce(m.sector, 'Unknown'), + coalesce(m.continent, 'Unknown') + """ + + start_t = time.time() + res = self.execute_query(query_daily) + if res: + logger.info(f"Updated analytics_daily in {time.time()-start_t:.2f}s") + + + # 2. Aggregate ISIN stats + last_isin_ts = self.get_last_aggregated_ts("isin_stats_daily") + logger.info(f"Last isin_stats_daily timestamp: {last_isin_ts}") + + query_isin = f""" + INSERT INTO isin_stats_daily + SELECT + date_trunc('day', timestamp) as timestamp, + isin, + sum(price * quantity) as volume, + count(*) as trade_count, + sum(price * quantity) / sum(quantity) as vwap + FROM trades + WHERE timestamp > '{last_isin_ts}'::timestamp + GROUP BY date_trunc('day', timestamp), isin + """ + + start_t = time.time() + res = self.execute_query(query_isin) + if res: + logger.info(f"Updated isin_stats_daily in {time.time()-start_t:.2f}s") + + def get_last_aggregated_ts(self, table): + res = self.execute_query(f"select max(timestamp) from {table}") + if res and res['dataset'] and res['dataset'][0][0]: + return res['dataset'][0][0] # ISO string usually + return "1970-01-01T00:00:00.000000Z" + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + worker = AnalyticsWorker(db_host="localhost", auth=("admin", "quest")) + worker.initialize_tables() + worker.run_aggregation()