now downloads historical eix dat
All checks were successful
Deployment / deploy-docker (push) Successful in 14s

This commit is contained in:
Melchior Reimers
2026-01-25 16:44:43 +01:00
parent b4b9e6e48b
commit b9062c5dac
3 changed files with 76 additions and 8 deletions

View File

@@ -45,8 +45,14 @@ def run_task(historical=False):
eix = EIXExchange()
ls = LSExchange()
# Pass last_ts to fetcher to allow smart filtering
# daemon.py runs daily, so we want to fetch everything since DB state
# BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument
# We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop.
# We will modify the loop below to handle args dynamically
exchanges_to_process = [
(eix, {'limit': None if historical else 1}),
(eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical
(ls, {'include_yesterday': historical})
]
@@ -58,7 +64,16 @@ def run_task(historical=False):
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Fetching data from {exchange.name} (Filtering trades older than {last_ts})...")
trades = exchange.fetch_latest_trades(**args)
# Special handling for EIX to support smart filtering
call_args = args.copy()
if exchange.name == "EIX" and not historical:
call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc)
# Remove limit if we are filtering by date to ensure we get everything
if 'limit' in call_args:
call_args.pop('limit')
trades = exchange.fetch_latest_trades(**call_args)
# Deduplizierung: Nur Trades nehmen, die neuer sind als der letzte in der DB
new_trades = [
@@ -97,7 +112,10 @@ def main():
logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...")
run_task(historical=True)
else:
logger.info("Found existing data in database. Waiting for scheduled run at 23:00.")
logger.info("Found existing data in database. Triggering catch-up sync...")
# Run a normal task to fetch any missing data since the last run
run_task(historical=False)
logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.")
while True:
now = datetime.datetime.now()

View File

@@ -118,6 +118,9 @@ async def get_analytics(
if sub_group_by and sub_group_by in groups_map:
query += f", {groups_map[sub_group_by]} as sub_label"
if metric == 'all':
query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades"
else:
query += f", {selected_metric} as value from trades"
if needs_metadata:
query += " t left join metadata m on t.isin = m.isin"

View File

@@ -12,7 +12,7 @@ class EIXExchange(BaseExchange):
def name(self) -> str:
return "EIX"
def fetch_latest_trades(self, limit: int = 1) -> List[Trade]:
def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None) -> List[Trade]:
# EIX stores its file list in a separate API endpoint
url = "https://european-investor-exchange.com/api/official-trades"
try:
@@ -23,13 +23,59 @@ class EIXExchange(BaseExchange):
print(f"Error fetching EIX file list: {e}")
return []
trades = []
count = 0
# Filter files based on date in filename if since_date provided
# Format: "kursblatt/2025/Kursblatt.2025-07-14.1752526803105.csv"
filtered_files = []
for item in files_list:
file_key = item.get('fileName')
if not file_key:
continue
if since_date:
try:
# Extract date from filename: Kursblatt.YYYY-MM-DD
parts = file_key.split('/')[-1].split('.')
# parts example: ['Kursblatt', '2025-07-14', '1752526803105', 'csv']
if len(parts) >= 2:
date_str = parts[1]
file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
# Check if file date is newer than since_date (compare dates only)
if file_date.date() > since_date.date():
filtered_files.append(item)
continue
# If same day, we might need to check it too, but EIX seems to be daily files
if file_date.date() == since_date.date():
filtered_files.append(item)
continue
except Exception:
# If parsing fails, default to including it (safety) or skipping?
# Let's include it if we are not sure
filtered_files.append(item)
else:
filtered_files.append(item)
# Sort files to process oldest to newest if doing a sync, or newest to oldest?
# If we have limit=1 (default), we usually want the newest.
# But if we are syncing history (since_date set), we probably want all of them.
# Logic: If since_date is set, we ignore limit (or use it as safety cap) and process ALL new files
if since_date:
files_to_process = filtered_files
# Sort by date ? The API list seems chronological.
else:
# Default behavior: take the last N files (API returns oldest first usually?)
# Let's assume list is chronological.
if limit:
files_to_process = files_list[-limit:]
else:
files_to_process = files_list
trades = []
count = 0
for item in files_to_process:
file_key = item.get('fileName')
# Download the CSV
csv_url = f"https://european-investor-exchange.com/api/trade-file-contents?key={file_key}"
try:
@@ -37,7 +83,8 @@ class EIXExchange(BaseExchange):
if csv_response.status_code == 200:
trades.extend(self._parse_csv(csv_response.text))
count += 1
if limit and count >= limit:
# Only enforce limit if since_date is NOT set
if not since_date and limit and count >= limit:
break
except Exception as e:
print(f"Error downloading EIX CSV {file_key}: {e}")