diff --git a/daemon.py b/daemon.py
index edf77aa..5a1d48e 100644
--- a/daemon.py
+++ b/daemon.py
@@ -6,6 +6,7 @@ import requests
from src.exchanges.eix import EIXExchange
from src.exchanges.ls import LSExchange
from src.database.questdb_client import DatabaseClient
+from src.analytics.worker import AnalyticsWorker
logging.basicConfig(
level=logging.INFO,
@@ -27,8 +28,6 @@ def get_last_trade_timestamp(db_url, exchange_name):
data = response.json()
if data['dataset']:
# QuestDB returns timestamp in micros since epoch by default in some views, or ISO
- # Let's assume the timestamp is in the dataset
- # ILP timestamps are stored as designated timestamps.
ts_value = data['dataset'][0][0] # Adjust index based on column order
if isinstance(ts_value, str):
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
@@ -45,14 +44,8 @@ def run_task(historical=False):
eix = EIXExchange()
ls = LSExchange()
- # Pass last_ts to fetcher to allow smart filtering
- # daemon.py runs daily, so we want to fetch everything since DB state
- # BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument
- # We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop.
-
- # We will modify the loop below to handle args dynamically
exchanges_to_process = [
- (eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical
+ (eix, {'limit': None if historical else 5}),
(ls, {'include_yesterday': historical})
]
@@ -91,6 +84,14 @@ def run_task(historical=False):
except Exception as e:
logger.error(f"Error processing exchange {exchange.name}: {e}")
+def run_analytics(db_url="questdb", db_port=9000):
+ try:
+ worker = AnalyticsWorker(db_host=db_url, db_port=db_port, auth=DB_AUTH)
+ worker.initialize_tables()
+ worker.run_aggregation()
+ except Exception as e:
+ logger.error(f"Analytics aggregation failed: {e}")
+
def main():
logger.info("Trading Daemon started.")
@@ -111,10 +112,12 @@ def main():
if is_empty:
logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...")
run_task(historical=True)
+ run_analytics()
else:
logger.info("Found existing data in database. Triggering catch-up sync...")
# Run a normal task to fetch any missing data since the last run
run_task(historical=False)
+ run_analytics()
logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.")
while True:
@@ -122,6 +125,7 @@ def main():
# Täglich um 23:00 Uhr
if now.hour == 23 and now.minute == 0:
run_task(historical=False)
+ run_analytics()
# Warte 61s, um MehrfachausfĂĽhrung in derselben Minute zu verhindern
time.sleep(61)
diff --git a/dashboard/public/index.html b/dashboard/public/index.html
index fb2744c..66cc127 100644
--- a/dashboard/public/index.html
+++ b/dashboard/public/index.html
@@ -268,6 +268,10 @@
+
+
+
+
@@ -585,18 +589,36 @@
if (y === 'all') {
// Dual axis for breakdown
// Volume Dataset
+ const volData = labels.map(l => {
+ const row = data.find(r => r[0] === l && r[1] === name);
+ return row ? row[3] : 0; // value_volume is index 3
+ });
+
datasets.push({
label: `${name} (Vol)`,
- data: labels.map(l => {
- const row = data.find(r => r[0] === l && r[1] === name);
- return row ? row[3] : 0; // value_volume is index 3
- }),
+ data: volData,
backgroundColor: `hsla(${hue}, 75%, 50%, 0.7)`,
borderColor: `hsla(${hue}, 75%, 50%, 1)`,
borderWidth: 2,
yAxisID: 'y',
type: 'bar'
});
+
+ // Add MA7 for Volume if enough data points
+ if (volData.length > 7) {
+ const ma7 = calculateMA(volData, 7);
+ datasets.push({
+ label: `${name} (Vol MA7)`,
+ data: ma7,
+ borderColor: `hsla(${hue}, 90%, 80%, 0.8)`,
+ borderWidth: 1.5,
+ borderDash: [5, 5],
+ pointRadius: 0,
+ yAxisID: 'y',
+ type: 'line',
+ tension: 0.4
+ });
+ }
// Count Dataset
datasets.push({
label: `${name} (Cnt)`,
@@ -864,6 +886,22 @@
updateUrlParams();
}
+ function calculateMA(data, period) {
+ let ma = [];
+ for (let i = 0; i < data.length; i++) {
+ if (i < period - 1) {
+ ma.push(null);
+ continue;
+ }
+ let sum = 0;
+ for (let j = 0; j < period; j++) {
+ sum += data[i - j] || 0;
+ }
+ ma.push(sum / period);
+ }
+ return ma;
+ }
+
function fillMetadataTable() {
const tbody = document.getElementById('metadataRows');
tbody.innerHTML = store.metadata.map(r => `
diff --git a/dashboard/server.py b/dashboard/server.py
index e8f7f58..f18bcb3 100644
--- a/dashboard/server.py
+++ b/dashboard/server.py
@@ -114,41 +114,118 @@ async def get_analytics(
"exchange_sector": f"concat({t_prefix}exchange, ' - ', coalesce({m_prefix}sector, 'Unknown'))" if needs_metadata else "'Unknown'"
}
- selected_metric = metrics_map.get(metric, metrics_map["volume"])
- selected_group = groups_map.get(group_by, groups_map["day"])
+ # Determine table based on granularity and needs
+ # For day/month aggregation without ISIN specific filtering, use analytics_daily
+ # But analytics_daily doesn't have individual ISINs (except via another table)
+ # So if ISIN filter is off, use analytics_daily
- query = f"select {selected_group} as label"
+ use_analytics_table = False
- if sub_group_by and sub_group_by in groups_map:
- query += f", {groups_map[sub_group_by]} as sub_label"
-
- if metric == 'all':
- query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades"
- else:
- query += f", {selected_metric} as value from trades"
- if needs_metadata:
- query += " t left join metadata m on t.isin = m.isin"
+ # Check if we can use the pre-aggregated table
+ if not isins and not sub_group_by == "isin" and group_by != "isin" and group_by != "name":
+ use_analytics_table = True
- query += " where 1=1"
+ table_name = "analytics_daily" if use_analytics_table else "trades"
- if date_from:
- query += f" and {t_prefix}timestamp >= '{date_from}'"
- if date_to:
- query += f" and {t_prefix}timestamp <= '{date_to}'"
-
- if isins:
- isins_list = ",".join([f"'{i.strip()}'" for i in isins.split(",")])
- query += f" and {t_prefix}isin in ({isins_list})"
+ # If using analytics table, columns might be named differently?
+ # analytics_daily: timestamp, exchange, sector, continent, volume, trade_count, avg_price
+
+ # We need to map our generic query builder to this table
+ # This might be tricky if column names don't align exactly or if we need dynamic mapping.
+ # To keep it safe for now, let's just stick to 'trades' but hint towards optimization.
+ # Actually, let's implement IT for the main view (Exchange/Continent breakdown)
+
+ if use_analytics_table:
+ # Simplified query for analytics table
+ # Note: timestamps are day-aligned in analytics table
+
+ # Adjust metric mapping for analytics table
+ metrics_map_opt = {
+ "volume": "sum(volume)",
+ "count": "sum(trade_count)",
+ "avg_price": "avg(avg_price)", # Not mathematically perfect but close for display
+ "all": "count(*) as value_count, sum(volume) as value_volume" # Wait, 'all' needs specific handling
+ }
+
+ if metric == 'all':
+ metric_expr = "sum(trade_count) as value_count, sum(volume) as value_volume"
+ else:
+ metric_expr = f"{metrics_map_opt.get(metric, 'sum(volume)')} as value"
- if continents and needs_metadata:
- cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")])
- query += f" and {m_prefix}continent in ({cont_list})"
+
+ # Group mapping logic
+ # analytics_daily has: timestamp, exchange, sector, continent
+ groups_map_opt = {
+ "day": "timestamp",
+ "month": "date_trunc('month', timestamp)",
+ "exchange": "exchange",
+ "continent": "continent",
+ "sector": "sector",
+ "exchange_continent": "concat(exchange, ' - ', continent)",
+ "exchange_sector": "concat(exchange, ' - ', sector)"
+ }
+
+ sel_group_expr = groups_map_opt.get(group_by, "timestamp")
+
+ query = f"select {sel_group_expr} as label"
+
+ if sub_group_by and sub_group_by in groups_map_opt:
+ query += f", {groups_map_opt[sub_group_by]} as sub_label"
+
+ query += f", {metric_expr} from analytics_daily where 1=1"
+
+ if date_from: query += f" and timestamp >= '{date_from}'"
+ if date_to: query += f" and timestamp <= '{date_to}'"
+
+ # Filters
+ if continents:
+ cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")])
+ query += f" and continent in ({cont_list})"
+
+ query += f" group by {sel_group_expr}"
+ if sub_group_by: query += f", {groups_map_opt[sub_group_by]}"
+
+ query += " order by label asc"
+
+ else:
+ # Fallback to RAW TRADES query (existing logic)
+ # ... (keep existing logic but indented/wrapped)
+ selected_metric = metrics_map.get(metric, metrics_map["volume"])
+ selected_group = groups_map.get(group_by, groups_map["day"])
- query += f" group by {selected_group}"
- if sub_group_by and sub_group_by in groups_map:
- query += f", {groups_map[sub_group_by]}"
-
- query += " order by label asc"
+ query = f"select {selected_group} as label"
+
+ if sub_group_by and sub_group_by in groups_map:
+ query += f", {groups_map[sub_group_by]} as sub_label"
+
+ if metric == 'all':
+ query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades"
+ else:
+ query += f", {selected_metric} as value from trades"
+
+ if needs_metadata:
+ query += " t left join metadata m on t.isin = m.isin"
+
+ query += " where 1=1"
+
+ if date_from:
+ query += f" and {t_prefix}timestamp >= '{date_from}'"
+ if date_to:
+ query += f" and {t_prefix}timestamp <= '{date_to}'"
+
+ if isins:
+ isins_list = ",".join([f"'{i.strip()}'" for i in isins.split(",")])
+ query += f" and {t_prefix}isin in ({isins_list})"
+
+ if continents and needs_metadata:
+ cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")])
+ query += f" and {m_prefix}continent in ({cont_list})"
+
+ query += f" group by {selected_group}"
+ if sub_group_by and sub_group_by in groups_map:
+ query += f", {groups_map[sub_group_by]}"
+
+ query += " order by label asc"
try:
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH)
diff --git a/src/analytics/worker.py b/src/analytics/worker.py
new file mode 100644
index 0000000..c1f362a
--- /dev/null
+++ b/src/analytics/worker.py
@@ -0,0 +1,168 @@
+import logging
+import requests
+import time
+from datetime import datetime, timedelta
+
+logger = logging.getLogger("AnalyticsWorker")
+
+class AnalyticsWorker:
+ def __init__(self, db_host="questdb", db_port=9000, auth=None):
+ self.db_url = f"http://{db_host}:{db_port}"
+ self.auth = auth
+
+ def execute_query(self, query):
+ try:
+ response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=self.auth)
+ if response.status_code == 200:
+ logger.debug(f"Query executed successfully: {query[:50]}...")
+ return response.json()
+ else:
+ logger.error(f"Query failed: {response.text} - Query: {query}")
+ return None
+ except Exception as e:
+ logger.error(f"DB connection error: {e}")
+ return None
+
+ def initialize_tables(self):
+ """Create necessary tables for pre-aggregation if they don't exist"""
+
+ # 1. Daily Stats (Global & Per Exchange)
+ # We store daily stats broken down by Exchange, Sector, Continent
+ # Actually, let's keep it simple first: One big table for flexible queries?
+ # Or multiple small tables?
+ # For performance, pre-aggregating by (Day, Exchange, Sector) is best.
+
+ # Table: analytics_daily
+ # timestamp | exchange | sector | continent | sum_volume | count_trades | avg_price
+
+ queries = [
+ """
+ CREATE TABLE IF NOT EXISTS analytics_daily (
+ timestamp TIMESTAMP,
+ exchange SYMBOL,
+ sector SYMBOL,
+ continent SYMBOL,
+ volume DOUBLE,
+ trade_count LONG,
+ avg_price DOUBLE
+ ) TIMESTAMP(timestamp) PARTITION BY YEAR;
+ """,
+ """
+ CREATE TABLE IF NOT EXISTS isin_stats_daily (
+ timestamp TIMESTAMP,
+ isin SYMBOL,
+ volume DOUBLE,
+ trade_count LONG,
+ vwap DOUBLE
+ ) TIMESTAMP(timestamp) PARTITION BY YEAR;
+ """
+ ]
+
+ for q in queries:
+ self.execute_query(q)
+
+ def run_aggregation(self):
+ """Run aggregation logic to fill tables"""
+ logger.info("Starting analytics aggregation...")
+
+ # 1. Aggregate into analytics_daily
+ # We perform an INSERT INTO ... SELECT
+ # We need to manage deduplication or delete/replace. QuestDB append only model
+ # implies we should be careful.
+ # Simple strategy: Delete stats for "today" (if creating incomplete stats) or
+ # rely on the fact that this runs once a day after full import.
+ # But for 'catch-up' we might process ranges.
+
+ # Let's try to aggregate everything that is NOT in analytics_daily.
+ # Efficient approach: Get max timestamp from analytics_daily, aggregate trades > max_ts
+
+ last_ts = self.get_last_aggregated_ts("analytics_daily")
+ logger.info(f"Last analytics_daily timestamp: {last_ts}")
+
+ # QuestDB INSERT INTO ... SELECT
+ # Grouping by 1d, exchange, sector, continent (requires join)
+
+ query = f"""
+ INSERT INTO analytics_daily
+ SELECT
+ timestamp,
+ exchange,
+ coalesce(m.sector, 'Unknown'),
+ coalesce(m.continent, 'Unknown'),
+ sum(price * quantity),
+ count(*),
+ sum(price * quantity) / sum(quantity)
+ FROM trades t
+ LEFT JOIN metadata m ON t.isin = m.isin
+ WHERE timestamp >= '{last_ts}'::timestamp
+ SAMPLE BY 1d FILL(none) ALIGN TO CALENDAR
+ """
+ # Note: SAMPLE BY with multipile groups in QuestDB might require attention to syntax or
+ # iterating. QuestDB's SAMPLE BY creates a time series bucket.
+ # If we want grouping by other columns, we use GROUP BY, but 'SAMPLE BY' is preferred for time buckets.
+ # SAMPLE BY 1d, exchange, m.sector, m.continent -- not standard SQL.
+
+ # Correct QuestDB approach for multi-dimensional time buckets:
+ # SAMPLE BY 1d, symbol works if symbol is the designated symbol column?
+ # No, QuestDB SAMPLE BY groups by time. For other columns we need standard GROUP BY
+ # combined with time bucketing functions like date_trunc('day', timestamp).
+
+ query_daily = f"""
+ INSERT INTO analytics_daily
+ SELECT
+ date_trunc('day', t.timestamp) as timestamp,
+ t.exchange,
+ coalesce(m.sector, 'Unknown') as sector,
+ coalesce(m.continent, 'Unknown') as continent,
+ sum(t.price * t.quantity) as volume,
+ count(*) as trade_count,
+ sum(t.price * t.quantity) / sum(t.quantity) as avg_price
+ FROM trades t
+ LEFT JOIN metadata m ON t.isin = m.isin
+ WHERE t.timestamp > '{last_ts}'::timestamp
+ GROUP BY
+ date_trunc('day', t.timestamp),
+ t.exchange,
+ coalesce(m.sector, 'Unknown'),
+ coalesce(m.continent, 'Unknown')
+ """
+
+ start_t = time.time()
+ res = self.execute_query(query_daily)
+ if res:
+ logger.info(f"Updated analytics_daily in {time.time()-start_t:.2f}s")
+
+
+ # 2. Aggregate ISIN stats
+ last_isin_ts = self.get_last_aggregated_ts("isin_stats_daily")
+ logger.info(f"Last isin_stats_daily timestamp: {last_isin_ts}")
+
+ query_isin = f"""
+ INSERT INTO isin_stats_daily
+ SELECT
+ date_trunc('day', timestamp) as timestamp,
+ isin,
+ sum(price * quantity) as volume,
+ count(*) as trade_count,
+ sum(price * quantity) / sum(quantity) as vwap
+ FROM trades
+ WHERE timestamp > '{last_isin_ts}'::timestamp
+ GROUP BY date_trunc('day', timestamp), isin
+ """
+
+ start_t = time.time()
+ res = self.execute_query(query_isin)
+ if res:
+ logger.info(f"Updated isin_stats_daily in {time.time()-start_t:.2f}s")
+
+ def get_last_aggregated_ts(self, table):
+ res = self.execute_query(f"select max(timestamp) from {table}")
+ if res and res['dataset'] and res['dataset'][0][0]:
+ return res['dataset'][0][0] # ISO string usually
+ return "1970-01-01T00:00:00.000000Z"
+
+if __name__ == "__main__":
+ logging.basicConfig(level=logging.INFO)
+ worker = AnalyticsWorker(db_host="localhost", auth=("admin", "quest"))
+ worker.initialize_tables()
+ worker.run_aggregation()