Files
trading-daemon/src/metadata/fetcher.py
Melchior Reimers 9161300cfc
All checks were successful
Deployment / deploy-docker (push) Successful in 15s
Fix empty name field error in metadata saving
2026-01-25 15:07:17 +01:00

171 lines
6.2 KiB
Python

import requests
import time
import logging
import os
import datetime
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("MetadataDaemon")
DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
DB_HOST = os.getenv("DB_HOST", "questdb")
def get_unique_isins():
query = "select distinct isin from trades"
try:
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
return [row[0] for row in data.get('dataset', []) if row[0]]
except Exception as e:
logger.error(f"Error fetching unique ISINs: {e}")
return []
def get_processed_isins():
query = "select distinct isin from metadata"
try:
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
return [row[0] for row in data.get('dataset', []) if row[0]]
except Exception:
# Table might not exist yet
return []
return []
from bs4 import BeautifulSoup
import yfinance as yf
def fetch_ticker_from_openfigi(isin):
"""Use OpenFIGI API to map ISIN to ticker symbol"""
try:
headers = {'Content-Type': 'application/json'}
payload = [{'idType': 'ID_ISIN', 'idValue': isin}]
response = requests.post('https://api.openfigi.com/v3/mapping',
json=payload, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
if data and len(data) > 0 and 'data' in data[0]:
# Get the first result's ticker
for item in data[0]['data']:
if 'ticker' in item:
return item['ticker']
except Exception as e:
logger.error(f"OpenFIGI error for {isin}: {e}")
return None
def fetch_sector_from_yfinance(ticker):
"""Use yfinance to get sector information from ticker symbol"""
try:
stock = yf.Ticker(ticker)
info = stock.info
if info and 'sector' in info:
return info['sector']
except Exception as e:
logger.error(f"yfinance error for {ticker}: {e}")
return None
def fetch_metadata(isin):
logger.info(f"Fetching metadata for ISIN: {isin}")
metadata = {
'isin': isin,
'name': 'Unknown',
'country': 'Unknown',
'continent': 'Unknown',
'sector': 'Unknown'
}
# Common headers to avoid blocks
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
# 1. GLEIF API for Name and Country
try:
gleif_url = f"https://api.gleif.org/api/v1/lei-records?filter[isin]={isin}"
res = requests.get(gleif_url, headers=headers, timeout=10)
if res.status_code == 200:
data = res.json().get('data', [])
if data:
attr = data[0].get('attributes', {})
metadata['name'] = attr.get('entity', {}).get('legalName', {}).get('name', 'Unknown')
metadata['country'] = attr.get('entity', {}).get('legalAddress', {}).get('country', 'Unknown')
except Exception as e:
logger.error(f"GLEIF error for {isin}: {e}")
# 2. Sector from OpenFIGI + yfinance
try:
ticker = fetch_ticker_from_openfigi(isin)
if ticker:
logger.info(f"Found ticker {ticker} for ISIN {isin}")
sector = fetch_sector_from_yfinance(ticker)
if sector:
metadata['sector'] = sector
logger.info(f"Found sector {sector} for {isin}")
except Exception as e:
logger.error(f"Sector fetching error for {isin}: {e}")
# 3. Continent mapping from Country Code
if metadata['country'] != 'Unknown':
try:
country_url = f"https://restcountries.com/v3.1/alpha/{metadata['country']}"
res = requests.get(country_url, headers=headers, timeout=10)
if res.status_code == 200:
data = res.json()
if data and isinstance(data, list):
continents = data[0].get('continents', [])
if continents:
metadata['continent'] = continents[0]
except Exception as e:
logger.error(f"RestCountries error for {metadata['country']}: {e}")
return metadata
def save_metadata(metadata):
# QuestDB Influx Line Protocol
# table,tag1=val1 field1="str",field2=num timestamp
# Ensure all fields have valid non-empty values
name = metadata.get('name', 'Unknown') or 'Unknown'
country = metadata.get('country', 'Unknown') or 'Unknown'
continent = metadata.get('continent', 'Unknown') or 'Unknown'
sector = metadata.get('sector', 'Unknown') or 'Unknown'
isin = metadata['isin']
# Escape special characters in name for Influx Line Protocol
name = name.replace(' ', '\\ ').replace(',', '\\,').replace('"', '\\"')
line = f'metadata,isin={isin} name="{name}",country="{country}",continent="{continent}",sector="{sector}"'
try:
response = requests.post(f"http://{DB_HOST}:9000/write", data=line + "\\n", auth=DB_AUTH)
if response.status_code not in [200, 204]:
logger.error(f"Error saving metadata: {response.text}")
except Exception as e:
logger.error(f"Connection error to QuestDB: {e}")
def main():
logger.info("Metadata Daemon started.")
while True:
unique_isins = get_unique_isins()
processed_isins = get_processed_isins()
new_isins = [i for i in unique_isins if i not in processed_isins]
if new_isins:
logger.info(f"Found {len(new_isins)} new ISINs to process.")
for isin in new_isins:
data = fetch_metadata(isin)
save_metadata(data)
time.sleep(1) # Rate limiting
else:
logger.info("No new ISINs found.")
time.sleep(3600) # Run once per hour
if __name__ == "__main__":
main()