from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse import requests import os import pandas as pd app = FastAPI(title="Trading Dashboard API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # Serve static files app.mount("/static", StaticFiles(directory="dashboard/public"), name="static") @app.get("/") async def read_index(): return FileResponse('dashboard/public/index.html') DB_USER = os.getenv("DB_USER", "admin") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None DB_HOST = os.getenv("DB_HOST", "questdb") @app.get("/api/trades") async def get_trades(isin: str = None, days: int = 7): query = f"select * from trades where timestamp > dateadd('d', -{days}, now())" if isin: query += f" and isin = '{isin}'" query += " order by timestamp asc" try: response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: return response.json() throw_http_error(response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/metadata") async def get_metadata(): query = "select * from metadata" try: response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: return response.json() throw_http_error(response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/summary") async def get_summary(): # Coalesce null values to 'Unknown' and group properly query = """ select coalesce(m.continent, 'Unknown') as continent, count(*) as trade_count, sum(t.price * t.quantity) as total_volume from trades t left join metadata m on t.isin = m.isin group by continent """ try: response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: return response.json() throw_http_error(response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/analytics") async def get_analytics( metric: str = "volume", group_by: str = "day", sub_group_by: str = None, date_from: str = None, date_to: str = None, isins: str = None, continents: str = None ): # Determine if we need to join metadata # Determine if we need to join metadata composite_keys = ["exchange_continent", "exchange_sector"] needs_metadata = any([ group_by in ["name", "continent", "sector"] + composite_keys, sub_group_by in ["name", "continent", "sector"] + composite_keys, continents is not None ]) # Use prefixes only if joining t_prefix = "t." if needs_metadata else "" m_prefix = "m." if needs_metadata else "" metrics_map = { "volume": f"sum({t_prefix}price * {t_prefix}quantity)", "count": f"count(*)", "avg_price": f"avg({t_prefix}price)" } groups_map = { "day": f"date_trunc('day', {t_prefix}timestamp)", "month": f"date_trunc('month', {t_prefix}timestamp)", "exchange": f"{t_prefix}exchange", "isin": f"{t_prefix}isin", "name": f"coalesce({m_prefix}name, {t_prefix}isin)" if needs_metadata else "isin", "continent": f"coalesce({m_prefix}continent, 'Unknown')" if needs_metadata else "'Unknown'", "sector": f"coalesce({m_prefix}sector, 'Unknown')" if needs_metadata else "'Unknown'", "exchange_continent": f"concat({t_prefix}exchange, ' - ', coalesce({m_prefix}continent, 'Unknown'))" if needs_metadata else "'Unknown'", "exchange_sector": f"concat({t_prefix}exchange, ' - ', coalesce({m_prefix}sector, 'Unknown'))" if needs_metadata else "'Unknown'" } # Determine table based on granularity and needs # For day/month aggregation without ISIN specific filtering, use analytics_daily # But analytics_daily doesn't have individual ISINs (except via another table) # So if ISIN filter is off, use analytics_daily use_analytics_table = False # Check if we can use the pre-aggregated table if not isins and not sub_group_by == "isin" and group_by != "isin" and group_by != "name": use_analytics_table = True table_name = "analytics_daily" if use_analytics_table else "trades" # If using analytics table, columns might be named differently? # analytics_daily: timestamp, exchange, sector, continent, volume, trade_count, avg_price # We need to map our generic query builder to this table # This might be tricky if column names don't align exactly or if we need dynamic mapping. # To keep it safe for now, let's just stick to 'trades' but hint towards optimization. # Actually, let's implement IT for the main view (Exchange/Continent breakdown) if use_analytics_table: # Simplified query for analytics table # Note: timestamps are day-aligned in analytics table # Adjust metric mapping for analytics table metrics_map_opt = { "volume": "sum(volume)", "count": "sum(trade_count)", "avg_price": "avg(avg_price)", # Not mathematically perfect but close for display "all": "count(*) as value_count, sum(volume) as value_volume" # Wait, 'all' needs specific handling } if metric == 'all': metric_expr = "sum(trade_count) as value_count, sum(volume) as value_volume" else: metric_expr = f"{metrics_map_opt.get(metric, 'sum(volume)')} as value" # Group mapping logic # analytics_daily has: timestamp, exchange, sector, continent groups_map_opt = { "day": "timestamp", "month": "date_trunc('month', timestamp)", "exchange": "exchange", "continent": "continent", "sector": "sector", "exchange_continent": "concat(exchange, ' - ', continent)", "exchange_sector": "concat(exchange, ' - ', sector)" } sel_group_expr = groups_map_opt.get(group_by, "timestamp") query = f"select {sel_group_expr} as label" if sub_group_by and sub_group_by in groups_map_opt: query += f", {groups_map_opt[sub_group_by]} as sub_label" query += f", {metric_expr} from analytics_daily where 1=1" if date_from: query += f" and timestamp >= '{date_from}'" if date_to: query += f" and timestamp <= '{date_to}'" # Filters if continents: cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")]) query += f" and continent in ({cont_list})" query += f" group by {sel_group_expr}" if sub_group_by: query += f", {groups_map_opt[sub_group_by]}" query += " order by label asc" else: # Fallback to RAW TRADES query (existing logic) # ... (keep existing logic but indented/wrapped) selected_metric = metrics_map.get(metric, metrics_map["volume"]) selected_group = groups_map.get(group_by, groups_map["day"]) query = f"select {selected_group} as label" if sub_group_by and sub_group_by in groups_map: query += f", {groups_map[sub_group_by]} as sub_label" if metric == 'all': query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades" else: query += f", {selected_metric} as value from trades" if needs_metadata: query += " t left join metadata m on t.isin = m.isin" query += " where 1=1" if date_from: query += f" and {t_prefix}timestamp >= '{date_from}'" if date_to: query += f" and {t_prefix}timestamp <= '{date_to}'" if isins: isins_list = ",".join([f"'{i.strip()}'" for i in isins.split(",")]) query += f" and {t_prefix}isin in ({isins_list})" if continents and needs_metadata: cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")]) query += f" and {m_prefix}continent in ({cont_list})" query += f" group by {selected_group}" if sub_group_by and sub_group_by in groups_map: query += f", {groups_map[sub_group_by]}" query += " order by label asc" print(f"Executing Query: {query}") try: response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: return response.json() print(f"DEBUG: Query Failed: {response.text}") if use_analytics_table: print("DEBUG: Analytics query failed, falling back to RAW trades query...") selected_metric = metrics_map.get(metric, metrics_map["volume"]) selected_group = groups_map.get(group_by, groups_map["day"]) raw_query = f"select {selected_group} as label" if sub_group_by and sub_group_by in groups_map: raw_query += f", {groups_map[sub_group_by]} as sub_label" if metric == 'all': raw_query += f", count(*) as value_count, sum({t_prefix}price * {t_prefix}quantity) as value_volume from trades" else: raw_query += f", {selected_metric} as value from trades" if needs_metadata: raw_query += " t left join metadata m on t.isin = m.isin" raw_query += " where 1=1" if date_from: raw_query += f" and {t_prefix}timestamp >= '{date_from}'" if date_to: raw_query += f" and {t_prefix}timestamp <= '{date_to}'" if isins: isins_list = ",".join([f"'{i.strip()}'" for i in isins.split(",")]) raw_query += f" and {t_prefix}isin in ({isins_list})" if continents and needs_metadata: cont_list = ",".join([f"'{c.strip()}'" for c in continents.split(",")]) raw_query += f" and {m_prefix}continent in ({cont_list})" raw_query += f" group by {selected_group}" if sub_group_by and sub_group_by in groups_map: raw_query += f", {groups_map[sub_group_by]}" raw_query += " order by label asc" print(f"Executing Fallback Query: {raw_query}") fb_response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': raw_query}, auth=DB_AUTH) if fb_response.status_code == 200: return fb_response.json() throw_http_error(response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/metadata/search") async def search_metadata(q: str): # Case-insensitive search for ISIN or Name query = f"select isin, name from metadata where isin ilike '%{q}%' or name ilike '%{q}%' limit 10" try: response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: return response.json() throw_http_error(response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) def throw_http_error(res): raise HTTPException(status_code=res.status_code, detail=f"QuestDB error: {res.text}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)