From b9062c5dac6a7d9d41785691d28b2a295f374db7 Mon Sep 17 00:00:00 2001 From: Melchior Reimers Date: Sun, 25 Jan 2026 16:44:43 +0100 Subject: [PATCH] now downloads historical eix dat --- daemon.py | 24 ++++++++++++++++--- dashboard/server.py | 5 +++- src/exchanges/eix.py | 55 ++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 76 insertions(+), 8 deletions(-) diff --git a/daemon.py b/daemon.py index ec5b6cc..edf77aa 100644 --- a/daemon.py +++ b/daemon.py @@ -45,8 +45,14 @@ def run_task(historical=False): eix = EIXExchange() ls = LSExchange() + # Pass last_ts to fetcher to allow smart filtering + # daemon.py runs daily, so we want to fetch everything since DB state + # BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument + # We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop. + + # We will modify the loop below to handle args dynamically exchanges_to_process = [ - (eix, {'limit': None if historical else 1}), + (eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical (ls, {'include_yesterday': historical}) ] @@ -58,7 +64,16 @@ def run_task(historical=False): last_ts = get_last_trade_timestamp(db_url, exchange.name) logger.info(f"Fetching data from {exchange.name} (Filtering trades older than {last_ts})...") - trades = exchange.fetch_latest_trades(**args) + + # Special handling for EIX to support smart filtering + call_args = args.copy() + if exchange.name == "EIX" and not historical: + call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc) + # Remove limit if we are filtering by date to ensure we get everything + if 'limit' in call_args: + call_args.pop('limit') + + trades = exchange.fetch_latest_trades(**call_args) # Deduplizierung: Nur Trades nehmen, die neuer sind als der letzte in der DB new_trades = [ @@ -97,7 +112,10 @@ def main(): logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...") run_task(historical=True) else: - logger.info("Found existing data in database. Waiting for scheduled run at 23:00.") + logger.info("Found existing data in database. Triggering catch-up sync...") + # Run a normal task to fetch any missing data since the last run + run_task(historical=False) + logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.") while True: now = datetime.datetime.now() diff --git a/dashboard/server.py b/dashboard/server.py index 8c8ccb3..f867dcc 100644 --- a/dashboard/server.py +++ b/dashboard/server.py @@ -118,7 +118,10 @@ async def get_analytics( if sub_group_by and sub_group_by in groups_map: query += f", {groups_map[sub_group_by]} as sub_label" - query += f", {selected_metric} as value from trades" + if metric == 'all': + query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades" + else: + query += f", {selected_metric} as value from trades" if needs_metadata: query += " t left join metadata m on t.isin = m.isin" diff --git a/src/exchanges/eix.py b/src/exchanges/eix.py index f855be8..959be31 100644 --- a/src/exchanges/eix.py +++ b/src/exchanges/eix.py @@ -12,7 +12,7 @@ class EIXExchange(BaseExchange): def name(self) -> str: return "EIX" - def fetch_latest_trades(self, limit: int = 1) -> List[Trade]: + def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None) -> List[Trade]: # EIX stores its file list in a separate API endpoint url = "https://european-investor-exchange.com/api/official-trades" try: @@ -23,12 +23,58 @@ class EIXExchange(BaseExchange): print(f"Error fetching EIX file list: {e}") return [] - trades = [] - count = 0 + # Filter files based on date in filename if since_date provided + # Format: "kursblatt/2025/Kursblatt.2025-07-14.1752526803105.csv" + filtered_files = [] for item in files_list: file_key = item.get('fileName') if not file_key: continue + + if since_date: + try: + # Extract date from filename: Kursblatt.YYYY-MM-DD + parts = file_key.split('/')[-1].split('.') + # parts example: ['Kursblatt', '2025-07-14', '1752526803105', 'csv'] + if len(parts) >= 2: + date_str = parts[1] + file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) + + # Check if file date is newer than since_date (compare dates only) + if file_date.date() > since_date.date(): + filtered_files.append(item) + continue + # If same day, we might need to check it too, but EIX seems to be daily files + if file_date.date() == since_date.date(): + filtered_files.append(item) + continue + except Exception: + # If parsing fails, default to including it (safety) or skipping? + # Let's include it if we are not sure + filtered_files.append(item) + else: + filtered_files.append(item) + + # Sort files to process oldest to newest if doing a sync, or newest to oldest? + # If we have limit=1 (default), we usually want the newest. + # But if we are syncing history (since_date set), we probably want all of them. + + # Logic: If since_date is set, we ignore limit (or use it as safety cap) and process ALL new files + if since_date: + files_to_process = filtered_files + # Sort by date ? The API list seems chronological. + else: + # Default behavior: take the last N files (API returns oldest first usually?) + # Let's assume list is chronological. + if limit: + files_to_process = files_list[-limit:] + else: + files_to_process = files_list + + trades = [] + count = 0 + for item in files_to_process: + file_key = item.get('fileName') # Download the CSV csv_url = f"https://european-investor-exchange.com/api/trade-file-contents?key={file_key}" @@ -37,7 +83,8 @@ class EIXExchange(BaseExchange): if csv_response.status_code == 200: trades.extend(self._parse_csv(csv_response.text)) count += 1 - if limit and count >= limit: + # Only enforce limit if since_date is NOT set + if not since_date and limit and count >= limit: break except Exception as e: print(f"Error downloading EIX CSV {file_key}: {e}")