This commit is contained in:
@@ -10,36 +10,50 @@ class DatabaseClient:
|
|||||||
self.url = f"http://{host}:{port}/write"
|
self.url = f"http://{host}:{port}/write"
|
||||||
self.auth = (user, password) if user and password else None
|
self.auth = (user, password) if user and password else None
|
||||||
|
|
||||||
def save_trades(self, trades: List[Trade]):
|
def save_trades(self, trades: List[Trade], batch_size: int = 50000):
|
||||||
if not trades:
|
if not trades:
|
||||||
return
|
return
|
||||||
|
|
||||||
lines = []
|
total_trades = len(trades)
|
||||||
for trade in trades:
|
print(f"Saving {total_trades} trades to QuestDB in batches of {batch_size}...")
|
||||||
# Clean symbols for ILP
|
|
||||||
symbol = trade.symbol.replace(" ", "\\ ").replace(",", "\\,")
|
|
||||||
exchange = trade.exchange
|
|
||||||
|
|
||||||
line = f"trades,exchange={exchange},symbol={symbol},isin={trade.isin} " \
|
|
||||||
f"price={trade.price},quantity={trade.quantity} " \
|
|
||||||
f"{int(trade.timestamp.timestamp() * 1e9)}"
|
|
||||||
lines.append(line)
|
|
||||||
|
|
||||||
payload = "\n".join(lines) + "\n"
|
|
||||||
|
|
||||||
try:
|
for i in range(0, total_trades, batch_size):
|
||||||
response = requests.post(
|
batch = trades[i:i + batch_size]
|
||||||
self.url,
|
lines = []
|
||||||
data=payload,
|
for trade in batch:
|
||||||
params={'precision': 'ns'},
|
# Clean symbols for ILP
|
||||||
auth=self.auth
|
try:
|
||||||
)
|
symbol = trade.symbol.replace(" ", "\\ ").replace(",", "\\,")
|
||||||
if response.status_code not in [204, 200]:
|
exchange = trade.exchange
|
||||||
print(f"Error saving to QuestDB: {response.text}")
|
|
||||||
except Exception as e:
|
line = f"trades,exchange={exchange},symbol={symbol},isin={trade.isin} " \
|
||||||
print(f"Could not connect to QuestDB at {self.url}: {e}")
|
f"price={trade.price},quantity={trade.quantity} " \
|
||||||
# Fallback: print to console or save to file
|
f"{int(trade.timestamp.timestamp() * 1e9)}"
|
||||||
self._fallback_save(trades)
|
lines.append(line)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error formating trade {trade}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not lines:
|
||||||
|
continue
|
||||||
|
|
||||||
|
payload = "\n".join(lines) + "\n"
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.post(
|
||||||
|
self.url,
|
||||||
|
data=payload,
|
||||||
|
params={'precision': 'ns'},
|
||||||
|
auth=self.auth
|
||||||
|
)
|
||||||
|
if response.status_code not in [204, 200]:
|
||||||
|
print(f"Error saving batch {i//batch_size + 1} to QuestDB: {response.text}")
|
||||||
|
else:
|
||||||
|
print(f"Saved batch {i//batch_size + 1} ({len(batch)} trades)")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Could not connect to QuestDB at {self.url}: {e}")
|
||||||
|
# Fallback: print to console or save to file
|
||||||
|
self._fallback_save(batch)
|
||||||
|
|
||||||
def _fallback_save(self, trades: List[Trade]):
|
def _fallback_save(self, trades: List[Trade]):
|
||||||
# Just log to a file for now if QuestDB is not available
|
# Just log to a file for now if QuestDB is not available
|
||||||
|
|||||||
Reference in New Issue
Block a user