diff --git a/src/database/questdb_client.py b/src/database/questdb_client.py index 2d3495c..700643b 100644 --- a/src/database/questdb_client.py +++ b/src/database/questdb_client.py @@ -10,36 +10,50 @@ class DatabaseClient: self.url = f"http://{host}:{port}/write" self.auth = (user, password) if user and password else None - def save_trades(self, trades: List[Trade]): + def save_trades(self, trades: List[Trade], batch_size: int = 50000): if not trades: return - lines = [] - for trade in trades: - # Clean symbols for ILP - symbol = trade.symbol.replace(" ", "\\ ").replace(",", "\\,") - exchange = trade.exchange - - line = f"trades,exchange={exchange},symbol={symbol},isin={trade.isin} " \ - f"price={trade.price},quantity={trade.quantity} " \ - f"{int(trade.timestamp.timestamp() * 1e9)}" - lines.append(line) - - payload = "\n".join(lines) + "\n" + total_trades = len(trades) + print(f"Saving {total_trades} trades to QuestDB in batches of {batch_size}...") - try: - response = requests.post( - self.url, - data=payload, - params={'precision': 'ns'}, - auth=self.auth - ) - if response.status_code not in [204, 200]: - print(f"Error saving to QuestDB: {response.text}") - except Exception as e: - print(f"Could not connect to QuestDB at {self.url}: {e}") - # Fallback: print to console or save to file - self._fallback_save(trades) + for i in range(0, total_trades, batch_size): + batch = trades[i:i + batch_size] + lines = [] + for trade in batch: + # Clean symbols for ILP + try: + symbol = trade.symbol.replace(" ", "\\ ").replace(",", "\\,") + exchange = trade.exchange + + line = f"trades,exchange={exchange},symbol={symbol},isin={trade.isin} " \ + f"price={trade.price},quantity={trade.quantity} " \ + f"{int(trade.timestamp.timestamp() * 1e9)}" + lines.append(line) + except Exception as e: + print(f"Error formating trade {trade}: {e}") + continue + + if not lines: + continue + + payload = "\n".join(lines) + "\n" + + try: + response = requests.post( + self.url, + data=payload, + params={'precision': 'ns'}, + auth=self.auth + ) + if response.status_code not in [204, 200]: + print(f"Error saving batch {i//batch_size + 1} to QuestDB: {response.text}") + else: + print(f"Saved batch {i//batch_size + 1} ({len(batch)} trades)") + except Exception as e: + print(f"Could not connect to QuestDB at {self.url}: {e}") + # Fallback: print to console or save to file + self._fallback_save(batch) def _fallback_save(self, trades: List[Trade]): # Just log to a file for now if QuestDB is not available