116 lines
4.1 KiB
Python
116 lines
4.1 KiB
Python
|
|
import requests
|
||
|
|
import time
|
||
|
|
import logging
|
||
|
|
import os
|
||
|
|
import datetime
|
||
|
|
|
||
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||
|
|
logger = logging.getLogger("MetadataDaemon")
|
||
|
|
|
||
|
|
DB_USER = os.getenv("DB_USER", "admin")
|
||
|
|
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
|
||
|
|
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
|
||
|
|
DB_HOST = os.getenv("DB_HOST", "questdb")
|
||
|
|
|
||
|
|
def get_unique_isins():
|
||
|
|
query = "select distinct isin from trades"
|
||
|
|
try:
|
||
|
|
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH)
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
return [row[0] for row in data.get('dataset', []) if row[0]]
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error fetching unique ISINs: {e}")
|
||
|
|
return []
|
||
|
|
|
||
|
|
def get_processed_isins():
|
||
|
|
query = "select distinct isin from metadata"
|
||
|
|
try:
|
||
|
|
response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH)
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
return [row[0] for row in data.get('dataset', []) if row[0]]
|
||
|
|
except Exception:
|
||
|
|
# Table might not exist yet
|
||
|
|
return []
|
||
|
|
return []
|
||
|
|
|
||
|
|
def fetch_metadata(isin):
|
||
|
|
logger.info(f"Fetching metadata for ISIN: {isin}")
|
||
|
|
metadata = {
|
||
|
|
'isin': isin,
|
||
|
|
'name': 'Unknown',
|
||
|
|
'country': 'Unknown',
|
||
|
|
'continent': 'Unknown',
|
||
|
|
'sector': 'Unknown'
|
||
|
|
}
|
||
|
|
|
||
|
|
# 1. GLEIF API for Name and Country
|
||
|
|
try:
|
||
|
|
gleif_url = f"https://api.gleif.org/api/v1/lei-records?filter[isin]={isin}"
|
||
|
|
res = requests.get(gleif_url, timeout=10)
|
||
|
|
if res.status_code == 200:
|
||
|
|
data = res.json().get('data', [])
|
||
|
|
if data:
|
||
|
|
attr = data[0].get('attributes', {})
|
||
|
|
metadata['name'] = attr.get('entity', {}).get('legalName', {}).get('name', 'Unknown')
|
||
|
|
metadata['country'] = attr.get('entity', {}).get('legalAddress', {}).get('country', 'Unknown')
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"GLEIF error for {isin}: {e}")
|
||
|
|
|
||
|
|
# 2. Continent mapping from Country Code
|
||
|
|
if metadata['country'] != 'Unknown':
|
||
|
|
try:
|
||
|
|
country_url = f"https://restcountries.com/v3.1/alpha/{metadata['country']}"
|
||
|
|
res = requests.get(country_url, timeout=10)
|
||
|
|
if res.status_code == 200:
|
||
|
|
data = res.json()
|
||
|
|
if data and isinstance(data, list):
|
||
|
|
continents = data[0].get('continents', [])
|
||
|
|
if continents:
|
||
|
|
metadata['continent'] = continents[0]
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"RestCountries error for {metadata['country']}: {e}")
|
||
|
|
|
||
|
|
return metadata
|
||
|
|
|
||
|
|
def save_metadata(metadata):
|
||
|
|
# QuestDB Influx Line Protocol
|
||
|
|
# table,tag1=val1 field1="str",field2=num timestamp
|
||
|
|
name = metadata['name'].replace(' ', '\\ ').replace(',', '\\,')
|
||
|
|
country = metadata['country']
|
||
|
|
continent = metadata['continent']
|
||
|
|
sector = metadata['sector']
|
||
|
|
isin = metadata['isin']
|
||
|
|
|
||
|
|
line = f'metadata,isin={isin} name="{name}",country="{country}",continent="{continent}",sector="{sector}"'
|
||
|
|
|
||
|
|
try:
|
||
|
|
response = requests.post(f"http://{DB_HOST}:9000/write", data=line + "\n", auth=DB_AUTH)
|
||
|
|
if response.status_code not in [200, 204]:
|
||
|
|
logger.error(f"Error saving metadata: {response.text}")
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Connection error to QuestDB: {e}")
|
||
|
|
|
||
|
|
def main():
|
||
|
|
logger.info("Metadata Daemon started.")
|
||
|
|
while True:
|
||
|
|
unique_isins = get_unique_isins()
|
||
|
|
processed_isins = get_processed_isins()
|
||
|
|
|
||
|
|
new_isins = [i for i in unique_isins if i not in processed_isins]
|
||
|
|
|
||
|
|
if new_isins:
|
||
|
|
logger.info(f"Found {len(new_isins)} new ISINs to process.")
|
||
|
|
for isin in new_isins:
|
||
|
|
data = fetch_metadata(isin)
|
||
|
|
save_metadata(data)
|
||
|
|
time.sleep(1) # Rate limiting
|
||
|
|
else:
|
||
|
|
logger.info("No new ISINs found.")
|
||
|
|
|
||
|
|
time.sleep(3600) # Run once per hour
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|