updated
All checks were successful
Deployment / deploy-docker (push) Successful in 17s

This commit is contained in:
Melchior Reimers
2026-01-25 18:07:57 +01:00
parent c218d47f14
commit 9185ec3ef6

View File

@@ -3,9 +3,8 @@ import logging
import datetime import datetime
import os import os
import requests import requests
from typing import Dict, List, Optional from typing import Dict, List, Tuple, Optional
import pandas as pd import pandas as pd
import json
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -24,8 +23,9 @@ TIME_PERIODS = [7, 30, 42, 69, 180, 365]
class AnalyticsWorker: class AnalyticsWorker:
def __init__(self): def __init__(self):
self.last_processed_timestamp = None
self.db_url = DB_URL self.db_url = DB_URL
def wait_for_questdb(self, max_retries: int = 30, retry_delay: int = 2): def wait_for_questdb(self, max_retries: int = 30, retry_delay: int = 2):
"""Wartet bis QuestDB verfügbar ist""" """Wartet bis QuestDB verfügbar ist"""
logger.info("Waiting for QuestDB to be available...") logger.info("Waiting for QuestDB to be available...")
@@ -40,317 +40,281 @@ class AnalyticsWorker:
time.sleep(retry_delay) time.sleep(retry_delay)
logger.error("QuestDB did not become available after waiting") logger.error("QuestDB did not become available after waiting")
return False return False
def query_questdb(self, query: str, timeout: int = 30) -> Optional[Dict]: def get_last_processed_timestamp(self) -> Optional[datetime.datetime]:
"""Zentrale QuestDB-Abfrage-Funktion""" """Holt den letzten verarbeiteten Timestamp aus der Analytics-Tabelle"""
try: try:
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=timeout) query = "select max(timestamp) as last_ts from analytics_exchange_daily"
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200: if response.status_code == 200:
return response.json() data = response.json()
else: if data.get('dataset') and data['dataset'] and len(data['dataset']) > 0 and data['dataset'][0][0]:
logger.error(f"QuestDB query failed: {response.status_code} - {response.text}") ts_value = data['dataset'][0][0]
return None if isinstance(ts_value, str):
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
elif isinstance(ts_value, (int, float)):
# QuestDB gibt Timestamps in Mikrosekunden zurück
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
except Exception as e: except Exception as e:
logger.error(f"Error querying QuestDB: {e}") logger.debug(f"Could not get last processed timestamp: {e}")
return None return None
def get_existing_dates(self, table_name: str) -> set: def get_new_trades(self, since: Optional[datetime.datetime] = None) -> List[Dict]:
"""Holt alle bereits berechneten Daten aus einer Analytics-Tabelle""" """Holt neue Trades seit dem letzten Verarbeitungszeitpunkt"""
query = f"select distinct date_trunc('day', timestamp) as date from {table_name}" if since:
data = self.query_questdb(query) since_str = since.strftime('%Y-%m-%d %H:%M:%S')
if not data or not data.get('dataset'): query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > '{since_str}' order by timestamp asc"
return set()
dates = set()
for row in data['dataset']:
if row[0]:
if isinstance(row[0], str):
dates.add(datetime.datetime.fromisoformat(row[0].replace('Z', '+00:00')).date())
elif isinstance(row[0], (int, float)):
dates.add(datetime.datetime.fromtimestamp(row[0] / 1000000, tz=datetime.timezone.utc).date())
return dates
def get_missing_dates(self) -> List[datetime.date]:
"""Ermittelt fehlende Tage, die noch berechnet werden müssen"""
# Hole das Datum des ersten Trades
query = "select min(date_trunc('day', timestamp)) as first_date from trades"
data = self.query_questdb(query)
if not data or not data.get('dataset') or not data['dataset'][0][0]:
logger.info("No trades found in database")
return []
first_date_value = data['dataset'][0][0]
if isinstance(first_date_value, str):
first_date = datetime.datetime.fromisoformat(first_date_value.replace('Z', '+00:00')).date()
else: else:
first_date = datetime.datetime.fromtimestamp(first_date_value / 1000000, tz=datetime.timezone.utc).date() # Erste Ausführung: nur die letzten 7 Tage
query = f"select timestamp, exchange, isin, price, quantity from trades where timestamp > dateadd('d', -7, now()) order by timestamp asc"
# Hole bereits berechnete Daten try:
existing_dates = self.get_existing_dates('analytics_daily_summary') response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
# Generiere alle Tage vom ersten Trade bis gestern data = response.json()
yesterday = datetime.date.today() - datetime.timedelta(days=1) columns = data.get('columns', [])
all_dates = [] dataset = data.get('dataset', [])
current = first_date
while current <= yesterday: trades = []
all_dates.append(current) for row in dataset:
current += datetime.timedelta(days=1) trade = {}
for i, col in enumerate(columns):
# Finde fehlende Tage trade[col['name']] = row[i]
missing_dates = [d for d in all_dates if d not in existing_dates] trades.append(trade)
logger.info(f"Found {len(missing_dates)} missing dates to calculate (from {len(all_dates)} total dates)") return trades
return sorted(missing_dates) except Exception as e:
logger.error(f"Error fetching new trades: {e}")
return []
def calculate_daily_summary(self, date: datetime.date) -> Optional[Dict]: def calculate_exchange_daily_aggregations(self, days_back: int = 365) -> List[Dict]:
"""Berechnet tägliche Zusammenfassung für einen Tag""" """Berechnet tägliche Aggregationen je Exchange mit Moving Averages"""
date_str = date.strftime('%Y-%m-%d') end_date = datetime.datetime.now(datetime.timezone.utc)
query = f""" start_date = end_date - datetime.timedelta(days=days_back)
select
count(*) as total_trades,
sum(price * quantity) as total_volume,
exchange,
count(*) as exchange_trades
from trades
where date_trunc('day', timestamp) = '{date_str}'
group by exchange
"""
data = self.query_questdb(query)
if not data or not data.get('dataset'):
return None
total_trades = 0
total_volume = 0.0
exchanges = {}
for row in data['dataset']:
exchange = row[2]
trades = row[3] if row[3] else 0
volume = row[1] if row[1] else 0.0
total_trades += trades
total_volume += volume
exchanges[exchange] = {'trades': trades, 'volume': volume}
return {
'date': date,
'total_trades': total_trades,
'total_volume': total_volume,
'exchanges': json.dumps(exchanges)
}
def calculate_exchange_daily(self, date: datetime.date) -> List[Dict]:
"""Berechnet tägliche Exchange-Statistiken mit Moving Averages"""
date_str = date.strftime('%Y-%m-%d')
# Hole Daten für diesen Tag
query = f""" query = f"""
select select
date_trunc('day', timestamp) as date,
exchange, exchange,
count(*) as trade_count, count(*) as trade_count,
sum(price * quantity) as volume sum(price * quantity) as volume
from trades from trades
where date_trunc('day', timestamp) = '{date_str}' where timestamp >= '{start_date.strftime('%Y-%m-%d')}'
group by exchange group by date, exchange
order by date asc, exchange asc
""" """
data = self.query_questdb(query) try:
if not data or not data.get('dataset'): response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
return [] if response.status_code == 200:
data = response.json()
results = [] columns = data.get('columns', [])
for row in data['dataset']: dataset = data.get('dataset', [])
exchange = row[0]
trade_count = row[1] if row[1] else 0
volume = row[2] if row[2] else 0.0
# Berechne Moving Averages für alle Zeiträume
ma_values = {}
for period in TIME_PERIODS:
# Hole Daten der letzten N Tage inklusive heute
end_date = date
start_date = end_date - datetime.timedelta(days=period-1)
ma_query = f""" results = []
select for row in dataset:
count(*) as ma_count, result = {}
sum(price * quantity) as ma_volume for i, col in enumerate(columns):
from trades result[col['name']] = row[i]
where exchange = '{exchange}' results.append(result)
and date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}'
and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}'
"""
ma_data = self.query_questdb(ma_query) # Berechne Moving Averages für alle Zeiträume
if ma_data and ma_data.get('dataset') and ma_data['dataset'][0]: df = pd.DataFrame(results)
ma_values[f'ma{period}_count'] = ma_data['dataset'][0][0] if ma_data['dataset'][0][0] else 0 if df.empty:
ma_values[f'ma{period}_volume'] = ma_data['dataset'][0][1] if ma_data['dataset'][0][1] else 0.0 return []
else:
ma_values[f'ma{period}_count'] = 0 # Pivot für einfachere MA-Berechnung
ma_values[f'ma{period}_volume'] = 0.0 df['date'] = pd.to_datetime(df['date'])
df = df.sort_values(['date', 'exchange'])
results.append({
'date': date, # Berechne MA für jeden Zeitraum
'exchange': exchange, for period in TIME_PERIODS:
'trade_count': trade_count, df[f'ma{period}_count'] = df.groupby('exchange')['trade_count'].transform(
'volume': volume, lambda x: x.rolling(window=period, min_periods=1).mean()
**ma_values )
}) df[f'ma{period}_volume'] = df.groupby('exchange')['volume'].transform(
lambda x: x.rolling(window=period, min_periods=1).mean()
return results )
# Konvertiere zurück zu Dict-Liste
return df.to_dict('records')
except Exception as e:
logger.error(f"Error calculating exchange daily aggregations: {e}")
return []
def calculate_stock_trends(self, date: datetime.date) -> List[Dict]: def calculate_stock_trends(self, days: int = 365) -> List[Dict]:
"""Berechnet Stock-Trends für alle Zeiträume für einen Tag""" """Berechnet Trenddaten je ISIN mit Änderungsprozenten"""
results = [] end_date = datetime.datetime.now(datetime.timezone.utc)
start_date = end_date - datetime.timedelta(days=days)
for period in TIME_PERIODS: # Aktuelle Periode
end_date = date query_current = f"""
start_date = end_date - datetime.timedelta(days=period-1) select
date_trunc('day', timestamp) as date,
# Aktuelle Periode isin,
query = f""" count(*) as trade_count,
select sum(price * quantity) as volume
isin, from trades
count(*) as trade_count, where timestamp >= '{start_date.strftime('%Y-%m-%d')}'
sum(price * quantity) as volume group by date, isin
from trades order by date asc, isin asc
where date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}' """
and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}'
group by isin
"""
data = self.query_questdb(query)
if not data or not data.get('dataset'):
continue
for row in data['dataset']:
isin = row[0]
current_count = row[1] if row[1] else 0
current_volume = row[2] if row[2] else 0.0
# Vorherige Periode für Vergleich
prev_start = start_date - datetime.timedelta(days=period)
prev_end = start_date - datetime.timedelta(days=1)
prev_query = f"""
select
count(*) as trade_count,
sum(price * quantity) as volume
from trades
where isin = '{isin}'
and date_trunc('day', timestamp) >= '{prev_start.strftime('%Y-%m-%d')}'
and date_trunc('day', timestamp) <= '{prev_end.strftime('%Y-%m-%d')}'
"""
prev_data = self.query_questdb(prev_query)
prev_count = 0
prev_volume = 0.0
if prev_data and prev_data.get('dataset') and prev_data['dataset'][0]:
prev_count = prev_data['dataset'][0][0] if prev_data['dataset'][0][0] else 0
prev_volume = prev_data['dataset'][0][1] if prev_data['dataset'][0][1] else 0.0
# Berechne Änderungen
count_change_pct = ((current_count - prev_count) / prev_count * 100) if prev_count > 0 else 0
volume_change_pct = ((current_volume - prev_volume) / prev_volume * 100) if prev_volume > 0 else 0
results.append({
'date': date,
'period_days': period,
'isin': isin,
'trade_count': current_count,
'volume': current_volume,
'count_change_pct': count_change_pct,
'volume_change_pct': volume_change_pct
})
return results try:
response = requests.get(f"{self.db_url}/exec", params={'query': query_current}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
columns = data.get('columns', [])
dataset = data.get('dataset', [])
results = []
for row in dataset:
result = {}
for i, col in enumerate(columns):
result[col['name']] = row[i]
results.append(result)
if not results:
return []
df = pd.DataFrame(results)
df['date'] = pd.to_datetime(df['date'])
# Aggregiere je ISIN über den gesamten Zeitraum
df_agg = df.groupby('isin').agg({
'trade_count': 'sum',
'volume': 'sum'
}).reset_index()
# Berechne Änderungen: Vergleich mit vorheriger Periode
# Für jede ISIN: aktueller Zeitraum vs. vorheriger Zeitraum
trends = []
for isin in df_agg['isin'].unique():
isin_data = df[df['isin'] == isin].sort_values('date')
# Teile in zwei Hälften für Vergleich
mid_point = len(isin_data) // 2
if mid_point > 0:
first_half = isin_data.iloc[:mid_point]
second_half = isin_data.iloc[mid_point:]
first_count = first_half['trade_count'].sum()
first_volume = first_half['volume'].sum()
second_count = second_half['trade_count'].sum()
second_volume = second_half['volume'].sum()
count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0
volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0
else:
count_change = 0
volume_change = 0
total_count = isin_data['trade_count'].sum()
total_volume = isin_data['volume'].sum()
trends.append({
'isin': isin,
'date': isin_data['date'].max(),
'trade_count': total_count,
'volume': total_volume,
'count_change_pct': count_change,
'volume_change_pct': volume_change
})
return trends
except Exception as e:
logger.error(f"Error calculating stock trends: {e}")
return []
def calculate_volume_changes(self, date: datetime.date) -> List[Dict]: def calculate_volume_changes(self, days: int = 365) -> List[Dict]:
"""Berechnet Volumen-Änderungen für alle Zeiträume für einen Tag""" """Berechnet Volumen- und Anzahl-Änderungen je Exchange"""
results = [] end_date = datetime.datetime.now(datetime.timezone.utc)
start_date = end_date - datetime.timedelta(days=days)
for period in TIME_PERIODS: query = f"""
end_date = date select
start_date = end_date - datetime.timedelta(days=period-1) date_trunc('day', timestamp) as date,
exchange,
# Hole alle Exchanges count(*) as trade_count,
exchanges_query = "select distinct exchange from trades" sum(price * quantity) as volume
exchanges_data = self.query_questdb(exchanges_query) from trades
if not exchanges_data or not exchanges_data.get('dataset'): where timestamp >= '{start_date.strftime('%Y-%m-%d')}'
continue group by date, exchange
order by date asc, exchange asc
for exchange_row in exchanges_data['dataset']: """
exchange = exchange_row[0]
# Aktuelle Periode
query = f"""
select
count(*) as trade_count,
sum(price * quantity) as volume
from trades
where exchange = '{exchange}'
and date_trunc('day', timestamp) >= '{start_date.strftime('%Y-%m-%d')}'
and date_trunc('day', timestamp) <= '{end_date.strftime('%Y-%m-%d')}'
"""
data = self.query_questdb(query)
if not data or not data.get('dataset') or not data['dataset'][0]:
continue
current_count = data['dataset'][0][0] if data['dataset'][0][0] else 0
current_volume = data['dataset'][0][1] if data['dataset'][0][1] else 0.0
# Vorherige Periode
prev_start = start_date - datetime.timedelta(days=period)
prev_end = start_date - datetime.timedelta(days=1)
prev_query = f"""
select
count(*) as trade_count,
sum(price * quantity) as volume
from trades
where exchange = '{exchange}'
and date_trunc('day', timestamp) >= '{prev_start.strftime('%Y-%m-%d')}'
and date_trunc('day', timestamp) <= '{prev_end.strftime('%Y-%m-%d')}'
"""
prev_data = self.query_questdb(prev_query)
prev_count = 0
prev_volume = 0.0
if prev_data and prev_data.get('dataset') and prev_data['dataset'][0]:
prev_count = prev_data['dataset'][0][0] if prev_data['dataset'][0][0] else 0
prev_volume = prev_data['dataset'][0][1] if prev_data['dataset'][0][1] else 0.0
# Berechne Änderungen
count_change_pct = ((current_count - prev_count) / prev_count * 100) if prev_count > 0 else 0
volume_change_pct = ((current_volume - prev_volume) / prev_volume * 100) if prev_volume > 0 else 0
# Bestimme Trend
if count_change_pct > 5 and volume_change_pct > 5:
trend = "mehr_trades_mehr_volumen"
elif count_change_pct > 5 and volume_change_pct < -5:
trend = "mehr_trades_weniger_volumen"
elif count_change_pct < -5 and volume_change_pct > 5:
trend = "weniger_trades_mehr_volumen"
elif count_change_pct < -5 and volume_change_pct < -5:
trend = "weniger_trades_weniger_volumen"
else:
trend = "stabil"
results.append({
'date': date,
'period_days': period,
'exchange': exchange,
'trade_count': current_count,
'volume': current_volume,
'count_change_pct': count_change_pct,
'volume_change_pct': volume_change_pct,
'trend': trend
})
return results try:
response = requests.get(f"{self.db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
columns = data.get('columns', [])
dataset = data.get('dataset', [])
results = []
for row in dataset:
result = {}
for i, col in enumerate(columns):
result[col['name']] = row[i]
results.append(result)
if not results:
return []
df = pd.DataFrame(results)
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values(['date', 'exchange'])
# Berechne Änderungen je Exchange
changes = []
for exchange in df['exchange'].unique():
exchange_data = df[df['exchange'] == exchange].sort_values('date')
# Teile in zwei Hälften
mid_point = len(exchange_data) // 2
if mid_point > 0:
first_half = exchange_data.iloc[:mid_point]
second_half = exchange_data.iloc[mid_point:]
first_count = first_half['trade_count'].sum()
first_volume = first_half['volume'].sum()
second_count = second_half['trade_count'].sum()
second_volume = second_half['volume'].sum()
count_change = ((second_count - first_count) / first_count * 100) if first_count > 0 else 0
volume_change = ((second_volume - first_volume) / first_volume * 100) if first_volume > 0 else 0
# Bestimme Trend
if count_change > 5 and volume_change > 5:
trend = "mehr_trades_mehr_volumen"
elif count_change > 5 and volume_change < -5:
trend = "mehr_trades_weniger_volumen"
elif count_change < -5 and volume_change > 5:
trend = "weniger_trades_mehr_volumen"
elif count_change < -5 and volume_change < -5:
trend = "weniger_trades_weniger_volumen"
else:
trend = "stabil"
else:
count_change = 0
volume_change = 0
trend = "stabil"
total_count = exchange_data['trade_count'].sum()
total_volume = exchange_data['volume'].sum()
changes.append({
'date': exchange_data['date'].max(),
'exchange': exchange,
'trade_count': total_count,
'volume': total_volume,
'count_change_pct': count_change,
'volume_change_pct': volume_change,
'trend': trend
})
return changes
except Exception as e:
logger.error(f"Error calculating volume changes: {e}")
return []
def save_analytics_data(self, table_name: str, data: List[Dict]): def save_analytics_data(self, table_name: str, data: List[Dict]):
"""Speichert aggregierte Daten in QuestDB via ILP""" """Speichert aggregierte Daten in QuestDB via ILP"""
@@ -362,10 +326,10 @@ class AnalyticsWorker:
try: try:
# Konvertiere Datum zu Timestamp # Konvertiere Datum zu Timestamp
if 'date' in row: if 'date' in row:
if isinstance(row['date'], datetime.date): if isinstance(row['date'], str):
dt = datetime.datetime.combine(row['date'], datetime.time.min).replace(tzinfo=datetime.timezone.utc)
elif isinstance(row['date'], str):
dt = datetime.datetime.fromisoformat(row['date'].replace('Z', '+00:00')) dt = datetime.datetime.fromisoformat(row['date'].replace('Z', '+00:00'))
elif isinstance(row['date'], pd.Timestamp):
dt = row['date'].to_pydatetime()
else: else:
dt = row['date'] dt = row['date']
timestamp_ns = int(dt.timestamp() * 1e9) timestamp_ns = int(dt.timestamp() * 1e9)
@@ -386,32 +350,30 @@ class AnalyticsWorker:
isin = str(row['isin']).replace(' ', '\\ ').replace(',', '\\,') isin = str(row['isin']).replace(' ', '\\ ').replace(',', '\\,')
tags.append(f"isin={isin}") tags.append(f"isin={isin}")
# Period als Tag
if 'period_days' in row and row['period_days']:
tags.append(f"period_days={row['period_days']}")
# Trend als Tag # Trend als Tag
if 'trend' in row and row['trend']: if 'trend' in row and row['trend']:
trend = str(row['trend']).replace(' ', '\\ ').replace(',', '\\,') trend = str(row['trend']).replace(' ', '\\ ').replace(',', '\\,')
tags.append(f"trend={trend}") tags.append(f"trend={trend}")
# Numerische Felder # Numerische Felder (period_days muss als Feld gespeichert werden, nicht als Tag)
for key, value in row.items(): for key, value in row.items():
if key in ['date', 'exchange', 'isin', 'trend', 'period_days', 'exchanges']: if key in ['date', 'exchange', 'isin', 'trend', 'exchanges']:
continue continue
if value is not None: if value is not None:
if isinstance(value, (int, float)): if isinstance(value, (int, float)):
fields.append(f"{key}={value}") fields.append(f"{key}={value}")
elif isinstance(value, str): elif isinstance(value, str):
# String-Felder in Anführungszeichen
escaped = value.replace('"', '\\"').replace(' ', '\\ ') escaped = value.replace('"', '\\"').replace(' ', '\\ ')
fields.append(f'{key}="{escaped}"') fields.append(f'{key}="{escaped}"')
# Exchanges als JSON-Feld # Erstelle Line - auch wenn keine Tags vorhanden sind (nur Fields)
if 'exchanges' in row and row['exchanges']: if fields:
fields.append(f'exchanges="{row["exchanges"]}"') if tags:
line = f"{table_name},{','.join(tags)} {','.join(fields)} {timestamp_ns}"
if tags and fields: else:
line = f"{table_name},{','.join(tags)} {','.join(fields)} {timestamp_ns}" # Manche Tabellen haben keine Tags, nur Fields
line = f"{table_name} {','.join(fields)} {timestamp_ns}"
lines.append(line) lines.append(line)
except Exception as e: except Exception as e:
logger.error(f"Error formatting row for {table_name}: {e}, row: {row}") logger.error(f"Error formatting row for {table_name}: {e}, row: {row}")
@@ -436,46 +398,37 @@ class AnalyticsWorker:
except Exception as e: except Exception as e:
logger.error(f"Error connecting to QuestDB: {e}") logger.error(f"Error connecting to QuestDB: {e}")
def process_date(self, date: datetime.date): def process_all_analytics(self):
"""Verarbeitet alle Analytics für einen bestimmten Tag""" """Verarbeitet alle Analytics für alle Zeiträume"""
logger.info(f"Processing analytics for {date}") logger.info("Starting analytics processing...")
# 1. Daily Summary # 1. Exchange Daily Aggregations (für alle Zeiträume)
summary = self.calculate_daily_summary(date) logger.info("Calculating exchange daily aggregations...")
if summary: exchange_data = self.calculate_exchange_daily_aggregations(days_back=365)
self.save_analytics_data('analytics_daily_summary', [summary])
# 2. Exchange Daily
exchange_data = self.calculate_exchange_daily(date)
if exchange_data: if exchange_data:
self.save_analytics_data('analytics_exchange_daily', exchange_data) self.save_analytics_data('analytics_exchange_daily', exchange_data)
# 3. Stock Trends # 2. Stock Trends (für alle Zeiträume)
stock_trends = self.calculate_stock_trends(date) logger.info("Calculating stock trends...")
if stock_trends: for days in TIME_PERIODS:
self.save_analytics_data('analytics_stock_trends', stock_trends) trends = self.calculate_stock_trends(days=days)
if trends:
# Füge Zeitraum als Tag hinzu
for trend in trends:
trend['period_days'] = days
self.save_analytics_data('analytics_stock_trends', trends)
# 4. Volume Changes # 3. Volume Changes (für alle Zeiträume)
volume_changes = self.calculate_volume_changes(date) logger.info("Calculating volume changes...")
if volume_changes: for days in TIME_PERIODS:
self.save_analytics_data('analytics_volume_changes', volume_changes) changes = self.calculate_volume_changes(days=days)
if changes:
# Füge Zeitraum als Tag hinzu
for change in changes:
change['period_days'] = days
self.save_analytics_data('analytics_volume_changes', changes)
logger.info(f"Completed processing for {date}") logger.info("Analytics processing completed.")
def process_missing_dates(self):
"""Berechnet alle fehlenden Tage"""
missing_dates = self.get_missing_dates()
if not missing_dates:
logger.info("No missing dates to process")
return
logger.info(f"Processing {len(missing_dates)} missing dates...")
for i, date in enumerate(missing_dates, 1):
logger.info(f"Processing date {i}/{len(missing_dates)}: {date}")
self.process_date(date)
# Kleine Pause zwischen den Berechnungen
if i % 10 == 0:
time.sleep(1)
def run(self): def run(self):
"""Hauptschleife des Workers""" """Hauptschleife des Workers"""
@@ -486,30 +439,32 @@ class AnalyticsWorker:
logger.error("Failed to connect to QuestDB. Exiting.") logger.error("Failed to connect to QuestDB. Exiting.")
return return
# Initiale Berechnung fehlender Tage # Initiale Verarbeitung
logger.info("Checking for missing dates...") self.process_all_analytics()
self.process_missing_dates() self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc)
# Hauptschleife: Warte auf Mitternacht # Polling-Schleife
logger.info("Waiting for midnight to process yesterday's data...")
while True: while True:
now = datetime.datetime.now() try:
# Prüfe auf neue Trades
# Prüfe ob es Mitternacht ist (00:00) last_ts = self.get_last_processed_timestamp()
if now.hour == 0 and now.minute == 0: new_trades = self.get_new_trades(since=last_ts)
yesterday = (now - datetime.timedelta(days=1)).date()
logger.info(f"Processing yesterday's data: {yesterday}") if new_trades:
self.process_date(yesterday) logger.info(f"Found {len(new_trades)} new trades, reprocessing analytics...")
# Warte 61s, um Mehrfachausführung zu verhindern self.process_all_analytics()
time.sleep(61) self.last_processed_timestamp = datetime.datetime.now(datetime.timezone.utc)
else:
# Prüfe auch auf fehlende Tage (alle 6 Stunden) logger.debug("No new trades found.")
if now.hour % 6 == 0 and now.minute == 0:
logger.info("Checking for missing dates...") # Warte 30 Sekunden vor nächster Prüfung
self.process_missing_dates() time.sleep(30)
time.sleep(61) except requests.exceptions.ConnectionError as e:
logger.warning(f"Connection error to QuestDB, retrying in 60s: {e}")
time.sleep(30) time.sleep(60) # Längere Pause bei Verbindungsfehler
except Exception as e:
logger.error(f"Error in worker loop: {e}", exc_info=True)
time.sleep(60) # Längere Pause bei Fehler
def main(): def main():
worker = AnalyticsWorker() worker = AnalyticsWorker()