Compare commits

...

5 Commits

Author SHA1 Message Date
Melchior Reimers
1dc79b8b64 Refactor: Code-Qualität verbessert und Projektstruktur aufgeräumt
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
- daemon.py: gc.collect() entfernt, robustes Scheduling (last_run_date statt Minuten-Check),
  Exchange Registry Pattern eingeführt (STREAMING_EXCHANGES/STANDARD_EXCHANGES)
- deutsche_boerse.py: Thread-safe User-Agent Rotation bei Rate-Limits,
  Logging statt print(), Feiertags-Prüfung, aufgeteilte Parse-Methoden
- eix.py: Logging statt print(), spezifische Exception-Typen statt blankem except
- read.py gelöscht und durch scripts/inspect_gzip.py ersetzt (Streaming-basiert)
- Utility-Scripts in scripts/ verschoben (cleanup_duplicates, restore_and_fix, verify_fix)
2026-02-01 08:18:55 +01:00
Melchior Reimers
cf55a0bd06 Fix: Analytics Worker berechnet heute/gestern IMMER neu
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
- Neue force_recalculate_date() Methode löscht alte Daten vor Neuberechnung
- Heute und gestern werden bei jedem Stunden-Check neu berechnet
- Behebt Problem, dass neue Trades nicht in Analytics aufgenommen wurden
2026-01-29 22:36:22 +01:00
Melchior Reimers
9cd84e0855 Fix: Streaming-Verarbeitung für EIX um RAM-Überlauf zu verhindern
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
- EIX verarbeitet jetzt eine Datei nach der anderen (nicht alle auf einmal)
- Speicher wird nach jeder Datei freigegeben (gc.collect)
- Day-basiertes Caching für Duplikatprüfung mit Cache-Clearing
- Reduziert RAM-Verbrauch von 8GB+ auf unter 500MB
2026-01-29 16:17:11 +01:00
Melchior Reimers
f325941e24 Fix: Analytics Worker berechnet jetzt alle Tabellen pro Tag
Some checks failed
Deployment / deploy-docker (push) Has been cancelled
2026-01-29 16:12:20 +01:00
Melchior Reimers
a21e036bb4 Add Börsenag exchanges (DUSA, DUSB, DUSC, DUSD, HAMA, HAMB, HANA, HANB)
- New boersenag.py with support for Düsseldorf, Hamburg, and Hannover exchanges
- Proper .gitignore to exclude venv and temp files
2026-01-29 16:08:14 +01:00
22 changed files with 1624 additions and 434 deletions

48
.gitignore vendored Normal file
View File

@@ -0,0 +1,48 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
venv/
ENV/
env/
.venv/
# IDE
.idea/
.vscode/
*.swp
*.swo
# Environment files
.env
.env.local
# OS
.DS_Store
Thumbs.db
# Logs
*.log
# Test data
*.gz
!requirements.txt

248
bin/Activate.ps1 Normal file
View File

@@ -0,0 +1,248 @@
<#
.Synopsis
Activate a Python virtual environment for the current PowerShell session.
.Description
Pushes the python executable for a virtual environment to the front of the
$Env:PATH environment variable and sets the prompt to signify that you are
in a Python virtual environment. Makes use of the command line switches as
well as the `pyvenv.cfg` file values present in the virtual environment.
.Parameter VenvDir
Path to the directory that contains the virtual environment to activate. The
default value for this is the parent of the directory that the Activate.ps1
script is located within.
.Parameter Prompt
The prompt prefix to display when this virtual environment is activated. By
default, this prompt is the name of the virtual environment folder (VenvDir)
surrounded by parentheses and followed by a single space (ie. '(.venv) ').
.Example
Activate.ps1
Activates the Python virtual environment that contains the Activate.ps1 script.
.Example
Activate.ps1 -Verbose
Activates the Python virtual environment that contains the Activate.ps1 script,
and shows extra information about the activation as it executes.
.Example
Activate.ps1 -VenvDir C:\Users\MyUser\Common\.venv
Activates the Python virtual environment located in the specified location.
.Example
Activate.ps1 -Prompt "MyPython"
Activates the Python virtual environment that contains the Activate.ps1 script,
and prefixes the current prompt with the specified string (surrounded in
parentheses) while the virtual environment is active.
.Notes
On Windows, it may be required to enable this Activate.ps1 script by setting the
execution policy for the user. You can do this by issuing the following PowerShell
command:
PS C:\> Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser
For more information on Execution Policies:
https://go.microsoft.com/fwlink/?LinkID=135170
#>
Param(
[Parameter(Mandatory = $false)]
[String]
$VenvDir,
[Parameter(Mandatory = $false)]
[String]
$Prompt
)
<# Function declarations --------------------------------------------------- #>
<#
.Synopsis
Remove all shell session elements added by the Activate script, including the
addition of the virtual environment's Python executable from the beginning of
the PATH variable.
.Parameter NonDestructive
If present, do not remove this function from the global namespace for the
session.
#>
function global:deactivate ([switch]$NonDestructive) {
# Revert to original values
# The prior prompt:
if (Test-Path -Path Function:_OLD_VIRTUAL_PROMPT) {
Copy-Item -Path Function:_OLD_VIRTUAL_PROMPT -Destination Function:prompt
Remove-Item -Path Function:_OLD_VIRTUAL_PROMPT
}
# The prior PYTHONHOME:
if (Test-Path -Path Env:_OLD_VIRTUAL_PYTHONHOME) {
Copy-Item -Path Env:_OLD_VIRTUAL_PYTHONHOME -Destination Env:PYTHONHOME
Remove-Item -Path Env:_OLD_VIRTUAL_PYTHONHOME
}
# The prior PATH:
if (Test-Path -Path Env:_OLD_VIRTUAL_PATH) {
Copy-Item -Path Env:_OLD_VIRTUAL_PATH -Destination Env:PATH
Remove-Item -Path Env:_OLD_VIRTUAL_PATH
}
# Just remove the VIRTUAL_ENV altogether:
if (Test-Path -Path Env:VIRTUAL_ENV) {
Remove-Item -Path env:VIRTUAL_ENV
}
# Just remove VIRTUAL_ENV_PROMPT altogether.
if (Test-Path -Path Env:VIRTUAL_ENV_PROMPT) {
Remove-Item -Path env:VIRTUAL_ENV_PROMPT
}
# Just remove the _PYTHON_VENV_PROMPT_PREFIX altogether:
if (Get-Variable -Name "_PYTHON_VENV_PROMPT_PREFIX" -ErrorAction SilentlyContinue) {
Remove-Variable -Name _PYTHON_VENV_PROMPT_PREFIX -Scope Global -Force
}
# Leave deactivate function in the global namespace if requested:
if (-not $NonDestructive) {
Remove-Item -Path function:deactivate
}
}
<#
.Description
Get-PyVenvConfig parses the values from the pyvenv.cfg file located in the
given folder, and returns them in a map.
For each line in the pyvenv.cfg file, if that line can be parsed into exactly
two strings separated by `=` (with any amount of whitespace surrounding the =)
then it is considered a `key = value` line. The left hand string is the key,
the right hand is the value.
If the value starts with a `'` or a `"` then the first and last character is
stripped from the value before being captured.
.Parameter ConfigDir
Path to the directory that contains the `pyvenv.cfg` file.
#>
function Get-PyVenvConfig(
[String]
$ConfigDir
) {
Write-Verbose "Given ConfigDir=$ConfigDir, obtain values in pyvenv.cfg"
# Ensure the file exists, and issue a warning if it doesn't (but still allow the function to continue).
$pyvenvConfigPath = Join-Path -Resolve -Path $ConfigDir -ChildPath 'pyvenv.cfg' -ErrorAction Continue
# An empty map will be returned if no config file is found.
$pyvenvConfig = @{ }
if ($pyvenvConfigPath) {
Write-Verbose "File exists, parse `key = value` lines"
$pyvenvConfigContent = Get-Content -Path $pyvenvConfigPath
$pyvenvConfigContent | ForEach-Object {
$keyval = $PSItem -split "\s*=\s*", 2
if ($keyval[0] -and $keyval[1]) {
$val = $keyval[1]
# Remove extraneous quotations around a string value.
if ("'""".Contains($val.Substring(0, 1))) {
$val = $val.Substring(1, $val.Length - 2)
}
$pyvenvConfig[$keyval[0]] = $val
Write-Verbose "Adding Key: '$($keyval[0])'='$val'"
}
}
}
return $pyvenvConfig
}
<# Begin Activate script --------------------------------------------------- #>
# Determine the containing directory of this script
$VenvExecPath = Split-Path -Parent $MyInvocation.MyCommand.Definition
$VenvExecDir = Get-Item -Path $VenvExecPath
Write-Verbose "Activation script is located in path: '$VenvExecPath'"
Write-Verbose "VenvExecDir Fullname: '$($VenvExecDir.FullName)"
Write-Verbose "VenvExecDir Name: '$($VenvExecDir.Name)"
# Set values required in priority: CmdLine, ConfigFile, Default
# First, get the location of the virtual environment, it might not be
# VenvExecDir if specified on the command line.
if ($VenvDir) {
Write-Verbose "VenvDir given as parameter, using '$VenvDir' to determine values"
}
else {
Write-Verbose "VenvDir not given as a parameter, using parent directory name as VenvDir."
$VenvDir = $VenvExecDir.Parent.FullName.TrimEnd("\\/")
Write-Verbose "VenvDir=$VenvDir"
}
# Next, read the `pyvenv.cfg` file to determine any required value such
# as `prompt`.
$pyvenvCfg = Get-PyVenvConfig -ConfigDir $VenvDir
# Next, set the prompt from the command line, or the config file, or
# just use the name of the virtual environment folder.
if ($Prompt) {
Write-Verbose "Prompt specified as argument, using '$Prompt'"
}
else {
Write-Verbose "Prompt not specified as argument to script, checking pyvenv.cfg value"
if ($pyvenvCfg -and $pyvenvCfg['prompt']) {
Write-Verbose " Setting based on value in pyvenv.cfg='$($pyvenvCfg['prompt'])'"
$Prompt = $pyvenvCfg['prompt'];
}
else {
Write-Verbose " Setting prompt based on parent's directory's name. (Is the directory name passed to venv module when creating the virtual environment)"
Write-Verbose " Got leaf-name of $VenvDir='$(Split-Path -Path $venvDir -Leaf)'"
$Prompt = Split-Path -Path $venvDir -Leaf
}
}
Write-Verbose "Prompt = '$Prompt'"
Write-Verbose "VenvDir='$VenvDir'"
# Deactivate any currently active virtual environment, but leave the
# deactivate function in place.
deactivate -nondestructive
# Now set the environment variable VIRTUAL_ENV, used by many tools to determine
# that there is an activated venv.
$env:VIRTUAL_ENV = $VenvDir
$env:VIRTUAL_ENV_PROMPT = $Prompt
if (-not $Env:VIRTUAL_ENV_DISABLE_PROMPT) {
Write-Verbose "Setting prompt to '$Prompt'"
# Set the prompt to include the env name
# Make sure _OLD_VIRTUAL_PROMPT is global
function global:_OLD_VIRTUAL_PROMPT { "" }
Copy-Item -Path function:prompt -Destination function:_OLD_VIRTUAL_PROMPT
New-Variable -Name _PYTHON_VENV_PROMPT_PREFIX -Description "Python virtual environment prompt prefix" -Scope Global -Option ReadOnly -Visibility Public -Value $Prompt
function global:prompt {
Write-Host -NoNewline -ForegroundColor Green "($_PYTHON_VENV_PROMPT_PREFIX) "
_OLD_VIRTUAL_PROMPT
}
}
# Clear PYTHONHOME
if (Test-Path -Path Env:PYTHONHOME) {
Copy-Item -Path Env:PYTHONHOME -Destination Env:_OLD_VIRTUAL_PYTHONHOME
Remove-Item -Path Env:PYTHONHOME
}
# Add the venv to the PATH
Copy-Item -Path Env:PATH -Destination Env:_OLD_VIRTUAL_PATH
$Env:PATH = "$VenvExecDir$([System.IO.Path]::PathSeparator)$Env:PATH"

76
bin/activate Normal file
View File

@@ -0,0 +1,76 @@
# This file must be used with "source bin/activate" *from bash*
# You cannot run it directly
deactivate () {
# reset old environment variables
if [ -n "${_OLD_VIRTUAL_PATH:-}" ] ; then
PATH="${_OLD_VIRTUAL_PATH:-}"
export PATH
unset _OLD_VIRTUAL_PATH
fi
if [ -n "${_OLD_VIRTUAL_PYTHONHOME:-}" ] ; then
PYTHONHOME="${_OLD_VIRTUAL_PYTHONHOME:-}"
export PYTHONHOME
unset _OLD_VIRTUAL_PYTHONHOME
fi
# Call hash to forget past locations. Without forgetting
# past locations the $PATH changes we made may not be respected.
# See "man bash" for more details. hash is usually a builtin of your shell
hash -r 2> /dev/null
if [ -n "${_OLD_VIRTUAL_PS1:-}" ] ; then
PS1="${_OLD_VIRTUAL_PS1:-}"
export PS1
unset _OLD_VIRTUAL_PS1
fi
unset VIRTUAL_ENV
unset VIRTUAL_ENV_PROMPT
if [ ! "${1:-}" = "nondestructive" ] ; then
# Self destruct!
unset -f deactivate
fi
}
# unset irrelevant variables
deactivate nondestructive
# on Windows, a path can contain colons and backslashes and has to be converted:
case "$(uname)" in
CYGWIN*|MSYS*|MINGW*)
# transform D:\path\to\venv to /d/path/to/venv on MSYS and MINGW
# and to /cygdrive/d/path/to/venv on Cygwin
VIRTUAL_ENV=$(cygpath /Users/melchiorreimers/Documents/trading_daemon)
export VIRTUAL_ENV
;;
*)
# use the path as-is
export VIRTUAL_ENV=/Users/melchiorreimers/Documents/trading_daemon
;;
esac
_OLD_VIRTUAL_PATH="$PATH"
PATH="$VIRTUAL_ENV/"bin":$PATH"
export PATH
VIRTUAL_ENV_PROMPT=trading_daemon
export VIRTUAL_ENV_PROMPT
# unset PYTHONHOME if set
# this will fail if PYTHONHOME is set to the empty string (which is bad anyway)
# could use `if (set -u; : $PYTHONHOME) ;` in bash
if [ -n "${PYTHONHOME:-}" ] ; then
_OLD_VIRTUAL_PYTHONHOME="${PYTHONHOME:-}"
unset PYTHONHOME
fi
if [ -z "${VIRTUAL_ENV_DISABLE_PROMPT:-}" ] ; then
_OLD_VIRTUAL_PS1="${PS1:-}"
PS1="("trading_daemon") ${PS1:-}"
export PS1
fi
# Call hash to forget past commands. Without forgetting
# past commands the $PATH changes we made may not be respected
hash -r 2> /dev/null

27
bin/activate.csh Normal file
View File

@@ -0,0 +1,27 @@
# This file must be used with "source bin/activate.csh" *from csh*.
# You cannot run it directly.
# Created by Davide Di Blasi <davidedb@gmail.com>.
# Ported to Python 3.3 venv by Andrew Svetlov <andrew.svetlov@gmail.com>
alias deactivate 'test $?_OLD_VIRTUAL_PATH != 0 && setenv PATH "$_OLD_VIRTUAL_PATH" && unset _OLD_VIRTUAL_PATH; rehash; test $?_OLD_VIRTUAL_PROMPT != 0 && set prompt="$_OLD_VIRTUAL_PROMPT" && unset _OLD_VIRTUAL_PROMPT; unsetenv VIRTUAL_ENV; unsetenv VIRTUAL_ENV_PROMPT; test "\!:*" != "nondestructive" && unalias deactivate'
# Unset irrelevant variables.
deactivate nondestructive
setenv VIRTUAL_ENV /Users/melchiorreimers/Documents/trading_daemon
set _OLD_VIRTUAL_PATH="$PATH"
setenv PATH "$VIRTUAL_ENV/"bin":$PATH"
setenv VIRTUAL_ENV_PROMPT trading_daemon
set _OLD_VIRTUAL_PROMPT="$prompt"
if (! "$?VIRTUAL_ENV_DISABLE_PROMPT") then
set prompt = "("trading_daemon") $prompt:q"
endif
alias pydoc python -m pydoc
rehash

69
bin/activate.fish Normal file
View File

@@ -0,0 +1,69 @@
# This file must be used with "source <venv>/bin/activate.fish" *from fish*
# (https://fishshell.com/). You cannot run it directly.
function deactivate -d "Exit virtual environment and return to normal shell environment"
# reset old environment variables
if test -n "$_OLD_VIRTUAL_PATH"
set -gx PATH $_OLD_VIRTUAL_PATH
set -e _OLD_VIRTUAL_PATH
end
if test -n "$_OLD_VIRTUAL_PYTHONHOME"
set -gx PYTHONHOME $_OLD_VIRTUAL_PYTHONHOME
set -e _OLD_VIRTUAL_PYTHONHOME
end
if test -n "$_OLD_FISH_PROMPT_OVERRIDE"
set -e _OLD_FISH_PROMPT_OVERRIDE
# prevents error when using nested fish instances (Issue #93858)
if functions -q _old_fish_prompt
functions -e fish_prompt
functions -c _old_fish_prompt fish_prompt
functions -e _old_fish_prompt
end
end
set -e VIRTUAL_ENV
set -e VIRTUAL_ENV_PROMPT
if test "$argv[1]" != "nondestructive"
# Self-destruct!
functions -e deactivate
end
end
# Unset irrelevant variables.
deactivate nondestructive
set -gx VIRTUAL_ENV /Users/melchiorreimers/Documents/trading_daemon
set -gx _OLD_VIRTUAL_PATH $PATH
set -gx PATH "$VIRTUAL_ENV/"bin $PATH
set -gx VIRTUAL_ENV_PROMPT trading_daemon
# Unset PYTHONHOME if set.
if set -q PYTHONHOME
set -gx _OLD_VIRTUAL_PYTHONHOME $PYTHONHOME
set -e PYTHONHOME
end
if test -z "$VIRTUAL_ENV_DISABLE_PROMPT"
# fish uses a function instead of an env var to generate the prompt.
# Save the current fish_prompt function as the function _old_fish_prompt.
functions -c fish_prompt _old_fish_prompt
# With the original prompt function renamed, we can override with our own.
function fish_prompt
# Save the return status of the last command.
set -l old_status $status
# Output the venv prompt; color taken from the blue of the Python logo.
printf "%s(%s)%s " (set_color 4B8BBE) trading_daemon (set_color normal)
# Restore the return status of the previous command.
echo "exit $old_status" | .
# Output the original/"old" prompt.
_old_fish_prompt
end
set -gx _OLD_FISH_PROMPT_OVERRIDE "$VIRTUAL_ENV"
end

8
bin/pip Executable file
View File

@@ -0,0 +1,8 @@
#!/Users/melchiorreimers/Documents/trading_daemon/bin/python3.13
# -*- coding: utf-8 -*-
import re
import sys
from pip._internal.cli.main import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())

8
bin/pip3 Executable file
View File

@@ -0,0 +1,8 @@
#!/Users/melchiorreimers/Documents/trading_daemon/bin/python3.13
# -*- coding: utf-8 -*-
import re
import sys
from pip._internal.cli.main import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())

8
bin/pip3.13 Executable file
View File

@@ -0,0 +1,8 @@
#!/Users/melchiorreimers/Documents/trading_daemon/bin/python3.13
# -*- coding: utf-8 -*-
import re
import sys
from pip._internal.cli.main import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())

1
bin/python Symbolic link
View File

@@ -0,0 +1 @@
python3.13

1
bin/python3 Symbolic link
View File

@@ -0,0 +1 @@
python3.13

1
bin/python3.13 Symbolic link
View File

@@ -0,0 +1 @@
/opt/homebrew/opt/python@3.13/bin/python3.13

484
daemon.py
View File

@@ -4,6 +4,9 @@ import datetime
import hashlib
import os
import requests
from typing import List, Type
from src.exchanges.base import BaseExchange
from src.exchanges.eix import EIXExchange
from src.exchanges.ls import LSExchange
from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange
@@ -25,230 +28,345 @@ DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
# =============================================================================
# Exchange Registry - Neue Börsen hier hinzufügen
# =============================================================================
# Exchanges die Streaming-Verarbeitung benötigen (große Datenmengen)
STREAMING_EXCHANGES: List[Type[BaseExchange]] = [
EIXExchange,
]
# Standard-Exchanges (normale Batch-Verarbeitung)
STANDARD_EXCHANGES: List[Type[BaseExchange]] = [
# Lang & Schwarz
LSExchange,
# Deutsche Börse
XetraExchange,
FrankfurtExchange,
QuotrixExchange,
# Weitere Börsen
GettexExchange,
StuttgartExchange,
# Börsenag (Düsseldorf, Hamburg, Hannover)
DUSAExchange,
DUSBExchange,
DUSCExchange,
DUSDExchange,
HAMAExchange,
HAMBExchange,
HANAExchange,
HANBExchange,
]
# =============================================================================
# Trades Cache
# =============================================================================
# Cache für existierende Trades pro Tag (wird nach jedem Exchange geleert)
_existing_trades_cache = {}
def get_trade_hash(trade):
"""Erstellt einen eindeutigen Hash für einen Trade."""
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
return hashlib.md5(key.encode()).hexdigest()
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=1000):
"""Filtert neue Trades in Batches, um RAM zu sparen. Verwendet Batch-Queries statt einzelne Checks."""
def get_existing_trades_for_day(db_url, exchange_name, day):
"""Holt existierende Trades für einen Tag aus der DB (mit Caching)."""
cache_key = f"{exchange_name}_{day.strftime('%Y-%m-%d')}"
if cache_key in _existing_trades_cache:
return _existing_trades_cache[cache_key]
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
day_end = day + datetime.timedelta(days=1)
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
query = f"""
SELECT isin, timestamp, price, quantity
FROM trades
WHERE exchange = '{exchange_name}'
AND timestamp >= '{day_start_str}'
AND timestamp < '{day_end_str}'
"""
existing_trades = set()
try:
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60)
if response.status_code == 200:
data = response.json()
if data.get('dataset'):
for row in data['dataset']:
isin, ts, price, qty = row
if isinstance(ts, str):
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
key = (isin, ts_dt.isoformat(), float(price), float(qty))
existing_trades.add(key)
except Exception as e:
logger.warning(f"Error fetching existing trades for {day}: {e}")
_existing_trades_cache[cache_key] = existing_trades
return existing_trades
def clear_trades_cache():
"""Leert den Cache für existierende Trades."""
global _existing_trades_cache
_existing_trades_cache = {}
def filter_new_trades_for_day(db_url, exchange_name, trades, day):
"""Filtert neue Trades für einen einzelnen Tag."""
if not trades:
return []
new_trades = []
total_batches = (len(trades) + batch_size - 1) // batch_size
existing = get_existing_trades_for_day(db_url, exchange_name, day)
for batch_idx in range(0, len(trades), batch_size):
batch = trades[batch_idx:batch_idx + batch_size]
batch_num = (batch_idx // batch_size) + 1
if batch_num % 10 == 0 or batch_num == 1:
logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} trades)...")
# Gruppiere Trades nach Tag für effizientere Queries
trades_by_day = {}
for trade in batch:
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
if day not in trades_by_day:
trades_by_day[day] = []
trades_by_day[day].append(trade)
# Prüfe jeden Tag separat
for day, day_trades in trades_by_day.items():
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
day_end = day + datetime.timedelta(days=1)
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
# Hole alle existierenden Trades für diesen Tag
query = f"""
SELECT isin, timestamp, price, quantity
FROM trades
WHERE exchange = '{exchange_name}'
AND timestamp >= '{day_start_str}'
AND timestamp < '{day_end_str}'
"""
try:
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=30)
if response.status_code == 200:
data = response.json()
existing_trades = set()
if data.get('dataset'):
for row in data['dataset']:
isin, ts, price, qty = row
# Normalisiere Timestamp für Vergleich
if isinstance(ts, str):
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
# Erstelle Vergleichs-Key (ohne Hash, direkter Vergleich)
key = (isin, ts_dt.isoformat(), float(price), float(qty))
existing_trades.add(key)
# Prüfe welche Trades neu sind
for trade in day_trades:
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
if trade_key not in existing_trades:
new_trades.append(trade)
else:
# Bei Fehler: alle Trades als neu behandeln (sicherer)
logger.warning(f"Query failed for day {day}, treating all trades as new")
new_trades.extend(day_trades)
except Exception as e:
# Bei Fehler: alle Trades als neu behandeln (sicherer)
logger.warning(f"Error checking trades for day {day}: {e}, treating all trades as new")
new_trades.extend(day_trades)
# Kleine Pause zwischen Batches, um DB nicht zu überlasten
if batch_idx + batch_size < len(trades):
time.sleep(0.05)
new_trades = []
for trade in trades:
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
if trade_key not in existing:
new_trades.append(trade)
return new_trades
def get_last_trade_timestamp(db_url, exchange_name):
# QuestDB query: get the latest timestamp for a specific exchange
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=5000):
"""Filtert neue Trades in Batches, gruppiert nach Tag."""
if not trades:
return []
# Gruppiere alle Trades nach Tag
trades_by_day = {}
for trade in trades:
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
if day not in trades_by_day:
trades_by_day[day] = []
trades_by_day[day].append(trade)
new_trades = []
total_days = len(trades_by_day)
for i, (day, day_trades) in enumerate(sorted(trades_by_day.items()), 1):
if i % 10 == 0 or i == 1:
logger.info(f"Checking day {i}/{total_days}: {day.strftime('%Y-%m-%d')} ({len(day_trades)} trades)...")
new_for_day = filter_new_trades_for_day(db_url, exchange_name, day_trades, day)
new_trades.extend(new_for_day)
# Kleine Pause um DB nicht zu überlasten
if i < total_days:
time.sleep(0.02)
return new_trades
def get_last_trade_timestamp(db_url: str, exchange_name: str) -> datetime.datetime:
"""Holt den Timestamp des letzten Trades für eine Exchange aus QuestDB."""
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
try:
# Using the /exec endpoint to get data
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data['dataset']:
# QuestDB returns timestamp in micros since epoch by default in some views, or ISO
# Let's assume the timestamp is in the dataset
# ILP timestamps are stored as designated timestamps.
ts_value = data['dataset'][0][0] # Adjust index based on column order
if data.get('dataset'):
# QuestDB gibt Timestamps in Mikrosekunden oder ISO-Format zurück
ts_value = data['dataset'][0][0]
if isinstance(ts_value, str):
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
else:
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
except Exception as e:
logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}")
logger.debug(f"Keine existierenden Daten für {exchange_name} oder DB nicht erreichbar: {e}")
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
def process_eix_streaming(db, db_url: str, exchange: BaseExchange, historical: bool = False):
"""Verarbeitet eine Exchange im Streaming-Modus um RAM zu sparen."""
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts}) - STREAMING...")
# Hole Liste der zu verarbeitenden Dateien
if historical:
files = exchange.get_files_to_process(limit=None, since_date=None)
else:
files = exchange.get_files_to_process(limit=None, since_date=last_ts)
if not files:
logger.info(f"Keine {exchange.name} Dateien zu verarbeiten.")
return
logger.info(f"{len(files)} {exchange.name} Dateien gefunden...")
total_new = 0
total_processed = 0
for i, file_item in enumerate(files, 1):
file_name = file_item.get('fileName', 'unknown').split('/')[-1]
logger.info(f"Verarbeite {exchange.name} Datei {i}/{len(files)}: {file_name}")
trades = exchange.fetch_trades_from_file(file_item)
if not trades:
logger.info(f" Keine Trades in {file_name}")
continue
total_processed += len(trades)
logger.info(f" {len(trades)} Trades geladen, filtere Duplikate...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
if new_trades:
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
total_new += len(new_trades)
logger.info(f" {len(new_trades)} neue Trades gespeichert (gesamt neu: {total_new})")
else:
logger.info(f" Keine neuen Trades in dieser Datei")
# Referenzen freigeben
del trades
del new_trades
time.sleep(0.1)
logger.info(f"{exchange.name} fertig: {total_new} neue Trades von {total_processed} verarbeitet.")
clear_trades_cache()
def process_standard_exchange(db, db_url: str, exchange: BaseExchange, historical: bool):
"""Verarbeitet einen Standard-Exchange mit Batch-Verarbeitung."""
try:
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts})...")
trades = exchange.fetch_latest_trades(include_yesterday=historical)
if not trades:
logger.info(f"Keine Trades von {exchange.name} erhalten.")
return
# Deduplizierung
logger.info(f"Filtere {len(trades)} Trades auf Duplikate...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
logger.info(f"Gefunden: {len(trades)} Trades gesamt, {len(new_trades)} sind neu.")
if new_trades:
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
logger.info(f"{len(new_trades)} neue Trades in QuestDB gespeichert.")
# Referenzen freigeben
del trades
if new_trades:
del new_trades
clear_trades_cache()
except Exception as e:
logger.error(f"Fehler bei Exchange {exchange.name}: {e}")
def run_task(historical=False):
logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...")
# Initialize exchanges
eix = EIXExchange()
ls = LSExchange()
# Neue Deutsche Börse Exchanges
xetra = XetraExchange()
frankfurt = FrankfurtExchange()
quotrix = QuotrixExchange()
gettex = GettexExchange()
stuttgart = StuttgartExchange()
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
dusa = DUSAExchange()
dusb = DUSBExchange()
dusc = DUSCExchange()
dusd = DUSDExchange()
hama = HAMAExchange()
hamb = HAMBExchange()
hana = HANAExchange()
hanb = HANBExchange()
# Pass last_ts to fetcher to allow smart filtering
# daemon.py runs daily, so we want to fetch everything since DB state
# BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument
# We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop.
# We will modify the loop below to handle args dynamically
exchanges_to_process = [
(eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical
(ls, {'include_yesterday': historical}),
# Deutsche Börse Exchanges
(xetra, {'include_yesterday': historical}),
(frankfurt, {'include_yesterday': historical}),
(quotrix, {'include_yesterday': historical}),
(gettex, {'include_yesterday': historical}),
(stuttgart, {'include_yesterday': historical}),
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
(dusa, {'include_yesterday': historical}),
(dusb, {'include_yesterday': historical}),
(dusc, {'include_yesterday': historical}),
(dusd, {'include_yesterday': historical}),
(hama, {'include_yesterday': historical}),
(hamb, {'include_yesterday': historical}),
(hana, {'include_yesterday': historical}),
(hanb, {'include_yesterday': historical}),
]
"""Haupttask: Holt Trades von allen registrierten Exchanges."""
logger.info(f"Starte Trading Data Fetcher (Historical: {historical})...")
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
for exchange, args in exchanges_to_process:
try:
db_url = "http://questdb:9000"
last_ts = get_last_trade_timestamp(db_url, exchange.name)
logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...")
# Special handling for EIX to support smart filtering
call_args = args.copy()
if exchange.name == "EIX" and not historical:
call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc)
# Remove limit if we are filtering by date to ensure we get everything
if 'limit' in call_args:
call_args.pop('limit')
trades = exchange.fetch_latest_trades(**call_args)
if not trades:
logger.info(f"No trades fetched from {exchange.name}.")
continue
# Hash-basierte Deduplizierung - Batch-Verarbeitung um RAM zu sparen
logger.info(f"Filtering {len(trades)} trades for duplicates (batch processing)...")
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=500)
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
if new_trades:
# Sort trades by timestamp before saving (QuestDB likes this)
new_trades.sort(key=lambda x: x.timestamp)
db.save_trades(new_trades)
logger.info(f"Stored {len(new_trades)} new trades in QuestDB.")
except Exception as e:
logger.error(f"Error processing exchange {exchange.name}: {e}")
def main():
logger.info("Trading Daemon started.")
# 1. Startup Check: Ist die DB leer?
db_url = "http://questdb:9000"
is_empty = True
# Streaming-Exchanges verarbeiten (große Datenmengen)
for exchange_class in STREAMING_EXCHANGES:
try:
exchange = exchange_class()
logger.info(f"Verarbeite {exchange.name} im Streaming-Modus...")
process_eix_streaming(db, db_url, exchange, historical=historical)
except Exception as e:
logger.error(f"Fehler bei Streaming-Exchange {exchange_class.__name__}: {e}")
# Standard-Exchanges verarbeiten
for exchange_class in STANDARD_EXCHANGES:
try:
exchange = exchange_class()
process_standard_exchange(db, db_url, exchange, historical)
except Exception as e:
logger.error(f"Fehler bei Exchange {exchange_class.__name__}: {e}")
logger.info("Alle Exchanges verarbeitet.")
def is_database_empty(db_url: str) -> bool:
"""Prüft ob die Datenbank leer ist oder die Tabelle nicht existiert."""
try:
# Prüfe ob bereits Trades in der Tabelle sind
response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data['dataset'] and data['dataset'][0][0] > 0:
is_empty = False
if data.get('dataset') and data['dataset'][0][0] > 0:
return False
except Exception:
# Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist
is_empty = True
pass
return True
if is_empty:
logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...")
def calculate_seconds_until_target(target_hour: int, target_minute: int = 0) -> int:
"""Berechnet Sekunden bis zur nächsten Zielzeit."""
now = datetime.datetime.now()
target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
# Wenn Zielzeit heute schon vorbei ist, nimm morgen
if target <= now:
target += datetime.timedelta(days=1)
return int((target - now).total_seconds())
def main():
logger.info("Trading Daemon gestartet.")
db_url = "http://questdb:9000"
# Startup: Initialer Sync
if is_database_empty(db_url):
logger.info("Datenbank ist leer. Starte initialen historischen Fetch...")
run_task(historical=True)
else:
logger.info("Found existing data in database. Triggering catch-up sync...")
# Run a normal task to fetch any missing data since the last run
logger.info("Existierende Daten gefunden. Starte Catch-up Sync...")
run_task(historical=False)
logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.")
logger.info("Catch-up Sync abgeschlossen.")
# Scheduling Konfiguration
SCHEDULE_HOUR = 23
SCHEDULE_MINUTE = 0
last_run_date = None
logger.info(f"Warte auf täglichen Run um {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d}...")
while True:
now = datetime.datetime.now()
# Täglich um 23:00 Uhr
if now.hour == 23 and now.minute == 0:
run_task(historical=False)
# Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern
time.sleep(61)
today = now.date()
# Check alle 30 Sekunden
time.sleep(30)
# Prüfe ob wir heute schon gelaufen sind
already_ran_today = (last_run_date == today)
# Prüfe ob wir im Zeitfenster sind (23:00 - 23:59)
in_schedule_window = (now.hour == SCHEDULE_HOUR and now.minute >= SCHEDULE_MINUTE)
if in_schedule_window and not already_ran_today:
logger.info(f"Geplanter Task startet ({now.strftime('%Y-%m-%d %H:%M:%S')})...")
run_task(historical=False)
last_run_date = today
logger.info("Geplanter Task abgeschlossen. Warte auf nächsten Tag...")
# Dynamische Sleep-Zeit: Kurz vor Zielzeit öfter prüfen
seconds_until_target = calculate_seconds_until_target(SCHEDULE_HOUR, SCHEDULE_MINUTE)
if seconds_until_target > 3600:
# Mehr als 1 Stunde: Schlafe 30 Minuten
sleep_time = 1800
elif seconds_until_target > 300:
# 5 Minuten bis 1 Stunde: Schlafe 5 Minuten
sleep_time = 300
else:
# Unter 5 Minuten: Schlafe 30 Sekunden
sleep_time = 30
time.sleep(sleep_time)
if __name__ == "__main__":
main()

5
pyvenv.cfg Normal file
View File

@@ -0,0 +1,5 @@
home = /opt/homebrew/opt/python@3.13/bin
include-system-site-packages = false
version = 3.13.2
executable = /opt/homebrew/Cellar/python@3.13/3.13.2/Frameworks/Python.framework/Versions/3.13/bin/python3.13
command = /opt/homebrew/opt/python@3.13/bin/python3.13 -m venv /Users/melchiorreimers/Documents/trading_daemon

79
scripts/inspect_gzip.py Normal file
View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""
Utility-Script zum Inspizieren von gzip-komprimierten JSON-Dateien.
Verarbeitet Dateien streaming, ohne alles in den RAM zu laden.
Verwendung:
python scripts/inspect_gzip.py <datei.json.gz> [--limit N] [--output datei.json]
"""
import gzip
import json
import argparse
import sys
from pathlib import Path
def inspect_gzip_file(filepath: str, limit: int = None, output_file: str = None):
"""
Liest eine gzip-komprimierte NDJSON-Datei und gibt die Inhalte aus.
Args:
filepath: Pfad zur .json.gz Datei
limit: Maximale Anzahl der auszugebenden Records (None = alle)
output_file: Optional: Ausgabe in Datei statt stdout
"""
path = Path(filepath)
if not path.exists():
print(f"Fehler: Datei '{filepath}' nicht gefunden.", file=sys.stderr)
return 1
count = 0
output = open(output_file, 'w', encoding='utf-8') if output_file else sys.stdout
try:
with gzip.open(filepath, mode='rt', encoding='utf-8') as f:
for line in f:
if not line.strip():
continue
try:
record = json.loads(line)
# Pretty-print einzelner Record
json.dump(record, output, indent=2, ensure_ascii=False)
output.write('\n')
count += 1
if limit and count >= limit:
break
except json.JSONDecodeError as e:
print(f"JSON-Fehler in Zeile {count + 1}: {e}", file=sys.stderr)
continue
print(f"\n--- {count} Records verarbeitet ---", file=sys.stderr)
finally:
if output_file and output != sys.stdout:
output.close()
return 0
def main():
parser = argparse.ArgumentParser(
description='Inspiziert gzip-komprimierte JSON-Dateien (NDJSON-Format)'
)
parser.add_argument('file', help='Pfad zur .json.gz Datei')
parser.add_argument('--limit', '-n', type=int, default=10,
help='Maximale Anzahl der Records (default: 10, 0 = alle)')
parser.add_argument('--output', '-o', type=str,
help='Ausgabe in Datei statt stdout')
args = parser.parse_args()
limit = args.limit if args.limit > 0 else None
return inspect_gzip_file(args.file, limit=limit, output_file=args.output)
if __name__ == '__main__':
sys.exit(main())

View File

@@ -865,6 +865,39 @@ class AnalyticsWorker:
if i % 10 == 0:
time.sleep(1)
def delete_analytics_for_date(self, date: datetime.date):
"""Löscht alle Analytics-Daten für ein bestimmtes Datum, damit sie neu berechnet werden können."""
date_str = date.strftime('%Y-%m-%d')
next_day = date + datetime.timedelta(days=1)
next_day_str = next_day.strftime('%Y-%m-%d')
tables = ['analytics_custom', 'analytics_exchange_daily', 'analytics_daily_summary']
for table in tables:
try:
# QuestDB DELETE syntax
delete_query = f"DELETE FROM {table} WHERE timestamp >= '{date_str}' AND timestamp < '{next_day_str}'"
response = requests.get(
f"{self.questdb_url}/exec",
params={'query': delete_query},
auth=self.auth,
timeout=30
)
if response.status_code == 200:
logger.debug(f"Deleted old analytics from {table} for {date}")
except Exception as e:
logger.debug(f"Could not delete from {table} for {date}: {e}")
def force_recalculate_date(self, date: datetime.date):
"""Erzwingt Neuberechnung der Analytics für ein Datum (löscht alte Daten zuerst)."""
logger.info(f"Force recalculating analytics for {date}...")
# Lösche alte Analytics-Daten für dieses Datum
self.delete_analytics_for_date(date)
# Berechne neu
self.process_date(date)
def run(self):
"""Hauptschleife des Workers"""
logger.info("Analytics Worker started.")
@@ -874,35 +907,26 @@ class AnalyticsWorker:
logger.error("Failed to connect to QuestDB. Exiting.")
return
# Initiale Berechnung fehlender Tage (inkl. gestern und heute)
# Initiale Berechnung fehlender Tage
logger.info("Checking for missing dates...")
self.process_missing_dates()
# Stelle sicher, dass gestern und heute verarbeitet werden
# IMMER heute und gestern neu berechnen (da neue Trades hinzukommen können)
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
logger.info(f"Ensuring yesterday ({yesterday}) and today ({today}) are processed...")
# Prüfe alle drei Tabellen
existing_custom = self.get_existing_dates('analytics_custom')
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
existing_summary = self.get_existing_dates('analytics_daily_summary')
existing_dates = existing_custom | existing_exchange | existing_summary
logger.info(f"Force recalculating yesterday ({yesterday}) and today ({today}) - new trades may have been added...")
if yesterday not in existing_dates:
logger.info(f"Processing yesterday's data: {yesterday}")
self.process_date(yesterday)
# Gestern immer neu berechnen
self.force_recalculate_date(yesterday)
# Heute wird verarbeitet, wenn es bereits Trades gibt
if today not in existing_dates:
# Prüfe ob es heute schon Trades gibt
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
data = self.query_questdb(query)
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
logger.info(f"Found trades for today ({today}), processing...")
self.process_date(today)
else:
logger.info(f"No trades found for today ({today}) yet, will process later")
# Heute nur wenn es Trades gibt
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
data = self.query_questdb(query)
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
self.force_recalculate_date(today)
else:
logger.info(f"No trades found for today ({today}) yet, will process later")
# Hauptschleife: Prüfe regelmäßig auf fehlende Tage
logger.info("Starting main loop - checking for missing dates every hour...")
@@ -917,32 +941,24 @@ class AnalyticsWorker:
self.process_missing_dates()
last_check_hour = current_hour
# Stelle sicher, dass gestern und heute verarbeitet wurden
# IMMER heute und gestern neu berechnen
today = now.date()
yesterday = today - datetime.timedelta(days=1)
# Prüfe alle drei Tabellen
existing_custom = self.get_existing_dates('analytics_custom')
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
existing_summary = self.get_existing_dates('analytics_daily_summary')
existing_dates = existing_custom | existing_exchange | existing_summary
if yesterday not in existing_dates:
logger.info(f"Processing yesterday's data: {yesterday}")
self.process_date(yesterday)
logger.info(f"Hourly recalculation of yesterday ({yesterday}) and today ({today})...")
self.force_recalculate_date(yesterday)
# Prüfe heute, ob es Trades gibt
if today not in existing_dates:
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
data = self.query_questdb(query)
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
logger.info(f"Found trades for today ({today}), processing...")
self.process_date(today)
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
data = self.query_questdb(query)
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
self.force_recalculate_date(today)
# Prüfe ob es Mitternacht ist (00:00) - verarbeite dann gestern
if now.hour == 0 and now.minute == 0:
yesterday = (now - datetime.timedelta(days=1)).date()
logger.info(f"Midnight reached - processing yesterday's data: {yesterday}")
self.process_date(yesterday)
logger.info(f"Midnight reached - force recalculating yesterday's data: {yesterday}")
self.force_recalculate_date(yesterday)
# Warte 61s, um Mehrfachausführung zu verhindern
time.sleep(61)

354
src/exchanges/boersenag.py Normal file
View File

@@ -0,0 +1,354 @@
"""
Börsenag Exchange Fetcher
Unterstützt: DUSA, DUSB, DUSC, DUSD, HAMA, HAMB, HANA, HANB
Datenquelle: https://www.boersenag.de/mifid-ii-delayed-data/
URL-Format: https://cld42.boersenag.de/m13data/data/Mifir13DelayedData_{MIC}_{SEQUENCE}_{TIMESTAMP}.csv
"""
import requests
import time
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from .base import BaseExchange, Trade
import re
# Rate-Limiting Konfiguration
RATE_LIMIT_DELAY = 0.3 # Sekunden zwischen Requests
# Browser User-Agent für Zugriff
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/csv, text/plain, */*',
'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8',
'Referer': 'https://www.boersenag.de/',
}
# Exchange-Konfiguration
BOERSENAG_EXCHANGES = {
'DUSA': {
'name': 'DUSA',
'full_name': 'Börse Düsseldorf Regulierter Markt',
'mic': 'DUSA',
},
'DUSB': {
'name': 'DUSB',
'full_name': 'Börse Düsseldorf Freiverkehr',
'mic': 'DUSB',
},
'DUSC': {
'name': 'DUSC',
'full_name': 'Börse Düsseldorf Quotrix Regulierter Markt',
'mic': 'DUSC',
},
'DUSD': {
'name': 'DUSD',
'full_name': 'Börse Düsseldorf Quotrix Freiverkehr',
'mic': 'DUSD',
},
'HAMA': {
'name': 'HAMA',
'full_name': 'Börse Hamburg Regulierter Markt',
'mic': 'HAMA',
},
'HAMB': {
'name': 'HAMB',
'full_name': 'Börse Hamburg Freiverkehr',
'mic': 'HAMB',
},
'HANA': {
'name': 'HANA',
'full_name': 'Börse Hannover Regulierter Markt',
'mic': 'HANA',
},
'HANB': {
'name': 'HANB',
'full_name': 'Börse Hannover Freiverkehr',
'mic': 'HANB',
},
}
BASE_URL = "https://cld42.boersenag.de/m13data/data"
class BoersenagBase(BaseExchange):
"""
Basisklasse für Börsenag Exchanges (DUSA, DUSB, DUSC, DUSD, HAMA, HAMB, HANA, HANB)
CSV Format (Semikolon-separiert):
MIC; ISIN; displayName; time; price; size; supplement
- time: "28.01.2026 15:48:42" (deutsches Format)
- price: "46,18" (deutsches Dezimalformat)
- size: Menge (kann 0 sein für Kurse ohne Trade)
- supplement: "bez " = bezahlt (echter Trade), "G " = Geld (Bid), "B " = Brief (Ask)
"""
@property
def mic(self) -> str:
"""MIC Code für die Börse"""
raise NotImplementedError
@property
def name(self) -> str:
return self.mic
def _generate_file_urls(self, target_date: datetime.date) -> List[str]:
"""
Generiert mögliche Datei-URLs für ein bestimmtes Datum.
Format: Mifir13DelayedData_{MIC}_{SEQUENCE}_{TIMESTAMP}.csv
Die Dateien werden stündlich mit einem Zeitstempel generiert.
Wir versuchen verschiedene Sequenznummern und Zeitstempel.
"""
urls = []
# Formatiere Datum im URL-Format: YYYYMMDD
date_str = target_date.strftime('%Y%m%d')
# Mögliche Sequenznummern (beobachtet: 000000DF, aber könnte variieren)
sequences = ['000000DF', '00000000', '000000DD', '000000DE']
# Generiere URLs für verschiedene Uhrzeiten (alle 15 Minuten)
for hour in range(0, 24):
for minute in [0, 15, 30, 45]:
timestamp = f"{date_str}{hour:02d}{minute:02d}000000"
for seq in sequences:
url = f"{BASE_URL}/Mifir13DelayedData_{self.mic}_{seq}_{timestamp}.csv"
urls.append(url)
# Versuche auch die einfachste Form mit 0000000000
for seq in sequences:
url = f"{BASE_URL}/Mifir13DelayedData_{self.mic}_{seq}_{date_str}0000000000.csv"
urls.append(url)
return urls
def _parse_german_datetime(self, dt_str: str) -> Optional[datetime]:
"""Parst deutsches Datumsformat: DD.MM.YYYY HH:MM:SS"""
try:
# Format: "28.01.2026 15:48:42"
dt = datetime.strptime(dt_str.strip(), '%d.%m.%Y %H:%M:%S')
# In UTC konvertieren (Deutsche Zeit = MEZ/MESZ, hier vereinfacht als UTC+1)
# Für korrektes Handling würde pytz benötigt
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return None
def _parse_german_number(self, num_str: str) -> Optional[float]:
"""Parst deutsches Zahlenformat: 1.234,56 -> 1234.56"""
try:
# Entferne Tausender-Trennzeichen (Punkt) und ersetze Dezimalkomma
clean = num_str.strip().replace('.', '').replace(',', '.')
return float(clean)
except ValueError:
return None
def _download_and_parse_file(self, url: str) -> List[Trade]:
"""Lädt eine CSV-Datei herunter und parst die Trades"""
trades = []
try:
response = requests.get(url, headers=HEADERS, timeout=30)
if response.status_code == 404:
return []
response.raise_for_status()
content = response.text
if not content.strip():
return []
lines = content.strip().split('\n')
if len(lines) < 2: # Nur Header, keine Daten
return []
# Erste Zeile ist Header
# MIC; ISIN; displayName; time; price; size; supplement
for line in lines[1:]:
if not line.strip():
continue
trade = self._parse_csv_line(line)
if trade:
trades.append(trade)
except requests.exceptions.HTTPError as e:
if e.response.status_code != 404:
print(f"[{self.name}] HTTP error: {e}")
except Exception as e:
print(f"[{self.name}] Error downloading {url}: {e}")
return trades
def _parse_csv_line(self, line: str) -> Optional[Trade]:
"""Parst eine einzelne CSV-Zeile"""
try:
# CSV ist Semikolon-separiert
parts = line.split(';')
if len(parts) < 7:
return None
mic = parts[0].strip()
isin = parts[1].strip()
display_name = parts[2].strip().strip('"')
time_str = parts[3].strip()
price_str = parts[4].strip()
size_str = parts[5].strip()
supplement = parts[6].strip().strip('"').strip()
# Validiere ISIN
if not isin or len(isin) != 12:
return None
# Parse Timestamp
timestamp = self._parse_german_datetime(time_str)
if not timestamp:
return None
# Parse Preis
price = self._parse_german_number(price_str)
if price is None or price <= 0:
return None
# Parse Menge
try:
size = float(size_str)
except ValueError:
size = 0
# Nur echte Trades (size > 0) oder "bez" (bezahlt) aufnehmen
# "G" = Geld (Bid), "B" = Brief (Ask) sind Kurse, keine Trades
is_trade = size > 0 or 'bez' in supplement.lower()
if not is_trade:
return None
# Bei size = 0 aber "bez" nehmen wir an, dass die Menge unbekannt ist (setze auf 1)
if size <= 0:
size = 1
return Trade(
exchange=self.name,
symbol=isin,
isin=isin,
price=price,
quantity=size,
timestamp=timestamp
)
except Exception as e:
return None
def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date:
"""Findet den letzten Handelstag (überspringt Wochenenden)"""
date = from_date
if date.weekday() == 5: # Samstag
date = date - timedelta(days=1)
elif date.weekday() == 6: # Sonntag
date = date - timedelta(days=2)
return date
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
"""Holt alle Trades vom letzten Handelstag"""
all_trades = []
# Bestimme Zieldatum
if since_date:
target_date = since_date.date() if hasattr(since_date, 'date') else since_date
else:
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
# Überspringe Wochenenden
original_date = target_date
target_date = self._get_last_trading_day(target_date)
if target_date != original_date:
print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}")
print(f"[{self.name}] Fetching trades for date: {target_date}")
# Generiere mögliche URLs
urls = self._generate_file_urls(target_date)
successful = 0
total_urls = len(urls)
# Versuche verschiedene URLs
for i, url in enumerate(urls):
trades = self._download_and_parse_file(url)
if trades:
all_trades.extend(trades)
successful += 1
print(f"[{self.name}] Found {len(trades)} trades from: {url.split('/')[-1]}")
# Bei Erfolg müssen wir nicht alle anderen URLs probieren
break
# Rate-Limiting
if i < total_urls - 1:
time.sleep(RATE_LIMIT_DELAY)
# Nach 20 fehlgeschlagenen Versuchen abbrechen
if i > 20 and successful == 0:
break
print(f"[{self.name}] Total trades fetched: {len(all_trades)}")
return all_trades
# Konkrete Exchange-Klassen
class DUSAExchange(BoersenagBase):
"""Börse Düsseldorf Regulierter Markt"""
@property
def mic(self) -> str:
return "DUSA"
class DUSBExchange(BoersenagBase):
"""Börse Düsseldorf Freiverkehr"""
@property
def mic(self) -> str:
return "DUSB"
class DUSCExchange(BoersenagBase):
"""Börse Düsseldorf Quotrix Regulierter Markt"""
@property
def mic(self) -> str:
return "DUSC"
class DUSDExchange(BoersenagBase):
"""Börse Düsseldorf Quotrix Freiverkehr"""
@property
def mic(self) -> str:
return "DUSD"
class HAMAExchange(BoersenagBase):
"""Börse Hamburg Regulierter Markt"""
@property
def mic(self) -> str:
return "HAMA"
class HAMBExchange(BoersenagBase):
"""Börse Hamburg Freiverkehr"""
@property
def mic(self) -> str:
return "HAMB"
class HANAExchange(BoersenagBase):
"""Börse Hannover Regulierter Markt"""
@property
def mic(self) -> str:
return "HANA"
class HANBExchange(BoersenagBase):
"""Börse Hannover Freiverkehr"""
@property
def mic(self) -> str:
return "HANB"

View File

@@ -2,16 +2,20 @@ import requests
import gzip
import json
import io
import re
import time
import logging
import threading
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from .base import BaseExchange, Trade
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
# Rate-Limiting Konfiguration
RATE_LIMIT_DELAY = 0.5 # Sekunden zwischen Requests
RATE_LIMIT_RETRY_DELAY = 5 # Sekunden Wartezeit bei 429
MAX_RETRIES = 3 # Maximale Wiederholungen bei 429
MAX_RETRIES = 5 # Maximale Wiederholungen bei 429
# API URLs für Deutsche Börse
API_URLS = {
@@ -21,17 +25,47 @@ API_URLS = {
}
DOWNLOAD_BASE_URL = "https://mfs.deutsche-boerse.com/api/download"
# Browser User-Agent für Zugriff
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, application/gzip, */*',
'Referer': 'https://mfs.deutsche-boerse.com/',
}
# Liste von User-Agents für Rotation bei Rate-Limiting
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0',
]
class UserAgentRotator:
"""Thread-safe User-Agent Rotation"""
def __init__(self):
self._index = 0
self._lock = threading.Lock()
def get_headers(self, rotate: bool = False) -> dict:
"""Gibt Headers mit aktuellem User-Agent zurück. Bei rotate=True wird zum nächsten gewechselt."""
with self._lock:
if rotate:
self._index = (self._index + 1) % len(USER_AGENTS)
return {
'User-Agent': USER_AGENTS[self._index],
'Accept': 'application/json, application/gzip, */*',
'Referer': 'https://mfs.deutsche-boerse.com/',
}
# Globale Instanz für User-Agent Rotation
_ua_rotator = UserAgentRotator()
class DeutscheBoerseBase(BaseExchange):
"""Basisklasse für Deutsche Börse Exchanges (Xetra, Frankfurt, Quotrix)"""
# Regex für Dateinamen-Parsing (kompiliert für Performance)
_FILENAME_PATTERN = re.compile(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})')
@property
def base_url(self) -> str:
"""Override in subclasses"""
@@ -46,60 +80,73 @@ class DeutscheBoerseBase(BaseExchange):
"""API URL für die Dateiliste"""
return API_URLS.get(self.name, self.base_url)
def _handle_rate_limit(self, retry: int, context: str) -> None:
"""Zentrale Rate-Limit Behandlung: rotiert User-Agent und wartet."""
_ua_rotator.get_headers(rotate=True)
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
logger.warning(f"[{self.name}] Rate limited ({context}), rotating User-Agent and waiting {wait_time}s... (retry {retry + 1}/{MAX_RETRIES})")
time.sleep(wait_time)
def _get_file_list(self) -> List[str]:
"""Holt die Dateiliste von der JSON API"""
try:
api_url = self.api_url
print(f"[{self.name}] Fetching file list from: {api_url}")
response = requests.get(api_url, headers=HEADERS, timeout=30)
response.raise_for_status()
data = response.json()
files = data.get('CurrentFiles', [])
print(f"[{self.name}] API returned {len(files)} files")
if files:
print(f"[{self.name}] Sample files: {files[:3]}")
return files
except Exception as e:
print(f"[{self.name}] Error fetching file list from API: {e}")
import traceback
print(f"[{self.name}] Traceback: {traceback.format_exc()}")
return []
api_url = self.api_url
for retry in range(MAX_RETRIES):
try:
headers = _ua_rotator.get_headers(rotate=(retry > 0))
logger.info(f"[{self.name}] Fetching file list from: {api_url}")
response = requests.get(api_url, headers=headers, timeout=30)
if response.status_code == 429:
self._handle_rate_limit(retry, "file list")
continue
response.raise_for_status()
data = response.json()
files = data.get('CurrentFiles', [])
logger.info(f"[{self.name}] API returned {len(files)} files")
if files:
logger.debug(f"[{self.name}] Sample files: {files[:3]}")
return files
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
self._handle_rate_limit(retry, "file list HTTPError")
continue
logger.error(f"[{self.name}] HTTP error fetching file list: {e}")
break
except Exception as e:
logger.exception(f"[{self.name}] Error fetching file list from API: {e}")
break
return []
def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]:
"""
Filtert Dateien für ein bestimmtes Datum.
Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz (mit Unterstrich!)
Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz
Da Handel bis 22:00 MEZ geht (21:00/20:00 UTC), müssen wir auch
Dateien nach Mitternacht UTC berücksichtigen.
"""
import re
filtered = []
# Für den Vortag: Dateien vom target_date UND vom Folgetag (bis ~02:00 UTC)
target_str = target_date.strftime('%Y-%m-%d')
next_day = target_date + timedelta(days=1)
next_day_str = next_day.strftime('%Y-%m-%d')
for file in files:
# Extrahiere Datum aus Dateiname
# Format: DETR-posttrade-2026-01-26T21_30.json.gz
if target_str in file:
filtered.append(file)
elif next_day_str in file:
# Prüfe ob es eine frühe Datei vom nächsten Tag ist (< 03:00 UTC)
try:
# Finde Timestamp im Dateinamen mit Unterstrich für Minuten
match = re.search(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})', file)
if match:
hour = int(match.group(2))
if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag
filtered.append(file)
except Exception:
pass
match = self._FILENAME_PATTERN.search(file)
if match:
hour = int(match.group(2))
if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag
filtered.append(file)
return filtered
@@ -110,17 +157,14 @@ class DeutscheBoerseBase(BaseExchange):
for retry in range(MAX_RETRIES):
try:
response = requests.get(full_url, headers=HEADERS, timeout=60)
headers = _ua_rotator.get_headers(rotate=(retry > 0))
response = requests.get(full_url, headers=headers, timeout=60)
if response.status_code == 404:
# Datei nicht gefunden - normal für alte Dateien
return []
if response.status_code == 429:
# Rate-Limit erreicht - warten und erneut versuchen
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
self._handle_rate_limit(retry, "download")
continue
response.raise_for_status()
@@ -130,13 +174,11 @@ class DeutscheBoerseBase(BaseExchange):
content = f.read().decode('utf-8')
if not content.strip():
# Leere Datei
return []
# NDJSON Format: Eine JSON-Zeile pro Trade
lines = content.strip().split('\n')
if not lines or (len(lines) == 1 and not lines[0].strip()):
# Leere Datei
return []
for line in lines:
@@ -147,116 +189,146 @@ class DeutscheBoerseBase(BaseExchange):
trade = self._parse_trade_record(record)
if trade:
trades.append(trade)
except json.JSONDecodeError:
continue
except Exception:
continue
except json.JSONDecodeError as e:
logger.debug(f"[{self.name}] JSON decode error in {filename}: {e}")
except Exception as e:
logger.debug(f"[{self.name}] Error parsing record in {filename}: {e}")
# Erfolg - keine weitere Retry nötig
# Erfolg
break
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
self._handle_rate_limit(retry, "download HTTPError")
continue
elif e.response.status_code != 404:
print(f"[{self.name}] HTTP error downloading {filename}: {e}")
logger.error(f"[{self.name}] HTTP error downloading {filename}: {e}")
break
except Exception as e:
print(f"[{self.name}] Error downloading/parsing {filename}: {e}")
logger.error(f"[{self.name}] Error downloading/parsing {filename}: {e}")
break
return trades
def _parse_timestamp(self, ts_str: str) -> Optional[datetime]:
"""
Parst einen Timestamp-String in ein datetime-Objekt.
Unterstützt Nanosekunden durch Kürzung auf Mikrosekunden.
"""
if not ts_str:
return None
# Ersetze 'Z' durch '+00:00' für ISO-Kompatibilität
ts_str = ts_str.replace('Z', '+00:00')
# Kürze Nanosekunden auf Mikrosekunden (Python max 6 Dezimalstellen)
if '.' in ts_str:
# Split bei '+' oder '-' für Timezone
if '+' in ts_str:
time_part, tz_part = ts_str.rsplit('+', 1)
tz_part = '+' + tz_part
elif ts_str.count('-') > 2: # Negative Timezone
time_part, tz_part = ts_str.rsplit('-', 1)
tz_part = '-' + tz_part
else:
time_part, tz_part = ts_str, ''
if '.' in time_part:
base, frac = time_part.split('.')
frac = frac[:6] # Kürze auf 6 Stellen
ts_str = f"{base}.{frac}{tz_part}"
return datetime.fromisoformat(ts_str)
def _extract_price(self, record: dict) -> Optional[float]:
"""Extrahiert den Preis aus verschiedenen JSON-Formaten."""
# Neues Format
if 'lastTrade' in record:
return float(record['lastTrade'])
# Altes Format mit verschachteltem Pric-Objekt
pric = record.get('Pric')
if pric is None:
return None
if isinstance(pric, (int, float)):
return float(pric)
if isinstance(pric, dict):
# Versuche verschiedene Pfade
if 'Pric' in pric:
inner = pric['Pric']
if isinstance(inner, dict):
amt = inner.get('MntryVal', {}).get('Amt') or inner.get('Amt')
if amt is not None:
return float(amt)
if 'MntryVal' in pric:
amt = pric['MntryVal'].get('Amt')
if amt is not None:
return float(amt)
return None
def _extract_quantity(self, record: dict) -> Optional[float]:
"""Extrahiert die Menge aus verschiedenen JSON-Formaten."""
# Neues Format
if 'lastQty' in record:
return float(record['lastQty'])
# Altes Format
qty = record.get('Qty')
if qty is None:
return None
if isinstance(qty, (int, float)):
return float(qty)
if isinstance(qty, dict):
val = qty.get('Unit') or qty.get('Qty')
if val is not None:
return float(val)
return None
def _parse_trade_record(self, record: dict) -> Optional[Trade]:
"""
Parst einen einzelnen Trade-Record aus dem JSON.
Aktuelles JSON-Format (NDJSON):
{
"messageId": "posttrade",
"sourceName": "GAT",
"isin": "US00123Q1040",
"lastTradeTime": "2026-01-29T14:07:00.419000000Z",
"lastTrade": 10.145,
"lastQty": 500.0,
"currency": "EUR",
...
}
Unterstützte Formate:
- Neues Format: isin, lastTrade, lastQty, lastTradeTime
- Altes Format: FinInstrmId.Id, Pric, Qty, TrdDt/TrdTm
"""
try:
# ISIN extrahieren - neues Format verwendet 'isin' lowercase
isin = record.get('isin') or record.get('ISIN') or record.get('instrumentId') or record.get('FinInstrmId', {}).get('Id', '')
# ISIN extrahieren
isin = (
record.get('isin') or
record.get('ISIN') or
record.get('instrumentId') or
record.get('FinInstrmId', {}).get('Id', '')
)
if not isin:
return None
# Preis extrahieren - neues Format: 'lastTrade'
price = None
if 'lastTrade' in record:
price = float(record['lastTrade'])
elif 'Pric' in record:
pric = record['Pric']
if isinstance(pric, dict):
if 'Pric' in pric:
inner = pric['Pric']
if 'MntryVal' in inner:
price = float(inner['MntryVal'].get('Amt', 0))
elif 'Amt' in inner:
price = float(inner['Amt'])
elif 'MntryVal' in pric:
price = float(pric['MntryVal'].get('Amt', 0))
elif isinstance(pric, (int, float)):
price = float(pric)
# Preis extrahieren
price = self._extract_price(record)
if price is None or price <= 0:
return None
# Menge extrahieren - neues Format: 'lastQty'
quantity = None
if 'lastQty' in record:
quantity = float(record['lastQty'])
elif 'Qty' in record:
qty = record['Qty']
if isinstance(qty, dict):
quantity = float(qty.get('Unit', qty.get('Qty', 0)))
elif isinstance(qty, (int, float)):
quantity = float(qty)
# Menge extrahieren
quantity = self._extract_quantity(record)
if quantity is None or quantity <= 0:
return None
# Timestamp extrahieren - neues Format: 'lastTradeTime'
# Timestamp extrahieren
timestamp = None
if 'lastTradeTime' in record:
ts_str = record['lastTradeTime']
# Format: "2026-01-29T14:07:00.419000000Z"
# Python kann max 6 Dezimalstellen, also kürzen
if '.' in ts_str:
parts = ts_str.replace('Z', '').split('.')
if len(parts) == 2 and len(parts[1]) > 6:
ts_str = parts[0] + '.' + parts[1][:6] + '+00:00'
else:
ts_str = ts_str.replace('Z', '+00:00')
else:
ts_str = ts_str.replace('Z', '+00:00')
timestamp = datetime.fromisoformat(ts_str)
timestamp = self._parse_timestamp(record['lastTradeTime'])
else:
# Fallback für altes Format
trd_dt = record.get('TrdDt', '')
trd_tm = record.get('TrdTm', '00:00:00')
if not trd_dt:
return None
ts_str = f"{trd_dt}T{trd_tm}"
if '.' in ts_str:
parts = ts_str.split('.')
if len(parts[1]) > 6:
ts_str = parts[0] + '.' + parts[1][:6]
timestamp = datetime.fromisoformat(ts_str)
if trd_dt:
timestamp = self._parse_timestamp(f"{trd_dt}T{trd_tm}")
if timestamp is None:
return None
@@ -273,22 +345,41 @@ class DeutscheBoerseBase(BaseExchange):
timestamp=timestamp
)
except Exception as e:
# Debug: Zeige ersten fehlgeschlagenen Record
except (ValueError, TypeError, KeyError) as e:
logger.debug(f"[{self.name}] Failed to parse trade record: {e}")
return None
def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date:
"""
Findet den letzten Handelstag (überspringt Wochenenden).
Findet den letzten Handelstag (überspringt Wochenenden und bekannte Feiertage).
Montag=0, Sonntag=6
"""
# Deutsche Börsen-Feiertage (fixe Daten, jedes Jahr gleich)
# Bewegliche Feiertage (Ostern etc.) müssten jährlich berechnet werden
fixed_holidays = {
(1, 1), # Neujahr
(5, 1), # Tag der Arbeit
(12, 24), # Heiligabend
(12, 25), # 1. Weihnachtstag
(12, 26), # 2. Weihnachtstag
(12, 31), # Silvester
}
date = from_date
# Wenn Samstag (5), gehe zurück zu Freitag
if date.weekday() == 5:
date = date - timedelta(days=1)
# Wenn Sonntag (6), gehe zurück zu Freitag
elif date.weekday() == 6:
date = date - timedelta(days=2)
max_iterations = 10 # Sicherheit gegen Endlosschleife
for _ in range(max_iterations):
# Wochenende überspringen
if date.weekday() == 5: # Samstag
date = date - timedelta(days=1)
elif date.weekday() == 6: # Sonntag
date = date - timedelta(days=2)
# Feiertag überspringen
elif (date.month, date.day) in fixed_holidays:
date = date - timedelta(days=1)
else:
break
return date
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
@@ -304,40 +395,36 @@ class DeutscheBoerseBase(BaseExchange):
# Standard: Vortag
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
# Überspringe Wochenenden
# Überspringe Wochenenden und Feiertage
original_date = target_date
target_date = self._get_last_trading_day(target_date)
if target_date != original_date:
print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}")
logger.info(f"[{self.name}] Adjusted date: {original_date} -> {target_date} (weekend/holiday)")
print(f"[{self.name}] Fetching trades for date: {target_date}")
logger.info(f"[{self.name}] Fetching trades for date: {target_date}")
# Hole Dateiliste von der API
files = self._get_file_list()
if not files:
print(f"[{self.name}] No files available from API")
logger.warning(f"[{self.name}] No files available from API")
return []
# Dateien für Zieldatum filtern
target_files = self._filter_files_for_date(files, target_date)
print(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)")
logger.info(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)")
if not target_files:
print(f"[{self.name}] No files for target date found")
logger.warning(f"[{self.name}] No files for target date found")
return []
# Alle passenden Dateien herunterladen und parsen (mit Rate-Limiting)
# Alle passenden Dateien herunterladen und parsen
successful = 0
failed = 0
total_files = len(target_files)
if total_files == 0:
print(f"[{self.name}] No files to download for date {target_date}")
return []
print(f"[{self.name}] Starting download of {total_files} files...")
logger.info(f"[{self.name}] Starting download of {total_files} files...")
for i, file in enumerate(target_files):
trades = self._download_and_parse_file(file)
@@ -353,9 +440,9 @@ class DeutscheBoerseBase(BaseExchange):
# Fortschritt alle 100 Dateien
if (i + 1) % 100 == 0:
print(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far")
logger.info(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far")
print(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades")
logger.info(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades")
return all_trades

View File

@@ -1,30 +1,38 @@
import requests
import json
from bs4 import BeautifulSoup
from datetime import datetime
from typing import List
import logging
from datetime import datetime, timezone
from typing import List, Generator, Tuple
from .base import BaseExchange, Trade
import csv
import io
logger = logging.getLogger(__name__)
class EIXExchange(BaseExchange):
"""European Investor Exchange - CSV-basierte Trade-Daten."""
API_BASE_URL = "https://european-investor-exchange.com/api"
@property
def name(self) -> str:
return "EIX"
def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None) -> List[Trade]:
# EIX stores its file list in a separate API endpoint
url = "https://european-investor-exchange.com/api/official-trades"
def get_files_to_process(self, limit: int = 1, since_date: datetime = None) -> List[dict]:
"""Holt die Liste der zu verarbeitenden Dateien ohne sie herunterzuladen."""
url = f"{self.API_BASE_URL}/official-trades"
try:
response = requests.get(url, timeout=15)
response.raise_for_status()
files_list = response.json()
except Exception as e:
print(f"Error fetching EIX file list: {e}")
except requests.exceptions.RequestException as e:
logger.error(f"[{self.name}] Fehler beim Abrufen der Dateiliste: {e}")
return []
except ValueError as e:
logger.error(f"[{self.name}] Ungültiges JSON in Dateiliste: {e}")
return []
# Filter files based on date in filename if since_date provided
# Format: "kursblatt/2025/Kursblatt.2025-07-14.1752526803105.csv"
# Filtere Dateien nach Datum wenn since_date angegeben
filtered_files = []
for item in files_list:
file_key = item.get('fileName')
@@ -33,90 +41,118 @@ class EIXExchange(BaseExchange):
if since_date:
try:
# Extract date from filename: Kursblatt.YYYY-MM-DD
parts = file_key.split('/')[-1].split('.')
# parts example: ['Kursblatt', '2025-07-14', '1752526803105', 'csv']
if len(parts) >= 2:
date_str = parts[1]
file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
# Check if file date is newer than since_date (compare dates only)
if file_date.date() > since_date.date():
if file_date.date() >= since_date.date():
filtered_files.append(item)
continue
# If same day, we might need to check it too, but EIX seems to be daily files
if file_date.date() == since_date.date():
filtered_files.append(item)
continue
except Exception:
# If parsing fails, default to including it (safety) or skipping?
# Let's include it if we are not sure
except (ValueError, IndexError) as e:
# Dateiname hat unerwartetes Format - zur Sicherheit einschließen
logger.debug(f"[{self.name}] Konnte Datum nicht aus {file_key} extrahieren: {e}")
filtered_files.append(item)
else:
filtered_files.append(item)
# Sort files to process oldest to newest if doing a sync, or newest to oldest?
# If we have limit=1 (default), we usually want the newest.
# But if we are syncing history (since_date set), we probably want all of them.
filtered_files.append(item)
# Logic: If since_date is set, we ignore limit (or use it as safety cap) and process ALL new files
if since_date:
files_to_process = filtered_files
# Sort by date ? The API list seems chronological.
return filtered_files
else:
# Default behavior: take the last N files (API returns oldest first usually?)
# Let's assume list is chronological.
if limit:
files_to_process = files_list[-limit:]
else:
files_to_process = files_list
return files_list[-limit:]
return files_list
def fetch_trades_from_file(self, file_item: dict) -> List[Trade]:
"""Lädt und parst eine einzelne CSV-Datei."""
file_key = file_item.get('fileName')
if not file_key:
return []
csv_url = f"{self.API_BASE_URL}/trade-file-contents?key={file_key}"
try:
response = requests.get(csv_url, timeout=60)
response.raise_for_status()
return self._parse_csv(response.text)
except requests.exceptions.RequestException as e:
logger.error(f"[{self.name}] Fehler beim Download von {file_key}: {e}")
except Exception as e:
logger.error(f"[{self.name}] Unerwarteter Fehler bei {file_key}: {e}")
return []
def fetch_trades_streaming(self, limit: int = 1, since_date: datetime = None) -> Generator[Tuple[str, List[Trade]], None, None]:
"""
Generator der Trades dateiweise zurückgibt.
Yields: (filename, trades) Tupel
"""
files = self.get_files_to_process(limit=limit, since_date=since_date)
for item in files:
file_key = item.get('fileName', 'unknown')
trades = self.fetch_trades_from_file(item)
if trades:
yield (file_key, trades)
trades = []
count = 0
for item in files_to_process:
file_key = item.get('fileName')
# Download the CSV
csv_url = f"https://european-investor-exchange.com/api/trade-file-contents?key={file_key}"
try:
csv_response = requests.get(csv_url, timeout=20)
if csv_response.status_code == 200:
trades.extend(self._parse_csv(csv_response.text))
count += 1
# Only enforce limit if since_date is NOT set
if not since_date and limit and count >= limit:
break
except Exception as e:
print(f"Error downloading EIX CSV {file_key}: {e}")
return trades
def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None, **kwargs) -> List[Trade]:
"""
Legacy-Methode für Kompatibilität.
WARNUNG: Lädt alle Trades in den Speicher! Für große Datenmengen fetch_trades_streaming() verwenden.
"""
# Für kleine Requests (limit <= 5) normale Verarbeitung
if limit and limit <= 5 and not since_date:
all_trades = []
for filename, trades in self.fetch_trades_streaming(limit=limit, since_date=since_date):
all_trades.extend(trades)
return all_trades
# Für große Requests: Warnung ausgeben und leere Liste zurückgeben
logger.warning(f"[{self.name}] fetch_latest_trades() mit großem Dataset aufgerufen. Verwende Streaming.")
return []
def _parse_csv(self, csv_text: str) -> List[Trade]:
"""Parst CSV-Text zu Trade-Objekten."""
trades = []
parse_errors = 0
f = io.StringIO(csv_text)
# Header: Trading day & Trading time UTC,Instrument Identifier,Quantity,Unit Price,Price Currency,Venue Identifier,Side
reader = csv.DictReader(f, delimiter=',')
for row in reader:
for row_num, row in enumerate(reader, start=2): # Start bei 2 wegen Header
try:
price = float(row['Unit Price'])
quantity = float(row['Quantity'])
isin = row['Instrument Identifier']
symbol = isin # Often symbol is unknown, use ISIN
time_str = row['Trading day & Trading time UTC']
# Format: 2026-01-22T06:30:00.617Z
# Python 3.11+ supports ISO with Z, otherwise we strip Z
# Preis und Menge validieren
if price <= 0 or quantity <= 0:
logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Preis/Menge: {price}/{quantity}")
parse_errors += 1
continue
ts_str = time_str.replace('Z', '+00:00')
timestamp = datetime.fromisoformat(ts_str)
trades.append(Trade(
exchange=self.name,
symbol=symbol,
symbol=isin,
isin=isin,
price=price,
quantity=quantity,
timestamp=timestamp
))
except Exception:
continue
except KeyError as e:
logger.debug(f"[{self.name}] Zeile {row_num}: Fehlendes Feld {e}")
parse_errors += 1
except ValueError as e:
logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Wert: {e}")
parse_errors += 1
except Exception as e:
logger.warning(f"[{self.name}] Zeile {row_num}: Unerwarteter Fehler: {e}")
parse_errors += 1
if parse_errors > 0:
logger.debug(f"[{self.name}] {parse_errors} Zeilen konnten nicht geparst werden")
return trades