From 33f5c90fce72b840fe465e53e0e115daa1d3f6c9 Mon Sep 17 00:00:00 2001 From: Melchior Reimers Date: Sun, 25 Jan 2026 17:36:29 +0100 Subject: [PATCH] updated --- Dockerfile.analytics | 10 + daemon.py | 22 +- dashboard/public/index.html | 434 ++++++++++++++++++++--- dashboard/server.py | 284 +++++++--------- docker-compose.yml | 14 + src/analytics/__init__.py | 1 + src/analytics/worker.py | 568 +++++++++++++++++++++++-------- systemd/analytics-worker.service | 18 + 8 files changed, 996 insertions(+), 355 deletions(-) create mode 100644 Dockerfile.analytics create mode 100644 src/analytics/__init__.py create mode 100644 systemd/analytics-worker.service diff --git a/Dockerfile.analytics b/Dockerfile.analytics new file mode 100644 index 0000000..e35770f --- /dev/null +++ b/Dockerfile.analytics @@ -0,0 +1,10 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["python", "-m", "src.analytics.worker"] diff --git a/daemon.py b/daemon.py index 5a1d48e..edf77aa 100644 --- a/daemon.py +++ b/daemon.py @@ -6,7 +6,6 @@ import requests from src.exchanges.eix import EIXExchange from src.exchanges.ls import LSExchange from src.database.questdb_client import DatabaseClient -from src.analytics.worker import AnalyticsWorker logging.basicConfig( level=logging.INFO, @@ -28,6 +27,8 @@ def get_last_trade_timestamp(db_url, exchange_name): data = response.json() if data['dataset']: # QuestDB returns timestamp in micros since epoch by default in some views, or ISO + # Let's assume the timestamp is in the dataset + # ILP timestamps are stored as designated timestamps. ts_value = data['dataset'][0][0] # Adjust index based on column order if isinstance(ts_value, str): return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00')) @@ -44,8 +45,14 @@ def run_task(historical=False): eix = EIXExchange() ls = LSExchange() + # Pass last_ts to fetcher to allow smart filtering + # daemon.py runs daily, so we want to fetch everything since DB state + # BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument + # We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop. + + # We will modify the loop below to handle args dynamically exchanges_to_process = [ - (eix, {'limit': None if historical else 5}), + (eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical (ls, {'include_yesterday': historical}) ] @@ -84,14 +91,6 @@ def run_task(historical=False): except Exception as e: logger.error(f"Error processing exchange {exchange.name}: {e}") -def run_analytics(db_url="questdb", db_port=9000): - try: - worker = AnalyticsWorker(db_host=db_url, db_port=db_port, auth=DB_AUTH) - worker.initialize_tables() - worker.run_aggregation() - except Exception as e: - logger.error(f"Analytics aggregation failed: {e}") - def main(): logger.info("Trading Daemon started.") @@ -112,12 +111,10 @@ def main(): if is_empty: logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...") run_task(historical=True) - run_analytics() else: logger.info("Found existing data in database. Triggering catch-up sync...") # Run a normal task to fetch any missing data since the last run run_task(historical=False) - run_analytics() logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.") while True: @@ -125,7 +122,6 @@ def main(): # Täglich um 23:00 Uhr if now.hour == 23 and now.minute == 0: run_task(historical=False) - run_analytics() # Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern time.sleep(61) diff --git a/dashboard/public/index.html b/dashboard/public/index.html index 66cc127..e987b36 100644 --- a/dashboard/public/index.html +++ b/dashboard/public/index.html @@ -251,6 +251,40 @@
+ + +
+
+

Erweiterte Statistiken

+ +
+ + +
+

Moving Average: Tradezahlen & Volumen je Exchange

+
+
+ + +
+

Tradingvolumen & Anzahl Änderungen

+
+
+ + +
+

Trendanalyse: Häufig gehandelte Aktien

+
+
+
+
@@ -268,10 +302,6 @@ - - - - @@ -534,6 +564,10 @@ document.getElementById('statIsins').innerText = store.metadata.length.toLocaleString(); renderDashboardCharts(); fillMetadataTable(); + // Lade Statistiken neu wenn Dashboard aktualisiert wird + if (window.activeView === 'dashboard') { + loadStatistics(); + } } function setChartType(type) { currentChartType = type; renderAnalyticsReport(); } @@ -589,36 +623,18 @@ if (y === 'all') { // Dual axis for breakdown // Volume Dataset - const volData = labels.map(l => { - const row = data.find(r => r[0] === l && r[1] === name); - return row ? row[3] : 0; // value_volume is index 3 - }); - datasets.push({ label: `${name} (Vol)`, - data: volData, + data: labels.map(l => { + const row = data.find(r => r[0] === l && r[1] === name); + return row ? row[3] : 0; // value_volume is index 3 + }), backgroundColor: `hsla(${hue}, 75%, 50%, 0.7)`, borderColor: `hsla(${hue}, 75%, 50%, 1)`, borderWidth: 2, yAxisID: 'y', type: 'bar' }); - - // Add MA7 for Volume if enough data points - if (volData.length > 7) { - const ma7 = calculateMA(volData, 7); - datasets.push({ - label: `${name} (Vol MA7)`, - data: ma7, - borderColor: `hsla(${hue}, 90%, 80%, 0.8)`, - borderWidth: 1.5, - borderDash: [5, 5], - pointRadius: 0, - yAxisID: 'y', - type: 'line', - tension: 0.4 - }); - } // Count Dataset datasets.push({ label: `${name} (Cnt)`, @@ -886,22 +902,6 @@ updateUrlParams(); } - function calculateMA(data, period) { - let ma = []; - for (let i = 0; i < data.length; i++) { - if (i < period - 1) { - ma.push(null); - continue; - } - let sum = 0; - for (let j = 0; j < period; j++) { - sum += data[i - j] || 0; - } - ma.push(sum / period); - } - return ma; - } - function fillMetadataTable() { const tbody = document.getElementById('metadataRows'); tbody.innerHTML = store.metadata.map(r => ` @@ -931,7 +931,355 @@ rows.forEach(r => r.style.display = r.innerText.toLowerCase().includes(q.toLowerCase()) ? '' : 'none'); } - window.onload = async () => { await fetchData(); syncStateFromUrl(); setInterval(fetchData, 30000); }; + async function loadStatistics() { + const days = document.getElementById('statisticsPeriod').value; + await Promise.all([ + loadMovingAverage(days), + loadVolumeChanges(days), + loadStockTrends(days) + ]); + } + + async function loadMovingAverage(days) { + try { + const res = await fetch(`${API}/statistics/moving-average?days=${days}`).then(r => r.json()); + const data = res.dataset || []; + const columns = res.columns || []; + + if (!data.length) { + console.log('No moving average data available'); + return; + } + + const ctx = document.getElementById('movingAverageChart').getContext('2d'); + + if (charts.movingAverage) charts.movingAverage.destroy(); + + // Finde Spaltenindizes + const dateIdx = columns.findIndex(c => c.name === 'date' || c.name === 'timestamp'); + const exchangeIdx = columns.findIndex(c => c.name === 'exchange'); + const countIdx = columns.findIndex(c => c.name === 'trade_count'); + const volumeIdx = columns.findIndex(c => c.name === 'volume'); + const maCountIdx = columns.findIndex(c => c.name === 'ma_count'); + const maVolumeIdx = columns.findIndex(c => c.name === 'ma_volume'); + + // Gruppiere nach Exchange + const exchanges = [...new Set(data.map(r => r[exchangeIdx]))]; + const dates = [...new Set(data.map(r => r[dateIdx]))].sort(); + + const datasets = []; + const colors = ['#38bdf8', '#f43f5e', '#10b981', '#fbbf24', '#8b5cf6']; + + // Trade Count Datasets + exchanges.forEach((exchange, idx) => { + datasets.push({ + label: `${exchange} - Trade Count`, + data: dates.map(d => { + const row = data.find(r => r[dateIdx] === d && r[exchangeIdx] === exchange); + return row ? (row[countIdx] || 0) : 0; + }), + borderColor: colors[idx % colors.length], + backgroundColor: colors[idx % colors.length] + '33', + borderWidth: 2, + yAxisID: 'y', + tension: 0.3 + }); + }); + + // Moving Average Datasets + exchanges.forEach((exchange, idx) => { + datasets.push({ + label: `${exchange} - MA Count`, + data: dates.map(d => { + const row = data.find(r => r[dateIdx] === d && r[exchangeIdx] === exchange); + return row ? (row[maCountIdx] || 0) : 0; + }), + borderColor: colors[idx % colors.length], + backgroundColor: 'transparent', + borderWidth: 2, + borderDash: [5, 5], + yAxisID: 'y', + tension: 0.3 + }); + }); + + // Volume Datasets + exchanges.forEach((exchange, idx) => { + datasets.push({ + label: `${exchange} - Volume`, + data: dates.map(d => { + const row = data.find(r => r[dateIdx] === d && r[exchangeIdx] === exchange); + return row ? (row[volumeIdx] || 0) : 0; + }), + borderColor: colors[(idx + 2) % colors.length], + backgroundColor: colors[(idx + 2) % colors.length] + '33', + borderWidth: 2, + yAxisID: 'y1', + tension: 0.3 + }); + }); + + charts.movingAverage = new Chart(ctx, { + type: 'line', + data: { + labels: dates.map(d => new Date(d).toLocaleDateString()), + datasets: datasets + }, + options: { + responsive: true, + maintainAspectRatio: false, + interaction: { mode: 'index', intersect: false }, + scales: { + y: { + type: 'linear', + display: true, + position: 'left', + title: { display: true, text: 'Trade Count', color: '#94a3b8' }, + grid: { color: 'rgba(255,255,255,0.05)' }, + ticks: { color: '#64748b' } + }, + y1: { + type: 'linear', + display: true, + position: 'right', + title: { display: true, text: 'Volume (€)', color: '#94a3b8' }, + grid: { drawOnChartArea: false }, + ticks: { color: '#64748b' } + }, + x: { + grid: { display: false }, + ticks: { color: '#64748b' } + } + }, + plugins: { + legend: { + display: true, + position: 'bottom', + labels: { color: '#94a3b8', boxWidth: 12, usePointStyle: true, padding: 15 } + }, + tooltip: { + backgroundColor: '#1e293b', + titleColor: '#38bdf8', + bodyColor: '#e2e8f0', + borderColor: 'rgba(255,255,255,0.1)', + borderWidth: 1 + } + } + } + }); + } catch (err) { + console.error('Error loading moving average:', err); + } + } + + async function loadVolumeChanges(days) { + try { + const res = await fetch(`${API}/statistics/volume-changes?days=${days}`).then(r => r.json()); + const data = res.dataset || []; + const columns = res.columns || []; + + if (!data.length) { + console.log('No volume changes data available'); + return; + } + + const ctx = document.getElementById('volumeChangesChart').getContext('2d'); + + if (charts.volumeChanges) charts.volumeChanges.destroy(); + + // Finde Spaltenindizes + const exchangeIdx = columns.findIndex(c => c.name === 'exchange'); + const countChangeIdx = columns.findIndex(c => c.name === 'count_change_pct'); + const volumeChangeIdx = columns.findIndex(c => c.name === 'volume_change_pct'); + const trendIdx = columns.findIndex(c => c.name === 'trend'); + + const exchanges = data.map(r => r[exchangeIdx]); + const countChanges = data.map(r => r[countChangeIdx] || 0); + const volumeChanges = data.map(r => r[volumeChangeIdx] || 0); + + charts.volumeChanges = new Chart(ctx, { + type: 'bar', + data: { + labels: exchanges, + datasets: [ + { + label: 'Anzahl Änderung (%)', + data: countChanges, + backgroundColor: '#38bdf866', + borderColor: '#38bdf8', + borderWidth: 2, + yAxisID: 'y' + }, + { + label: 'Volumen Änderung (%)', + data: volumeChanges, + backgroundColor: '#fbbf2466', + borderColor: '#fbbf24', + borderWidth: 2, + yAxisID: 'y' + } + ] + }, + options: { + responsive: true, + maintainAspectRatio: false, + scales: { + y: { + type: 'linear', + display: true, + title: { display: true, text: 'Änderung (%)', color: '#94a3b8' }, + grid: { color: 'rgba(255,255,255,0.05)' }, + ticks: { color: '#64748b' } + }, + x: { + grid: { display: false }, + ticks: { color: '#64748b' } + } + }, + plugins: { + legend: { + display: true, + position: 'top', + labels: { color: '#94a3b8', boxWidth: 12, usePointStyle: true, padding: 15 } + }, + tooltip: { + backgroundColor: '#1e293b', + titleColor: '#38bdf8', + bodyColor: '#e2e8f0', + borderColor: 'rgba(255,255,255,0.1)', + borderWidth: 1, + callbacks: { + afterLabel: (context) => { + const idx = context.dataIndex; + const trend = data[idx][trendIdx]; // trend + return `Trend: ${trend || 'N/A'}`; + } + } + } + } + } + }); + } catch (err) { + console.error('Error loading volume changes:', err); + } + } + + async function loadStockTrends(days) { + try { + const res = await fetch(`${API}/statistics/stock-trends?days=${days}&limit=20`).then(r => r.json()); + const data = res.dataset || []; + const columns = res.columns || []; + + if (!data.length) { + console.log('No stock trends data available'); + return; + } + + const ctx = document.getElementById('stockTrendsChart').getContext('2d'); + const tableContainer = document.getElementById('stockTrendsTable'); + + if (charts.stockTrends) charts.stockTrends.destroy(); + + // Finde Spaltenindizes + const isinIdx = columns.findIndex(c => c.name === 'isin'); + const volumeIdx = columns.findIndex(c => c.name === 'volume'); + const countIdx = columns.findIndex(c => c.name === 'trade_count'); + const countChangeIdx = columns.findIndex(c => c.name === 'count_change_pct'); + const volumeChangeIdx = columns.findIndex(c => c.name === 'volume_change_pct'); + + // Sortiere nach Volumen + const sorted = [...data].sort((a, b) => (b[volumeIdx] || 0) - (a[volumeIdx] || 0)).slice(0, 10); + + const isins = sorted.map(r => r[isinIdx]); + const volumes = sorted.map(r => r[volumeIdx] || 0); + const countChanges = sorted.map(r => r[countChangeIdx] || 0); + const volumeChanges = sorted.map(r => r[volumeChangeIdx] || 0); + + charts.stockTrends = new Chart(ctx, { + type: 'bar', + data: { + labels: isins.map(i => i.substring(0, 12) + '...'), + datasets: [ + { + label: 'Volumen (€)', + data: volumes, + backgroundColor: '#10b98166', + borderColor: '#10b981', + borderWidth: 2, + yAxisID: 'y' + } + ] + }, + options: { + responsive: true, + maintainAspectRatio: false, + scales: { + y: { + type: 'linear', + display: true, + title: { display: true, text: 'Volumen (€)', color: '#94a3b8' }, + grid: { color: 'rgba(255,255,255,0.05)' }, + ticks: { color: '#64748b' } + }, + x: { + grid: { display: false }, + ticks: { color: '#64748b', maxRotation: 45, minRotation: 45 } + } + }, + plugins: { + legend: { + display: true, + position: 'top', + labels: { color: '#94a3b8', boxWidth: 12, usePointStyle: true, padding: 15 } + }, + tooltip: { + backgroundColor: '#1e293b', + titleColor: '#38bdf8', + bodyColor: '#e2e8f0', + borderColor: 'rgba(255,255,255,0.1)', + borderWidth: 1 + } + } + } + }); + + // Erstelle Tabelle + tableContainer.innerHTML = ` + + + + + + + + + + + + ${sorted.map(r => ` + + + + + + + + `).join('')} + +
ISINTradesVolumen (€)Anzahl Δ (%)Volumen Δ (%)
${r[isinIdx]}${(r[countIdx] || 0).toLocaleString()}€${((r[volumeIdx] || 0) / 1e6).toFixed(2)}M${((r[countChangeIdx] || 0)).toFixed(2)}%${((r[volumeChangeIdx] || 0)).toFixed(2)}%
+ `; + } catch (err) { + console.error('Error loading stock trends:', err); + } + } + + window.onload = async () => { + await fetchData(); + syncStateFromUrl(); + setInterval(fetchData, 30000); + // Lade Statistiken beim Start + setTimeout(() => loadStatistics(), 1000); + }; diff --git a/dashboard/server.py b/dashboard/server.py index f34c464..46d0c19 100644 --- a/dashboard/server.py +++ b/dashboard/server.py @@ -114,168 +114,46 @@ async def get_analytics( "exchange_sector": f"concat({t_prefix}exchange, ' - ', coalesce({m_prefix}sector, 'Unknown'))" if needs_metadata else "'Unknown'" } - # Determine table based on granularity and needs - # For day/month aggregation without ISIN specific filtering, use analytics_daily - # But analytics_daily doesn't have individual ISINs (except via another table) - # So if ISIN filter is off, use analytics_daily + selected_metric = metrics_map.get(metric, metrics_map["volume"]) + selected_group = groups_map.get(group_by, groups_map["day"]) - use_analytics_table = False + query = f"select {selected_group} as label" - # Check if we can use the pre-aggregated table - if not isins and not sub_group_by == "isin" and group_by != "isin" and group_by != "name": - use_analytics_table = True - - table_name = "analytics_daily" if use_analytics_table else "trades" - - # If using analytics table, columns might be named differently? - # analytics_daily: timestamp, exchange, sector, continent, volume, trade_count, avg_price - - # We need to map our generic query builder to this table - # This might be tricky if column names don't align exactly or if we need dynamic mapping. - # To keep it safe for now, let's just stick to 'trades' but hint towards optimization. - # Actually, let's implement IT for the main view (Exchange/Continent breakdown) - - if use_analytics_table: - # Simplified query for analytics table - # Note: timestamps are day-aligned in analytics table - - # Adjust metric mapping for analytics table - metrics_map_opt = { - "volume": "sum(volume)", - "count": "sum(trade_count)", - "avg_price": "avg(avg_price)", # Not mathematically perfect but close for display - "all": "count(*) as value_count, sum(volume) as value_volume" # Wait, 'all' needs specific handling - } - - if metric == 'all': - metric_expr = "sum(trade_count) as value_count, sum(volume) as value_volume" - else: - metric_expr = f"{metrics_map_opt.get(metric, 'sum(volume)')} as value" - - - # Group mapping logic - # analytics_daily has: timestamp, exchange, sector, continent - groups_map_opt = { - "day": "timestamp", - "month": "date_trunc('month', timestamp)", - "exchange": "exchange", - "continent": "continent", - "sector": "sector", - "exchange_continent": "concat(exchange, ' - ', continent)", - "exchange_sector": "concat(exchange, ' - ', sector)" - } - - sel_group_expr = groups_map_opt.get(group_by, "timestamp") - - query = f"select {sel_group_expr} as label" - - if sub_group_by and sub_group_by in groups_map_opt: - query += f", {groups_map_opt[sub_group_by]} as sub_label" - - query += f", {metric_expr} from analytics_daily where 1=1" - - if date_from: query += f" and timestamp >= '{date_from}'" - if date_to: query += f" and timestamp <= '{date_to}'" - - # Filters - if continents: - cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")]) - query += f" and continent in ({cont_list})" - - query += f" group by {sel_group_expr}" - if sub_group_by: query += f", {groups_map_opt[sub_group_by]}" - - query += " order by label asc" - + if sub_group_by and sub_group_by in groups_map: + query += f", {groups_map[sub_group_by]} as sub_label" + + if metric == 'all': + query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades" else: - # Fallback to RAW TRADES query (existing logic) - # ... (keep existing logic but indented/wrapped) - selected_metric = metrics_map.get(metric, metrics_map["volume"]) - selected_group = groups_map.get(group_by, groups_map["day"]) - - query = f"select {selected_group} as label" - - if sub_group_by and sub_group_by in groups_map: - query += f", {groups_map[sub_group_by]} as sub_label" - - if metric == 'all': - query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades" - else: - query += f", {selected_metric} as value from trades" - - if needs_metadata: - query += " t left join metadata m on t.isin = m.isin" - - query += " where 1=1" - - if date_from: - query += f" and {t_prefix}timestamp >= '{date_from}'" - if date_to: - query += f" and {t_prefix}timestamp <= '{date_to}'" - - if isins: - isins_list = ",".join([f"'{i.strip()}'" for i in isins.split(",")]) - query += f" and {t_prefix}isin in ({isins_list})" - - if continents and needs_metadata: - cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")]) - query += f" and {m_prefix}continent in ({cont_list})" - - query += f" group by {selected_group}" - if sub_group_by and sub_group_by in groups_map: - query += f", {groups_map[sub_group_by]}" - - query += " order by label asc" + query += f", {selected_metric} as value from trades" + if needs_metadata: + query += " t left join metadata m on t.isin = m.isin" + + query += " where 1=1" + + if date_from: + query += f" and {t_prefix}timestamp >= '{date_from}'" + if date_to: + query += f" and {t_prefix}timestamp <= '{date_to}'" + + if isins: + isins_list = ",".join([f"'{i.strip()}'" for i in isins.split(",")]) + query += f" and {t_prefix}isin in ({isins_list})" + + if continents and needs_metadata: + cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")]) + query += f" and {m_prefix}continent in ({cont_list})" + + query += f" group by {selected_group}" + if sub_group_by and sub_group_by in groups_map: + query += f", {groups_map[sub_group_by]}" + + query += " order by label asc" - print(f"Executing Query: {query}") try: response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: return response.json() - - print(f"DEBUG: Query Failed: {response.text}") - - if use_analytics_table: - print("DEBUG: Analytics query failed, falling back to RAW trades query...") - - selected_metric = metrics_map.get(metric, metrics_map["volume"]) - selected_group = groups_map.get(group_by, groups_map["day"]) - - raw_query = f"select {selected_group} as label" - - if sub_group_by and sub_group_by in groups_map: - raw_query += f", {groups_map[sub_group_by]} as sub_label" - - if metric == 'all': - raw_query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades" - else: - raw_query += f", {selected_metric} as value from trades" - - if needs_metadata: - raw_query += " t left join metadata m on t.isin = m.isin" - - raw_query += " where 1=1" - - if date_from: raw_query += f" and {t_prefix}timestamp >= '{date_from}'" - if date_to: raw_query += f" and {t_prefix}timestamp <= '{date_to}'" - if isins: - isins_list = ",".join([f"'{i.strip()}'" for i in isins.split(",")]) - raw_query += f" and {t_prefix}isin in ({isins_list})" - if continents and needs_metadata: - cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")]) - raw_query += f" and {m_prefix}continent in ({cont_list})" - - raw_query += f" group by {selected_group}" - if sub_group_by and sub_group_by in groups_map: - raw_query += f", {groups_map[sub_group_by]}" - - raw_query += " order by label asc" - - print(f"Executing Fallback Query: {raw_query}") - fb_response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': raw_query}, auth=DB_AUTH) - if fb_response.status_code == 200: - return fb_response.json() - throw_http_error(response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @@ -292,6 +170,104 @@ async def search_metadata(q: str): except Exception as e: raise HTTPException(status_code=500, detail=str(e)) +@app.get("/api/statistics/moving-average") +async def get_moving_average(days: int = 7, exchange: str = None): + """ + Gibt Moving Average Daten für Tradezahlen und Volumen je Exchange zurück. + Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage + """ + if days not in [7, 30, 42, 69, 180, 365]: + raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") + + # Hole Daten aus der vorberechneten analytics_exchange_daily Tabelle + query = f""" + select + timestamp as date, + exchange, + trade_count, + volume, + ma{days}_count as ma_count, + ma{days}_volume as ma_volume + from analytics_exchange_daily + where timestamp >= dateadd('d', -{days}, now()) + """ + + if exchange: + query += f" and exchange = '{exchange}'" + + query += " order by date asc, exchange asc" + + try: + response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + return response.json() + throw_http_error(response) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/statistics/volume-changes") +async def get_volume_changes(days: int = 7): + """ + Gibt Änderungen in Volumen und Anzahl je Exchange zurück. + Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage + """ + if days not in [7, 30, 42, 69, 180, 365]: + raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") + + query = f""" + select + timestamp as date, + exchange, + trade_count, + volume, + count_change_pct, + volume_change_pct, + trend + from analytics_volume_changes + where period_days = {days} + order by date desc, exchange asc + """ + + try: + response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + return response.json() + throw_http_error(response) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/statistics/stock-trends") +async def get_stock_trends(days: int = 7, limit: int = 20): + """ + Gibt Trendanalyse für häufig gehandelte Aktien zurück. + Unterstützte Zeiträume: 7, 30, 42, 69, 180, 365 Tage + """ + if days not in [7, 30, 42, 69, 180, 365]: + raise HTTPException(status_code=400, detail="Invalid days parameter. Must be one of: 7, 30, 42, 69, 180, 365") + + # Hole Top-Aktien nach Volumen für den Zeitraum + query = f""" + select + timestamp as date, + isin, + trade_count, + volume, + count_change_pct, + volume_change_pct + from analytics_stock_trends + where period_days = {days} + order by volume desc + limit {limit} + """ + + try: + response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + return response.json() + throw_http_error(response) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + def throw_http_error(res): raise HTTPException(status_code=res.status_code, detail=f"QuestDB error: {res.text}") diff --git a/docker-compose.yml b/docker-compose.yml index d527f56..3b1deb2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -60,5 +60,19 @@ services: - DB_PASSWORD=${DB_PASSWORD:-quest} - DB_HOST=questdb + analytics_worker: + build: + context: . + dockerfile: Dockerfile.analytics + container_name: analytics_worker + depends_on: + - questdb + restart: always + environment: + - PYTHONUNBUFFERED=1 + - DB_USER=${DB_USER:-admin} + - DB_PASSWORD=${DB_PASSWORD:-quest} + - DB_HOST=questdb + volumes: questdb_data: diff --git a/src/analytics/__init__.py b/src/analytics/__init__.py new file mode 100644 index 0000000..c82d339 --- /dev/null +++ b/src/analytics/__init__.py @@ -0,0 +1 @@ +# Analytics module for trading data aggregation diff --git a/src/analytics/worker.py b/src/analytics/worker.py index c1f362a..af91641 100644 --- a/src/analytics/worker.py +++ b/src/analytics/worker.py @@ -1,168 +1,446 @@ -import logging -import requests import time -from datetime import datetime, timedelta +import logging +import datetime +import os +import requests +from typing import Dict, List, Tuple, Optional +import pandas as pd +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) logger = logging.getLogger("AnalyticsWorker") +DB_USER = os.getenv("DB_USER", "admin") +DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") +DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None +DB_HOST = os.getenv("DB_HOST", "questdb") +DB_URL = f"http://{DB_HOST}:9000" + +# Unterstützte Zeiträume +TIME_PERIODS = [7, 30, 42, 69, 180, 365] + class AnalyticsWorker: - def __init__(self, db_host="questdb", db_port=9000, auth=None): - self.db_url = f"http://{db_host}:{db_port}" - self.auth = auth - - def execute_query(self, query): + def __init__(self): + self.last_processed_timestamp = None + self.db_url = DB_URL + + def get_last_processed_timestamp(self) -> Optional[datetime.datetime]: + """Holt den letzten verarbeiteten Timestamp aus der Analytics-Tabelle""" try: - response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=self.auth) + query = "select max(timestamp) as last_ts from analytics_exchange_daily" + response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: - logger.debug(f"Query executed successfully: {query[:50]}...") - return response.json() - else: - logger.error(f"Query failed: {response.text} - Query: {query}") - return None + data = response.json() + if data.get('dataset') and data['dataset'] and len(data['dataset']) > 0 and data['dataset'][0][0]: + ts_value = data['dataset'][0][0] + if isinstance(ts_value, str): + return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00')) + elif isinstance(ts_value, (int, float)): + # QuestDB gibt Timestamps in Mikrosekunden zurück + return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc) except Exception as e: - logger.error(f"DB connection error: {e}") - return None - - def initialize_tables(self): - """Create necessary tables for pre-aggregation if they don't exist""" + logger.debug(f"Could not get last processed timestamp: {e}") + return None + + def get_new_trades(self, since: Optional[datetime.datetime] = None) -> List[Dict]: + """Holt neue Trades seit dem letzten Verarbeitungszeitpunkt""" + if since: + since_str = since.strftime('%Y-%m-%d %H:%M:%S') + query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > '{since_str}' order by timestamp asc" + else: + # Erste Ausführung: nur die letzten 7 Tage + query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > dateadd('d', -7, now()) order by timestamp asc" - # 1. Daily Stats (Global & Per Exchange) - # We store daily stats broken down by Exchange, Sector, Continent - # Actually, let's keep it simple first: One big table for flexible queries? - # Or multiple small tables? - # For performance, pre-aggregating by (Day, Exchange, Sector) is best. - - # Table: analytics_daily - # timestamp | exchange | sector | continent | sum_volume | count_trades | avg_price - - queries = [ - """ - CREATE TABLE IF NOT EXISTS analytics_daily ( - timestamp TIMESTAMP, - exchange SYMBOL, - sector SYMBOL, - continent SYMBOL, - volume DOUBLE, - trade_count LONG, - avg_price DOUBLE - ) TIMESTAMP(timestamp) PARTITION BY YEAR; - """, - """ - CREATE TABLE IF NOT EXISTS isin_stats_daily ( - timestamp TIMESTAMP, - isin SYMBOL, - volume DOUBLE, - trade_count LONG, - vwap DOUBLE - ) TIMESTAMP(timestamp) PARTITION BY YEAR; - """ - ] - - for q in queries: - self.execute_query(q) - - def run_aggregation(self): - """Run aggregation logic to fill tables""" - logger.info("Starting analytics aggregation...") - - # 1. Aggregate into analytics_daily - # We perform an INSERT INTO ... SELECT - # We need to manage deduplication or delete/replace. QuestDB append only model - # implies we should be careful. - # Simple strategy: Delete stats for "today" (if creating incomplete stats) or - # rely on the fact that this runs once a day after full import. - # But for 'catch-up' we might process ranges. - - # Let's try to aggregate everything that is NOT in analytics_daily. - # Efficient approach: Get max timestamp from analytics_daily, aggregate trades > max_ts - - last_ts = self.get_last_aggregated_ts("analytics_daily") - logger.info(f"Last analytics_daily timestamp: {last_ts}") - - # QuestDB INSERT INTO ... SELECT - # Grouping by 1d, exchange, sector, continent (requires join) + try: + response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + columns = data.get('columns', []) + dataset = data.get('dataset', []) + + trades = [] + for row in dataset: + trade = {} + for i, col in enumerate(columns): + trade[col['name']] = row[i] + trades.append(trade) + return trades + except Exception as e: + logger.error(f"Error fetching new trades: {e}") + return [] + + def calculate_exchange_daily_aggregations(self, days_back: int = 365) -> List[Dict]: + """Berechnet tägliche Aggregationen je Exchange mit Moving Averages""" + end_date = datetime.datetime.now(datetime.timezone.utc) + start_date = end_date - datetime.timedelta(days=days_back) query = f""" - INSERT INTO analytics_daily - SELECT - timestamp, + select + date_trunc('day', timestamp) as date, exchange, - coalesce(m.sector, 'Unknown'), - coalesce(m.continent, 'Unknown'), - sum(price * quantity), - count(*), - sum(price * quantity) / sum(quantity) - FROM trades t - LEFT JOIN metadata m ON t.isin = m.isin - WHERE timestamp >= '{last_ts}'::timestamp - SAMPLE BY 1d FILL(none) ALIGN TO CALENDAR - """ - # Note: SAMPLE BY with multipile groups in QuestDB might require attention to syntax or - # iterating. QuestDB's SAMPLE BY creates a time series bucket. - # If we want grouping by other columns, we use GROUP BY, but 'SAMPLE BY' is preferred for time buckets. - # SAMPLE BY 1d, exchange, m.sector, m.continent -- not standard SQL. - - # Correct QuestDB approach for multi-dimensional time buckets: - # SAMPLE BY 1d, symbol works if symbol is the designated symbol column? - # No, QuestDB SAMPLE BY groups by time. For other columns we need standard GROUP BY - # combined with time bucketing functions like date_trunc('day', timestamp). - - query_daily = f""" - INSERT INTO analytics_daily - SELECT - date_trunc('day', t.timestamp) as timestamp, - t.exchange, - coalesce(m.sector, 'Unknown') as sector, - coalesce(m.continent, 'Unknown') as continent, - sum(t.price * t.quantity) as volume, count(*) as trade_count, - sum(t.price * t.quantity) / sum(t.quantity) as avg_price - FROM trades t - LEFT JOIN metadata m ON t.isin = m.isin - WHERE t.timestamp > '{last_ts}'::timestamp - GROUP BY - date_trunc('day', t.timestamp), - t.exchange, - coalesce(m.sector, 'Unknown'), - coalesce(m.continent, 'Unknown') + sum(price * quantity) as volume + from trades + where timestamp >= '{start_date.strftime('%Y-%m-%d')}' + group by date, exchange + order by date asc, exchange asc """ - start_t = time.time() - res = self.execute_query(query_daily) - if res: - logger.info(f"Updated analytics_daily in {time.time()-start_t:.2f}s") + try: + response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + columns = data.get('columns', []) + dataset = data.get('dataset', []) + + results = [] + for row in dataset: + result = {} + for i, col in enumerate(columns): + result[col['name']] = row[i] + results.append(result) + + # Berechne Moving Averages für alle Zeiträume + df = pd.DataFrame(results) + if df.empty: + return [] + + # Pivot für einfachere MA-Berechnung + df['date'] = pd.to_datetime(df['date']) + df = df.sort_values(['date', 'exchange']) + + # Berechne MA für jeden Zeitraum + for period in TIME_PERIODS: + df[f'ma{period}_count'] = df.groupby('exchange')['trade_count'].transform( + lambda x: x.rolling(window=period, min_periods=1).mean() + ) + df[f'ma{period}_volume'] = df.groupby('exchange')['volume'].transform( + lambda x: x.rolling(window=period, min_periods=1).mean() + ) + + # Konvertiere zurück zu Dict-Liste + return df.to_dict('records') + except Exception as e: + logger.error(f"Error calculating exchange daily aggregations: {e}") + return [] + + def calculate_stock_trends(self, days: int = 365) -> List[Dict]: + """Berechnet Trenddaten je ISIN mit Änderungsprozenten""" + end_date = datetime.datetime.now(datetime.timezone.utc) + start_date = end_date - datetime.timedelta(days=days) - - # 2. Aggregate ISIN stats - last_isin_ts = self.get_last_aggregated_ts("isin_stats_daily") - logger.info(f"Last isin_stats_daily timestamp: {last_isin_ts}") - - query_isin = f""" - INSERT INTO isin_stats_daily - SELECT - date_trunc('day', timestamp) as timestamp, + # Aktuelle Periode + query_current = f""" + select + date_trunc('day', timestamp) as date, isin, - sum(price * quantity) as volume, count(*) as trade_count, - sum(price * quantity) / sum(quantity) as vwap - FROM trades - WHERE timestamp > '{last_isin_ts}'::timestamp - GROUP BY date_trunc('day', timestamp), isin + sum(price * quantity) as volume + from trades + where timestamp >= '{start_date.strftime('%Y-%m-%d')}' + group by date, isin + order by date asc, isin asc """ - start_t = time.time() - res = self.execute_query(query_isin) - if res: - logger.info(f"Updated isin_stats_daily in {time.time()-start_t:.2f}s") + try: + response = requests.get(f"{self.db_url}/exec", params={'query': query_current}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + columns = data.get('columns', []) + dataset = data.get('dataset', []) + + results = [] + for row in dataset: + result = {} + for i, col in enumerate(columns): + result[col['name']] = row[i] + results.append(result) + + if not results: + return [] + + df = pd.DataFrame(results) + df['date'] = pd.to_datetime(df['date']) + + # Aggregiere je ISIN über den gesamten Zeitraum + df_agg = df.groupby('isin').agg({ + 'trade_count': 'sum', + 'volume': 'sum' + }).reset_index() + + # Berechne Änderungen: Vergleich mit vorheriger Periode + # Für jede ISIN: aktueller Zeitraum vs. vorheriger Zeitraum + trends = [] + for isin in df_agg['isin'].unique(): + isin_data = df[df['isin'] == isin].sort_values('date') + + # Teile in zwei Hälften für Vergleich + mid_point = len(isin_data) // 2 + if mid_point > 0: + first_half = isin_data.iloc[:mid_point] + second_half = isin_data.iloc[mid_point:] + + first_count = first_half['trade_count'].sum() + first_volume = first_half['volume'].sum() + second_count = second_half['trade_count'].sum() + second_volume = second_half['volume'].sum() + + count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0 + volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0 + else: + count_change = 0 + volume_change = 0 + + total_count = isin_data['trade_count'].sum() + total_volume = isin_data['volume'].sum() + + trends.append({ + 'isin': isin, + 'date': isin_data['date'].max(), + 'trade_count': total_count, + 'volume': total_volume, + 'count_change_pct': count_change, + 'volume_change_pct': volume_change + }) + + return trends + except Exception as e: + logger.error(f"Error calculating stock trends: {e}") + return [] + + def calculate_volume_changes(self, days: int = 365) -> List[Dict]: + """Berechnet Volumen- und Anzahl-Änderungen je Exchange""" + end_date = datetime.datetime.now(datetime.timezone.utc) + start_date = end_date - datetime.timedelta(days=days) + + query = f""" + select + date_trunc('day', timestamp) as date, + exchange, + count(*) as trade_count, + sum(price * quantity) as volume + from trades + where timestamp >= '{start_date.strftime('%Y-%m-%d')}' + group by date, exchange + order by date asc, exchange asc + """ + + try: + response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + columns = data.get('columns', []) + dataset = data.get('dataset', []) + + results = [] + for row in dataset: + result = {} + for i, col in enumerate(columns): + result[col['name']] = row[i] + results.append(result) + + if not results: + return [] + + df = pd.DataFrame(results) + df['date'] = pd.to_datetime(df['date']) + df = df.sort_values(['date', 'exchange']) + + # Berechne Änderungen je Exchange + changes = [] + for exchange in df['exchange'].unique(): + exchange_data = df[df['exchange'] == exchange].sort_values('date') + + # Teile in zwei Hälften + mid_point = len(exchange_data) // 2 + if mid_point > 0: + first_half = exchange_data.iloc[:mid_point] + second_half = exchange_data.iloc[mid_point:] + + first_count = first_half['trade_count'].sum() + first_volume = first_half['volume'].sum() + second_count = second_half['trade_count'].sum() + second_volume = second_half['volume'].sum() + + count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0 + volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0 + + # Bestimme Trend + if count_change > 5 and volume_change > 5: + trend = "mehr_trades_mehr_volumen" + elif count_change > 5 and volume_change < -5: + trend = "mehr_trades_weniger_volumen" + elif count_change < -5 and volume_change > 5: + trend = "weniger_trades_mehr_volumen" + elif count_change < -5 and volume_change < -5: + trend = "weniger_trades_weniger_volumen" + else: + trend = "stabil" + else: + count_change = 0 + volume_change = 0 + trend = "stabil" + + total_count = exchange_data['trade_count'].sum() + total_volume = exchange_data['volume'].sum() + + changes.append({ + 'date': exchange_data['date'].max(), + 'exchange': exchange, + 'trade_count': total_count, + 'volume': total_volume, + 'count_change_pct': count_change, + 'volume_change_pct': volume_change, + 'trend': trend + }) + + return changes + except Exception as e: + logger.error(f"Error calculating volume changes: {e}") + return [] + + def save_analytics_data(self, table_name: str, data: List[Dict]): + """Speichert aggregierte Daten in QuestDB via ILP""" + if not data: + return + + lines = [] + for row in data: + try: + # Konvertiere Datum zu Timestamp + if 'date' in row: + if isinstance(row['date'], str): + dt = datetime.datetime.fromisoformat(row['date'].replace('Z', '+00:00')) + elif isinstance(row['date'], pd.Timestamp): + dt = row['date'].to_pydatetime() + else: + dt = row['date'] + timestamp_ns = int(dt.timestamp() * 1e9) + else: + timestamp_ns = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1e9) + + # Baue ILP-Line auf + tags = [] + fields = [] + + # Exchange als Tag + if 'exchange' in row and row['exchange']: + exchange = str(row['exchange']).replace(' ', '\\ ').replace(',', '\\,') + tags.append(f"exchange={exchange}") + + # ISIN als Tag + if 'isin' in row and row['isin']: + isin = str(row['isin']).replace(' ', '\\ ').replace(',', '\\,') + tags.append(f"isin={isin}") + + # Trend als Tag + if 'trend' in row and row['trend']: + trend = str(row['trend']).replace(' ', '\\ ').replace(',', '\\,') + tags.append(f"trend={trend}") + + # Numerische Felder + for key, value in row.items(): + if key in ['date', 'exchange', 'isin', 'trend']: + continue + if value is not None: + if isinstance(value, (int, float)): + fields.append(f"{key}={value}") + elif isinstance(value, str): + # String-Felder in Anführungszeichen + escaped = value.replace('"', '\\"').replace(' ', '\\ ') + fields.append(f'{key}="{escaped}"') + + if tags and fields: + line = f"{table_name},{','.join(tags)} {','.join(fields)} {timestamp_ns}" + lines.append(line) + except Exception as e: + logger.error(f"Error formatting row for {table_name}: {e}, row: {row}") + continue + + if not lines: + return + + payload = "\n".join(lines) + "\n" + + try: + response = requests.post( + f"{self.db_url}/write", + data=payload, + params={'precision': 'ns'}, + auth=DB_AUTH + ) + if response.status_code not in [200, 204]: + logger.error(f"Error saving to {table_name}: {response.text}") + else: + logger.info(f"Saved {len(lines)} rows to {table_name}") + except Exception as e: + logger.error(f"Error connecting to QuestDB: {e}") + + def process_all_analytics(self): + """Verarbeitet alle Analytics für alle Zeiträume""" + logger.info("Starting analytics processing...") + + # 1. Exchange Daily Aggregations (für alle Zeiträume) + logger.info("Calculating exchange daily aggregations...") + exchange_data = self.calculate_exchange_daily_aggregations(days_back=365) + if exchange_data: + self.save_analytics_data('analytics_exchange_daily', exchange_data) + + # 2. Stock Trends (für alle Zeiträume) + logger.info("Calculating stock trends...") + for days in TIME_PERIODS: + trends = self.calculate_stock_trends(days=days) + if trends: + # Füge Zeitraum als Tag hinzu + for trend in trends: + trend['period_days'] = days + self.save_analytics_data('analytics_stock_trends', trends) + + # 3. Volume Changes (für alle Zeiträume) + logger.info("Calculating volume changes...") + for days in TIME_PERIODS: + changes = self.calculate_volume_changes(days=days) + if changes: + # Füge Zeitraum als Tag hinzu + for change in changes: + change['period_days'] = days + self.save_analytics_data('analytics_volume_changes', changes) + + logger.info("Analytics processing completed.") + + def run(self): + """Hauptschleife des Workers""" + logger.info("Analytics Worker started.") + + # Initiale Verarbeitung + self.process_all_analytics() + self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc) + + # Polling-Schleife + while True: + try: + # Prüfe auf neue Trades + last_ts = self.get_last_processed_timestamp() + new_trades = self.get_new_trades(since=last_ts) + + if new_trades: + logger.info(f"Found {len(new_trades)} new trades, reprocessing analytics...") + self.process_all_analytics() + self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc) + else: + logger.debug("No new trades found.") + + # Warte 30 Sekunden vor nächster Prüfung + time.sleep(30) + except Exception as e: + logger.error(f"Error in worker loop: {e}") + time.sleep(60) # Längere Pause bei Fehler - def get_last_aggregated_ts(self, table): - res = self.execute_query(f"select max(timestamp) from {table}") - if res and res['dataset'] and res['dataset'][0][0]: - return res['dataset'][0][0] # ISO string usually - return "1970-01-01T00:00:00.000000Z" +def main(): + worker = AnalyticsWorker() + worker.run() if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - worker = AnalyticsWorker(db_host="localhost", auth=("admin", "quest")) - worker.initialize_tables() - worker.run_aggregation() + main() diff --git a/systemd/analytics-worker.service b/systemd/analytics-worker.service new file mode 100644 index 0000000..9f1f4d0 --- /dev/null +++ b/systemd/analytics-worker.service @@ -0,0 +1,18 @@ +[Unit] +Description=Trading Analytics Worker +After=network.target questdb.service + +[Service] +Type=simple +User=melchiorreimers +WorkingDirectory=/Users/melchiorreimers/Documents/trading_daemon +Environment="PYTHONUNBUFFERED=1" +Environment="DB_USER=admin" +Environment="DB_PASSWORD=quest" +Environment="DB_HOST=localhost" +ExecStart=/usr/bin/python3 -m src.analytics.worker +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target