Compare commits
5 Commits
a07319d957
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1dc79b8b64 | ||
|
|
cf55a0bd06 | ||
|
|
9cd84e0855 | ||
|
|
f325941e24 | ||
|
|
a21e036bb4 |
48
.gitignore
vendored
Normal file
48
.gitignore
vendored
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
# Python
|
||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
*$py.class
|
||||||
|
*.so
|
||||||
|
.Python
|
||||||
|
build/
|
||||||
|
develop-eggs/
|
||||||
|
dist/
|
||||||
|
downloads/
|
||||||
|
eggs/
|
||||||
|
.eggs/
|
||||||
|
lib/
|
||||||
|
lib64/
|
||||||
|
parts/
|
||||||
|
sdist/
|
||||||
|
var/
|
||||||
|
wheels/
|
||||||
|
*.egg-info/
|
||||||
|
.installed.cfg
|
||||||
|
*.egg
|
||||||
|
|
||||||
|
# Virtual environments
|
||||||
|
venv/
|
||||||
|
ENV/
|
||||||
|
env/
|
||||||
|
.venv/
|
||||||
|
|
||||||
|
# IDE
|
||||||
|
.idea/
|
||||||
|
.vscode/
|
||||||
|
*.swp
|
||||||
|
*.swo
|
||||||
|
|
||||||
|
# Environment files
|
||||||
|
.env
|
||||||
|
.env.local
|
||||||
|
|
||||||
|
# OS
|
||||||
|
.DS_Store
|
||||||
|
Thumbs.db
|
||||||
|
|
||||||
|
# Logs
|
||||||
|
*.log
|
||||||
|
|
||||||
|
# Test data
|
||||||
|
*.gz
|
||||||
|
!requirements.txt
|
||||||
248
bin/Activate.ps1
Normal file
248
bin/Activate.ps1
Normal file
@@ -0,0 +1,248 @@
|
|||||||
|
<#
|
||||||
|
.Synopsis
|
||||||
|
Activate a Python virtual environment for the current PowerShell session.
|
||||||
|
|
||||||
|
.Description
|
||||||
|
Pushes the python executable for a virtual environment to the front of the
|
||||||
|
$Env:PATH environment variable and sets the prompt to signify that you are
|
||||||
|
in a Python virtual environment. Makes use of the command line switches as
|
||||||
|
well as the `pyvenv.cfg` file values present in the virtual environment.
|
||||||
|
|
||||||
|
.Parameter VenvDir
|
||||||
|
Path to the directory that contains the virtual environment to activate. The
|
||||||
|
default value for this is the parent of the directory that the Activate.ps1
|
||||||
|
script is located within.
|
||||||
|
|
||||||
|
.Parameter Prompt
|
||||||
|
The prompt prefix to display when this virtual environment is activated. By
|
||||||
|
default, this prompt is the name of the virtual environment folder (VenvDir)
|
||||||
|
surrounded by parentheses and followed by a single space (ie. '(.venv) ').
|
||||||
|
|
||||||
|
.Example
|
||||||
|
Activate.ps1
|
||||||
|
Activates the Python virtual environment that contains the Activate.ps1 script.
|
||||||
|
|
||||||
|
.Example
|
||||||
|
Activate.ps1 -Verbose
|
||||||
|
Activates the Python virtual environment that contains the Activate.ps1 script,
|
||||||
|
and shows extra information about the activation as it executes.
|
||||||
|
|
||||||
|
.Example
|
||||||
|
Activate.ps1 -VenvDir C:\Users\MyUser\Common\.venv
|
||||||
|
Activates the Python virtual environment located in the specified location.
|
||||||
|
|
||||||
|
.Example
|
||||||
|
Activate.ps1 -Prompt "MyPython"
|
||||||
|
Activates the Python virtual environment that contains the Activate.ps1 script,
|
||||||
|
and prefixes the current prompt with the specified string (surrounded in
|
||||||
|
parentheses) while the virtual environment is active.
|
||||||
|
|
||||||
|
.Notes
|
||||||
|
On Windows, it may be required to enable this Activate.ps1 script by setting the
|
||||||
|
execution policy for the user. You can do this by issuing the following PowerShell
|
||||||
|
command:
|
||||||
|
|
||||||
|
PS C:\> Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser
|
||||||
|
|
||||||
|
For more information on Execution Policies:
|
||||||
|
https://go.microsoft.com/fwlink/?LinkID=135170
|
||||||
|
|
||||||
|
#>
|
||||||
|
Param(
|
||||||
|
[Parameter(Mandatory = $false)]
|
||||||
|
[String]
|
||||||
|
$VenvDir,
|
||||||
|
[Parameter(Mandatory = $false)]
|
||||||
|
[String]
|
||||||
|
$Prompt
|
||||||
|
)
|
||||||
|
|
||||||
|
<# Function declarations --------------------------------------------------- #>
|
||||||
|
|
||||||
|
<#
|
||||||
|
.Synopsis
|
||||||
|
Remove all shell session elements added by the Activate script, including the
|
||||||
|
addition of the virtual environment's Python executable from the beginning of
|
||||||
|
the PATH variable.
|
||||||
|
|
||||||
|
.Parameter NonDestructive
|
||||||
|
If present, do not remove this function from the global namespace for the
|
||||||
|
session.
|
||||||
|
|
||||||
|
#>
|
||||||
|
function global:deactivate ([switch]$NonDestructive) {
|
||||||
|
# Revert to original values
|
||||||
|
|
||||||
|
# The prior prompt:
|
||||||
|
if (Test-Path -Path Function:_OLD_VIRTUAL_PROMPT) {
|
||||||
|
Copy-Item -Path Function:_OLD_VIRTUAL_PROMPT -Destination Function:prompt
|
||||||
|
Remove-Item -Path Function:_OLD_VIRTUAL_PROMPT
|
||||||
|
}
|
||||||
|
|
||||||
|
# The prior PYTHONHOME:
|
||||||
|
if (Test-Path -Path Env:_OLD_VIRTUAL_PYTHONHOME) {
|
||||||
|
Copy-Item -Path Env:_OLD_VIRTUAL_PYTHONHOME -Destination Env:PYTHONHOME
|
||||||
|
Remove-Item -Path Env:_OLD_VIRTUAL_PYTHONHOME
|
||||||
|
}
|
||||||
|
|
||||||
|
# The prior PATH:
|
||||||
|
if (Test-Path -Path Env:_OLD_VIRTUAL_PATH) {
|
||||||
|
Copy-Item -Path Env:_OLD_VIRTUAL_PATH -Destination Env:PATH
|
||||||
|
Remove-Item -Path Env:_OLD_VIRTUAL_PATH
|
||||||
|
}
|
||||||
|
|
||||||
|
# Just remove the VIRTUAL_ENV altogether:
|
||||||
|
if (Test-Path -Path Env:VIRTUAL_ENV) {
|
||||||
|
Remove-Item -Path env:VIRTUAL_ENV
|
||||||
|
}
|
||||||
|
|
||||||
|
# Just remove VIRTUAL_ENV_PROMPT altogether.
|
||||||
|
if (Test-Path -Path Env:VIRTUAL_ENV_PROMPT) {
|
||||||
|
Remove-Item -Path env:VIRTUAL_ENV_PROMPT
|
||||||
|
}
|
||||||
|
|
||||||
|
# Just remove the _PYTHON_VENV_PROMPT_PREFIX altogether:
|
||||||
|
if (Get-Variable -Name "_PYTHON_VENV_PROMPT_PREFIX" -ErrorAction SilentlyContinue) {
|
||||||
|
Remove-Variable -Name _PYTHON_VENV_PROMPT_PREFIX -Scope Global -Force
|
||||||
|
}
|
||||||
|
|
||||||
|
# Leave deactivate function in the global namespace if requested:
|
||||||
|
if (-not $NonDestructive) {
|
||||||
|
Remove-Item -Path function:deactivate
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
<#
|
||||||
|
.Description
|
||||||
|
Get-PyVenvConfig parses the values from the pyvenv.cfg file located in the
|
||||||
|
given folder, and returns them in a map.
|
||||||
|
|
||||||
|
For each line in the pyvenv.cfg file, if that line can be parsed into exactly
|
||||||
|
two strings separated by `=` (with any amount of whitespace surrounding the =)
|
||||||
|
then it is considered a `key = value` line. The left hand string is the key,
|
||||||
|
the right hand is the value.
|
||||||
|
|
||||||
|
If the value starts with a `'` or a `"` then the first and last character is
|
||||||
|
stripped from the value before being captured.
|
||||||
|
|
||||||
|
.Parameter ConfigDir
|
||||||
|
Path to the directory that contains the `pyvenv.cfg` file.
|
||||||
|
#>
|
||||||
|
function Get-PyVenvConfig(
|
||||||
|
[String]
|
||||||
|
$ConfigDir
|
||||||
|
) {
|
||||||
|
Write-Verbose "Given ConfigDir=$ConfigDir, obtain values in pyvenv.cfg"
|
||||||
|
|
||||||
|
# Ensure the file exists, and issue a warning if it doesn't (but still allow the function to continue).
|
||||||
|
$pyvenvConfigPath = Join-Path -Resolve -Path $ConfigDir -ChildPath 'pyvenv.cfg' -ErrorAction Continue
|
||||||
|
|
||||||
|
# An empty map will be returned if no config file is found.
|
||||||
|
$pyvenvConfig = @{ }
|
||||||
|
|
||||||
|
if ($pyvenvConfigPath) {
|
||||||
|
|
||||||
|
Write-Verbose "File exists, parse `key = value` lines"
|
||||||
|
$pyvenvConfigContent = Get-Content -Path $pyvenvConfigPath
|
||||||
|
|
||||||
|
$pyvenvConfigContent | ForEach-Object {
|
||||||
|
$keyval = $PSItem -split "\s*=\s*", 2
|
||||||
|
if ($keyval[0] -and $keyval[1]) {
|
||||||
|
$val = $keyval[1]
|
||||||
|
|
||||||
|
# Remove extraneous quotations around a string value.
|
||||||
|
if ("'""".Contains($val.Substring(0, 1))) {
|
||||||
|
$val = $val.Substring(1, $val.Length - 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
$pyvenvConfig[$keyval[0]] = $val
|
||||||
|
Write-Verbose "Adding Key: '$($keyval[0])'='$val'"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return $pyvenvConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
<# Begin Activate script --------------------------------------------------- #>
|
||||||
|
|
||||||
|
# Determine the containing directory of this script
|
||||||
|
$VenvExecPath = Split-Path -Parent $MyInvocation.MyCommand.Definition
|
||||||
|
$VenvExecDir = Get-Item -Path $VenvExecPath
|
||||||
|
|
||||||
|
Write-Verbose "Activation script is located in path: '$VenvExecPath'"
|
||||||
|
Write-Verbose "VenvExecDir Fullname: '$($VenvExecDir.FullName)"
|
||||||
|
Write-Verbose "VenvExecDir Name: '$($VenvExecDir.Name)"
|
||||||
|
|
||||||
|
# Set values required in priority: CmdLine, ConfigFile, Default
|
||||||
|
# First, get the location of the virtual environment, it might not be
|
||||||
|
# VenvExecDir if specified on the command line.
|
||||||
|
if ($VenvDir) {
|
||||||
|
Write-Verbose "VenvDir given as parameter, using '$VenvDir' to determine values"
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Write-Verbose "VenvDir not given as a parameter, using parent directory name as VenvDir."
|
||||||
|
$VenvDir = $VenvExecDir.Parent.FullName.TrimEnd("\\/")
|
||||||
|
Write-Verbose "VenvDir=$VenvDir"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Next, read the `pyvenv.cfg` file to determine any required value such
|
||||||
|
# as `prompt`.
|
||||||
|
$pyvenvCfg = Get-PyVenvConfig -ConfigDir $VenvDir
|
||||||
|
|
||||||
|
# Next, set the prompt from the command line, or the config file, or
|
||||||
|
# just use the name of the virtual environment folder.
|
||||||
|
if ($Prompt) {
|
||||||
|
Write-Verbose "Prompt specified as argument, using '$Prompt'"
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Write-Verbose "Prompt not specified as argument to script, checking pyvenv.cfg value"
|
||||||
|
if ($pyvenvCfg -and $pyvenvCfg['prompt']) {
|
||||||
|
Write-Verbose " Setting based on value in pyvenv.cfg='$($pyvenvCfg['prompt'])'"
|
||||||
|
$Prompt = $pyvenvCfg['prompt'];
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Write-Verbose " Setting prompt based on parent's directory's name. (Is the directory name passed to venv module when creating the virtual environment)"
|
||||||
|
Write-Verbose " Got leaf-name of $VenvDir='$(Split-Path -Path $venvDir -Leaf)'"
|
||||||
|
$Prompt = Split-Path -Path $venvDir -Leaf
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Write-Verbose "Prompt = '$Prompt'"
|
||||||
|
Write-Verbose "VenvDir='$VenvDir'"
|
||||||
|
|
||||||
|
# Deactivate any currently active virtual environment, but leave the
|
||||||
|
# deactivate function in place.
|
||||||
|
deactivate -nondestructive
|
||||||
|
|
||||||
|
# Now set the environment variable VIRTUAL_ENV, used by many tools to determine
|
||||||
|
# that there is an activated venv.
|
||||||
|
$env:VIRTUAL_ENV = $VenvDir
|
||||||
|
|
||||||
|
$env:VIRTUAL_ENV_PROMPT = $Prompt
|
||||||
|
|
||||||
|
if (-not $Env:VIRTUAL_ENV_DISABLE_PROMPT) {
|
||||||
|
|
||||||
|
Write-Verbose "Setting prompt to '$Prompt'"
|
||||||
|
|
||||||
|
# Set the prompt to include the env name
|
||||||
|
# Make sure _OLD_VIRTUAL_PROMPT is global
|
||||||
|
function global:_OLD_VIRTUAL_PROMPT { "" }
|
||||||
|
Copy-Item -Path function:prompt -Destination function:_OLD_VIRTUAL_PROMPT
|
||||||
|
New-Variable -Name _PYTHON_VENV_PROMPT_PREFIX -Description "Python virtual environment prompt prefix" -Scope Global -Option ReadOnly -Visibility Public -Value $Prompt
|
||||||
|
|
||||||
|
function global:prompt {
|
||||||
|
Write-Host -NoNewline -ForegroundColor Green "($_PYTHON_VENV_PROMPT_PREFIX) "
|
||||||
|
_OLD_VIRTUAL_PROMPT
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Clear PYTHONHOME
|
||||||
|
if (Test-Path -Path Env:PYTHONHOME) {
|
||||||
|
Copy-Item -Path Env:PYTHONHOME -Destination Env:_OLD_VIRTUAL_PYTHONHOME
|
||||||
|
Remove-Item -Path Env:PYTHONHOME
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add the venv to the PATH
|
||||||
|
Copy-Item -Path Env:PATH -Destination Env:_OLD_VIRTUAL_PATH
|
||||||
|
$Env:PATH = "$VenvExecDir$([System.IO.Path]::PathSeparator)$Env:PATH"
|
||||||
76
bin/activate
Normal file
76
bin/activate
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
# This file must be used with "source bin/activate" *from bash*
|
||||||
|
# You cannot run it directly
|
||||||
|
|
||||||
|
deactivate () {
|
||||||
|
# reset old environment variables
|
||||||
|
if [ -n "${_OLD_VIRTUAL_PATH:-}" ] ; then
|
||||||
|
PATH="${_OLD_VIRTUAL_PATH:-}"
|
||||||
|
export PATH
|
||||||
|
unset _OLD_VIRTUAL_PATH
|
||||||
|
fi
|
||||||
|
if [ -n "${_OLD_VIRTUAL_PYTHONHOME:-}" ] ; then
|
||||||
|
PYTHONHOME="${_OLD_VIRTUAL_PYTHONHOME:-}"
|
||||||
|
export PYTHONHOME
|
||||||
|
unset _OLD_VIRTUAL_PYTHONHOME
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Call hash to forget past locations. Without forgetting
|
||||||
|
# past locations the $PATH changes we made may not be respected.
|
||||||
|
# See "man bash" for more details. hash is usually a builtin of your shell
|
||||||
|
hash -r 2> /dev/null
|
||||||
|
|
||||||
|
if [ -n "${_OLD_VIRTUAL_PS1:-}" ] ; then
|
||||||
|
PS1="${_OLD_VIRTUAL_PS1:-}"
|
||||||
|
export PS1
|
||||||
|
unset _OLD_VIRTUAL_PS1
|
||||||
|
fi
|
||||||
|
|
||||||
|
unset VIRTUAL_ENV
|
||||||
|
unset VIRTUAL_ENV_PROMPT
|
||||||
|
if [ ! "${1:-}" = "nondestructive" ] ; then
|
||||||
|
# Self destruct!
|
||||||
|
unset -f deactivate
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
# unset irrelevant variables
|
||||||
|
deactivate nondestructive
|
||||||
|
|
||||||
|
# on Windows, a path can contain colons and backslashes and has to be converted:
|
||||||
|
case "$(uname)" in
|
||||||
|
CYGWIN*|MSYS*|MINGW*)
|
||||||
|
# transform D:\path\to\venv to /d/path/to/venv on MSYS and MINGW
|
||||||
|
# and to /cygdrive/d/path/to/venv on Cygwin
|
||||||
|
VIRTUAL_ENV=$(cygpath /Users/melchiorreimers/Documents/trading_daemon)
|
||||||
|
export VIRTUAL_ENV
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
# use the path as-is
|
||||||
|
export VIRTUAL_ENV=/Users/melchiorreimers/Documents/trading_daemon
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
|
||||||
|
_OLD_VIRTUAL_PATH="$PATH"
|
||||||
|
PATH="$VIRTUAL_ENV/"bin":$PATH"
|
||||||
|
export PATH
|
||||||
|
|
||||||
|
VIRTUAL_ENV_PROMPT=trading_daemon
|
||||||
|
export VIRTUAL_ENV_PROMPT
|
||||||
|
|
||||||
|
# unset PYTHONHOME if set
|
||||||
|
# this will fail if PYTHONHOME is set to the empty string (which is bad anyway)
|
||||||
|
# could use `if (set -u; : $PYTHONHOME) ;` in bash
|
||||||
|
if [ -n "${PYTHONHOME:-}" ] ; then
|
||||||
|
_OLD_VIRTUAL_PYTHONHOME="${PYTHONHOME:-}"
|
||||||
|
unset PYTHONHOME
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ -z "${VIRTUAL_ENV_DISABLE_PROMPT:-}" ] ; then
|
||||||
|
_OLD_VIRTUAL_PS1="${PS1:-}"
|
||||||
|
PS1="("trading_daemon") ${PS1:-}"
|
||||||
|
export PS1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Call hash to forget past commands. Without forgetting
|
||||||
|
# past commands the $PATH changes we made may not be respected
|
||||||
|
hash -r 2> /dev/null
|
||||||
27
bin/activate.csh
Normal file
27
bin/activate.csh
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
# This file must be used with "source bin/activate.csh" *from csh*.
|
||||||
|
# You cannot run it directly.
|
||||||
|
|
||||||
|
# Created by Davide Di Blasi <davidedb@gmail.com>.
|
||||||
|
# Ported to Python 3.3 venv by Andrew Svetlov <andrew.svetlov@gmail.com>
|
||||||
|
|
||||||
|
alias deactivate 'test $?_OLD_VIRTUAL_PATH != 0 && setenv PATH "$_OLD_VIRTUAL_PATH" && unset _OLD_VIRTUAL_PATH; rehash; test $?_OLD_VIRTUAL_PROMPT != 0 && set prompt="$_OLD_VIRTUAL_PROMPT" && unset _OLD_VIRTUAL_PROMPT; unsetenv VIRTUAL_ENV; unsetenv VIRTUAL_ENV_PROMPT; test "\!:*" != "nondestructive" && unalias deactivate'
|
||||||
|
|
||||||
|
# Unset irrelevant variables.
|
||||||
|
deactivate nondestructive
|
||||||
|
|
||||||
|
setenv VIRTUAL_ENV /Users/melchiorreimers/Documents/trading_daemon
|
||||||
|
|
||||||
|
set _OLD_VIRTUAL_PATH="$PATH"
|
||||||
|
setenv PATH "$VIRTUAL_ENV/"bin":$PATH"
|
||||||
|
setenv VIRTUAL_ENV_PROMPT trading_daemon
|
||||||
|
|
||||||
|
|
||||||
|
set _OLD_VIRTUAL_PROMPT="$prompt"
|
||||||
|
|
||||||
|
if (! "$?VIRTUAL_ENV_DISABLE_PROMPT") then
|
||||||
|
set prompt = "("trading_daemon") $prompt:q"
|
||||||
|
endif
|
||||||
|
|
||||||
|
alias pydoc python -m pydoc
|
||||||
|
|
||||||
|
rehash
|
||||||
69
bin/activate.fish
Normal file
69
bin/activate.fish
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
# This file must be used with "source <venv>/bin/activate.fish" *from fish*
|
||||||
|
# (https://fishshell.com/). You cannot run it directly.
|
||||||
|
|
||||||
|
function deactivate -d "Exit virtual environment and return to normal shell environment"
|
||||||
|
# reset old environment variables
|
||||||
|
if test -n "$_OLD_VIRTUAL_PATH"
|
||||||
|
set -gx PATH $_OLD_VIRTUAL_PATH
|
||||||
|
set -e _OLD_VIRTUAL_PATH
|
||||||
|
end
|
||||||
|
if test -n "$_OLD_VIRTUAL_PYTHONHOME"
|
||||||
|
set -gx PYTHONHOME $_OLD_VIRTUAL_PYTHONHOME
|
||||||
|
set -e _OLD_VIRTUAL_PYTHONHOME
|
||||||
|
end
|
||||||
|
|
||||||
|
if test -n "$_OLD_FISH_PROMPT_OVERRIDE"
|
||||||
|
set -e _OLD_FISH_PROMPT_OVERRIDE
|
||||||
|
# prevents error when using nested fish instances (Issue #93858)
|
||||||
|
if functions -q _old_fish_prompt
|
||||||
|
functions -e fish_prompt
|
||||||
|
functions -c _old_fish_prompt fish_prompt
|
||||||
|
functions -e _old_fish_prompt
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
set -e VIRTUAL_ENV
|
||||||
|
set -e VIRTUAL_ENV_PROMPT
|
||||||
|
if test "$argv[1]" != "nondestructive"
|
||||||
|
# Self-destruct!
|
||||||
|
functions -e deactivate
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Unset irrelevant variables.
|
||||||
|
deactivate nondestructive
|
||||||
|
|
||||||
|
set -gx VIRTUAL_ENV /Users/melchiorreimers/Documents/trading_daemon
|
||||||
|
|
||||||
|
set -gx _OLD_VIRTUAL_PATH $PATH
|
||||||
|
set -gx PATH "$VIRTUAL_ENV/"bin $PATH
|
||||||
|
set -gx VIRTUAL_ENV_PROMPT trading_daemon
|
||||||
|
|
||||||
|
# Unset PYTHONHOME if set.
|
||||||
|
if set -q PYTHONHOME
|
||||||
|
set -gx _OLD_VIRTUAL_PYTHONHOME $PYTHONHOME
|
||||||
|
set -e PYTHONHOME
|
||||||
|
end
|
||||||
|
|
||||||
|
if test -z "$VIRTUAL_ENV_DISABLE_PROMPT"
|
||||||
|
# fish uses a function instead of an env var to generate the prompt.
|
||||||
|
|
||||||
|
# Save the current fish_prompt function as the function _old_fish_prompt.
|
||||||
|
functions -c fish_prompt _old_fish_prompt
|
||||||
|
|
||||||
|
# With the original prompt function renamed, we can override with our own.
|
||||||
|
function fish_prompt
|
||||||
|
# Save the return status of the last command.
|
||||||
|
set -l old_status $status
|
||||||
|
|
||||||
|
# Output the venv prompt; color taken from the blue of the Python logo.
|
||||||
|
printf "%s(%s)%s " (set_color 4B8BBE) trading_daemon (set_color normal)
|
||||||
|
|
||||||
|
# Restore the return status of the previous command.
|
||||||
|
echo "exit $old_status" | .
|
||||||
|
# Output the original/"old" prompt.
|
||||||
|
_old_fish_prompt
|
||||||
|
end
|
||||||
|
|
||||||
|
set -gx _OLD_FISH_PROMPT_OVERRIDE "$VIRTUAL_ENV"
|
||||||
|
end
|
||||||
8
bin/pip
Executable file
8
bin/pip
Executable file
@@ -0,0 +1,8 @@
|
|||||||
|
#!/Users/melchiorreimers/Documents/trading_daemon/bin/python3.13
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from pip._internal.cli.main import main
|
||||||
|
if __name__ == '__main__':
|
||||||
|
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
|
||||||
|
sys.exit(main())
|
||||||
8
bin/pip3
Executable file
8
bin/pip3
Executable file
@@ -0,0 +1,8 @@
|
|||||||
|
#!/Users/melchiorreimers/Documents/trading_daemon/bin/python3.13
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from pip._internal.cli.main import main
|
||||||
|
if __name__ == '__main__':
|
||||||
|
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
|
||||||
|
sys.exit(main())
|
||||||
8
bin/pip3.13
Executable file
8
bin/pip3.13
Executable file
@@ -0,0 +1,8 @@
|
|||||||
|
#!/Users/melchiorreimers/Documents/trading_daemon/bin/python3.13
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from pip._internal.cli.main import main
|
||||||
|
if __name__ == '__main__':
|
||||||
|
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
|
||||||
|
sys.exit(main())
|
||||||
1
bin/python
Symbolic link
1
bin/python
Symbolic link
@@ -0,0 +1 @@
|
|||||||
|
python3.13
|
||||||
1
bin/python3
Symbolic link
1
bin/python3
Symbolic link
@@ -0,0 +1 @@
|
|||||||
|
python3.13
|
||||||
1
bin/python3.13
Symbolic link
1
bin/python3.13
Symbolic link
@@ -0,0 +1 @@
|
|||||||
|
/opt/homebrew/opt/python@3.13/bin/python3.13
|
||||||
484
daemon.py
484
daemon.py
@@ -4,6 +4,9 @@ import datetime
|
|||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
import requests
|
import requests
|
||||||
|
from typing import List, Type
|
||||||
|
|
||||||
|
from src.exchanges.base import BaseExchange
|
||||||
from src.exchanges.eix import EIXExchange
|
from src.exchanges.eix import EIXExchange
|
||||||
from src.exchanges.ls import LSExchange
|
from src.exchanges.ls import LSExchange
|
||||||
from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange
|
from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange
|
||||||
@@ -25,230 +28,345 @@ DB_USER = os.getenv("DB_USER", "admin")
|
|||||||
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
|
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
|
||||||
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
|
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Exchange Registry - Neue Börsen hier hinzufügen
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
# Exchanges die Streaming-Verarbeitung benötigen (große Datenmengen)
|
||||||
|
STREAMING_EXCHANGES: List[Type[BaseExchange]] = [
|
||||||
|
EIXExchange,
|
||||||
|
]
|
||||||
|
|
||||||
|
# Standard-Exchanges (normale Batch-Verarbeitung)
|
||||||
|
STANDARD_EXCHANGES: List[Type[BaseExchange]] = [
|
||||||
|
# Lang & Schwarz
|
||||||
|
LSExchange,
|
||||||
|
# Deutsche Börse
|
||||||
|
XetraExchange,
|
||||||
|
FrankfurtExchange,
|
||||||
|
QuotrixExchange,
|
||||||
|
# Weitere Börsen
|
||||||
|
GettexExchange,
|
||||||
|
StuttgartExchange,
|
||||||
|
# Börsenag (Düsseldorf, Hamburg, Hannover)
|
||||||
|
DUSAExchange,
|
||||||
|
DUSBExchange,
|
||||||
|
DUSCExchange,
|
||||||
|
DUSDExchange,
|
||||||
|
HAMAExchange,
|
||||||
|
HAMBExchange,
|
||||||
|
HANAExchange,
|
||||||
|
HANBExchange,
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Trades Cache
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
# Cache für existierende Trades pro Tag (wird nach jedem Exchange geleert)
|
||||||
|
_existing_trades_cache = {}
|
||||||
|
|
||||||
def get_trade_hash(trade):
|
def get_trade_hash(trade):
|
||||||
"""Erstellt einen eindeutigen Hash für einen Trade."""
|
"""Erstellt einen eindeutigen Hash für einen Trade."""
|
||||||
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
|
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
|
||||||
return hashlib.md5(key.encode()).hexdigest()
|
return hashlib.md5(key.encode()).hexdigest()
|
||||||
|
|
||||||
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=1000):
|
def get_existing_trades_for_day(db_url, exchange_name, day):
|
||||||
"""Filtert neue Trades in Batches, um RAM zu sparen. Verwendet Batch-Queries statt einzelne Checks."""
|
"""Holt existierende Trades für einen Tag aus der DB (mit Caching)."""
|
||||||
|
cache_key = f"{exchange_name}_{day.strftime('%Y-%m-%d')}"
|
||||||
|
|
||||||
|
if cache_key in _existing_trades_cache:
|
||||||
|
return _existing_trades_cache[cache_key]
|
||||||
|
|
||||||
|
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
||||||
|
day_end = day + datetime.timedelta(days=1)
|
||||||
|
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
||||||
|
|
||||||
|
query = f"""
|
||||||
|
SELECT isin, timestamp, price, quantity
|
||||||
|
FROM trades
|
||||||
|
WHERE exchange = '{exchange_name}'
|
||||||
|
AND timestamp >= '{day_start_str}'
|
||||||
|
AND timestamp < '{day_end_str}'
|
||||||
|
"""
|
||||||
|
|
||||||
|
existing_trades = set()
|
||||||
|
try:
|
||||||
|
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60)
|
||||||
|
if response.status_code == 200:
|
||||||
|
data = response.json()
|
||||||
|
if data.get('dataset'):
|
||||||
|
for row in data['dataset']:
|
||||||
|
isin, ts, price, qty = row
|
||||||
|
if isinstance(ts, str):
|
||||||
|
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
|
||||||
|
else:
|
||||||
|
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
|
||||||
|
key = (isin, ts_dt.isoformat(), float(price), float(qty))
|
||||||
|
existing_trades.add(key)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error fetching existing trades for {day}: {e}")
|
||||||
|
|
||||||
|
_existing_trades_cache[cache_key] = existing_trades
|
||||||
|
return existing_trades
|
||||||
|
|
||||||
|
def clear_trades_cache():
|
||||||
|
"""Leert den Cache für existierende Trades."""
|
||||||
|
global _existing_trades_cache
|
||||||
|
_existing_trades_cache = {}
|
||||||
|
|
||||||
|
def filter_new_trades_for_day(db_url, exchange_name, trades, day):
|
||||||
|
"""Filtert neue Trades für einen einzelnen Tag."""
|
||||||
if not trades:
|
if not trades:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
new_trades = []
|
existing = get_existing_trades_for_day(db_url, exchange_name, day)
|
||||||
total_batches = (len(trades) + batch_size - 1) // batch_size
|
|
||||||
|
|
||||||
for batch_idx in range(0, len(trades), batch_size):
|
new_trades = []
|
||||||
batch = trades[batch_idx:batch_idx + batch_size]
|
for trade in trades:
|
||||||
batch_num = (batch_idx // batch_size) + 1
|
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
|
||||||
|
if trade_key not in existing:
|
||||||
if batch_num % 10 == 0 or batch_num == 1:
|
new_trades.append(trade)
|
||||||
logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} trades)...")
|
|
||||||
|
|
||||||
# Gruppiere Trades nach Tag für effizientere Queries
|
|
||||||
trades_by_day = {}
|
|
||||||
for trade in batch:
|
|
||||||
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
||||||
if day not in trades_by_day:
|
|
||||||
trades_by_day[day] = []
|
|
||||||
trades_by_day[day].append(trade)
|
|
||||||
|
|
||||||
# Prüfe jeden Tag separat
|
|
||||||
for day, day_trades in trades_by_day.items():
|
|
||||||
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
|
||||||
day_end = day + datetime.timedelta(days=1)
|
|
||||||
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
|
||||||
|
|
||||||
# Hole alle existierenden Trades für diesen Tag
|
|
||||||
query = f"""
|
|
||||||
SELECT isin, timestamp, price, quantity
|
|
||||||
FROM trades
|
|
||||||
WHERE exchange = '{exchange_name}'
|
|
||||||
AND timestamp >= '{day_start_str}'
|
|
||||||
AND timestamp < '{day_end_str}'
|
|
||||||
"""
|
|
||||||
|
|
||||||
try:
|
|
||||||
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=30)
|
|
||||||
if response.status_code == 200:
|
|
||||||
data = response.json()
|
|
||||||
existing_trades = set()
|
|
||||||
if data.get('dataset'):
|
|
||||||
for row in data['dataset']:
|
|
||||||
isin, ts, price, qty = row
|
|
||||||
# Normalisiere Timestamp für Vergleich
|
|
||||||
if isinstance(ts, str):
|
|
||||||
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
|
|
||||||
else:
|
|
||||||
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
|
|
||||||
# Erstelle Vergleichs-Key (ohne Hash, direkter Vergleich)
|
|
||||||
key = (isin, ts_dt.isoformat(), float(price), float(qty))
|
|
||||||
existing_trades.add(key)
|
|
||||||
|
|
||||||
# Prüfe welche Trades neu sind
|
|
||||||
for trade in day_trades:
|
|
||||||
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
|
|
||||||
if trade_key not in existing_trades:
|
|
||||||
new_trades.append(trade)
|
|
||||||
else:
|
|
||||||
# Bei Fehler: alle Trades als neu behandeln (sicherer)
|
|
||||||
logger.warning(f"Query failed for day {day}, treating all trades as new")
|
|
||||||
new_trades.extend(day_trades)
|
|
||||||
except Exception as e:
|
|
||||||
# Bei Fehler: alle Trades als neu behandeln (sicherer)
|
|
||||||
logger.warning(f"Error checking trades for day {day}: {e}, treating all trades as new")
|
|
||||||
new_trades.extend(day_trades)
|
|
||||||
|
|
||||||
# Kleine Pause zwischen Batches, um DB nicht zu überlasten
|
|
||||||
if batch_idx + batch_size < len(trades):
|
|
||||||
time.sleep(0.05)
|
|
||||||
|
|
||||||
return new_trades
|
return new_trades
|
||||||
|
|
||||||
def get_last_trade_timestamp(db_url, exchange_name):
|
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=5000):
|
||||||
# QuestDB query: get the latest timestamp for a specific exchange
|
"""Filtert neue Trades in Batches, gruppiert nach Tag."""
|
||||||
|
if not trades:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Gruppiere alle Trades nach Tag
|
||||||
|
trades_by_day = {}
|
||||||
|
for trade in trades:
|
||||||
|
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||||
|
if day not in trades_by_day:
|
||||||
|
trades_by_day[day] = []
|
||||||
|
trades_by_day[day].append(trade)
|
||||||
|
|
||||||
|
new_trades = []
|
||||||
|
total_days = len(trades_by_day)
|
||||||
|
|
||||||
|
for i, (day, day_trades) in enumerate(sorted(trades_by_day.items()), 1):
|
||||||
|
if i % 10 == 0 or i == 1:
|
||||||
|
logger.info(f"Checking day {i}/{total_days}: {day.strftime('%Y-%m-%d')} ({len(day_trades)} trades)...")
|
||||||
|
|
||||||
|
new_for_day = filter_new_trades_for_day(db_url, exchange_name, day_trades, day)
|
||||||
|
new_trades.extend(new_for_day)
|
||||||
|
|
||||||
|
# Kleine Pause um DB nicht zu überlasten
|
||||||
|
if i < total_days:
|
||||||
|
time.sleep(0.02)
|
||||||
|
|
||||||
|
return new_trades
|
||||||
|
|
||||||
|
def get_last_trade_timestamp(db_url: str, exchange_name: str) -> datetime.datetime:
|
||||||
|
"""Holt den Timestamp des letzten Trades für eine Exchange aus QuestDB."""
|
||||||
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
|
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
|
||||||
try:
|
try:
|
||||||
# Using the /exec endpoint to get data
|
|
||||||
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
|
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
data = response.json()
|
data = response.json()
|
||||||
if data['dataset']:
|
if data.get('dataset'):
|
||||||
# QuestDB returns timestamp in micros since epoch by default in some views, or ISO
|
# QuestDB gibt Timestamps in Mikrosekunden oder ISO-Format zurück
|
||||||
# Let's assume the timestamp is in the dataset
|
ts_value = data['dataset'][0][0]
|
||||||
# ILP timestamps are stored as designated timestamps.
|
|
||||||
ts_value = data['dataset'][0][0] # Adjust index based on column order
|
|
||||||
if isinstance(ts_value, str):
|
if isinstance(ts_value, str):
|
||||||
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
|
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
|
||||||
else:
|
else:
|
||||||
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
|
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}")
|
logger.debug(f"Keine existierenden Daten für {exchange_name} oder DB nicht erreichbar: {e}")
|
||||||
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
|
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
|
||||||
|
|
||||||
|
def process_eix_streaming(db, db_url: str, exchange: BaseExchange, historical: bool = False):
|
||||||
|
"""Verarbeitet eine Exchange im Streaming-Modus um RAM zu sparen."""
|
||||||
|
last_ts = get_last_trade_timestamp(db_url, exchange.name)
|
||||||
|
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts}) - STREAMING...")
|
||||||
|
|
||||||
|
# Hole Liste der zu verarbeitenden Dateien
|
||||||
|
if historical:
|
||||||
|
files = exchange.get_files_to_process(limit=None, since_date=None)
|
||||||
|
else:
|
||||||
|
files = exchange.get_files_to_process(limit=None, since_date=last_ts)
|
||||||
|
|
||||||
|
if not files:
|
||||||
|
logger.info(f"Keine {exchange.name} Dateien zu verarbeiten.")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(f"{len(files)} {exchange.name} Dateien gefunden...")
|
||||||
|
|
||||||
|
total_new = 0
|
||||||
|
total_processed = 0
|
||||||
|
|
||||||
|
for i, file_item in enumerate(files, 1):
|
||||||
|
file_name = file_item.get('fileName', 'unknown').split('/')[-1]
|
||||||
|
logger.info(f"Verarbeite {exchange.name} Datei {i}/{len(files)}: {file_name}")
|
||||||
|
|
||||||
|
trades = exchange.fetch_trades_from_file(file_item)
|
||||||
|
|
||||||
|
if not trades:
|
||||||
|
logger.info(f" Keine Trades in {file_name}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
total_processed += len(trades)
|
||||||
|
logger.info(f" {len(trades)} Trades geladen, filtere Duplikate...")
|
||||||
|
|
||||||
|
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
|
||||||
|
|
||||||
|
if new_trades:
|
||||||
|
new_trades.sort(key=lambda x: x.timestamp)
|
||||||
|
db.save_trades(new_trades)
|
||||||
|
total_new += len(new_trades)
|
||||||
|
logger.info(f" {len(new_trades)} neue Trades gespeichert (gesamt neu: {total_new})")
|
||||||
|
else:
|
||||||
|
logger.info(f" Keine neuen Trades in dieser Datei")
|
||||||
|
|
||||||
|
# Referenzen freigeben
|
||||||
|
del trades
|
||||||
|
del new_trades
|
||||||
|
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
logger.info(f"{exchange.name} fertig: {total_new} neue Trades von {total_processed} verarbeitet.")
|
||||||
|
clear_trades_cache()
|
||||||
|
|
||||||
|
def process_standard_exchange(db, db_url: str, exchange: BaseExchange, historical: bool):
|
||||||
|
"""Verarbeitet einen Standard-Exchange mit Batch-Verarbeitung."""
|
||||||
|
try:
|
||||||
|
last_ts = get_last_trade_timestamp(db_url, exchange.name)
|
||||||
|
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts})...")
|
||||||
|
|
||||||
|
trades = exchange.fetch_latest_trades(include_yesterday=historical)
|
||||||
|
|
||||||
|
if not trades:
|
||||||
|
logger.info(f"Keine Trades von {exchange.name} erhalten.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Deduplizierung
|
||||||
|
logger.info(f"Filtere {len(trades)} Trades auf Duplikate...")
|
||||||
|
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
|
||||||
|
|
||||||
|
logger.info(f"Gefunden: {len(trades)} Trades gesamt, {len(new_trades)} sind neu.")
|
||||||
|
|
||||||
|
if new_trades:
|
||||||
|
new_trades.sort(key=lambda x: x.timestamp)
|
||||||
|
db.save_trades(new_trades)
|
||||||
|
logger.info(f"{len(new_trades)} neue Trades in QuestDB gespeichert.")
|
||||||
|
|
||||||
|
# Referenzen freigeben
|
||||||
|
del trades
|
||||||
|
if new_trades:
|
||||||
|
del new_trades
|
||||||
|
clear_trades_cache()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler bei Exchange {exchange.name}: {e}")
|
||||||
|
|
||||||
|
|
||||||
def run_task(historical=False):
|
def run_task(historical=False):
|
||||||
logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...")
|
"""Haupttask: Holt Trades von allen registrierten Exchanges."""
|
||||||
|
logger.info(f"Starte Trading Data Fetcher (Historical: {historical})...")
|
||||||
# Initialize exchanges
|
|
||||||
eix = EIXExchange()
|
|
||||||
ls = LSExchange()
|
|
||||||
|
|
||||||
# Neue Deutsche Börse Exchanges
|
|
||||||
xetra = XetraExchange()
|
|
||||||
frankfurt = FrankfurtExchange()
|
|
||||||
quotrix = QuotrixExchange()
|
|
||||||
gettex = GettexExchange()
|
|
||||||
stuttgart = StuttgartExchange()
|
|
||||||
|
|
||||||
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
|
|
||||||
dusa = DUSAExchange()
|
|
||||||
dusb = DUSBExchange()
|
|
||||||
dusc = DUSCExchange()
|
|
||||||
dusd = DUSDExchange()
|
|
||||||
hama = HAMAExchange()
|
|
||||||
hamb = HAMBExchange()
|
|
||||||
hana = HANAExchange()
|
|
||||||
hanb = HANBExchange()
|
|
||||||
|
|
||||||
# Pass last_ts to fetcher to allow smart filtering
|
|
||||||
# daemon.py runs daily, so we want to fetch everything since DB state
|
|
||||||
# BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument
|
|
||||||
# We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop.
|
|
||||||
|
|
||||||
# We will modify the loop below to handle args dynamically
|
|
||||||
exchanges_to_process = [
|
|
||||||
(eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical
|
|
||||||
(ls, {'include_yesterday': historical}),
|
|
||||||
# Deutsche Börse Exchanges
|
|
||||||
(xetra, {'include_yesterday': historical}),
|
|
||||||
(frankfurt, {'include_yesterday': historical}),
|
|
||||||
(quotrix, {'include_yesterday': historical}),
|
|
||||||
(gettex, {'include_yesterday': historical}),
|
|
||||||
(stuttgart, {'include_yesterday': historical}),
|
|
||||||
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
|
|
||||||
(dusa, {'include_yesterday': historical}),
|
|
||||||
(dusb, {'include_yesterday': historical}),
|
|
||||||
(dusc, {'include_yesterday': historical}),
|
|
||||||
(dusd, {'include_yesterday': historical}),
|
|
||||||
(hama, {'include_yesterday': historical}),
|
|
||||||
(hamb, {'include_yesterday': historical}),
|
|
||||||
(hana, {'include_yesterday': historical}),
|
|
||||||
(hanb, {'include_yesterday': historical}),
|
|
||||||
]
|
|
||||||
|
|
||||||
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
|
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
|
||||||
|
|
||||||
for exchange, args in exchanges_to_process:
|
|
||||||
try:
|
|
||||||
db_url = "http://questdb:9000"
|
|
||||||
last_ts = get_last_trade_timestamp(db_url, exchange.name)
|
|
||||||
|
|
||||||
logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...")
|
|
||||||
|
|
||||||
# Special handling for EIX to support smart filtering
|
|
||||||
call_args = args.copy()
|
|
||||||
if exchange.name == "EIX" and not historical:
|
|
||||||
call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc)
|
|
||||||
# Remove limit if we are filtering by date to ensure we get everything
|
|
||||||
if 'limit' in call_args:
|
|
||||||
call_args.pop('limit')
|
|
||||||
|
|
||||||
trades = exchange.fetch_latest_trades(**call_args)
|
|
||||||
|
|
||||||
if not trades:
|
|
||||||
logger.info(f"No trades fetched from {exchange.name}.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Hash-basierte Deduplizierung - Batch-Verarbeitung um RAM zu sparen
|
|
||||||
logger.info(f"Filtering {len(trades)} trades for duplicates (batch processing)...")
|
|
||||||
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=500)
|
|
||||||
|
|
||||||
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
|
|
||||||
|
|
||||||
if new_trades:
|
|
||||||
# Sort trades by timestamp before saving (QuestDB likes this)
|
|
||||||
new_trades.sort(key=lambda x: x.timestamp)
|
|
||||||
db.save_trades(new_trades)
|
|
||||||
logger.info(f"Stored {len(new_trades)} new trades in QuestDB.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error processing exchange {exchange.name}: {e}")
|
|
||||||
|
|
||||||
def main():
|
|
||||||
logger.info("Trading Daemon started.")
|
|
||||||
|
|
||||||
# 1. Startup Check: Ist die DB leer?
|
|
||||||
db_url = "http://questdb:9000"
|
db_url = "http://questdb:9000"
|
||||||
is_empty = True
|
|
||||||
|
# Streaming-Exchanges verarbeiten (große Datenmengen)
|
||||||
|
for exchange_class in STREAMING_EXCHANGES:
|
||||||
|
try:
|
||||||
|
exchange = exchange_class()
|
||||||
|
logger.info(f"Verarbeite {exchange.name} im Streaming-Modus...")
|
||||||
|
process_eix_streaming(db, db_url, exchange, historical=historical)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler bei Streaming-Exchange {exchange_class.__name__}: {e}")
|
||||||
|
|
||||||
|
# Standard-Exchanges verarbeiten
|
||||||
|
for exchange_class in STANDARD_EXCHANGES:
|
||||||
|
try:
|
||||||
|
exchange = exchange_class()
|
||||||
|
process_standard_exchange(db, db_url, exchange, historical)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler bei Exchange {exchange_class.__name__}: {e}")
|
||||||
|
|
||||||
|
logger.info("Alle Exchanges verarbeitet.")
|
||||||
|
|
||||||
|
def is_database_empty(db_url: str) -> bool:
|
||||||
|
"""Prüft ob die Datenbank leer ist oder die Tabelle nicht existiert."""
|
||||||
try:
|
try:
|
||||||
# Prüfe ob bereits Trades in der Tabelle sind
|
|
||||||
response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH)
|
response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
data = response.json()
|
data = response.json()
|
||||||
if data['dataset'] and data['dataset'][0][0] > 0:
|
if data.get('dataset') and data['dataset'][0][0] > 0:
|
||||||
is_empty = False
|
return False
|
||||||
except Exception:
|
except Exception:
|
||||||
# Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist
|
pass
|
||||||
is_empty = True
|
return True
|
||||||
|
|
||||||
if is_empty:
|
|
||||||
logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...")
|
def calculate_seconds_until_target(target_hour: int, target_minute: int = 0) -> int:
|
||||||
|
"""Berechnet Sekunden bis zur nächsten Zielzeit."""
|
||||||
|
now = datetime.datetime.now()
|
||||||
|
target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
|
||||||
|
|
||||||
|
# Wenn Zielzeit heute schon vorbei ist, nimm morgen
|
||||||
|
if target <= now:
|
||||||
|
target += datetime.timedelta(days=1)
|
||||||
|
|
||||||
|
return int((target - now).total_seconds())
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
logger.info("Trading Daemon gestartet.")
|
||||||
|
|
||||||
|
db_url = "http://questdb:9000"
|
||||||
|
|
||||||
|
# Startup: Initialer Sync
|
||||||
|
if is_database_empty(db_url):
|
||||||
|
logger.info("Datenbank ist leer. Starte initialen historischen Fetch...")
|
||||||
run_task(historical=True)
|
run_task(historical=True)
|
||||||
else:
|
else:
|
||||||
logger.info("Found existing data in database. Triggering catch-up sync...")
|
logger.info("Existierende Daten gefunden. Starte Catch-up Sync...")
|
||||||
# Run a normal task to fetch any missing data since the last run
|
|
||||||
run_task(historical=False)
|
run_task(historical=False)
|
||||||
logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.")
|
logger.info("Catch-up Sync abgeschlossen.")
|
||||||
|
|
||||||
|
# Scheduling Konfiguration
|
||||||
|
SCHEDULE_HOUR = 23
|
||||||
|
SCHEDULE_MINUTE = 0
|
||||||
|
last_run_date = None
|
||||||
|
|
||||||
|
logger.info(f"Warte auf täglichen Run um {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d}...")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
now = datetime.datetime.now()
|
now = datetime.datetime.now()
|
||||||
# Täglich um 23:00 Uhr
|
today = now.date()
|
||||||
if now.hour == 23 and now.minute == 0:
|
|
||||||
run_task(historical=False)
|
|
||||||
# Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern
|
|
||||||
time.sleep(61)
|
|
||||||
|
|
||||||
# Check alle 30 Sekunden
|
# Prüfe ob wir heute schon gelaufen sind
|
||||||
time.sleep(30)
|
already_ran_today = (last_run_date == today)
|
||||||
|
|
||||||
|
# Prüfe ob wir im Zeitfenster sind (23:00 - 23:59)
|
||||||
|
in_schedule_window = (now.hour == SCHEDULE_HOUR and now.minute >= SCHEDULE_MINUTE)
|
||||||
|
|
||||||
|
if in_schedule_window and not already_ran_today:
|
||||||
|
logger.info(f"Geplanter Task startet ({now.strftime('%Y-%m-%d %H:%M:%S')})...")
|
||||||
|
run_task(historical=False)
|
||||||
|
last_run_date = today
|
||||||
|
logger.info("Geplanter Task abgeschlossen. Warte auf nächsten Tag...")
|
||||||
|
|
||||||
|
# Dynamische Sleep-Zeit: Kurz vor Zielzeit öfter prüfen
|
||||||
|
seconds_until_target = calculate_seconds_until_target(SCHEDULE_HOUR, SCHEDULE_MINUTE)
|
||||||
|
|
||||||
|
if seconds_until_target > 3600:
|
||||||
|
# Mehr als 1 Stunde: Schlafe 30 Minuten
|
||||||
|
sleep_time = 1800
|
||||||
|
elif seconds_until_target > 300:
|
||||||
|
# 5 Minuten bis 1 Stunde: Schlafe 5 Minuten
|
||||||
|
sleep_time = 300
|
||||||
|
else:
|
||||||
|
# Unter 5 Minuten: Schlafe 30 Sekunden
|
||||||
|
sleep_time = 30
|
||||||
|
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
5
pyvenv.cfg
Normal file
5
pyvenv.cfg
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
home = /opt/homebrew/opt/python@3.13/bin
|
||||||
|
include-system-site-packages = false
|
||||||
|
version = 3.13.2
|
||||||
|
executable = /opt/homebrew/Cellar/python@3.13/3.13.2/Frameworks/Python.framework/Versions/3.13/bin/python3.13
|
||||||
|
command = /opt/homebrew/opt/python@3.13/bin/python3.13 -m venv /Users/melchiorreimers/Documents/trading_daemon
|
||||||
79
scripts/inspect_gzip.py
Normal file
79
scripts/inspect_gzip.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Utility-Script zum Inspizieren von gzip-komprimierten JSON-Dateien.
|
||||||
|
Verarbeitet Dateien streaming, ohne alles in den RAM zu laden.
|
||||||
|
|
||||||
|
Verwendung:
|
||||||
|
python scripts/inspect_gzip.py <datei.json.gz> [--limit N] [--output datei.json]
|
||||||
|
"""
|
||||||
|
import gzip
|
||||||
|
import json
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def inspect_gzip_file(filepath: str, limit: int = None, output_file: str = None):
|
||||||
|
"""
|
||||||
|
Liest eine gzip-komprimierte NDJSON-Datei und gibt die Inhalte aus.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath: Pfad zur .json.gz Datei
|
||||||
|
limit: Maximale Anzahl der auszugebenden Records (None = alle)
|
||||||
|
output_file: Optional: Ausgabe in Datei statt stdout
|
||||||
|
"""
|
||||||
|
path = Path(filepath)
|
||||||
|
if not path.exists():
|
||||||
|
print(f"Fehler: Datei '{filepath}' nicht gefunden.", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
output = open(output_file, 'w', encoding='utf-8') if output_file else sys.stdout
|
||||||
|
|
||||||
|
try:
|
||||||
|
with gzip.open(filepath, mode='rt', encoding='utf-8') as f:
|
||||||
|
for line in f:
|
||||||
|
if not line.strip():
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
record = json.loads(line)
|
||||||
|
# Pretty-print einzelner Record
|
||||||
|
json.dump(record, output, indent=2, ensure_ascii=False)
|
||||||
|
output.write('\n')
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
if limit and count >= limit:
|
||||||
|
break
|
||||||
|
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
print(f"JSON-Fehler in Zeile {count + 1}: {e}", file=sys.stderr)
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"\n--- {count} Records verarbeitet ---", file=sys.stderr)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if output_file and output != sys.stdout:
|
||||||
|
output.close()
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Inspiziert gzip-komprimierte JSON-Dateien (NDJSON-Format)'
|
||||||
|
)
|
||||||
|
parser.add_argument('file', help='Pfad zur .json.gz Datei')
|
||||||
|
parser.add_argument('--limit', '-n', type=int, default=10,
|
||||||
|
help='Maximale Anzahl der Records (default: 10, 0 = alle)')
|
||||||
|
parser.add_argument('--output', '-o', type=str,
|
||||||
|
help='Ausgabe in Datei statt stdout')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
limit = args.limit if args.limit > 0 else None
|
||||||
|
return inspect_gzip_file(args.file, limit=limit, output_file=args.output)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
sys.exit(main())
|
||||||
@@ -865,6 +865,39 @@ class AnalyticsWorker:
|
|||||||
if i % 10 == 0:
|
if i % 10 == 0:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
def delete_analytics_for_date(self, date: datetime.date):
|
||||||
|
"""Löscht alle Analytics-Daten für ein bestimmtes Datum, damit sie neu berechnet werden können."""
|
||||||
|
date_str = date.strftime('%Y-%m-%d')
|
||||||
|
next_day = date + datetime.timedelta(days=1)
|
||||||
|
next_day_str = next_day.strftime('%Y-%m-%d')
|
||||||
|
|
||||||
|
tables = ['analytics_custom', 'analytics_exchange_daily', 'analytics_daily_summary']
|
||||||
|
|
||||||
|
for table in tables:
|
||||||
|
try:
|
||||||
|
# QuestDB DELETE syntax
|
||||||
|
delete_query = f"DELETE FROM {table} WHERE timestamp >= '{date_str}' AND timestamp < '{next_day_str}'"
|
||||||
|
response = requests.get(
|
||||||
|
f"{self.questdb_url}/exec",
|
||||||
|
params={'query': delete_query},
|
||||||
|
auth=self.auth,
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
if response.status_code == 200:
|
||||||
|
logger.debug(f"Deleted old analytics from {table} for {date}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Could not delete from {table} for {date}: {e}")
|
||||||
|
|
||||||
|
def force_recalculate_date(self, date: datetime.date):
|
||||||
|
"""Erzwingt Neuberechnung der Analytics für ein Datum (löscht alte Daten zuerst)."""
|
||||||
|
logger.info(f"Force recalculating analytics for {date}...")
|
||||||
|
|
||||||
|
# Lösche alte Analytics-Daten für dieses Datum
|
||||||
|
self.delete_analytics_for_date(date)
|
||||||
|
|
||||||
|
# Berechne neu
|
||||||
|
self.process_date(date)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Hauptschleife des Workers"""
|
"""Hauptschleife des Workers"""
|
||||||
logger.info("Analytics Worker started.")
|
logger.info("Analytics Worker started.")
|
||||||
@@ -874,35 +907,26 @@ class AnalyticsWorker:
|
|||||||
logger.error("Failed to connect to QuestDB. Exiting.")
|
logger.error("Failed to connect to QuestDB. Exiting.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Initiale Berechnung fehlender Tage (inkl. gestern und heute)
|
# Initiale Berechnung fehlender Tage
|
||||||
logger.info("Checking for missing dates...")
|
logger.info("Checking for missing dates...")
|
||||||
self.process_missing_dates()
|
self.process_missing_dates()
|
||||||
|
|
||||||
# Stelle sicher, dass gestern und heute verarbeitet werden
|
# IMMER heute und gestern neu berechnen (da neue Trades hinzukommen können)
|
||||||
today = datetime.date.today()
|
today = datetime.date.today()
|
||||||
yesterday = today - datetime.timedelta(days=1)
|
yesterday = today - datetime.timedelta(days=1)
|
||||||
|
|
||||||
logger.info(f"Ensuring yesterday ({yesterday}) and today ({today}) are processed...")
|
logger.info(f"Force recalculating yesterday ({yesterday}) and today ({today}) - new trades may have been added...")
|
||||||
# Prüfe alle drei Tabellen
|
|
||||||
existing_custom = self.get_existing_dates('analytics_custom')
|
|
||||||
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
|
|
||||||
existing_summary = self.get_existing_dates('analytics_daily_summary')
|
|
||||||
existing_dates = existing_custom | existing_exchange | existing_summary
|
|
||||||
|
|
||||||
if yesterday not in existing_dates:
|
# Gestern immer neu berechnen
|
||||||
logger.info(f"Processing yesterday's data: {yesterday}")
|
self.force_recalculate_date(yesterday)
|
||||||
self.process_date(yesterday)
|
|
||||||
|
|
||||||
# Heute wird verarbeitet, wenn es bereits Trades gibt
|
# Heute nur wenn es Trades gibt
|
||||||
if today not in existing_dates:
|
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
||||||
# Prüfe ob es heute schon Trades gibt
|
data = self.query_questdb(query)
|
||||||
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
||||||
data = self.query_questdb(query)
|
self.force_recalculate_date(today)
|
||||||
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
else:
|
||||||
logger.info(f"Found trades for today ({today}), processing...")
|
logger.info(f"No trades found for today ({today}) yet, will process later")
|
||||||
self.process_date(today)
|
|
||||||
else:
|
|
||||||
logger.info(f"No trades found for today ({today}) yet, will process later")
|
|
||||||
|
|
||||||
# Hauptschleife: Prüfe regelmäßig auf fehlende Tage
|
# Hauptschleife: Prüfe regelmäßig auf fehlende Tage
|
||||||
logger.info("Starting main loop - checking for missing dates every hour...")
|
logger.info("Starting main loop - checking for missing dates every hour...")
|
||||||
@@ -917,32 +941,24 @@ class AnalyticsWorker:
|
|||||||
self.process_missing_dates()
|
self.process_missing_dates()
|
||||||
last_check_hour = current_hour
|
last_check_hour = current_hour
|
||||||
|
|
||||||
# Stelle sicher, dass gestern und heute verarbeitet wurden
|
# IMMER heute und gestern neu berechnen
|
||||||
today = now.date()
|
today = now.date()
|
||||||
yesterday = today - datetime.timedelta(days=1)
|
yesterday = today - datetime.timedelta(days=1)
|
||||||
# Prüfe alle drei Tabellen
|
|
||||||
existing_custom = self.get_existing_dates('analytics_custom')
|
|
||||||
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
|
|
||||||
existing_summary = self.get_existing_dates('analytics_daily_summary')
|
|
||||||
existing_dates = existing_custom | existing_exchange | existing_summary
|
|
||||||
|
|
||||||
if yesterday not in existing_dates:
|
logger.info(f"Hourly recalculation of yesterday ({yesterday}) and today ({today})...")
|
||||||
logger.info(f"Processing yesterday's data: {yesterday}")
|
self.force_recalculate_date(yesterday)
|
||||||
self.process_date(yesterday)
|
|
||||||
|
|
||||||
# Prüfe heute, ob es Trades gibt
|
# Prüfe heute, ob es Trades gibt
|
||||||
if today not in existing_dates:
|
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
||||||
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
data = self.query_questdb(query)
|
||||||
data = self.query_questdb(query)
|
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
||||||
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
self.force_recalculate_date(today)
|
||||||
logger.info(f"Found trades for today ({today}), processing...")
|
|
||||||
self.process_date(today)
|
|
||||||
|
|
||||||
# Prüfe ob es Mitternacht ist (00:00) - verarbeite dann gestern
|
# Prüfe ob es Mitternacht ist (00:00) - verarbeite dann gestern
|
||||||
if now.hour == 0 and now.minute == 0:
|
if now.hour == 0 and now.minute == 0:
|
||||||
yesterday = (now - datetime.timedelta(days=1)).date()
|
yesterday = (now - datetime.timedelta(days=1)).date()
|
||||||
logger.info(f"Midnight reached - processing yesterday's data: {yesterday}")
|
logger.info(f"Midnight reached - force recalculating yesterday's data: {yesterday}")
|
||||||
self.process_date(yesterday)
|
self.force_recalculate_date(yesterday)
|
||||||
# Warte 61s, um Mehrfachausführung zu verhindern
|
# Warte 61s, um Mehrfachausführung zu verhindern
|
||||||
time.sleep(61)
|
time.sleep(61)
|
||||||
|
|
||||||
|
|||||||
354
src/exchanges/boersenag.py
Normal file
354
src/exchanges/boersenag.py
Normal file
@@ -0,0 +1,354 @@
|
|||||||
|
"""
|
||||||
|
Börsenag Exchange Fetcher
|
||||||
|
Unterstützt: DUSA, DUSB, DUSC, DUSD, HAMA, HAMB, HANA, HANB
|
||||||
|
|
||||||
|
Datenquelle: https://www.boersenag.de/mifid-ii-delayed-data/
|
||||||
|
URL-Format: https://cld42.boersenag.de/m13data/data/Mifir13DelayedData_{MIC}_{SEQUENCE}_{TIMESTAMP}.csv
|
||||||
|
"""
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from typing import List, Optional
|
||||||
|
from .base import BaseExchange, Trade
|
||||||
|
import re
|
||||||
|
|
||||||
|
# Rate-Limiting Konfiguration
|
||||||
|
RATE_LIMIT_DELAY = 0.3 # Sekunden zwischen Requests
|
||||||
|
|
||||||
|
# Browser User-Agent für Zugriff
|
||||||
|
HEADERS = {
|
||||||
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||||
|
'Accept': 'text/csv, text/plain, */*',
|
||||||
|
'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8',
|
||||||
|
'Referer': 'https://www.boersenag.de/',
|
||||||
|
}
|
||||||
|
|
||||||
|
# Exchange-Konfiguration
|
||||||
|
BOERSENAG_EXCHANGES = {
|
||||||
|
'DUSA': {
|
||||||
|
'name': 'DUSA',
|
||||||
|
'full_name': 'Börse Düsseldorf Regulierter Markt',
|
||||||
|
'mic': 'DUSA',
|
||||||
|
},
|
||||||
|
'DUSB': {
|
||||||
|
'name': 'DUSB',
|
||||||
|
'full_name': 'Börse Düsseldorf Freiverkehr',
|
||||||
|
'mic': 'DUSB',
|
||||||
|
},
|
||||||
|
'DUSC': {
|
||||||
|
'name': 'DUSC',
|
||||||
|
'full_name': 'Börse Düsseldorf Quotrix Regulierter Markt',
|
||||||
|
'mic': 'DUSC',
|
||||||
|
},
|
||||||
|
'DUSD': {
|
||||||
|
'name': 'DUSD',
|
||||||
|
'full_name': 'Börse Düsseldorf Quotrix Freiverkehr',
|
||||||
|
'mic': 'DUSD',
|
||||||
|
},
|
||||||
|
'HAMA': {
|
||||||
|
'name': 'HAMA',
|
||||||
|
'full_name': 'Börse Hamburg Regulierter Markt',
|
||||||
|
'mic': 'HAMA',
|
||||||
|
},
|
||||||
|
'HAMB': {
|
||||||
|
'name': 'HAMB',
|
||||||
|
'full_name': 'Börse Hamburg Freiverkehr',
|
||||||
|
'mic': 'HAMB',
|
||||||
|
},
|
||||||
|
'HANA': {
|
||||||
|
'name': 'HANA',
|
||||||
|
'full_name': 'Börse Hannover Regulierter Markt',
|
||||||
|
'mic': 'HANA',
|
||||||
|
},
|
||||||
|
'HANB': {
|
||||||
|
'name': 'HANB',
|
||||||
|
'full_name': 'Börse Hannover Freiverkehr',
|
||||||
|
'mic': 'HANB',
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
BASE_URL = "https://cld42.boersenag.de/m13data/data"
|
||||||
|
|
||||||
|
|
||||||
|
class BoersenagBase(BaseExchange):
|
||||||
|
"""
|
||||||
|
Basisklasse für Börsenag Exchanges (DUSA, DUSB, DUSC, DUSD, HAMA, HAMB, HANA, HANB)
|
||||||
|
|
||||||
|
CSV Format (Semikolon-separiert):
|
||||||
|
MIC; ISIN; displayName; time; price; size; supplement
|
||||||
|
- time: "28.01.2026 15:48:42" (deutsches Format)
|
||||||
|
- price: "46,18" (deutsches Dezimalformat)
|
||||||
|
- size: Menge (kann 0 sein für Kurse ohne Trade)
|
||||||
|
- supplement: "bez " = bezahlt (echter Trade), "G " = Geld (Bid), "B " = Brief (Ask)
|
||||||
|
"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def mic(self) -> str:
|
||||||
|
"""MIC Code für die Börse"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self) -> str:
|
||||||
|
return self.mic
|
||||||
|
|
||||||
|
def _generate_file_urls(self, target_date: datetime.date) -> List[str]:
|
||||||
|
"""
|
||||||
|
Generiert mögliche Datei-URLs für ein bestimmtes Datum.
|
||||||
|
Format: Mifir13DelayedData_{MIC}_{SEQUENCE}_{TIMESTAMP}.csv
|
||||||
|
|
||||||
|
Die Dateien werden stündlich mit einem Zeitstempel generiert.
|
||||||
|
Wir versuchen verschiedene Sequenznummern und Zeitstempel.
|
||||||
|
"""
|
||||||
|
urls = []
|
||||||
|
|
||||||
|
# Formatiere Datum im URL-Format: YYYYMMDD
|
||||||
|
date_str = target_date.strftime('%Y%m%d')
|
||||||
|
|
||||||
|
# Mögliche Sequenznummern (beobachtet: 000000DF, aber könnte variieren)
|
||||||
|
sequences = ['000000DF', '00000000', '000000DD', '000000DE']
|
||||||
|
|
||||||
|
# Generiere URLs für verschiedene Uhrzeiten (alle 15 Minuten)
|
||||||
|
for hour in range(0, 24):
|
||||||
|
for minute in [0, 15, 30, 45]:
|
||||||
|
timestamp = f"{date_str}{hour:02d}{minute:02d}000000"
|
||||||
|
for seq in sequences:
|
||||||
|
url = f"{BASE_URL}/Mifir13DelayedData_{self.mic}_{seq}_{timestamp}.csv"
|
||||||
|
urls.append(url)
|
||||||
|
|
||||||
|
# Versuche auch die einfachste Form mit 0000000000
|
||||||
|
for seq in sequences:
|
||||||
|
url = f"{BASE_URL}/Mifir13DelayedData_{self.mic}_{seq}_{date_str}0000000000.csv"
|
||||||
|
urls.append(url)
|
||||||
|
|
||||||
|
return urls
|
||||||
|
|
||||||
|
def _parse_german_datetime(self, dt_str: str) -> Optional[datetime]:
|
||||||
|
"""Parst deutsches Datumsformat: DD.MM.YYYY HH:MM:SS"""
|
||||||
|
try:
|
||||||
|
# Format: "28.01.2026 15:48:42"
|
||||||
|
dt = datetime.strptime(dt_str.strip(), '%d.%m.%Y %H:%M:%S')
|
||||||
|
# In UTC konvertieren (Deutsche Zeit = MEZ/MESZ, hier vereinfacht als UTC+1)
|
||||||
|
# Für korrektes Handling würde pytz benötigt
|
||||||
|
dt = dt.replace(tzinfo=timezone.utc)
|
||||||
|
return dt
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _parse_german_number(self, num_str: str) -> Optional[float]:
|
||||||
|
"""Parst deutsches Zahlenformat: 1.234,56 -> 1234.56"""
|
||||||
|
try:
|
||||||
|
# Entferne Tausender-Trennzeichen (Punkt) und ersetze Dezimalkomma
|
||||||
|
clean = num_str.strip().replace('.', '').replace(',', '.')
|
||||||
|
return float(clean)
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _download_and_parse_file(self, url: str) -> List[Trade]:
|
||||||
|
"""Lädt eine CSV-Datei herunter und parst die Trades"""
|
||||||
|
trades = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.get(url, headers=HEADERS, timeout=30)
|
||||||
|
|
||||||
|
if response.status_code == 404:
|
||||||
|
return []
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
content = response.text
|
||||||
|
if not content.strip():
|
||||||
|
return []
|
||||||
|
|
||||||
|
lines = content.strip().split('\n')
|
||||||
|
if len(lines) < 2: # Nur Header, keine Daten
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Erste Zeile ist Header
|
||||||
|
# MIC; ISIN; displayName; time; price; size; supplement
|
||||||
|
for line in lines[1:]:
|
||||||
|
if not line.strip():
|
||||||
|
continue
|
||||||
|
|
||||||
|
trade = self._parse_csv_line(line)
|
||||||
|
if trade:
|
||||||
|
trades.append(trade)
|
||||||
|
|
||||||
|
except requests.exceptions.HTTPError as e:
|
||||||
|
if e.response.status_code != 404:
|
||||||
|
print(f"[{self.name}] HTTP error: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[{self.name}] Error downloading {url}: {e}")
|
||||||
|
|
||||||
|
return trades
|
||||||
|
|
||||||
|
def _parse_csv_line(self, line: str) -> Optional[Trade]:
|
||||||
|
"""Parst eine einzelne CSV-Zeile"""
|
||||||
|
try:
|
||||||
|
# CSV ist Semikolon-separiert
|
||||||
|
parts = line.split(';')
|
||||||
|
if len(parts) < 7:
|
||||||
|
return None
|
||||||
|
|
||||||
|
mic = parts[0].strip()
|
||||||
|
isin = parts[1].strip()
|
||||||
|
display_name = parts[2].strip().strip('"')
|
||||||
|
time_str = parts[3].strip()
|
||||||
|
price_str = parts[4].strip()
|
||||||
|
size_str = parts[5].strip()
|
||||||
|
supplement = parts[6].strip().strip('"').strip()
|
||||||
|
|
||||||
|
# Validiere ISIN
|
||||||
|
if not isin or len(isin) != 12:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Parse Timestamp
|
||||||
|
timestamp = self._parse_german_datetime(time_str)
|
||||||
|
if not timestamp:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Parse Preis
|
||||||
|
price = self._parse_german_number(price_str)
|
||||||
|
if price is None or price <= 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Parse Menge
|
||||||
|
try:
|
||||||
|
size = float(size_str)
|
||||||
|
except ValueError:
|
||||||
|
size = 0
|
||||||
|
|
||||||
|
# Nur echte Trades (size > 0) oder "bez" (bezahlt) aufnehmen
|
||||||
|
# "G" = Geld (Bid), "B" = Brief (Ask) sind Kurse, keine Trades
|
||||||
|
is_trade = size > 0 or 'bez' in supplement.lower()
|
||||||
|
|
||||||
|
if not is_trade:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Bei size = 0 aber "bez" nehmen wir an, dass die Menge unbekannt ist (setze auf 1)
|
||||||
|
if size <= 0:
|
||||||
|
size = 1
|
||||||
|
|
||||||
|
return Trade(
|
||||||
|
exchange=self.name,
|
||||||
|
symbol=isin,
|
||||||
|
isin=isin,
|
||||||
|
price=price,
|
||||||
|
quantity=size,
|
||||||
|
timestamp=timestamp
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date:
|
||||||
|
"""Findet den letzten Handelstag (überspringt Wochenenden)"""
|
||||||
|
date = from_date
|
||||||
|
if date.weekday() == 5: # Samstag
|
||||||
|
date = date - timedelta(days=1)
|
||||||
|
elif date.weekday() == 6: # Sonntag
|
||||||
|
date = date - timedelta(days=2)
|
||||||
|
return date
|
||||||
|
|
||||||
|
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
|
||||||
|
"""Holt alle Trades vom letzten Handelstag"""
|
||||||
|
all_trades = []
|
||||||
|
|
||||||
|
# Bestimme Zieldatum
|
||||||
|
if since_date:
|
||||||
|
target_date = since_date.date() if hasattr(since_date, 'date') else since_date
|
||||||
|
else:
|
||||||
|
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
|
||||||
|
|
||||||
|
# Überspringe Wochenenden
|
||||||
|
original_date = target_date
|
||||||
|
target_date = self._get_last_trading_day(target_date)
|
||||||
|
|
||||||
|
if target_date != original_date:
|
||||||
|
print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}")
|
||||||
|
|
||||||
|
print(f"[{self.name}] Fetching trades for date: {target_date}")
|
||||||
|
|
||||||
|
# Generiere mögliche URLs
|
||||||
|
urls = self._generate_file_urls(target_date)
|
||||||
|
|
||||||
|
successful = 0
|
||||||
|
total_urls = len(urls)
|
||||||
|
|
||||||
|
# Versuche verschiedene URLs
|
||||||
|
for i, url in enumerate(urls):
|
||||||
|
trades = self._download_and_parse_file(url)
|
||||||
|
if trades:
|
||||||
|
all_trades.extend(trades)
|
||||||
|
successful += 1
|
||||||
|
print(f"[{self.name}] Found {len(trades)} trades from: {url.split('/')[-1]}")
|
||||||
|
# Bei Erfolg müssen wir nicht alle anderen URLs probieren
|
||||||
|
break
|
||||||
|
|
||||||
|
# Rate-Limiting
|
||||||
|
if i < total_urls - 1:
|
||||||
|
time.sleep(RATE_LIMIT_DELAY)
|
||||||
|
|
||||||
|
# Nach 20 fehlgeschlagenen Versuchen abbrechen
|
||||||
|
if i > 20 and successful == 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
print(f"[{self.name}] Total trades fetched: {len(all_trades)}")
|
||||||
|
return all_trades
|
||||||
|
|
||||||
|
|
||||||
|
# Konkrete Exchange-Klassen
|
||||||
|
class DUSAExchange(BoersenagBase):
|
||||||
|
"""Börse Düsseldorf Regulierter Markt"""
|
||||||
|
@property
|
||||||
|
def mic(self) -> str:
|
||||||
|
return "DUSA"
|
||||||
|
|
||||||
|
|
||||||
|
class DUSBExchange(BoersenagBase):
|
||||||
|
"""Börse Düsseldorf Freiverkehr"""
|
||||||
|
@property
|
||||||
|
def mic(self) -> str:
|
||||||
|
return "DUSB"
|
||||||
|
|
||||||
|
|
||||||
|
class DUSCExchange(BoersenagBase):
|
||||||
|
"""Börse Düsseldorf Quotrix Regulierter Markt"""
|
||||||
|
@property
|
||||||
|
def mic(self) -> str:
|
||||||
|
return "DUSC"
|
||||||
|
|
||||||
|
|
||||||
|
class DUSDExchange(BoersenagBase):
|
||||||
|
"""Börse Düsseldorf Quotrix Freiverkehr"""
|
||||||
|
@property
|
||||||
|
def mic(self) -> str:
|
||||||
|
return "DUSD"
|
||||||
|
|
||||||
|
|
||||||
|
class HAMAExchange(BoersenagBase):
|
||||||
|
"""Börse Hamburg Regulierter Markt"""
|
||||||
|
@property
|
||||||
|
def mic(self) -> str:
|
||||||
|
return "HAMA"
|
||||||
|
|
||||||
|
|
||||||
|
class HAMBExchange(BoersenagBase):
|
||||||
|
"""Börse Hamburg Freiverkehr"""
|
||||||
|
@property
|
||||||
|
def mic(self) -> str:
|
||||||
|
return "HAMB"
|
||||||
|
|
||||||
|
|
||||||
|
class HANAExchange(BoersenagBase):
|
||||||
|
"""Börse Hannover Regulierter Markt"""
|
||||||
|
@property
|
||||||
|
def mic(self) -> str:
|
||||||
|
return "HANA"
|
||||||
|
|
||||||
|
|
||||||
|
class HANBExchange(BoersenagBase):
|
||||||
|
"""Börse Hannover Freiverkehr"""
|
||||||
|
@property
|
||||||
|
def mic(self) -> str:
|
||||||
|
return "HANB"
|
||||||
@@ -2,16 +2,20 @@ import requests
|
|||||||
import gzip
|
import gzip
|
||||||
import json
|
import json
|
||||||
import io
|
import io
|
||||||
|
import re
|
||||||
import time
|
import time
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
from .base import BaseExchange, Trade
|
from .base import BaseExchange, Trade
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Rate-Limiting Konfiguration
|
# Rate-Limiting Konfiguration
|
||||||
RATE_LIMIT_DELAY = 0.5 # Sekunden zwischen Requests
|
RATE_LIMIT_DELAY = 0.5 # Sekunden zwischen Requests
|
||||||
RATE_LIMIT_RETRY_DELAY = 5 # Sekunden Wartezeit bei 429
|
RATE_LIMIT_RETRY_DELAY = 5 # Sekunden Wartezeit bei 429
|
||||||
MAX_RETRIES = 3 # Maximale Wiederholungen bei 429
|
MAX_RETRIES = 5 # Maximale Wiederholungen bei 429
|
||||||
|
|
||||||
# API URLs für Deutsche Börse
|
# API URLs für Deutsche Börse
|
||||||
API_URLS = {
|
API_URLS = {
|
||||||
@@ -21,17 +25,47 @@ API_URLS = {
|
|||||||
}
|
}
|
||||||
DOWNLOAD_BASE_URL = "https://mfs.deutsche-boerse.com/api/download"
|
DOWNLOAD_BASE_URL = "https://mfs.deutsche-boerse.com/api/download"
|
||||||
|
|
||||||
# Browser User-Agent für Zugriff
|
# Liste von User-Agents für Rotation bei Rate-Limiting
|
||||||
HEADERS = {
|
USER_AGENTS = [
|
||||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||||
'Accept': 'application/json, application/gzip, */*',
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
|
||||||
'Referer': 'https://mfs.deutsche-boerse.com/',
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0',
|
||||||
}
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
|
||||||
|
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||||
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
|
||||||
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0',
|
||||||
|
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0',
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class UserAgentRotator:
|
||||||
|
"""Thread-safe User-Agent Rotation"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._index = 0
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def get_headers(self, rotate: bool = False) -> dict:
|
||||||
|
"""Gibt Headers mit aktuellem User-Agent zurück. Bei rotate=True wird zum nächsten gewechselt."""
|
||||||
|
with self._lock:
|
||||||
|
if rotate:
|
||||||
|
self._index = (self._index + 1) % len(USER_AGENTS)
|
||||||
|
return {
|
||||||
|
'User-Agent': USER_AGENTS[self._index],
|
||||||
|
'Accept': 'application/json, application/gzip, */*',
|
||||||
|
'Referer': 'https://mfs.deutsche-boerse.com/',
|
||||||
|
}
|
||||||
|
|
||||||
|
# Globale Instanz für User-Agent Rotation
|
||||||
|
_ua_rotator = UserAgentRotator()
|
||||||
|
|
||||||
|
|
||||||
class DeutscheBoerseBase(BaseExchange):
|
class DeutscheBoerseBase(BaseExchange):
|
||||||
"""Basisklasse für Deutsche Börse Exchanges (Xetra, Frankfurt, Quotrix)"""
|
"""Basisklasse für Deutsche Börse Exchanges (Xetra, Frankfurt, Quotrix)"""
|
||||||
|
|
||||||
|
# Regex für Dateinamen-Parsing (kompiliert für Performance)
|
||||||
|
_FILENAME_PATTERN = re.compile(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def base_url(self) -> str:
|
def base_url(self) -> str:
|
||||||
"""Override in subclasses"""
|
"""Override in subclasses"""
|
||||||
@@ -46,60 +80,73 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
"""API URL für die Dateiliste"""
|
"""API URL für die Dateiliste"""
|
||||||
return API_URLS.get(self.name, self.base_url)
|
return API_URLS.get(self.name, self.base_url)
|
||||||
|
|
||||||
|
def _handle_rate_limit(self, retry: int, context: str) -> None:
|
||||||
|
"""Zentrale Rate-Limit Behandlung: rotiert User-Agent und wartet."""
|
||||||
|
_ua_rotator.get_headers(rotate=True)
|
||||||
|
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
|
||||||
|
logger.warning(f"[{self.name}] Rate limited ({context}), rotating User-Agent and waiting {wait_time}s... (retry {retry + 1}/{MAX_RETRIES})")
|
||||||
|
time.sleep(wait_time)
|
||||||
|
|
||||||
def _get_file_list(self) -> List[str]:
|
def _get_file_list(self) -> List[str]:
|
||||||
"""Holt die Dateiliste von der JSON API"""
|
"""Holt die Dateiliste von der JSON API"""
|
||||||
try:
|
api_url = self.api_url
|
||||||
api_url = self.api_url
|
|
||||||
print(f"[{self.name}] Fetching file list from: {api_url}")
|
for retry in range(MAX_RETRIES):
|
||||||
response = requests.get(api_url, headers=HEADERS, timeout=30)
|
try:
|
||||||
response.raise_for_status()
|
headers = _ua_rotator.get_headers(rotate=(retry > 0))
|
||||||
|
logger.info(f"[{self.name}] Fetching file list from: {api_url}")
|
||||||
data = response.json()
|
response = requests.get(api_url, headers=headers, timeout=30)
|
||||||
files = data.get('CurrentFiles', [])
|
|
||||||
|
if response.status_code == 429:
|
||||||
print(f"[{self.name}] API returned {len(files)} files")
|
self._handle_rate_limit(retry, "file list")
|
||||||
if files:
|
continue
|
||||||
print(f"[{self.name}] Sample files: {files[:3]}")
|
|
||||||
return files
|
response.raise_for_status()
|
||||||
|
|
||||||
except Exception as e:
|
data = response.json()
|
||||||
print(f"[{self.name}] Error fetching file list from API: {e}")
|
files = data.get('CurrentFiles', [])
|
||||||
import traceback
|
|
||||||
print(f"[{self.name}] Traceback: {traceback.format_exc()}")
|
logger.info(f"[{self.name}] API returned {len(files)} files")
|
||||||
return []
|
if files:
|
||||||
|
logger.debug(f"[{self.name}] Sample files: {files[:3]}")
|
||||||
|
return files
|
||||||
|
|
||||||
|
except requests.exceptions.HTTPError as e:
|
||||||
|
if e.response.status_code == 429:
|
||||||
|
self._handle_rate_limit(retry, "file list HTTPError")
|
||||||
|
continue
|
||||||
|
logger.error(f"[{self.name}] HTTP error fetching file list: {e}")
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"[{self.name}] Error fetching file list from API: {e}")
|
||||||
|
break
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]:
|
def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]:
|
||||||
"""
|
"""
|
||||||
Filtert Dateien für ein bestimmtes Datum.
|
Filtert Dateien für ein bestimmtes Datum.
|
||||||
Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz (mit Unterstrich!)
|
Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz
|
||||||
|
|
||||||
Da Handel bis 22:00 MEZ geht (21:00/20:00 UTC), müssen wir auch
|
Da Handel bis 22:00 MEZ geht (21:00/20:00 UTC), müssen wir auch
|
||||||
Dateien nach Mitternacht UTC berücksichtigen.
|
Dateien nach Mitternacht UTC berücksichtigen.
|
||||||
"""
|
"""
|
||||||
import re
|
|
||||||
filtered = []
|
filtered = []
|
||||||
|
|
||||||
# Für den Vortag: Dateien vom target_date UND vom Folgetag (bis ~02:00 UTC)
|
|
||||||
target_str = target_date.strftime('%Y-%m-%d')
|
target_str = target_date.strftime('%Y-%m-%d')
|
||||||
next_day = target_date + timedelta(days=1)
|
next_day = target_date + timedelta(days=1)
|
||||||
next_day_str = next_day.strftime('%Y-%m-%d')
|
next_day_str = next_day.strftime('%Y-%m-%d')
|
||||||
|
|
||||||
for file in files:
|
for file in files:
|
||||||
# Extrahiere Datum aus Dateiname
|
|
||||||
# Format: DETR-posttrade-2026-01-26T21_30.json.gz
|
|
||||||
if target_str in file:
|
if target_str in file:
|
||||||
filtered.append(file)
|
filtered.append(file)
|
||||||
elif next_day_str in file:
|
elif next_day_str in file:
|
||||||
# Prüfe ob es eine frühe Datei vom nächsten Tag ist (< 03:00 UTC)
|
# Prüfe ob es eine frühe Datei vom nächsten Tag ist (< 03:00 UTC)
|
||||||
try:
|
match = self._FILENAME_PATTERN.search(file)
|
||||||
# Finde Timestamp im Dateinamen mit Unterstrich für Minuten
|
if match:
|
||||||
match = re.search(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})', file)
|
hour = int(match.group(2))
|
||||||
if match:
|
if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag
|
||||||
hour = int(match.group(2))
|
filtered.append(file)
|
||||||
if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag
|
|
||||||
filtered.append(file)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return filtered
|
return filtered
|
||||||
|
|
||||||
@@ -110,17 +157,14 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
|
|
||||||
for retry in range(MAX_RETRIES):
|
for retry in range(MAX_RETRIES):
|
||||||
try:
|
try:
|
||||||
response = requests.get(full_url, headers=HEADERS, timeout=60)
|
headers = _ua_rotator.get_headers(rotate=(retry > 0))
|
||||||
|
response = requests.get(full_url, headers=headers, timeout=60)
|
||||||
|
|
||||||
if response.status_code == 404:
|
if response.status_code == 404:
|
||||||
# Datei nicht gefunden - normal für alte Dateien
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
if response.status_code == 429:
|
if response.status_code == 429:
|
||||||
# Rate-Limit erreicht - warten und erneut versuchen
|
self._handle_rate_limit(retry, "download")
|
||||||
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
|
|
||||||
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
|
|
||||||
time.sleep(wait_time)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
@@ -130,13 +174,11 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
content = f.read().decode('utf-8')
|
content = f.read().decode('utf-8')
|
||||||
|
|
||||||
if not content.strip():
|
if not content.strip():
|
||||||
# Leere Datei
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# NDJSON Format: Eine JSON-Zeile pro Trade
|
# NDJSON Format: Eine JSON-Zeile pro Trade
|
||||||
lines = content.strip().split('\n')
|
lines = content.strip().split('\n')
|
||||||
if not lines or (len(lines) == 1 and not lines[0].strip()):
|
if not lines or (len(lines) == 1 and not lines[0].strip()):
|
||||||
# Leere Datei
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
for line in lines:
|
for line in lines:
|
||||||
@@ -147,116 +189,146 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
trade = self._parse_trade_record(record)
|
trade = self._parse_trade_record(record)
|
||||||
if trade:
|
if trade:
|
||||||
trades.append(trade)
|
trades.append(trade)
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError as e:
|
||||||
continue
|
logger.debug(f"[{self.name}] JSON decode error in {filename}: {e}")
|
||||||
except Exception:
|
except Exception as e:
|
||||||
continue
|
logger.debug(f"[{self.name}] Error parsing record in {filename}: {e}")
|
||||||
|
|
||||||
# Erfolg - keine weitere Retry nötig
|
# Erfolg
|
||||||
break
|
break
|
||||||
|
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
if e.response.status_code == 429:
|
if e.response.status_code == 429:
|
||||||
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
|
self._handle_rate_limit(retry, "download HTTPError")
|
||||||
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
|
|
||||||
time.sleep(wait_time)
|
|
||||||
continue
|
continue
|
||||||
elif e.response.status_code != 404:
|
elif e.response.status_code != 404:
|
||||||
print(f"[{self.name}] HTTP error downloading {filename}: {e}")
|
logger.error(f"[{self.name}] HTTP error downloading {filename}: {e}")
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[{self.name}] Error downloading/parsing {filename}: {e}")
|
logger.error(f"[{self.name}] Error downloading/parsing {filename}: {e}")
|
||||||
break
|
break
|
||||||
|
|
||||||
return trades
|
return trades
|
||||||
|
|
||||||
|
def _parse_timestamp(self, ts_str: str) -> Optional[datetime]:
|
||||||
|
"""
|
||||||
|
Parst einen Timestamp-String in ein datetime-Objekt.
|
||||||
|
Unterstützt Nanosekunden durch Kürzung auf Mikrosekunden.
|
||||||
|
"""
|
||||||
|
if not ts_str:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Ersetze 'Z' durch '+00:00' für ISO-Kompatibilität
|
||||||
|
ts_str = ts_str.replace('Z', '+00:00')
|
||||||
|
|
||||||
|
# Kürze Nanosekunden auf Mikrosekunden (Python max 6 Dezimalstellen)
|
||||||
|
if '.' in ts_str:
|
||||||
|
# Split bei '+' oder '-' für Timezone
|
||||||
|
if '+' in ts_str:
|
||||||
|
time_part, tz_part = ts_str.rsplit('+', 1)
|
||||||
|
tz_part = '+' + tz_part
|
||||||
|
elif ts_str.count('-') > 2: # Negative Timezone
|
||||||
|
time_part, tz_part = ts_str.rsplit('-', 1)
|
||||||
|
tz_part = '-' + tz_part
|
||||||
|
else:
|
||||||
|
time_part, tz_part = ts_str, ''
|
||||||
|
|
||||||
|
if '.' in time_part:
|
||||||
|
base, frac = time_part.split('.')
|
||||||
|
frac = frac[:6] # Kürze auf 6 Stellen
|
||||||
|
ts_str = f"{base}.{frac}{tz_part}"
|
||||||
|
|
||||||
|
return datetime.fromisoformat(ts_str)
|
||||||
|
|
||||||
|
def _extract_price(self, record: dict) -> Optional[float]:
|
||||||
|
"""Extrahiert den Preis aus verschiedenen JSON-Formaten."""
|
||||||
|
# Neues Format
|
||||||
|
if 'lastTrade' in record:
|
||||||
|
return float(record['lastTrade'])
|
||||||
|
|
||||||
|
# Altes Format mit verschachteltem Pric-Objekt
|
||||||
|
pric = record.get('Pric')
|
||||||
|
if pric is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if isinstance(pric, (int, float)):
|
||||||
|
return float(pric)
|
||||||
|
|
||||||
|
if isinstance(pric, dict):
|
||||||
|
# Versuche verschiedene Pfade
|
||||||
|
if 'Pric' in pric:
|
||||||
|
inner = pric['Pric']
|
||||||
|
if isinstance(inner, dict):
|
||||||
|
amt = inner.get('MntryVal', {}).get('Amt') or inner.get('Amt')
|
||||||
|
if amt is not None:
|
||||||
|
return float(amt)
|
||||||
|
if 'MntryVal' in pric:
|
||||||
|
amt = pric['MntryVal'].get('Amt')
|
||||||
|
if amt is not None:
|
||||||
|
return float(amt)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _extract_quantity(self, record: dict) -> Optional[float]:
|
||||||
|
"""Extrahiert die Menge aus verschiedenen JSON-Formaten."""
|
||||||
|
# Neues Format
|
||||||
|
if 'lastQty' in record:
|
||||||
|
return float(record['lastQty'])
|
||||||
|
|
||||||
|
# Altes Format
|
||||||
|
qty = record.get('Qty')
|
||||||
|
if qty is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if isinstance(qty, (int, float)):
|
||||||
|
return float(qty)
|
||||||
|
|
||||||
|
if isinstance(qty, dict):
|
||||||
|
val = qty.get('Unit') or qty.get('Qty')
|
||||||
|
if val is not None:
|
||||||
|
return float(val)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
def _parse_trade_record(self, record: dict) -> Optional[Trade]:
|
def _parse_trade_record(self, record: dict) -> Optional[Trade]:
|
||||||
"""
|
"""
|
||||||
Parst einen einzelnen Trade-Record aus dem JSON.
|
Parst einen einzelnen Trade-Record aus dem JSON.
|
||||||
|
|
||||||
Aktuelles JSON-Format (NDJSON):
|
Unterstützte Formate:
|
||||||
{
|
- Neues Format: isin, lastTrade, lastQty, lastTradeTime
|
||||||
"messageId": "posttrade",
|
- Altes Format: FinInstrmId.Id, Pric, Qty, TrdDt/TrdTm
|
||||||
"sourceName": "GAT",
|
|
||||||
"isin": "US00123Q1040",
|
|
||||||
"lastTradeTime": "2026-01-29T14:07:00.419000000Z",
|
|
||||||
"lastTrade": 10.145,
|
|
||||||
"lastQty": 500.0,
|
|
||||||
"currency": "EUR",
|
|
||||||
...
|
|
||||||
}
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# ISIN extrahieren - neues Format verwendet 'isin' lowercase
|
# ISIN extrahieren
|
||||||
isin = record.get('isin') or record.get('ISIN') or record.get('instrumentId') or record.get('FinInstrmId', {}).get('Id', '')
|
isin = (
|
||||||
|
record.get('isin') or
|
||||||
|
record.get('ISIN') or
|
||||||
|
record.get('instrumentId') or
|
||||||
|
record.get('FinInstrmId', {}).get('Id', '')
|
||||||
|
)
|
||||||
if not isin:
|
if not isin:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Preis extrahieren - neues Format: 'lastTrade'
|
# Preis extrahieren
|
||||||
price = None
|
price = self._extract_price(record)
|
||||||
if 'lastTrade' in record:
|
|
||||||
price = float(record['lastTrade'])
|
|
||||||
elif 'Pric' in record:
|
|
||||||
pric = record['Pric']
|
|
||||||
if isinstance(pric, dict):
|
|
||||||
if 'Pric' in pric:
|
|
||||||
inner = pric['Pric']
|
|
||||||
if 'MntryVal' in inner:
|
|
||||||
price = float(inner['MntryVal'].get('Amt', 0))
|
|
||||||
elif 'Amt' in inner:
|
|
||||||
price = float(inner['Amt'])
|
|
||||||
elif 'MntryVal' in pric:
|
|
||||||
price = float(pric['MntryVal'].get('Amt', 0))
|
|
||||||
elif isinstance(pric, (int, float)):
|
|
||||||
price = float(pric)
|
|
||||||
|
|
||||||
if price is None or price <= 0:
|
if price is None or price <= 0:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Menge extrahieren - neues Format: 'lastQty'
|
# Menge extrahieren
|
||||||
quantity = None
|
quantity = self._extract_quantity(record)
|
||||||
if 'lastQty' in record:
|
|
||||||
quantity = float(record['lastQty'])
|
|
||||||
elif 'Qty' in record:
|
|
||||||
qty = record['Qty']
|
|
||||||
if isinstance(qty, dict):
|
|
||||||
quantity = float(qty.get('Unit', qty.get('Qty', 0)))
|
|
||||||
elif isinstance(qty, (int, float)):
|
|
||||||
quantity = float(qty)
|
|
||||||
|
|
||||||
if quantity is None or quantity <= 0:
|
if quantity is None or quantity <= 0:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Timestamp extrahieren - neues Format: 'lastTradeTime'
|
# Timestamp extrahieren
|
||||||
timestamp = None
|
timestamp = None
|
||||||
if 'lastTradeTime' in record:
|
if 'lastTradeTime' in record:
|
||||||
ts_str = record['lastTradeTime']
|
timestamp = self._parse_timestamp(record['lastTradeTime'])
|
||||||
# Format: "2026-01-29T14:07:00.419000000Z"
|
|
||||||
# Python kann max 6 Dezimalstellen, also kürzen
|
|
||||||
if '.' in ts_str:
|
|
||||||
parts = ts_str.replace('Z', '').split('.')
|
|
||||||
if len(parts) == 2 and len(parts[1]) > 6:
|
|
||||||
ts_str = parts[0] + '.' + parts[1][:6] + '+00:00'
|
|
||||||
else:
|
|
||||||
ts_str = ts_str.replace('Z', '+00:00')
|
|
||||||
else:
|
|
||||||
ts_str = ts_str.replace('Z', '+00:00')
|
|
||||||
timestamp = datetime.fromisoformat(ts_str)
|
|
||||||
else:
|
else:
|
||||||
# Fallback für altes Format
|
# Fallback für altes Format
|
||||||
trd_dt = record.get('TrdDt', '')
|
trd_dt = record.get('TrdDt', '')
|
||||||
trd_tm = record.get('TrdTm', '00:00:00')
|
trd_tm = record.get('TrdTm', '00:00:00')
|
||||||
|
if trd_dt:
|
||||||
if not trd_dt:
|
timestamp = self._parse_timestamp(f"{trd_dt}T{trd_tm}")
|
||||||
return None
|
|
||||||
|
|
||||||
ts_str = f"{trd_dt}T{trd_tm}"
|
|
||||||
if '.' in ts_str:
|
|
||||||
parts = ts_str.split('.')
|
|
||||||
if len(parts[1]) > 6:
|
|
||||||
ts_str = parts[0] + '.' + parts[1][:6]
|
|
||||||
|
|
||||||
timestamp = datetime.fromisoformat(ts_str)
|
|
||||||
|
|
||||||
if timestamp is None:
|
if timestamp is None:
|
||||||
return None
|
return None
|
||||||
@@ -273,22 +345,41 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
timestamp=timestamp
|
timestamp=timestamp
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except (ValueError, TypeError, KeyError) as e:
|
||||||
# Debug: Zeige ersten fehlgeschlagenen Record
|
logger.debug(f"[{self.name}] Failed to parse trade record: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date:
|
def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date:
|
||||||
"""
|
"""
|
||||||
Findet den letzten Handelstag (überspringt Wochenenden).
|
Findet den letzten Handelstag (überspringt Wochenenden und bekannte Feiertage).
|
||||||
Montag=0, Sonntag=6
|
Montag=0, Sonntag=6
|
||||||
"""
|
"""
|
||||||
|
# Deutsche Börsen-Feiertage (fixe Daten, jedes Jahr gleich)
|
||||||
|
# Bewegliche Feiertage (Ostern etc.) müssten jährlich berechnet werden
|
||||||
|
fixed_holidays = {
|
||||||
|
(1, 1), # Neujahr
|
||||||
|
(5, 1), # Tag der Arbeit
|
||||||
|
(12, 24), # Heiligabend
|
||||||
|
(12, 25), # 1. Weihnachtstag
|
||||||
|
(12, 26), # 2. Weihnachtstag
|
||||||
|
(12, 31), # Silvester
|
||||||
|
}
|
||||||
|
|
||||||
date = from_date
|
date = from_date
|
||||||
# Wenn Samstag (5), gehe zurück zu Freitag
|
max_iterations = 10 # Sicherheit gegen Endlosschleife
|
||||||
if date.weekday() == 5:
|
|
||||||
date = date - timedelta(days=1)
|
for _ in range(max_iterations):
|
||||||
# Wenn Sonntag (6), gehe zurück zu Freitag
|
# Wochenende überspringen
|
||||||
elif date.weekday() == 6:
|
if date.weekday() == 5: # Samstag
|
||||||
date = date - timedelta(days=2)
|
date = date - timedelta(days=1)
|
||||||
|
elif date.weekday() == 6: # Sonntag
|
||||||
|
date = date - timedelta(days=2)
|
||||||
|
# Feiertag überspringen
|
||||||
|
elif (date.month, date.day) in fixed_holidays:
|
||||||
|
date = date - timedelta(days=1)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
return date
|
return date
|
||||||
|
|
||||||
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
|
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
|
||||||
@@ -304,40 +395,36 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
# Standard: Vortag
|
# Standard: Vortag
|
||||||
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
|
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
|
||||||
|
|
||||||
# Überspringe Wochenenden
|
# Überspringe Wochenenden und Feiertage
|
||||||
original_date = target_date
|
original_date = target_date
|
||||||
target_date = self._get_last_trading_day(target_date)
|
target_date = self._get_last_trading_day(target_date)
|
||||||
|
|
||||||
if target_date != original_date:
|
if target_date != original_date:
|
||||||
print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}")
|
logger.info(f"[{self.name}] Adjusted date: {original_date} -> {target_date} (weekend/holiday)")
|
||||||
|
|
||||||
print(f"[{self.name}] Fetching trades for date: {target_date}")
|
logger.info(f"[{self.name}] Fetching trades for date: {target_date}")
|
||||||
|
|
||||||
# Hole Dateiliste von der API
|
# Hole Dateiliste von der API
|
||||||
files = self._get_file_list()
|
files = self._get_file_list()
|
||||||
|
|
||||||
if not files:
|
if not files:
|
||||||
print(f"[{self.name}] No files available from API")
|
logger.warning(f"[{self.name}] No files available from API")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Dateien für Zieldatum filtern
|
# Dateien für Zieldatum filtern
|
||||||
target_files = self._filter_files_for_date(files, target_date)
|
target_files = self._filter_files_for_date(files, target_date)
|
||||||
print(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)")
|
logger.info(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)")
|
||||||
|
|
||||||
if not target_files:
|
if not target_files:
|
||||||
print(f"[{self.name}] No files for target date found")
|
logger.warning(f"[{self.name}] No files for target date found")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Alle passenden Dateien herunterladen und parsen (mit Rate-Limiting)
|
# Alle passenden Dateien herunterladen und parsen
|
||||||
successful = 0
|
successful = 0
|
||||||
failed = 0
|
failed = 0
|
||||||
total_files = len(target_files)
|
total_files = len(target_files)
|
||||||
|
|
||||||
if total_files == 0:
|
logger.info(f"[{self.name}] Starting download of {total_files} files...")
|
||||||
print(f"[{self.name}] No files to download for date {target_date}")
|
|
||||||
return []
|
|
||||||
|
|
||||||
print(f"[{self.name}] Starting download of {total_files} files...")
|
|
||||||
|
|
||||||
for i, file in enumerate(target_files):
|
for i, file in enumerate(target_files):
|
||||||
trades = self._download_and_parse_file(file)
|
trades = self._download_and_parse_file(file)
|
||||||
@@ -353,9 +440,9 @@ class DeutscheBoerseBase(BaseExchange):
|
|||||||
|
|
||||||
# Fortschritt alle 100 Dateien
|
# Fortschritt alle 100 Dateien
|
||||||
if (i + 1) % 100 == 0:
|
if (i + 1) % 100 == 0:
|
||||||
print(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far")
|
logger.info(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far")
|
||||||
|
|
||||||
print(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades")
|
logger.info(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades")
|
||||||
return all_trades
|
return all_trades
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,30 +1,38 @@
|
|||||||
import requests
|
import requests
|
||||||
import json
|
import logging
|
||||||
from bs4 import BeautifulSoup
|
from datetime import datetime, timezone
|
||||||
from datetime import datetime
|
from typing import List, Generator, Tuple
|
||||||
from typing import List
|
|
||||||
from .base import BaseExchange, Trade
|
from .base import BaseExchange, Trade
|
||||||
import csv
|
import csv
|
||||||
import io
|
import io
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class EIXExchange(BaseExchange):
|
class EIXExchange(BaseExchange):
|
||||||
|
"""European Investor Exchange - CSV-basierte Trade-Daten."""
|
||||||
|
|
||||||
|
API_BASE_URL = "https://european-investor-exchange.com/api"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self) -> str:
|
def name(self) -> str:
|
||||||
return "EIX"
|
return "EIX"
|
||||||
|
|
||||||
def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None) -> List[Trade]:
|
def get_files_to_process(self, limit: int = 1, since_date: datetime = None) -> List[dict]:
|
||||||
# EIX stores its file list in a separate API endpoint
|
"""Holt die Liste der zu verarbeitenden Dateien ohne sie herunterzuladen."""
|
||||||
url = "https://european-investor-exchange.com/api/official-trades"
|
url = f"{self.API_BASE_URL}/official-trades"
|
||||||
try:
|
try:
|
||||||
response = requests.get(url, timeout=15)
|
response = requests.get(url, timeout=15)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
files_list = response.json()
|
files_list = response.json()
|
||||||
except Exception as e:
|
except requests.exceptions.RequestException as e:
|
||||||
print(f"Error fetching EIX file list: {e}")
|
logger.error(f"[{self.name}] Fehler beim Abrufen der Dateiliste: {e}")
|
||||||
|
return []
|
||||||
|
except ValueError as e:
|
||||||
|
logger.error(f"[{self.name}] Ungültiges JSON in Dateiliste: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Filter files based on date in filename if since_date provided
|
# Filtere Dateien nach Datum wenn since_date angegeben
|
||||||
# Format: "kursblatt/2025/Kursblatt.2025-07-14.1752526803105.csv"
|
|
||||||
filtered_files = []
|
filtered_files = []
|
||||||
for item in files_list:
|
for item in files_list:
|
||||||
file_key = item.get('fileName')
|
file_key = item.get('fileName')
|
||||||
@@ -33,90 +41,118 @@ class EIXExchange(BaseExchange):
|
|||||||
|
|
||||||
if since_date:
|
if since_date:
|
||||||
try:
|
try:
|
||||||
# Extract date from filename: Kursblatt.YYYY-MM-DD
|
|
||||||
parts = file_key.split('/')[-1].split('.')
|
parts = file_key.split('/')[-1].split('.')
|
||||||
# parts example: ['Kursblatt', '2025-07-14', '1752526803105', 'csv']
|
|
||||||
if len(parts) >= 2:
|
if len(parts) >= 2:
|
||||||
date_str = parts[1]
|
date_str = parts[1]
|
||||||
file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
|
file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
|
||||||
|
|
||||||
# Check if file date is newer than since_date (compare dates only)
|
if file_date.date() >= since_date.date():
|
||||||
if file_date.date() > since_date.date():
|
|
||||||
filtered_files.append(item)
|
filtered_files.append(item)
|
||||||
continue
|
except (ValueError, IndexError) as e:
|
||||||
# If same day, we might need to check it too, but EIX seems to be daily files
|
# Dateiname hat unerwartetes Format - zur Sicherheit einschließen
|
||||||
if file_date.date() == since_date.date():
|
logger.debug(f"[{self.name}] Konnte Datum nicht aus {file_key} extrahieren: {e}")
|
||||||
filtered_files.append(item)
|
|
||||||
continue
|
|
||||||
except Exception:
|
|
||||||
# If parsing fails, default to including it (safety) or skipping?
|
|
||||||
# Let's include it if we are not sure
|
|
||||||
filtered_files.append(item)
|
filtered_files.append(item)
|
||||||
else:
|
else:
|
||||||
filtered_files.append(item)
|
filtered_files.append(item)
|
||||||
|
|
||||||
# Sort files to process oldest to newest if doing a sync, or newest to oldest?
|
|
||||||
# If we have limit=1 (default), we usually want the newest.
|
|
||||||
# But if we are syncing history (since_date set), we probably want all of them.
|
|
||||||
|
|
||||||
# Logic: If since_date is set, we ignore limit (or use it as safety cap) and process ALL new files
|
|
||||||
if since_date:
|
if since_date:
|
||||||
files_to_process = filtered_files
|
return filtered_files
|
||||||
# Sort by date ? The API list seems chronological.
|
|
||||||
else:
|
else:
|
||||||
# Default behavior: take the last N files (API returns oldest first usually?)
|
|
||||||
# Let's assume list is chronological.
|
|
||||||
if limit:
|
if limit:
|
||||||
files_to_process = files_list[-limit:]
|
return files_list[-limit:]
|
||||||
else:
|
return files_list
|
||||||
files_to_process = files_list
|
|
||||||
|
def fetch_trades_from_file(self, file_item: dict) -> List[Trade]:
|
||||||
|
"""Lädt und parst eine einzelne CSV-Datei."""
|
||||||
|
file_key = file_item.get('fileName')
|
||||||
|
if not file_key:
|
||||||
|
return []
|
||||||
|
|
||||||
|
csv_url = f"{self.API_BASE_URL}/trade-file-contents?key={file_key}"
|
||||||
|
try:
|
||||||
|
response = requests.get(csv_url, timeout=60)
|
||||||
|
response.raise_for_status()
|
||||||
|
return self._parse_csv(response.text)
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"[{self.name}] Fehler beim Download von {file_key}: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[{self.name}] Unerwarteter Fehler bei {file_key}: {e}")
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
def fetch_trades_streaming(self, limit: int = 1, since_date: datetime = None) -> Generator[Tuple[str, List[Trade]], None, None]:
|
||||||
|
"""
|
||||||
|
Generator der Trades dateiweise zurückgibt.
|
||||||
|
Yields: (filename, trades) Tupel
|
||||||
|
"""
|
||||||
|
files = self.get_files_to_process(limit=limit, since_date=since_date)
|
||||||
|
|
||||||
|
for item in files:
|
||||||
|
file_key = item.get('fileName', 'unknown')
|
||||||
|
trades = self.fetch_trades_from_file(item)
|
||||||
|
if trades:
|
||||||
|
yield (file_key, trades)
|
||||||
|
|
||||||
trades = []
|
def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None, **kwargs) -> List[Trade]:
|
||||||
count = 0
|
"""
|
||||||
for item in files_to_process:
|
Legacy-Methode für Kompatibilität.
|
||||||
file_key = item.get('fileName')
|
WARNUNG: Lädt alle Trades in den Speicher! Für große Datenmengen fetch_trades_streaming() verwenden.
|
||||||
|
"""
|
||||||
# Download the CSV
|
# Für kleine Requests (limit <= 5) normale Verarbeitung
|
||||||
csv_url = f"https://european-investor-exchange.com/api/trade-file-contents?key={file_key}"
|
if limit and limit <= 5 and not since_date:
|
||||||
try:
|
all_trades = []
|
||||||
csv_response = requests.get(csv_url, timeout=20)
|
for filename, trades in self.fetch_trades_streaming(limit=limit, since_date=since_date):
|
||||||
if csv_response.status_code == 200:
|
all_trades.extend(trades)
|
||||||
trades.extend(self._parse_csv(csv_response.text))
|
return all_trades
|
||||||
count += 1
|
|
||||||
# Only enforce limit if since_date is NOT set
|
# Für große Requests: Warnung ausgeben und leere Liste zurückgeben
|
||||||
if not since_date and limit and count >= limit:
|
logger.warning(f"[{self.name}] fetch_latest_trades() mit großem Dataset aufgerufen. Verwende Streaming.")
|
||||||
break
|
return []
|
||||||
except Exception as e:
|
|
||||||
print(f"Error downloading EIX CSV {file_key}: {e}")
|
|
||||||
|
|
||||||
return trades
|
|
||||||
|
|
||||||
def _parse_csv(self, csv_text: str) -> List[Trade]:
|
def _parse_csv(self, csv_text: str) -> List[Trade]:
|
||||||
|
"""Parst CSV-Text zu Trade-Objekten."""
|
||||||
trades = []
|
trades = []
|
||||||
|
parse_errors = 0
|
||||||
|
|
||||||
f = io.StringIO(csv_text)
|
f = io.StringIO(csv_text)
|
||||||
# Header: Trading day & Trading time UTC,Instrument Identifier,Quantity,Unit Price,Price Currency,Venue Identifier,Side
|
|
||||||
reader = csv.DictReader(f, delimiter=',')
|
reader = csv.DictReader(f, delimiter=',')
|
||||||
for row in reader:
|
|
||||||
|
for row_num, row in enumerate(reader, start=2): # Start bei 2 wegen Header
|
||||||
try:
|
try:
|
||||||
price = float(row['Unit Price'])
|
price = float(row['Unit Price'])
|
||||||
quantity = float(row['Quantity'])
|
quantity = float(row['Quantity'])
|
||||||
isin = row['Instrument Identifier']
|
isin = row['Instrument Identifier']
|
||||||
symbol = isin # Often symbol is unknown, use ISIN
|
|
||||||
time_str = row['Trading day & Trading time UTC']
|
time_str = row['Trading day & Trading time UTC']
|
||||||
|
|
||||||
# Format: 2026-01-22T06:30:00.617Z
|
# Preis und Menge validieren
|
||||||
# Python 3.11+ supports ISO with Z, otherwise we strip Z
|
if price <= 0 or quantity <= 0:
|
||||||
|
logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Preis/Menge: {price}/{quantity}")
|
||||||
|
parse_errors += 1
|
||||||
|
continue
|
||||||
|
|
||||||
ts_str = time_str.replace('Z', '+00:00')
|
ts_str = time_str.replace('Z', '+00:00')
|
||||||
timestamp = datetime.fromisoformat(ts_str)
|
timestamp = datetime.fromisoformat(ts_str)
|
||||||
|
|
||||||
trades.append(Trade(
|
trades.append(Trade(
|
||||||
exchange=self.name,
|
exchange=self.name,
|
||||||
symbol=symbol,
|
symbol=isin,
|
||||||
isin=isin,
|
isin=isin,
|
||||||
price=price,
|
price=price,
|
||||||
quantity=quantity,
|
quantity=quantity,
|
||||||
timestamp=timestamp
|
timestamp=timestamp
|
||||||
))
|
))
|
||||||
except Exception:
|
|
||||||
continue
|
except KeyError as e:
|
||||||
|
logger.debug(f"[{self.name}] Zeile {row_num}: Fehlendes Feld {e}")
|
||||||
|
parse_errors += 1
|
||||||
|
except ValueError as e:
|
||||||
|
logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Wert: {e}")
|
||||||
|
parse_errors += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[{self.name}] Zeile {row_num}: Unerwarteter Fehler: {e}")
|
||||||
|
parse_errors += 1
|
||||||
|
|
||||||
|
if parse_errors > 0:
|
||||||
|
logger.debug(f"[{self.name}] {parse_errors} Zeilen konnten nicht geparst werden")
|
||||||
|
|
||||||
return trades
|
return trades
|
||||||
|
|||||||
Reference in New Issue
Block a user