Compare commits
5 Commits
a07319d957
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1dc79b8b64 | ||
|
|
cf55a0bd06 | ||
|
|
9cd84e0855 | ||
|
|
f325941e24 | ||
|
|
a21e036bb4 |
48
.gitignore
vendored
Normal file
48
.gitignore
vendored
Normal file
@@ -0,0 +1,48 @@
|
||||
# Python
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
*.so
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
|
||||
# Virtual environments
|
||||
venv/
|
||||
ENV/
|
||||
env/
|
||||
.venv/
|
||||
|
||||
# IDE
|
||||
.idea/
|
||||
.vscode/
|
||||
*.swp
|
||||
*.swo
|
||||
|
||||
# Environment files
|
||||
.env
|
||||
.env.local
|
||||
|
||||
# OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
|
||||
# Logs
|
||||
*.log
|
||||
|
||||
# Test data
|
||||
*.gz
|
||||
!requirements.txt
|
||||
248
bin/Activate.ps1
Normal file
248
bin/Activate.ps1
Normal file
@@ -0,0 +1,248 @@
|
||||
<#
|
||||
.Synopsis
|
||||
Activate a Python virtual environment for the current PowerShell session.
|
||||
|
||||
.Description
|
||||
Pushes the python executable for a virtual environment to the front of the
|
||||
$Env:PATH environment variable and sets the prompt to signify that you are
|
||||
in a Python virtual environment. Makes use of the command line switches as
|
||||
well as the `pyvenv.cfg` file values present in the virtual environment.
|
||||
|
||||
.Parameter VenvDir
|
||||
Path to the directory that contains the virtual environment to activate. The
|
||||
default value for this is the parent of the directory that the Activate.ps1
|
||||
script is located within.
|
||||
|
||||
.Parameter Prompt
|
||||
The prompt prefix to display when this virtual environment is activated. By
|
||||
default, this prompt is the name of the virtual environment folder (VenvDir)
|
||||
surrounded by parentheses and followed by a single space (ie. '(.venv) ').
|
||||
|
||||
.Example
|
||||
Activate.ps1
|
||||
Activates the Python virtual environment that contains the Activate.ps1 script.
|
||||
|
||||
.Example
|
||||
Activate.ps1 -Verbose
|
||||
Activates the Python virtual environment that contains the Activate.ps1 script,
|
||||
and shows extra information about the activation as it executes.
|
||||
|
||||
.Example
|
||||
Activate.ps1 -VenvDir C:\Users\MyUser\Common\.venv
|
||||
Activates the Python virtual environment located in the specified location.
|
||||
|
||||
.Example
|
||||
Activate.ps1 -Prompt "MyPython"
|
||||
Activates the Python virtual environment that contains the Activate.ps1 script,
|
||||
and prefixes the current prompt with the specified string (surrounded in
|
||||
parentheses) while the virtual environment is active.
|
||||
|
||||
.Notes
|
||||
On Windows, it may be required to enable this Activate.ps1 script by setting the
|
||||
execution policy for the user. You can do this by issuing the following PowerShell
|
||||
command:
|
||||
|
||||
PS C:\> Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser
|
||||
|
||||
For more information on Execution Policies:
|
||||
https://go.microsoft.com/fwlink/?LinkID=135170
|
||||
|
||||
#>
|
||||
Param(
|
||||
[Parameter(Mandatory = $false)]
|
||||
[String]
|
||||
$VenvDir,
|
||||
[Parameter(Mandatory = $false)]
|
||||
[String]
|
||||
$Prompt
|
||||
)
|
||||
|
||||
<# Function declarations --------------------------------------------------- #>
|
||||
|
||||
<#
|
||||
.Synopsis
|
||||
Remove all shell session elements added by the Activate script, including the
|
||||
addition of the virtual environment's Python executable from the beginning of
|
||||
the PATH variable.
|
||||
|
||||
.Parameter NonDestructive
|
||||
If present, do not remove this function from the global namespace for the
|
||||
session.
|
||||
|
||||
#>
|
||||
function global:deactivate ([switch]$NonDestructive) {
|
||||
# Revert to original values
|
||||
|
||||
# The prior prompt:
|
||||
if (Test-Path -Path Function:_OLD_VIRTUAL_PROMPT) {
|
||||
Copy-Item -Path Function:_OLD_VIRTUAL_PROMPT -Destination Function:prompt
|
||||
Remove-Item -Path Function:_OLD_VIRTUAL_PROMPT
|
||||
}
|
||||
|
||||
# The prior PYTHONHOME:
|
||||
if (Test-Path -Path Env:_OLD_VIRTUAL_PYTHONHOME) {
|
||||
Copy-Item -Path Env:_OLD_VIRTUAL_PYTHONHOME -Destination Env:PYTHONHOME
|
||||
Remove-Item -Path Env:_OLD_VIRTUAL_PYTHONHOME
|
||||
}
|
||||
|
||||
# The prior PATH:
|
||||
if (Test-Path -Path Env:_OLD_VIRTUAL_PATH) {
|
||||
Copy-Item -Path Env:_OLD_VIRTUAL_PATH -Destination Env:PATH
|
||||
Remove-Item -Path Env:_OLD_VIRTUAL_PATH
|
||||
}
|
||||
|
||||
# Just remove the VIRTUAL_ENV altogether:
|
||||
if (Test-Path -Path Env:VIRTUAL_ENV) {
|
||||
Remove-Item -Path env:VIRTUAL_ENV
|
||||
}
|
||||
|
||||
# Just remove VIRTUAL_ENV_PROMPT altogether.
|
||||
if (Test-Path -Path Env:VIRTUAL_ENV_PROMPT) {
|
||||
Remove-Item -Path env:VIRTUAL_ENV_PROMPT
|
||||
}
|
||||
|
||||
# Just remove the _PYTHON_VENV_PROMPT_PREFIX altogether:
|
||||
if (Get-Variable -Name "_PYTHON_VENV_PROMPT_PREFIX" -ErrorAction SilentlyContinue) {
|
||||
Remove-Variable -Name _PYTHON_VENV_PROMPT_PREFIX -Scope Global -Force
|
||||
}
|
||||
|
||||
# Leave deactivate function in the global namespace if requested:
|
||||
if (-not $NonDestructive) {
|
||||
Remove-Item -Path function:deactivate
|
||||
}
|
||||
}
|
||||
|
||||
<#
|
||||
.Description
|
||||
Get-PyVenvConfig parses the values from the pyvenv.cfg file located in the
|
||||
given folder, and returns them in a map.
|
||||
|
||||
For each line in the pyvenv.cfg file, if that line can be parsed into exactly
|
||||
two strings separated by `=` (with any amount of whitespace surrounding the =)
|
||||
then it is considered a `key = value` line. The left hand string is the key,
|
||||
the right hand is the value.
|
||||
|
||||
If the value starts with a `'` or a `"` then the first and last character is
|
||||
stripped from the value before being captured.
|
||||
|
||||
.Parameter ConfigDir
|
||||
Path to the directory that contains the `pyvenv.cfg` file.
|
||||
#>
|
||||
function Get-PyVenvConfig(
|
||||
[String]
|
||||
$ConfigDir
|
||||
) {
|
||||
Write-Verbose "Given ConfigDir=$ConfigDir, obtain values in pyvenv.cfg"
|
||||
|
||||
# Ensure the file exists, and issue a warning if it doesn't (but still allow the function to continue).
|
||||
$pyvenvConfigPath = Join-Path -Resolve -Path $ConfigDir -ChildPath 'pyvenv.cfg' -ErrorAction Continue
|
||||
|
||||
# An empty map will be returned if no config file is found.
|
||||
$pyvenvConfig = @{ }
|
||||
|
||||
if ($pyvenvConfigPath) {
|
||||
|
||||
Write-Verbose "File exists, parse `key = value` lines"
|
||||
$pyvenvConfigContent = Get-Content -Path $pyvenvConfigPath
|
||||
|
||||
$pyvenvConfigContent | ForEach-Object {
|
||||
$keyval = $PSItem -split "\s*=\s*", 2
|
||||
if ($keyval[0] -and $keyval[1]) {
|
||||
$val = $keyval[1]
|
||||
|
||||
# Remove extraneous quotations around a string value.
|
||||
if ("'""".Contains($val.Substring(0, 1))) {
|
||||
$val = $val.Substring(1, $val.Length - 2)
|
||||
}
|
||||
|
||||
$pyvenvConfig[$keyval[0]] = $val
|
||||
Write-Verbose "Adding Key: '$($keyval[0])'='$val'"
|
||||
}
|
||||
}
|
||||
}
|
||||
return $pyvenvConfig
|
||||
}
|
||||
|
||||
|
||||
<# Begin Activate script --------------------------------------------------- #>
|
||||
|
||||
# Determine the containing directory of this script
|
||||
$VenvExecPath = Split-Path -Parent $MyInvocation.MyCommand.Definition
|
||||
$VenvExecDir = Get-Item -Path $VenvExecPath
|
||||
|
||||
Write-Verbose "Activation script is located in path: '$VenvExecPath'"
|
||||
Write-Verbose "VenvExecDir Fullname: '$($VenvExecDir.FullName)"
|
||||
Write-Verbose "VenvExecDir Name: '$($VenvExecDir.Name)"
|
||||
|
||||
# Set values required in priority: CmdLine, ConfigFile, Default
|
||||
# First, get the location of the virtual environment, it might not be
|
||||
# VenvExecDir if specified on the command line.
|
||||
if ($VenvDir) {
|
||||
Write-Verbose "VenvDir given as parameter, using '$VenvDir' to determine values"
|
||||
}
|
||||
else {
|
||||
Write-Verbose "VenvDir not given as a parameter, using parent directory name as VenvDir."
|
||||
$VenvDir = $VenvExecDir.Parent.FullName.TrimEnd("\\/")
|
||||
Write-Verbose "VenvDir=$VenvDir"
|
||||
}
|
||||
|
||||
# Next, read the `pyvenv.cfg` file to determine any required value such
|
||||
# as `prompt`.
|
||||
$pyvenvCfg = Get-PyVenvConfig -ConfigDir $VenvDir
|
||||
|
||||
# Next, set the prompt from the command line, or the config file, or
|
||||
# just use the name of the virtual environment folder.
|
||||
if ($Prompt) {
|
||||
Write-Verbose "Prompt specified as argument, using '$Prompt'"
|
||||
}
|
||||
else {
|
||||
Write-Verbose "Prompt not specified as argument to script, checking pyvenv.cfg value"
|
||||
if ($pyvenvCfg -and $pyvenvCfg['prompt']) {
|
||||
Write-Verbose " Setting based on value in pyvenv.cfg='$($pyvenvCfg['prompt'])'"
|
||||
$Prompt = $pyvenvCfg['prompt'];
|
||||
}
|
||||
else {
|
||||
Write-Verbose " Setting prompt based on parent's directory's name. (Is the directory name passed to venv module when creating the virtual environment)"
|
||||
Write-Verbose " Got leaf-name of $VenvDir='$(Split-Path -Path $venvDir -Leaf)'"
|
||||
$Prompt = Split-Path -Path $venvDir -Leaf
|
||||
}
|
||||
}
|
||||
|
||||
Write-Verbose "Prompt = '$Prompt'"
|
||||
Write-Verbose "VenvDir='$VenvDir'"
|
||||
|
||||
# Deactivate any currently active virtual environment, but leave the
|
||||
# deactivate function in place.
|
||||
deactivate -nondestructive
|
||||
|
||||
# Now set the environment variable VIRTUAL_ENV, used by many tools to determine
|
||||
# that there is an activated venv.
|
||||
$env:VIRTUAL_ENV = $VenvDir
|
||||
|
||||
$env:VIRTUAL_ENV_PROMPT = $Prompt
|
||||
|
||||
if (-not $Env:VIRTUAL_ENV_DISABLE_PROMPT) {
|
||||
|
||||
Write-Verbose "Setting prompt to '$Prompt'"
|
||||
|
||||
# Set the prompt to include the env name
|
||||
# Make sure _OLD_VIRTUAL_PROMPT is global
|
||||
function global:_OLD_VIRTUAL_PROMPT { "" }
|
||||
Copy-Item -Path function:prompt -Destination function:_OLD_VIRTUAL_PROMPT
|
||||
New-Variable -Name _PYTHON_VENV_PROMPT_PREFIX -Description "Python virtual environment prompt prefix" -Scope Global -Option ReadOnly -Visibility Public -Value $Prompt
|
||||
|
||||
function global:prompt {
|
||||
Write-Host -NoNewline -ForegroundColor Green "($_PYTHON_VENV_PROMPT_PREFIX) "
|
||||
_OLD_VIRTUAL_PROMPT
|
||||
}
|
||||
}
|
||||
|
||||
# Clear PYTHONHOME
|
||||
if (Test-Path -Path Env:PYTHONHOME) {
|
||||
Copy-Item -Path Env:PYTHONHOME -Destination Env:_OLD_VIRTUAL_PYTHONHOME
|
||||
Remove-Item -Path Env:PYTHONHOME
|
||||
}
|
||||
|
||||
# Add the venv to the PATH
|
||||
Copy-Item -Path Env:PATH -Destination Env:_OLD_VIRTUAL_PATH
|
||||
$Env:PATH = "$VenvExecDir$([System.IO.Path]::PathSeparator)$Env:PATH"
|
||||
76
bin/activate
Normal file
76
bin/activate
Normal file
@@ -0,0 +1,76 @@
|
||||
# This file must be used with "source bin/activate" *from bash*
|
||||
# You cannot run it directly
|
||||
|
||||
deactivate () {
|
||||
# reset old environment variables
|
||||
if [ -n "${_OLD_VIRTUAL_PATH:-}" ] ; then
|
||||
PATH="${_OLD_VIRTUAL_PATH:-}"
|
||||
export PATH
|
||||
unset _OLD_VIRTUAL_PATH
|
||||
fi
|
||||
if [ -n "${_OLD_VIRTUAL_PYTHONHOME:-}" ] ; then
|
||||
PYTHONHOME="${_OLD_VIRTUAL_PYTHONHOME:-}"
|
||||
export PYTHONHOME
|
||||
unset _OLD_VIRTUAL_PYTHONHOME
|
||||
fi
|
||||
|
||||
# Call hash to forget past locations. Without forgetting
|
||||
# past locations the $PATH changes we made may not be respected.
|
||||
# See "man bash" for more details. hash is usually a builtin of your shell
|
||||
hash -r 2> /dev/null
|
||||
|
||||
if [ -n "${_OLD_VIRTUAL_PS1:-}" ] ; then
|
||||
PS1="${_OLD_VIRTUAL_PS1:-}"
|
||||
export PS1
|
||||
unset _OLD_VIRTUAL_PS1
|
||||
fi
|
||||
|
||||
unset VIRTUAL_ENV
|
||||
unset VIRTUAL_ENV_PROMPT
|
||||
if [ ! "${1:-}" = "nondestructive" ] ; then
|
||||
# Self destruct!
|
||||
unset -f deactivate
|
||||
fi
|
||||
}
|
||||
|
||||
# unset irrelevant variables
|
||||
deactivate nondestructive
|
||||
|
||||
# on Windows, a path can contain colons and backslashes and has to be converted:
|
||||
case "$(uname)" in
|
||||
CYGWIN*|MSYS*|MINGW*)
|
||||
# transform D:\path\to\venv to /d/path/to/venv on MSYS and MINGW
|
||||
# and to /cygdrive/d/path/to/venv on Cygwin
|
||||
VIRTUAL_ENV=$(cygpath /Users/melchiorreimers/Documents/trading_daemon)
|
||||
export VIRTUAL_ENV
|
||||
;;
|
||||
*)
|
||||
# use the path as-is
|
||||
export VIRTUAL_ENV=/Users/melchiorreimers/Documents/trading_daemon
|
||||
;;
|
||||
esac
|
||||
|
||||
_OLD_VIRTUAL_PATH="$PATH"
|
||||
PATH="$VIRTUAL_ENV/"bin":$PATH"
|
||||
export PATH
|
||||
|
||||
VIRTUAL_ENV_PROMPT=trading_daemon
|
||||
export VIRTUAL_ENV_PROMPT
|
||||
|
||||
# unset PYTHONHOME if set
|
||||
# this will fail if PYTHONHOME is set to the empty string (which is bad anyway)
|
||||
# could use `if (set -u; : $PYTHONHOME) ;` in bash
|
||||
if [ -n "${PYTHONHOME:-}" ] ; then
|
||||
_OLD_VIRTUAL_PYTHONHOME="${PYTHONHOME:-}"
|
||||
unset PYTHONHOME
|
||||
fi
|
||||
|
||||
if [ -z "${VIRTUAL_ENV_DISABLE_PROMPT:-}" ] ; then
|
||||
_OLD_VIRTUAL_PS1="${PS1:-}"
|
||||
PS1="("trading_daemon") ${PS1:-}"
|
||||
export PS1
|
||||
fi
|
||||
|
||||
# Call hash to forget past commands. Without forgetting
|
||||
# past commands the $PATH changes we made may not be respected
|
||||
hash -r 2> /dev/null
|
||||
27
bin/activate.csh
Normal file
27
bin/activate.csh
Normal file
@@ -0,0 +1,27 @@
|
||||
# This file must be used with "source bin/activate.csh" *from csh*.
|
||||
# You cannot run it directly.
|
||||
|
||||
# Created by Davide Di Blasi <davidedb@gmail.com>.
|
||||
# Ported to Python 3.3 venv by Andrew Svetlov <andrew.svetlov@gmail.com>
|
||||
|
||||
alias deactivate 'test $?_OLD_VIRTUAL_PATH != 0 && setenv PATH "$_OLD_VIRTUAL_PATH" && unset _OLD_VIRTUAL_PATH; rehash; test $?_OLD_VIRTUAL_PROMPT != 0 && set prompt="$_OLD_VIRTUAL_PROMPT" && unset _OLD_VIRTUAL_PROMPT; unsetenv VIRTUAL_ENV; unsetenv VIRTUAL_ENV_PROMPT; test "\!:*" != "nondestructive" && unalias deactivate'
|
||||
|
||||
# Unset irrelevant variables.
|
||||
deactivate nondestructive
|
||||
|
||||
setenv VIRTUAL_ENV /Users/melchiorreimers/Documents/trading_daemon
|
||||
|
||||
set _OLD_VIRTUAL_PATH="$PATH"
|
||||
setenv PATH "$VIRTUAL_ENV/"bin":$PATH"
|
||||
setenv VIRTUAL_ENV_PROMPT trading_daemon
|
||||
|
||||
|
||||
set _OLD_VIRTUAL_PROMPT="$prompt"
|
||||
|
||||
if (! "$?VIRTUAL_ENV_DISABLE_PROMPT") then
|
||||
set prompt = "("trading_daemon") $prompt:q"
|
||||
endif
|
||||
|
||||
alias pydoc python -m pydoc
|
||||
|
||||
rehash
|
||||
69
bin/activate.fish
Normal file
69
bin/activate.fish
Normal file
@@ -0,0 +1,69 @@
|
||||
# This file must be used with "source <venv>/bin/activate.fish" *from fish*
|
||||
# (https://fishshell.com/). You cannot run it directly.
|
||||
|
||||
function deactivate -d "Exit virtual environment and return to normal shell environment"
|
||||
# reset old environment variables
|
||||
if test -n "$_OLD_VIRTUAL_PATH"
|
||||
set -gx PATH $_OLD_VIRTUAL_PATH
|
||||
set -e _OLD_VIRTUAL_PATH
|
||||
end
|
||||
if test -n "$_OLD_VIRTUAL_PYTHONHOME"
|
||||
set -gx PYTHONHOME $_OLD_VIRTUAL_PYTHONHOME
|
||||
set -e _OLD_VIRTUAL_PYTHONHOME
|
||||
end
|
||||
|
||||
if test -n "$_OLD_FISH_PROMPT_OVERRIDE"
|
||||
set -e _OLD_FISH_PROMPT_OVERRIDE
|
||||
# prevents error when using nested fish instances (Issue #93858)
|
||||
if functions -q _old_fish_prompt
|
||||
functions -e fish_prompt
|
||||
functions -c _old_fish_prompt fish_prompt
|
||||
functions -e _old_fish_prompt
|
||||
end
|
||||
end
|
||||
|
||||
set -e VIRTUAL_ENV
|
||||
set -e VIRTUAL_ENV_PROMPT
|
||||
if test "$argv[1]" != "nondestructive"
|
||||
# Self-destruct!
|
||||
functions -e deactivate
|
||||
end
|
||||
end
|
||||
|
||||
# Unset irrelevant variables.
|
||||
deactivate nondestructive
|
||||
|
||||
set -gx VIRTUAL_ENV /Users/melchiorreimers/Documents/trading_daemon
|
||||
|
||||
set -gx _OLD_VIRTUAL_PATH $PATH
|
||||
set -gx PATH "$VIRTUAL_ENV/"bin $PATH
|
||||
set -gx VIRTUAL_ENV_PROMPT trading_daemon
|
||||
|
||||
# Unset PYTHONHOME if set.
|
||||
if set -q PYTHONHOME
|
||||
set -gx _OLD_VIRTUAL_PYTHONHOME $PYTHONHOME
|
||||
set -e PYTHONHOME
|
||||
end
|
||||
|
||||
if test -z "$VIRTUAL_ENV_DISABLE_PROMPT"
|
||||
# fish uses a function instead of an env var to generate the prompt.
|
||||
|
||||
# Save the current fish_prompt function as the function _old_fish_prompt.
|
||||
functions -c fish_prompt _old_fish_prompt
|
||||
|
||||
# With the original prompt function renamed, we can override with our own.
|
||||
function fish_prompt
|
||||
# Save the return status of the last command.
|
||||
set -l old_status $status
|
||||
|
||||
# Output the venv prompt; color taken from the blue of the Python logo.
|
||||
printf "%s(%s)%s " (set_color 4B8BBE) trading_daemon (set_color normal)
|
||||
|
||||
# Restore the return status of the previous command.
|
||||
echo "exit $old_status" | .
|
||||
# Output the original/"old" prompt.
|
||||
_old_fish_prompt
|
||||
end
|
||||
|
||||
set -gx _OLD_FISH_PROMPT_OVERRIDE "$VIRTUAL_ENV"
|
||||
end
|
||||
8
bin/pip
Executable file
8
bin/pip
Executable file
@@ -0,0 +1,8 @@
|
||||
#!/Users/melchiorreimers/Documents/trading_daemon/bin/python3.13
|
||||
# -*- coding: utf-8 -*-
|
||||
import re
|
||||
import sys
|
||||
from pip._internal.cli.main import main
|
||||
if __name__ == '__main__':
|
||||
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
|
||||
sys.exit(main())
|
||||
8
bin/pip3
Executable file
8
bin/pip3
Executable file
@@ -0,0 +1,8 @@
|
||||
#!/Users/melchiorreimers/Documents/trading_daemon/bin/python3.13
|
||||
# -*- coding: utf-8 -*-
|
||||
import re
|
||||
import sys
|
||||
from pip._internal.cli.main import main
|
||||
if __name__ == '__main__':
|
||||
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
|
||||
sys.exit(main())
|
||||
8
bin/pip3.13
Executable file
8
bin/pip3.13
Executable file
@@ -0,0 +1,8 @@
|
||||
#!/Users/melchiorreimers/Documents/trading_daemon/bin/python3.13
|
||||
# -*- coding: utf-8 -*-
|
||||
import re
|
||||
import sys
|
||||
from pip._internal.cli.main import main
|
||||
if __name__ == '__main__':
|
||||
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
|
||||
sys.exit(main())
|
||||
1
bin/python
Symbolic link
1
bin/python
Symbolic link
@@ -0,0 +1 @@
|
||||
python3.13
|
||||
1
bin/python3
Symbolic link
1
bin/python3
Symbolic link
@@ -0,0 +1 @@
|
||||
python3.13
|
||||
1
bin/python3.13
Symbolic link
1
bin/python3.13
Symbolic link
@@ -0,0 +1 @@
|
||||
/opt/homebrew/opt/python@3.13/bin/python3.13
|
||||
484
daemon.py
484
daemon.py
@@ -4,6 +4,9 @@ import datetime
|
||||
import hashlib
|
||||
import os
|
||||
import requests
|
||||
from typing import List, Type
|
||||
|
||||
from src.exchanges.base import BaseExchange
|
||||
from src.exchanges.eix import EIXExchange
|
||||
from src.exchanges.ls import LSExchange
|
||||
from src.exchanges.deutsche_boerse import XetraExchange, FrankfurtExchange, QuotrixExchange
|
||||
@@ -25,230 +28,345 @@ DB_USER = os.getenv("DB_USER", "admin")
|
||||
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
|
||||
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Exchange Registry - Neue Börsen hier hinzufügen
|
||||
# =============================================================================
|
||||
|
||||
# Exchanges die Streaming-Verarbeitung benötigen (große Datenmengen)
|
||||
STREAMING_EXCHANGES: List[Type[BaseExchange]] = [
|
||||
EIXExchange,
|
||||
]
|
||||
|
||||
# Standard-Exchanges (normale Batch-Verarbeitung)
|
||||
STANDARD_EXCHANGES: List[Type[BaseExchange]] = [
|
||||
# Lang & Schwarz
|
||||
LSExchange,
|
||||
# Deutsche Börse
|
||||
XetraExchange,
|
||||
FrankfurtExchange,
|
||||
QuotrixExchange,
|
||||
# Weitere Börsen
|
||||
GettexExchange,
|
||||
StuttgartExchange,
|
||||
# Börsenag (Düsseldorf, Hamburg, Hannover)
|
||||
DUSAExchange,
|
||||
DUSBExchange,
|
||||
DUSCExchange,
|
||||
DUSDExchange,
|
||||
HAMAExchange,
|
||||
HAMBExchange,
|
||||
HANAExchange,
|
||||
HANBExchange,
|
||||
]
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Trades Cache
|
||||
# =============================================================================
|
||||
|
||||
# Cache für existierende Trades pro Tag (wird nach jedem Exchange geleert)
|
||||
_existing_trades_cache = {}
|
||||
|
||||
def get_trade_hash(trade):
|
||||
"""Erstellt einen eindeutigen Hash für einen Trade."""
|
||||
key = f"{trade.exchange}|{trade.isin}|{trade.timestamp.isoformat()}|{trade.price}|{trade.quantity}"
|
||||
return hashlib.md5(key.encode()).hexdigest()
|
||||
|
||||
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=1000):
|
||||
"""Filtert neue Trades in Batches, um RAM zu sparen. Verwendet Batch-Queries statt einzelne Checks."""
|
||||
def get_existing_trades_for_day(db_url, exchange_name, day):
|
||||
"""Holt existierende Trades für einen Tag aus der DB (mit Caching)."""
|
||||
cache_key = f"{exchange_name}_{day.strftime('%Y-%m-%d')}"
|
||||
|
||||
if cache_key in _existing_trades_cache:
|
||||
return _existing_trades_cache[cache_key]
|
||||
|
||||
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
||||
day_end = day + datetime.timedelta(days=1)
|
||||
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
||||
|
||||
query = f"""
|
||||
SELECT isin, timestamp, price, quantity
|
||||
FROM trades
|
||||
WHERE exchange = '{exchange_name}'
|
||||
AND timestamp >= '{day_start_str}'
|
||||
AND timestamp < '{day_end_str}'
|
||||
"""
|
||||
|
||||
existing_trades = set()
|
||||
try:
|
||||
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=60)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
if data.get('dataset'):
|
||||
for row in data['dataset']:
|
||||
isin, ts, price, qty = row
|
||||
if isinstance(ts, str):
|
||||
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
|
||||
else:
|
||||
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
|
||||
key = (isin, ts_dt.isoformat(), float(price), float(qty))
|
||||
existing_trades.add(key)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error fetching existing trades for {day}: {e}")
|
||||
|
||||
_existing_trades_cache[cache_key] = existing_trades
|
||||
return existing_trades
|
||||
|
||||
def clear_trades_cache():
|
||||
"""Leert den Cache für existierende Trades."""
|
||||
global _existing_trades_cache
|
||||
_existing_trades_cache = {}
|
||||
|
||||
def filter_new_trades_for_day(db_url, exchange_name, trades, day):
|
||||
"""Filtert neue Trades für einen einzelnen Tag."""
|
||||
if not trades:
|
||||
return []
|
||||
|
||||
new_trades = []
|
||||
total_batches = (len(trades) + batch_size - 1) // batch_size
|
||||
existing = get_existing_trades_for_day(db_url, exchange_name, day)
|
||||
|
||||
for batch_idx in range(0, len(trades), batch_size):
|
||||
batch = trades[batch_idx:batch_idx + batch_size]
|
||||
batch_num = (batch_idx // batch_size) + 1
|
||||
|
||||
if batch_num % 10 == 0 or batch_num == 1:
|
||||
logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} trades)...")
|
||||
|
||||
# Gruppiere Trades nach Tag für effizientere Queries
|
||||
trades_by_day = {}
|
||||
for trade in batch:
|
||||
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
if day not in trades_by_day:
|
||||
trades_by_day[day] = []
|
||||
trades_by_day[day].append(trade)
|
||||
|
||||
# Prüfe jeden Tag separat
|
||||
for day, day_trades in trades_by_day.items():
|
||||
day_start_str = day.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
||||
day_end = day + datetime.timedelta(days=1)
|
||||
day_end_str = day_end.strftime('%Y-%m-%dT%H:%M:%S.000000Z')
|
||||
|
||||
# Hole alle existierenden Trades für diesen Tag
|
||||
query = f"""
|
||||
SELECT isin, timestamp, price, quantity
|
||||
FROM trades
|
||||
WHERE exchange = '{exchange_name}'
|
||||
AND timestamp >= '{day_start_str}'
|
||||
AND timestamp < '{day_end_str}'
|
||||
"""
|
||||
|
||||
try:
|
||||
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH, timeout=30)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
existing_trades = set()
|
||||
if data.get('dataset'):
|
||||
for row in data['dataset']:
|
||||
isin, ts, price, qty = row
|
||||
# Normalisiere Timestamp für Vergleich
|
||||
if isinstance(ts, str):
|
||||
ts_dt = datetime.datetime.fromisoformat(ts.replace('Z', '+00:00'))
|
||||
else:
|
||||
ts_dt = datetime.datetime.fromtimestamp(ts / 1000000, tz=datetime.timezone.utc)
|
||||
# Erstelle Vergleichs-Key (ohne Hash, direkter Vergleich)
|
||||
key = (isin, ts_dt.isoformat(), float(price), float(qty))
|
||||
existing_trades.add(key)
|
||||
|
||||
# Prüfe welche Trades neu sind
|
||||
for trade in day_trades:
|
||||
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
|
||||
if trade_key not in existing_trades:
|
||||
new_trades.append(trade)
|
||||
else:
|
||||
# Bei Fehler: alle Trades als neu behandeln (sicherer)
|
||||
logger.warning(f"Query failed for day {day}, treating all trades as new")
|
||||
new_trades.extend(day_trades)
|
||||
except Exception as e:
|
||||
# Bei Fehler: alle Trades als neu behandeln (sicherer)
|
||||
logger.warning(f"Error checking trades for day {day}: {e}, treating all trades as new")
|
||||
new_trades.extend(day_trades)
|
||||
|
||||
# Kleine Pause zwischen Batches, um DB nicht zu überlasten
|
||||
if batch_idx + batch_size < len(trades):
|
||||
time.sleep(0.05)
|
||||
new_trades = []
|
||||
for trade in trades:
|
||||
trade_key = (trade.isin, trade.timestamp.isoformat(), float(trade.price), float(trade.quantity))
|
||||
if trade_key not in existing:
|
||||
new_trades.append(trade)
|
||||
|
||||
return new_trades
|
||||
|
||||
def get_last_trade_timestamp(db_url, exchange_name):
|
||||
# QuestDB query: get the latest timestamp for a specific exchange
|
||||
def filter_new_trades_batch(db_url, exchange_name, trades, batch_size=5000):
|
||||
"""Filtert neue Trades in Batches, gruppiert nach Tag."""
|
||||
if not trades:
|
||||
return []
|
||||
|
||||
# Gruppiere alle Trades nach Tag
|
||||
trades_by_day = {}
|
||||
for trade in trades:
|
||||
day = trade.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
if day not in trades_by_day:
|
||||
trades_by_day[day] = []
|
||||
trades_by_day[day].append(trade)
|
||||
|
||||
new_trades = []
|
||||
total_days = len(trades_by_day)
|
||||
|
||||
for i, (day, day_trades) in enumerate(sorted(trades_by_day.items()), 1):
|
||||
if i % 10 == 0 or i == 1:
|
||||
logger.info(f"Checking day {i}/{total_days}: {day.strftime('%Y-%m-%d')} ({len(day_trades)} trades)...")
|
||||
|
||||
new_for_day = filter_new_trades_for_day(db_url, exchange_name, day_trades, day)
|
||||
new_trades.extend(new_for_day)
|
||||
|
||||
# Kleine Pause um DB nicht zu überlasten
|
||||
if i < total_days:
|
||||
time.sleep(0.02)
|
||||
|
||||
return new_trades
|
||||
|
||||
def get_last_trade_timestamp(db_url: str, exchange_name: str) -> datetime.datetime:
|
||||
"""Holt den Timestamp des letzten Trades für eine Exchange aus QuestDB."""
|
||||
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
|
||||
try:
|
||||
# Using the /exec endpoint to get data
|
||||
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
if data['dataset']:
|
||||
# QuestDB returns timestamp in micros since epoch by default in some views, or ISO
|
||||
# Let's assume the timestamp is in the dataset
|
||||
# ILP timestamps are stored as designated timestamps.
|
||||
ts_value = data['dataset'][0][0] # Adjust index based on column order
|
||||
if data.get('dataset'):
|
||||
# QuestDB gibt Timestamps in Mikrosekunden oder ISO-Format zurück
|
||||
ts_value = data['dataset'][0][0]
|
||||
if isinstance(ts_value, str):
|
||||
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
|
||||
else:
|
||||
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
|
||||
except Exception as e:
|
||||
logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}")
|
||||
logger.debug(f"Keine existierenden Daten für {exchange_name} oder DB nicht erreichbar: {e}")
|
||||
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
|
||||
|
||||
def process_eix_streaming(db, db_url: str, exchange: BaseExchange, historical: bool = False):
|
||||
"""Verarbeitet eine Exchange im Streaming-Modus um RAM zu sparen."""
|
||||
last_ts = get_last_trade_timestamp(db_url, exchange.name)
|
||||
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts}) - STREAMING...")
|
||||
|
||||
# Hole Liste der zu verarbeitenden Dateien
|
||||
if historical:
|
||||
files = exchange.get_files_to_process(limit=None, since_date=None)
|
||||
else:
|
||||
files = exchange.get_files_to_process(limit=None, since_date=last_ts)
|
||||
|
||||
if not files:
|
||||
logger.info(f"Keine {exchange.name} Dateien zu verarbeiten.")
|
||||
return
|
||||
|
||||
logger.info(f"{len(files)} {exchange.name} Dateien gefunden...")
|
||||
|
||||
total_new = 0
|
||||
total_processed = 0
|
||||
|
||||
for i, file_item in enumerate(files, 1):
|
||||
file_name = file_item.get('fileName', 'unknown').split('/')[-1]
|
||||
logger.info(f"Verarbeite {exchange.name} Datei {i}/{len(files)}: {file_name}")
|
||||
|
||||
trades = exchange.fetch_trades_from_file(file_item)
|
||||
|
||||
if not trades:
|
||||
logger.info(f" Keine Trades in {file_name}")
|
||||
continue
|
||||
|
||||
total_processed += len(trades)
|
||||
logger.info(f" {len(trades)} Trades geladen, filtere Duplikate...")
|
||||
|
||||
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
|
||||
|
||||
if new_trades:
|
||||
new_trades.sort(key=lambda x: x.timestamp)
|
||||
db.save_trades(new_trades)
|
||||
total_new += len(new_trades)
|
||||
logger.info(f" {len(new_trades)} neue Trades gespeichert (gesamt neu: {total_new})")
|
||||
else:
|
||||
logger.info(f" Keine neuen Trades in dieser Datei")
|
||||
|
||||
# Referenzen freigeben
|
||||
del trades
|
||||
del new_trades
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
logger.info(f"{exchange.name} fertig: {total_new} neue Trades von {total_processed} verarbeitet.")
|
||||
clear_trades_cache()
|
||||
|
||||
def process_standard_exchange(db, db_url: str, exchange: BaseExchange, historical: bool):
|
||||
"""Verarbeitet einen Standard-Exchange mit Batch-Verarbeitung."""
|
||||
try:
|
||||
last_ts = get_last_trade_timestamp(db_url, exchange.name)
|
||||
logger.info(f"Hole Daten von {exchange.name} (Letzter Trade: {last_ts})...")
|
||||
|
||||
trades = exchange.fetch_latest_trades(include_yesterday=historical)
|
||||
|
||||
if not trades:
|
||||
logger.info(f"Keine Trades von {exchange.name} erhalten.")
|
||||
return
|
||||
|
||||
# Deduplizierung
|
||||
logger.info(f"Filtere {len(trades)} Trades auf Duplikate...")
|
||||
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=5000)
|
||||
|
||||
logger.info(f"Gefunden: {len(trades)} Trades gesamt, {len(new_trades)} sind neu.")
|
||||
|
||||
if new_trades:
|
||||
new_trades.sort(key=lambda x: x.timestamp)
|
||||
db.save_trades(new_trades)
|
||||
logger.info(f"{len(new_trades)} neue Trades in QuestDB gespeichert.")
|
||||
|
||||
# Referenzen freigeben
|
||||
del trades
|
||||
if new_trades:
|
||||
del new_trades
|
||||
clear_trades_cache()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler bei Exchange {exchange.name}: {e}")
|
||||
|
||||
|
||||
def run_task(historical=False):
|
||||
logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...")
|
||||
|
||||
# Initialize exchanges
|
||||
eix = EIXExchange()
|
||||
ls = LSExchange()
|
||||
|
||||
# Neue Deutsche Börse Exchanges
|
||||
xetra = XetraExchange()
|
||||
frankfurt = FrankfurtExchange()
|
||||
quotrix = QuotrixExchange()
|
||||
gettex = GettexExchange()
|
||||
stuttgart = StuttgartExchange()
|
||||
|
||||
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
|
||||
dusa = DUSAExchange()
|
||||
dusb = DUSBExchange()
|
||||
dusc = DUSCExchange()
|
||||
dusd = DUSDExchange()
|
||||
hama = HAMAExchange()
|
||||
hamb = HAMBExchange()
|
||||
hana = HANAExchange()
|
||||
hanb = HANBExchange()
|
||||
|
||||
# Pass last_ts to fetcher to allow smart filtering
|
||||
# daemon.py runs daily, so we want to fetch everything since DB state
|
||||
# BUT we need to be careful: eix.py's fetch_latest_trades needs 'since_date' argument
|
||||
# We can't pass it here directly in the tuple easily because last_ts is calculated inside the loop.
|
||||
|
||||
# We will modify the loop below to handle args dynamically
|
||||
exchanges_to_process = [
|
||||
(eix, {'limit': None if historical else 5}), # Default limit 5 for safety if no historical
|
||||
(ls, {'include_yesterday': historical}),
|
||||
# Deutsche Börse Exchanges
|
||||
(xetra, {'include_yesterday': historical}),
|
||||
(frankfurt, {'include_yesterday': historical}),
|
||||
(quotrix, {'include_yesterday': historical}),
|
||||
(gettex, {'include_yesterday': historical}),
|
||||
(stuttgart, {'include_yesterday': historical}),
|
||||
# Börsenag Exchanges (Düsseldorf, Hamburg, Hannover)
|
||||
(dusa, {'include_yesterday': historical}),
|
||||
(dusb, {'include_yesterday': historical}),
|
||||
(dusc, {'include_yesterday': historical}),
|
||||
(dusd, {'include_yesterday': historical}),
|
||||
(hama, {'include_yesterday': historical}),
|
||||
(hamb, {'include_yesterday': historical}),
|
||||
(hana, {'include_yesterday': historical}),
|
||||
(hanb, {'include_yesterday': historical}),
|
||||
]
|
||||
"""Haupttask: Holt Trades von allen registrierten Exchanges."""
|
||||
logger.info(f"Starte Trading Data Fetcher (Historical: {historical})...")
|
||||
|
||||
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
|
||||
|
||||
for exchange, args in exchanges_to_process:
|
||||
try:
|
||||
db_url = "http://questdb:9000"
|
||||
last_ts = get_last_trade_timestamp(db_url, exchange.name)
|
||||
|
||||
logger.info(f"Fetching data from {exchange.name} (Last trade: {last_ts})...")
|
||||
|
||||
# Special handling for EIX to support smart filtering
|
||||
call_args = args.copy()
|
||||
if exchange.name == "EIX" and not historical:
|
||||
call_args['since_date'] = last_ts.replace(tzinfo=datetime.timezone.utc)
|
||||
# Remove limit if we are filtering by date to ensure we get everything
|
||||
if 'limit' in call_args:
|
||||
call_args.pop('limit')
|
||||
|
||||
trades = exchange.fetch_latest_trades(**call_args)
|
||||
|
||||
if not trades:
|
||||
logger.info(f"No trades fetched from {exchange.name}.")
|
||||
continue
|
||||
|
||||
# Hash-basierte Deduplizierung - Batch-Verarbeitung um RAM zu sparen
|
||||
logger.info(f"Filtering {len(trades)} trades for duplicates (batch processing)...")
|
||||
new_trades = filter_new_trades_batch(db_url, exchange.name, trades, batch_size=500)
|
||||
|
||||
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
|
||||
|
||||
if new_trades:
|
||||
# Sort trades by timestamp before saving (QuestDB likes this)
|
||||
new_trades.sort(key=lambda x: x.timestamp)
|
||||
db.save_trades(new_trades)
|
||||
logger.info(f"Stored {len(new_trades)} new trades in QuestDB.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing exchange {exchange.name}: {e}")
|
||||
|
||||
def main():
|
||||
logger.info("Trading Daemon started.")
|
||||
|
||||
# 1. Startup Check: Ist die DB leer?
|
||||
db_url = "http://questdb:9000"
|
||||
is_empty = True
|
||||
|
||||
# Streaming-Exchanges verarbeiten (große Datenmengen)
|
||||
for exchange_class in STREAMING_EXCHANGES:
|
||||
try:
|
||||
exchange = exchange_class()
|
||||
logger.info(f"Verarbeite {exchange.name} im Streaming-Modus...")
|
||||
process_eix_streaming(db, db_url, exchange, historical=historical)
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler bei Streaming-Exchange {exchange_class.__name__}: {e}")
|
||||
|
||||
# Standard-Exchanges verarbeiten
|
||||
for exchange_class in STANDARD_EXCHANGES:
|
||||
try:
|
||||
exchange = exchange_class()
|
||||
process_standard_exchange(db, db_url, exchange, historical)
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler bei Exchange {exchange_class.__name__}: {e}")
|
||||
|
||||
logger.info("Alle Exchanges verarbeitet.")
|
||||
|
||||
def is_database_empty(db_url: str) -> bool:
|
||||
"""Prüft ob die Datenbank leer ist oder die Tabelle nicht existiert."""
|
||||
try:
|
||||
# Prüfe ob bereits Trades in der Tabelle sind
|
||||
response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
if data['dataset'] and data['dataset'][0][0] > 0:
|
||||
is_empty = False
|
||||
if data.get('dataset') and data['dataset'][0][0] > 0:
|
||||
return False
|
||||
except Exception:
|
||||
# Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist
|
||||
is_empty = True
|
||||
pass
|
||||
return True
|
||||
|
||||
if is_empty:
|
||||
logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...")
|
||||
|
||||
def calculate_seconds_until_target(target_hour: int, target_minute: int = 0) -> int:
|
||||
"""Berechnet Sekunden bis zur nächsten Zielzeit."""
|
||||
now = datetime.datetime.now()
|
||||
target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
|
||||
|
||||
# Wenn Zielzeit heute schon vorbei ist, nimm morgen
|
||||
if target <= now:
|
||||
target += datetime.timedelta(days=1)
|
||||
|
||||
return int((target - now).total_seconds())
|
||||
|
||||
|
||||
def main():
|
||||
logger.info("Trading Daemon gestartet.")
|
||||
|
||||
db_url = "http://questdb:9000"
|
||||
|
||||
# Startup: Initialer Sync
|
||||
if is_database_empty(db_url):
|
||||
logger.info("Datenbank ist leer. Starte initialen historischen Fetch...")
|
||||
run_task(historical=True)
|
||||
else:
|
||||
logger.info("Found existing data in database. Triggering catch-up sync...")
|
||||
# Run a normal task to fetch any missing data since the last run
|
||||
logger.info("Existierende Daten gefunden. Starte Catch-up Sync...")
|
||||
run_task(historical=False)
|
||||
logger.info("Catch-up sync completed. Waiting for scheduled run at 23:00.")
|
||||
logger.info("Catch-up Sync abgeschlossen.")
|
||||
|
||||
# Scheduling Konfiguration
|
||||
SCHEDULE_HOUR = 23
|
||||
SCHEDULE_MINUTE = 0
|
||||
last_run_date = None
|
||||
|
||||
logger.info(f"Warte auf täglichen Run um {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d}...")
|
||||
|
||||
while True:
|
||||
now = datetime.datetime.now()
|
||||
# Täglich um 23:00 Uhr
|
||||
if now.hour == 23 and now.minute == 0:
|
||||
run_task(historical=False)
|
||||
# Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern
|
||||
time.sleep(61)
|
||||
today = now.date()
|
||||
|
||||
# Check alle 30 Sekunden
|
||||
time.sleep(30)
|
||||
# Prüfe ob wir heute schon gelaufen sind
|
||||
already_ran_today = (last_run_date == today)
|
||||
|
||||
# Prüfe ob wir im Zeitfenster sind (23:00 - 23:59)
|
||||
in_schedule_window = (now.hour == SCHEDULE_HOUR and now.minute >= SCHEDULE_MINUTE)
|
||||
|
||||
if in_schedule_window and not already_ran_today:
|
||||
logger.info(f"Geplanter Task startet ({now.strftime('%Y-%m-%d %H:%M:%S')})...")
|
||||
run_task(historical=False)
|
||||
last_run_date = today
|
||||
logger.info("Geplanter Task abgeschlossen. Warte auf nächsten Tag...")
|
||||
|
||||
# Dynamische Sleep-Zeit: Kurz vor Zielzeit öfter prüfen
|
||||
seconds_until_target = calculate_seconds_until_target(SCHEDULE_HOUR, SCHEDULE_MINUTE)
|
||||
|
||||
if seconds_until_target > 3600:
|
||||
# Mehr als 1 Stunde: Schlafe 30 Minuten
|
||||
sleep_time = 1800
|
||||
elif seconds_until_target > 300:
|
||||
# 5 Minuten bis 1 Stunde: Schlafe 5 Minuten
|
||||
sleep_time = 300
|
||||
else:
|
||||
# Unter 5 Minuten: Schlafe 30 Sekunden
|
||||
sleep_time = 30
|
||||
|
||||
time.sleep(sleep_time)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
5
pyvenv.cfg
Normal file
5
pyvenv.cfg
Normal file
@@ -0,0 +1,5 @@
|
||||
home = /opt/homebrew/opt/python@3.13/bin
|
||||
include-system-site-packages = false
|
||||
version = 3.13.2
|
||||
executable = /opt/homebrew/Cellar/python@3.13/3.13.2/Frameworks/Python.framework/Versions/3.13/bin/python3.13
|
||||
command = /opt/homebrew/opt/python@3.13/bin/python3.13 -m venv /Users/melchiorreimers/Documents/trading_daemon
|
||||
79
scripts/inspect_gzip.py
Normal file
79
scripts/inspect_gzip.py
Normal file
@@ -0,0 +1,79 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Utility-Script zum Inspizieren von gzip-komprimierten JSON-Dateien.
|
||||
Verarbeitet Dateien streaming, ohne alles in den RAM zu laden.
|
||||
|
||||
Verwendung:
|
||||
python scripts/inspect_gzip.py <datei.json.gz> [--limit N] [--output datei.json]
|
||||
"""
|
||||
import gzip
|
||||
import json
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def inspect_gzip_file(filepath: str, limit: int = None, output_file: str = None):
|
||||
"""
|
||||
Liest eine gzip-komprimierte NDJSON-Datei und gibt die Inhalte aus.
|
||||
|
||||
Args:
|
||||
filepath: Pfad zur .json.gz Datei
|
||||
limit: Maximale Anzahl der auszugebenden Records (None = alle)
|
||||
output_file: Optional: Ausgabe in Datei statt stdout
|
||||
"""
|
||||
path = Path(filepath)
|
||||
if not path.exists():
|
||||
print(f"Fehler: Datei '{filepath}' nicht gefunden.", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
count = 0
|
||||
output = open(output_file, 'w', encoding='utf-8') if output_file else sys.stdout
|
||||
|
||||
try:
|
||||
with gzip.open(filepath, mode='rt', encoding='utf-8') as f:
|
||||
for line in f:
|
||||
if not line.strip():
|
||||
continue
|
||||
|
||||
try:
|
||||
record = json.loads(line)
|
||||
# Pretty-print einzelner Record
|
||||
json.dump(record, output, indent=2, ensure_ascii=False)
|
||||
output.write('\n')
|
||||
count += 1
|
||||
|
||||
if limit and count >= limit:
|
||||
break
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"JSON-Fehler in Zeile {count + 1}: {e}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
print(f"\n--- {count} Records verarbeitet ---", file=sys.stderr)
|
||||
|
||||
finally:
|
||||
if output_file and output != sys.stdout:
|
||||
output.close()
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Inspiziert gzip-komprimierte JSON-Dateien (NDJSON-Format)'
|
||||
)
|
||||
parser.add_argument('file', help='Pfad zur .json.gz Datei')
|
||||
parser.add_argument('--limit', '-n', type=int, default=10,
|
||||
help='Maximale Anzahl der Records (default: 10, 0 = alle)')
|
||||
parser.add_argument('--output', '-o', type=str,
|
||||
help='Ausgabe in Datei statt stdout')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
limit = args.limit if args.limit > 0 else None
|
||||
return inspect_gzip_file(args.file, limit=limit, output_file=args.output)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
||||
@@ -865,6 +865,39 @@ class AnalyticsWorker:
|
||||
if i % 10 == 0:
|
||||
time.sleep(1)
|
||||
|
||||
def delete_analytics_for_date(self, date: datetime.date):
|
||||
"""Löscht alle Analytics-Daten für ein bestimmtes Datum, damit sie neu berechnet werden können."""
|
||||
date_str = date.strftime('%Y-%m-%d')
|
||||
next_day = date + datetime.timedelta(days=1)
|
||||
next_day_str = next_day.strftime('%Y-%m-%d')
|
||||
|
||||
tables = ['analytics_custom', 'analytics_exchange_daily', 'analytics_daily_summary']
|
||||
|
||||
for table in tables:
|
||||
try:
|
||||
# QuestDB DELETE syntax
|
||||
delete_query = f"DELETE FROM {table} WHERE timestamp >= '{date_str}' AND timestamp < '{next_day_str}'"
|
||||
response = requests.get(
|
||||
f"{self.questdb_url}/exec",
|
||||
params={'query': delete_query},
|
||||
auth=self.auth,
|
||||
timeout=30
|
||||
)
|
||||
if response.status_code == 200:
|
||||
logger.debug(f"Deleted old analytics from {table} for {date}")
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not delete from {table} for {date}: {e}")
|
||||
|
||||
def force_recalculate_date(self, date: datetime.date):
|
||||
"""Erzwingt Neuberechnung der Analytics für ein Datum (löscht alte Daten zuerst)."""
|
||||
logger.info(f"Force recalculating analytics for {date}...")
|
||||
|
||||
# Lösche alte Analytics-Daten für dieses Datum
|
||||
self.delete_analytics_for_date(date)
|
||||
|
||||
# Berechne neu
|
||||
self.process_date(date)
|
||||
|
||||
def run(self):
|
||||
"""Hauptschleife des Workers"""
|
||||
logger.info("Analytics Worker started.")
|
||||
@@ -874,35 +907,26 @@ class AnalyticsWorker:
|
||||
logger.error("Failed to connect to QuestDB. Exiting.")
|
||||
return
|
||||
|
||||
# Initiale Berechnung fehlender Tage (inkl. gestern und heute)
|
||||
# Initiale Berechnung fehlender Tage
|
||||
logger.info("Checking for missing dates...")
|
||||
self.process_missing_dates()
|
||||
|
||||
# Stelle sicher, dass gestern und heute verarbeitet werden
|
||||
# IMMER heute und gestern neu berechnen (da neue Trades hinzukommen können)
|
||||
today = datetime.date.today()
|
||||
yesterday = today - datetime.timedelta(days=1)
|
||||
|
||||
logger.info(f"Ensuring yesterday ({yesterday}) and today ({today}) are processed...")
|
||||
# Prüfe alle drei Tabellen
|
||||
existing_custom = self.get_existing_dates('analytics_custom')
|
||||
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
|
||||
existing_summary = self.get_existing_dates('analytics_daily_summary')
|
||||
existing_dates = existing_custom | existing_exchange | existing_summary
|
||||
logger.info(f"Force recalculating yesterday ({yesterday}) and today ({today}) - new trades may have been added...")
|
||||
|
||||
if yesterday not in existing_dates:
|
||||
logger.info(f"Processing yesterday's data: {yesterday}")
|
||||
self.process_date(yesterday)
|
||||
# Gestern immer neu berechnen
|
||||
self.force_recalculate_date(yesterday)
|
||||
|
||||
# Heute wird verarbeitet, wenn es bereits Trades gibt
|
||||
if today not in existing_dates:
|
||||
# Prüfe ob es heute schon Trades gibt
|
||||
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
||||
data = self.query_questdb(query)
|
||||
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
||||
logger.info(f"Found trades for today ({today}), processing...")
|
||||
self.process_date(today)
|
||||
else:
|
||||
logger.info(f"No trades found for today ({today}) yet, will process later")
|
||||
# Heute nur wenn es Trades gibt
|
||||
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
||||
data = self.query_questdb(query)
|
||||
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
||||
self.force_recalculate_date(today)
|
||||
else:
|
||||
logger.info(f"No trades found for today ({today}) yet, will process later")
|
||||
|
||||
# Hauptschleife: Prüfe regelmäßig auf fehlende Tage
|
||||
logger.info("Starting main loop - checking for missing dates every hour...")
|
||||
@@ -917,32 +941,24 @@ class AnalyticsWorker:
|
||||
self.process_missing_dates()
|
||||
last_check_hour = current_hour
|
||||
|
||||
# Stelle sicher, dass gestern und heute verarbeitet wurden
|
||||
# IMMER heute und gestern neu berechnen
|
||||
today = now.date()
|
||||
yesterday = today - datetime.timedelta(days=1)
|
||||
# Prüfe alle drei Tabellen
|
||||
existing_custom = self.get_existing_dates('analytics_custom')
|
||||
existing_exchange = self.get_existing_dates('analytics_exchange_daily')
|
||||
existing_summary = self.get_existing_dates('analytics_daily_summary')
|
||||
existing_dates = existing_custom | existing_exchange | existing_summary
|
||||
|
||||
if yesterday not in existing_dates:
|
||||
logger.info(f"Processing yesterday's data: {yesterday}")
|
||||
self.process_date(yesterday)
|
||||
logger.info(f"Hourly recalculation of yesterday ({yesterday}) and today ({today})...")
|
||||
self.force_recalculate_date(yesterday)
|
||||
|
||||
# Prüfe heute, ob es Trades gibt
|
||||
if today not in existing_dates:
|
||||
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
||||
data = self.query_questdb(query)
|
||||
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
||||
logger.info(f"Found trades for today ({today}), processing...")
|
||||
self.process_date(today)
|
||||
query = f"select count(*) from trades where date_trunc('day', timestamp) = '{today}'"
|
||||
data = self.query_questdb(query)
|
||||
if data and data.get('dataset') and data['dataset'][0][0] and data['dataset'][0][0] > 0:
|
||||
self.force_recalculate_date(today)
|
||||
|
||||
# Prüfe ob es Mitternacht ist (00:00) - verarbeite dann gestern
|
||||
if now.hour == 0 and now.minute == 0:
|
||||
yesterday = (now - datetime.timedelta(days=1)).date()
|
||||
logger.info(f"Midnight reached - processing yesterday's data: {yesterday}")
|
||||
self.process_date(yesterday)
|
||||
logger.info(f"Midnight reached - force recalculating yesterday's data: {yesterday}")
|
||||
self.force_recalculate_date(yesterday)
|
||||
# Warte 61s, um Mehrfachausführung zu verhindern
|
||||
time.sleep(61)
|
||||
|
||||
|
||||
354
src/exchanges/boersenag.py
Normal file
354
src/exchanges/boersenag.py
Normal file
@@ -0,0 +1,354 @@
|
||||
"""
|
||||
Börsenag Exchange Fetcher
|
||||
Unterstützt: DUSA, DUSB, DUSC, DUSD, HAMA, HAMB, HANA, HANB
|
||||
|
||||
Datenquelle: https://www.boersenag.de/mifid-ii-delayed-data/
|
||||
URL-Format: https://cld42.boersenag.de/m13data/data/Mifir13DelayedData_{MIC}_{SEQUENCE}_{TIMESTAMP}.csv
|
||||
"""
|
||||
|
||||
import requests
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import List, Optional
|
||||
from .base import BaseExchange, Trade
|
||||
import re
|
||||
|
||||
# Rate-Limiting Konfiguration
|
||||
RATE_LIMIT_DELAY = 0.3 # Sekunden zwischen Requests
|
||||
|
||||
# Browser User-Agent für Zugriff
|
||||
HEADERS = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||
'Accept': 'text/csv, text/plain, */*',
|
||||
'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8',
|
||||
'Referer': 'https://www.boersenag.de/',
|
||||
}
|
||||
|
||||
# Exchange-Konfiguration
|
||||
BOERSENAG_EXCHANGES = {
|
||||
'DUSA': {
|
||||
'name': 'DUSA',
|
||||
'full_name': 'Börse Düsseldorf Regulierter Markt',
|
||||
'mic': 'DUSA',
|
||||
},
|
||||
'DUSB': {
|
||||
'name': 'DUSB',
|
||||
'full_name': 'Börse Düsseldorf Freiverkehr',
|
||||
'mic': 'DUSB',
|
||||
},
|
||||
'DUSC': {
|
||||
'name': 'DUSC',
|
||||
'full_name': 'Börse Düsseldorf Quotrix Regulierter Markt',
|
||||
'mic': 'DUSC',
|
||||
},
|
||||
'DUSD': {
|
||||
'name': 'DUSD',
|
||||
'full_name': 'Börse Düsseldorf Quotrix Freiverkehr',
|
||||
'mic': 'DUSD',
|
||||
},
|
||||
'HAMA': {
|
||||
'name': 'HAMA',
|
||||
'full_name': 'Börse Hamburg Regulierter Markt',
|
||||
'mic': 'HAMA',
|
||||
},
|
||||
'HAMB': {
|
||||
'name': 'HAMB',
|
||||
'full_name': 'Börse Hamburg Freiverkehr',
|
||||
'mic': 'HAMB',
|
||||
},
|
||||
'HANA': {
|
||||
'name': 'HANA',
|
||||
'full_name': 'Börse Hannover Regulierter Markt',
|
||||
'mic': 'HANA',
|
||||
},
|
||||
'HANB': {
|
||||
'name': 'HANB',
|
||||
'full_name': 'Börse Hannover Freiverkehr',
|
||||
'mic': 'HANB',
|
||||
},
|
||||
}
|
||||
|
||||
BASE_URL = "https://cld42.boersenag.de/m13data/data"
|
||||
|
||||
|
||||
class BoersenagBase(BaseExchange):
|
||||
"""
|
||||
Basisklasse für Börsenag Exchanges (DUSA, DUSB, DUSC, DUSD, HAMA, HAMB, HANA, HANB)
|
||||
|
||||
CSV Format (Semikolon-separiert):
|
||||
MIC; ISIN; displayName; time; price; size; supplement
|
||||
- time: "28.01.2026 15:48:42" (deutsches Format)
|
||||
- price: "46,18" (deutsches Dezimalformat)
|
||||
- size: Menge (kann 0 sein für Kurse ohne Trade)
|
||||
- supplement: "bez " = bezahlt (echter Trade), "G " = Geld (Bid), "B " = Brief (Ask)
|
||||
"""
|
||||
|
||||
@property
|
||||
def mic(self) -> str:
|
||||
"""MIC Code für die Börse"""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.mic
|
||||
|
||||
def _generate_file_urls(self, target_date: datetime.date) -> List[str]:
|
||||
"""
|
||||
Generiert mögliche Datei-URLs für ein bestimmtes Datum.
|
||||
Format: Mifir13DelayedData_{MIC}_{SEQUENCE}_{TIMESTAMP}.csv
|
||||
|
||||
Die Dateien werden stündlich mit einem Zeitstempel generiert.
|
||||
Wir versuchen verschiedene Sequenznummern und Zeitstempel.
|
||||
"""
|
||||
urls = []
|
||||
|
||||
# Formatiere Datum im URL-Format: YYYYMMDD
|
||||
date_str = target_date.strftime('%Y%m%d')
|
||||
|
||||
# Mögliche Sequenznummern (beobachtet: 000000DF, aber könnte variieren)
|
||||
sequences = ['000000DF', '00000000', '000000DD', '000000DE']
|
||||
|
||||
# Generiere URLs für verschiedene Uhrzeiten (alle 15 Minuten)
|
||||
for hour in range(0, 24):
|
||||
for minute in [0, 15, 30, 45]:
|
||||
timestamp = f"{date_str}{hour:02d}{minute:02d}000000"
|
||||
for seq in sequences:
|
||||
url = f"{BASE_URL}/Mifir13DelayedData_{self.mic}_{seq}_{timestamp}.csv"
|
||||
urls.append(url)
|
||||
|
||||
# Versuche auch die einfachste Form mit 0000000000
|
||||
for seq in sequences:
|
||||
url = f"{BASE_URL}/Mifir13DelayedData_{self.mic}_{seq}_{date_str}0000000000.csv"
|
||||
urls.append(url)
|
||||
|
||||
return urls
|
||||
|
||||
def _parse_german_datetime(self, dt_str: str) -> Optional[datetime]:
|
||||
"""Parst deutsches Datumsformat: DD.MM.YYYY HH:MM:SS"""
|
||||
try:
|
||||
# Format: "28.01.2026 15:48:42"
|
||||
dt = datetime.strptime(dt_str.strip(), '%d.%m.%Y %H:%M:%S')
|
||||
# In UTC konvertieren (Deutsche Zeit = MEZ/MESZ, hier vereinfacht als UTC+1)
|
||||
# Für korrektes Handling würde pytz benötigt
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def _parse_german_number(self, num_str: str) -> Optional[float]:
|
||||
"""Parst deutsches Zahlenformat: 1.234,56 -> 1234.56"""
|
||||
try:
|
||||
# Entferne Tausender-Trennzeichen (Punkt) und ersetze Dezimalkomma
|
||||
clean = num_str.strip().replace('.', '').replace(',', '.')
|
||||
return float(clean)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def _download_and_parse_file(self, url: str) -> List[Trade]:
|
||||
"""Lädt eine CSV-Datei herunter und parst die Trades"""
|
||||
trades = []
|
||||
|
||||
try:
|
||||
response = requests.get(url, headers=HEADERS, timeout=30)
|
||||
|
||||
if response.status_code == 404:
|
||||
return []
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
content = response.text
|
||||
if not content.strip():
|
||||
return []
|
||||
|
||||
lines = content.strip().split('\n')
|
||||
if len(lines) < 2: # Nur Header, keine Daten
|
||||
return []
|
||||
|
||||
# Erste Zeile ist Header
|
||||
# MIC; ISIN; displayName; time; price; size; supplement
|
||||
for line in lines[1:]:
|
||||
if not line.strip():
|
||||
continue
|
||||
|
||||
trade = self._parse_csv_line(line)
|
||||
if trade:
|
||||
trades.append(trade)
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
if e.response.status_code != 404:
|
||||
print(f"[{self.name}] HTTP error: {e}")
|
||||
except Exception as e:
|
||||
print(f"[{self.name}] Error downloading {url}: {e}")
|
||||
|
||||
return trades
|
||||
|
||||
def _parse_csv_line(self, line: str) -> Optional[Trade]:
|
||||
"""Parst eine einzelne CSV-Zeile"""
|
||||
try:
|
||||
# CSV ist Semikolon-separiert
|
||||
parts = line.split(';')
|
||||
if len(parts) < 7:
|
||||
return None
|
||||
|
||||
mic = parts[0].strip()
|
||||
isin = parts[1].strip()
|
||||
display_name = parts[2].strip().strip('"')
|
||||
time_str = parts[3].strip()
|
||||
price_str = parts[4].strip()
|
||||
size_str = parts[5].strip()
|
||||
supplement = parts[6].strip().strip('"').strip()
|
||||
|
||||
# Validiere ISIN
|
||||
if not isin or len(isin) != 12:
|
||||
return None
|
||||
|
||||
# Parse Timestamp
|
||||
timestamp = self._parse_german_datetime(time_str)
|
||||
if not timestamp:
|
||||
return None
|
||||
|
||||
# Parse Preis
|
||||
price = self._parse_german_number(price_str)
|
||||
if price is None or price <= 0:
|
||||
return None
|
||||
|
||||
# Parse Menge
|
||||
try:
|
||||
size = float(size_str)
|
||||
except ValueError:
|
||||
size = 0
|
||||
|
||||
# Nur echte Trades (size > 0) oder "bez" (bezahlt) aufnehmen
|
||||
# "G" = Geld (Bid), "B" = Brief (Ask) sind Kurse, keine Trades
|
||||
is_trade = size > 0 or 'bez' in supplement.lower()
|
||||
|
||||
if not is_trade:
|
||||
return None
|
||||
|
||||
# Bei size = 0 aber "bez" nehmen wir an, dass die Menge unbekannt ist (setze auf 1)
|
||||
if size <= 0:
|
||||
size = 1
|
||||
|
||||
return Trade(
|
||||
exchange=self.name,
|
||||
symbol=isin,
|
||||
isin=isin,
|
||||
price=price,
|
||||
quantity=size,
|
||||
timestamp=timestamp
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date:
|
||||
"""Findet den letzten Handelstag (überspringt Wochenenden)"""
|
||||
date = from_date
|
||||
if date.weekday() == 5: # Samstag
|
||||
date = date - timedelta(days=1)
|
||||
elif date.weekday() == 6: # Sonntag
|
||||
date = date - timedelta(days=2)
|
||||
return date
|
||||
|
||||
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
|
||||
"""Holt alle Trades vom letzten Handelstag"""
|
||||
all_trades = []
|
||||
|
||||
# Bestimme Zieldatum
|
||||
if since_date:
|
||||
target_date = since_date.date() if hasattr(since_date, 'date') else since_date
|
||||
else:
|
||||
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
|
||||
|
||||
# Überspringe Wochenenden
|
||||
original_date = target_date
|
||||
target_date = self._get_last_trading_day(target_date)
|
||||
|
||||
if target_date != original_date:
|
||||
print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}")
|
||||
|
||||
print(f"[{self.name}] Fetching trades for date: {target_date}")
|
||||
|
||||
# Generiere mögliche URLs
|
||||
urls = self._generate_file_urls(target_date)
|
||||
|
||||
successful = 0
|
||||
total_urls = len(urls)
|
||||
|
||||
# Versuche verschiedene URLs
|
||||
for i, url in enumerate(urls):
|
||||
trades = self._download_and_parse_file(url)
|
||||
if trades:
|
||||
all_trades.extend(trades)
|
||||
successful += 1
|
||||
print(f"[{self.name}] Found {len(trades)} trades from: {url.split('/')[-1]}")
|
||||
# Bei Erfolg müssen wir nicht alle anderen URLs probieren
|
||||
break
|
||||
|
||||
# Rate-Limiting
|
||||
if i < total_urls - 1:
|
||||
time.sleep(RATE_LIMIT_DELAY)
|
||||
|
||||
# Nach 20 fehlgeschlagenen Versuchen abbrechen
|
||||
if i > 20 and successful == 0:
|
||||
break
|
||||
|
||||
print(f"[{self.name}] Total trades fetched: {len(all_trades)}")
|
||||
return all_trades
|
||||
|
||||
|
||||
# Konkrete Exchange-Klassen
|
||||
class DUSAExchange(BoersenagBase):
|
||||
"""Börse Düsseldorf Regulierter Markt"""
|
||||
@property
|
||||
def mic(self) -> str:
|
||||
return "DUSA"
|
||||
|
||||
|
||||
class DUSBExchange(BoersenagBase):
|
||||
"""Börse Düsseldorf Freiverkehr"""
|
||||
@property
|
||||
def mic(self) -> str:
|
||||
return "DUSB"
|
||||
|
||||
|
||||
class DUSCExchange(BoersenagBase):
|
||||
"""Börse Düsseldorf Quotrix Regulierter Markt"""
|
||||
@property
|
||||
def mic(self) -> str:
|
||||
return "DUSC"
|
||||
|
||||
|
||||
class DUSDExchange(BoersenagBase):
|
||||
"""Börse Düsseldorf Quotrix Freiverkehr"""
|
||||
@property
|
||||
def mic(self) -> str:
|
||||
return "DUSD"
|
||||
|
||||
|
||||
class HAMAExchange(BoersenagBase):
|
||||
"""Börse Hamburg Regulierter Markt"""
|
||||
@property
|
||||
def mic(self) -> str:
|
||||
return "HAMA"
|
||||
|
||||
|
||||
class HAMBExchange(BoersenagBase):
|
||||
"""Börse Hamburg Freiverkehr"""
|
||||
@property
|
||||
def mic(self) -> str:
|
||||
return "HAMB"
|
||||
|
||||
|
||||
class HANAExchange(BoersenagBase):
|
||||
"""Börse Hannover Regulierter Markt"""
|
||||
@property
|
||||
def mic(self) -> str:
|
||||
return "HANA"
|
||||
|
||||
|
||||
class HANBExchange(BoersenagBase):
|
||||
"""Börse Hannover Freiverkehr"""
|
||||
@property
|
||||
def mic(self) -> str:
|
||||
return "HANB"
|
||||
@@ -2,16 +2,20 @@ import requests
|
||||
import gzip
|
||||
import json
|
||||
import io
|
||||
import re
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import List, Optional
|
||||
from .base import BaseExchange, Trade
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Rate-Limiting Konfiguration
|
||||
RATE_LIMIT_DELAY = 0.5 # Sekunden zwischen Requests
|
||||
RATE_LIMIT_RETRY_DELAY = 5 # Sekunden Wartezeit bei 429
|
||||
MAX_RETRIES = 3 # Maximale Wiederholungen bei 429
|
||||
MAX_RETRIES = 5 # Maximale Wiederholungen bei 429
|
||||
|
||||
# API URLs für Deutsche Börse
|
||||
API_URLS = {
|
||||
@@ -21,17 +25,47 @@ API_URLS = {
|
||||
}
|
||||
DOWNLOAD_BASE_URL = "https://mfs.deutsche-boerse.com/api/download"
|
||||
|
||||
# Browser User-Agent für Zugriff
|
||||
HEADERS = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||
'Accept': 'application/json, application/gzip, */*',
|
||||
'Referer': 'https://mfs.deutsche-boerse.com/',
|
||||
}
|
||||
# Liste von User-Agents für Rotation bei Rate-Limiting
|
||||
USER_AGENTS = [
|
||||
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
|
||||
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0',
|
||||
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
|
||||
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
|
||||
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0',
|
||||
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0',
|
||||
]
|
||||
|
||||
|
||||
class UserAgentRotator:
|
||||
"""Thread-safe User-Agent Rotation"""
|
||||
|
||||
def __init__(self):
|
||||
self._index = 0
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def get_headers(self, rotate: bool = False) -> dict:
|
||||
"""Gibt Headers mit aktuellem User-Agent zurück. Bei rotate=True wird zum nächsten gewechselt."""
|
||||
with self._lock:
|
||||
if rotate:
|
||||
self._index = (self._index + 1) % len(USER_AGENTS)
|
||||
return {
|
||||
'User-Agent': USER_AGENTS[self._index],
|
||||
'Accept': 'application/json, application/gzip, */*',
|
||||
'Referer': 'https://mfs.deutsche-boerse.com/',
|
||||
}
|
||||
|
||||
# Globale Instanz für User-Agent Rotation
|
||||
_ua_rotator = UserAgentRotator()
|
||||
|
||||
|
||||
class DeutscheBoerseBase(BaseExchange):
|
||||
"""Basisklasse für Deutsche Börse Exchanges (Xetra, Frankfurt, Quotrix)"""
|
||||
|
||||
# Regex für Dateinamen-Parsing (kompiliert für Performance)
|
||||
_FILENAME_PATTERN = re.compile(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})')
|
||||
|
||||
@property
|
||||
def base_url(self) -> str:
|
||||
"""Override in subclasses"""
|
||||
@@ -46,60 +80,73 @@ class DeutscheBoerseBase(BaseExchange):
|
||||
"""API URL für die Dateiliste"""
|
||||
return API_URLS.get(self.name, self.base_url)
|
||||
|
||||
def _handle_rate_limit(self, retry: int, context: str) -> None:
|
||||
"""Zentrale Rate-Limit Behandlung: rotiert User-Agent und wartet."""
|
||||
_ua_rotator.get_headers(rotate=True)
|
||||
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
|
||||
logger.warning(f"[{self.name}] Rate limited ({context}), rotating User-Agent and waiting {wait_time}s... (retry {retry + 1}/{MAX_RETRIES})")
|
||||
time.sleep(wait_time)
|
||||
|
||||
def _get_file_list(self) -> List[str]:
|
||||
"""Holt die Dateiliste von der JSON API"""
|
||||
try:
|
||||
api_url = self.api_url
|
||||
print(f"[{self.name}] Fetching file list from: {api_url}")
|
||||
response = requests.get(api_url, headers=HEADERS, timeout=30)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
files = data.get('CurrentFiles', [])
|
||||
|
||||
print(f"[{self.name}] API returned {len(files)} files")
|
||||
if files:
|
||||
print(f"[{self.name}] Sample files: {files[:3]}")
|
||||
return files
|
||||
|
||||
except Exception as e:
|
||||
print(f"[{self.name}] Error fetching file list from API: {e}")
|
||||
import traceback
|
||||
print(f"[{self.name}] Traceback: {traceback.format_exc()}")
|
||||
return []
|
||||
api_url = self.api_url
|
||||
|
||||
for retry in range(MAX_RETRIES):
|
||||
try:
|
||||
headers = _ua_rotator.get_headers(rotate=(retry > 0))
|
||||
logger.info(f"[{self.name}] Fetching file list from: {api_url}")
|
||||
response = requests.get(api_url, headers=headers, timeout=30)
|
||||
|
||||
if response.status_code == 429:
|
||||
self._handle_rate_limit(retry, "file list")
|
||||
continue
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
files = data.get('CurrentFiles', [])
|
||||
|
||||
logger.info(f"[{self.name}] API returned {len(files)} files")
|
||||
if files:
|
||||
logger.debug(f"[{self.name}] Sample files: {files[:3]}")
|
||||
return files
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
if e.response.status_code == 429:
|
||||
self._handle_rate_limit(retry, "file list HTTPError")
|
||||
continue
|
||||
logger.error(f"[{self.name}] HTTP error fetching file list: {e}")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"[{self.name}] Error fetching file list from API: {e}")
|
||||
break
|
||||
|
||||
return []
|
||||
|
||||
def _filter_files_for_date(self, files: List[str], target_date: datetime.date) -> List[str]:
|
||||
"""
|
||||
Filtert Dateien für ein bestimmtes Datum.
|
||||
Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz (mit Unterstrich!)
|
||||
Dateiformat: DETR-posttrade-YYYY-MM-DDTHH_MM.json.gz
|
||||
|
||||
Da Handel bis 22:00 MEZ geht (21:00/20:00 UTC), müssen wir auch
|
||||
Dateien nach Mitternacht UTC berücksichtigen.
|
||||
"""
|
||||
import re
|
||||
filtered = []
|
||||
|
||||
# Für den Vortag: Dateien vom target_date UND vom Folgetag (bis ~02:00 UTC)
|
||||
target_str = target_date.strftime('%Y-%m-%d')
|
||||
next_day = target_date + timedelta(days=1)
|
||||
next_day_str = next_day.strftime('%Y-%m-%d')
|
||||
|
||||
for file in files:
|
||||
# Extrahiere Datum aus Dateiname
|
||||
# Format: DETR-posttrade-2026-01-26T21_30.json.gz
|
||||
if target_str in file:
|
||||
filtered.append(file)
|
||||
elif next_day_str in file:
|
||||
# Prüfe ob es eine frühe Datei vom nächsten Tag ist (< 03:00 UTC)
|
||||
try:
|
||||
# Finde Timestamp im Dateinamen mit Unterstrich für Minuten
|
||||
match = re.search(r'posttrade-(\d{4}-\d{2}-\d{2})T(\d{2})_(\d{2})', file)
|
||||
if match:
|
||||
hour = int(match.group(2))
|
||||
if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag
|
||||
filtered.append(file)
|
||||
except Exception:
|
||||
pass
|
||||
match = self._FILENAME_PATTERN.search(file)
|
||||
if match:
|
||||
hour = int(match.group(2))
|
||||
if hour < 3: # Frühe Morgenstunden gehören noch zum Vortag
|
||||
filtered.append(file)
|
||||
|
||||
return filtered
|
||||
|
||||
@@ -110,17 +157,14 @@ class DeutscheBoerseBase(BaseExchange):
|
||||
|
||||
for retry in range(MAX_RETRIES):
|
||||
try:
|
||||
response = requests.get(full_url, headers=HEADERS, timeout=60)
|
||||
headers = _ua_rotator.get_headers(rotate=(retry > 0))
|
||||
response = requests.get(full_url, headers=headers, timeout=60)
|
||||
|
||||
if response.status_code == 404:
|
||||
# Datei nicht gefunden - normal für alte Dateien
|
||||
return []
|
||||
|
||||
if response.status_code == 429:
|
||||
# Rate-Limit erreicht - warten und erneut versuchen
|
||||
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
|
||||
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
|
||||
time.sleep(wait_time)
|
||||
self._handle_rate_limit(retry, "download")
|
||||
continue
|
||||
|
||||
response.raise_for_status()
|
||||
@@ -130,13 +174,11 @@ class DeutscheBoerseBase(BaseExchange):
|
||||
content = f.read().decode('utf-8')
|
||||
|
||||
if not content.strip():
|
||||
# Leere Datei
|
||||
return []
|
||||
|
||||
# NDJSON Format: Eine JSON-Zeile pro Trade
|
||||
lines = content.strip().split('\n')
|
||||
if not lines or (len(lines) == 1 and not lines[0].strip()):
|
||||
# Leere Datei
|
||||
return []
|
||||
|
||||
for line in lines:
|
||||
@@ -147,116 +189,146 @@ class DeutscheBoerseBase(BaseExchange):
|
||||
trade = self._parse_trade_record(record)
|
||||
if trade:
|
||||
trades.append(trade)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
except json.JSONDecodeError as e:
|
||||
logger.debug(f"[{self.name}] JSON decode error in {filename}: {e}")
|
||||
except Exception as e:
|
||||
logger.debug(f"[{self.name}] Error parsing record in {filename}: {e}")
|
||||
|
||||
# Erfolg - keine weitere Retry nötig
|
||||
# Erfolg
|
||||
break
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
if e.response.status_code == 429:
|
||||
wait_time = RATE_LIMIT_RETRY_DELAY * (retry + 1)
|
||||
print(f"[{self.name}] Rate limited, waiting {wait_time}s...")
|
||||
time.sleep(wait_time)
|
||||
self._handle_rate_limit(retry, "download HTTPError")
|
||||
continue
|
||||
elif e.response.status_code != 404:
|
||||
print(f"[{self.name}] HTTP error downloading {filename}: {e}")
|
||||
logger.error(f"[{self.name}] HTTP error downloading {filename}: {e}")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"[{self.name}] Error downloading/parsing {filename}: {e}")
|
||||
logger.error(f"[{self.name}] Error downloading/parsing {filename}: {e}")
|
||||
break
|
||||
|
||||
return trades
|
||||
|
||||
def _parse_timestamp(self, ts_str: str) -> Optional[datetime]:
|
||||
"""
|
||||
Parst einen Timestamp-String in ein datetime-Objekt.
|
||||
Unterstützt Nanosekunden durch Kürzung auf Mikrosekunden.
|
||||
"""
|
||||
if not ts_str:
|
||||
return None
|
||||
|
||||
# Ersetze 'Z' durch '+00:00' für ISO-Kompatibilität
|
||||
ts_str = ts_str.replace('Z', '+00:00')
|
||||
|
||||
# Kürze Nanosekunden auf Mikrosekunden (Python max 6 Dezimalstellen)
|
||||
if '.' in ts_str:
|
||||
# Split bei '+' oder '-' für Timezone
|
||||
if '+' in ts_str:
|
||||
time_part, tz_part = ts_str.rsplit('+', 1)
|
||||
tz_part = '+' + tz_part
|
||||
elif ts_str.count('-') > 2: # Negative Timezone
|
||||
time_part, tz_part = ts_str.rsplit('-', 1)
|
||||
tz_part = '-' + tz_part
|
||||
else:
|
||||
time_part, tz_part = ts_str, ''
|
||||
|
||||
if '.' in time_part:
|
||||
base, frac = time_part.split('.')
|
||||
frac = frac[:6] # Kürze auf 6 Stellen
|
||||
ts_str = f"{base}.{frac}{tz_part}"
|
||||
|
||||
return datetime.fromisoformat(ts_str)
|
||||
|
||||
def _extract_price(self, record: dict) -> Optional[float]:
|
||||
"""Extrahiert den Preis aus verschiedenen JSON-Formaten."""
|
||||
# Neues Format
|
||||
if 'lastTrade' in record:
|
||||
return float(record['lastTrade'])
|
||||
|
||||
# Altes Format mit verschachteltem Pric-Objekt
|
||||
pric = record.get('Pric')
|
||||
if pric is None:
|
||||
return None
|
||||
|
||||
if isinstance(pric, (int, float)):
|
||||
return float(pric)
|
||||
|
||||
if isinstance(pric, dict):
|
||||
# Versuche verschiedene Pfade
|
||||
if 'Pric' in pric:
|
||||
inner = pric['Pric']
|
||||
if isinstance(inner, dict):
|
||||
amt = inner.get('MntryVal', {}).get('Amt') or inner.get('Amt')
|
||||
if amt is not None:
|
||||
return float(amt)
|
||||
if 'MntryVal' in pric:
|
||||
amt = pric['MntryVal'].get('Amt')
|
||||
if amt is not None:
|
||||
return float(amt)
|
||||
|
||||
return None
|
||||
|
||||
def _extract_quantity(self, record: dict) -> Optional[float]:
|
||||
"""Extrahiert die Menge aus verschiedenen JSON-Formaten."""
|
||||
# Neues Format
|
||||
if 'lastQty' in record:
|
||||
return float(record['lastQty'])
|
||||
|
||||
# Altes Format
|
||||
qty = record.get('Qty')
|
||||
if qty is None:
|
||||
return None
|
||||
|
||||
if isinstance(qty, (int, float)):
|
||||
return float(qty)
|
||||
|
||||
if isinstance(qty, dict):
|
||||
val = qty.get('Unit') or qty.get('Qty')
|
||||
if val is not None:
|
||||
return float(val)
|
||||
|
||||
return None
|
||||
|
||||
def _parse_trade_record(self, record: dict) -> Optional[Trade]:
|
||||
"""
|
||||
Parst einen einzelnen Trade-Record aus dem JSON.
|
||||
|
||||
Aktuelles JSON-Format (NDJSON):
|
||||
{
|
||||
"messageId": "posttrade",
|
||||
"sourceName": "GAT",
|
||||
"isin": "US00123Q1040",
|
||||
"lastTradeTime": "2026-01-29T14:07:00.419000000Z",
|
||||
"lastTrade": 10.145,
|
||||
"lastQty": 500.0,
|
||||
"currency": "EUR",
|
||||
...
|
||||
}
|
||||
Unterstützte Formate:
|
||||
- Neues Format: isin, lastTrade, lastQty, lastTradeTime
|
||||
- Altes Format: FinInstrmId.Id, Pric, Qty, TrdDt/TrdTm
|
||||
"""
|
||||
try:
|
||||
# ISIN extrahieren - neues Format verwendet 'isin' lowercase
|
||||
isin = record.get('isin') or record.get('ISIN') or record.get('instrumentId') or record.get('FinInstrmId', {}).get('Id', '')
|
||||
# ISIN extrahieren
|
||||
isin = (
|
||||
record.get('isin') or
|
||||
record.get('ISIN') or
|
||||
record.get('instrumentId') or
|
||||
record.get('FinInstrmId', {}).get('Id', '')
|
||||
)
|
||||
if not isin:
|
||||
return None
|
||||
|
||||
# Preis extrahieren - neues Format: 'lastTrade'
|
||||
price = None
|
||||
if 'lastTrade' in record:
|
||||
price = float(record['lastTrade'])
|
||||
elif 'Pric' in record:
|
||||
pric = record['Pric']
|
||||
if isinstance(pric, dict):
|
||||
if 'Pric' in pric:
|
||||
inner = pric['Pric']
|
||||
if 'MntryVal' in inner:
|
||||
price = float(inner['MntryVal'].get('Amt', 0))
|
||||
elif 'Amt' in inner:
|
||||
price = float(inner['Amt'])
|
||||
elif 'MntryVal' in pric:
|
||||
price = float(pric['MntryVal'].get('Amt', 0))
|
||||
elif isinstance(pric, (int, float)):
|
||||
price = float(pric)
|
||||
|
||||
# Preis extrahieren
|
||||
price = self._extract_price(record)
|
||||
if price is None or price <= 0:
|
||||
return None
|
||||
|
||||
# Menge extrahieren - neues Format: 'lastQty'
|
||||
quantity = None
|
||||
if 'lastQty' in record:
|
||||
quantity = float(record['lastQty'])
|
||||
elif 'Qty' in record:
|
||||
qty = record['Qty']
|
||||
if isinstance(qty, dict):
|
||||
quantity = float(qty.get('Unit', qty.get('Qty', 0)))
|
||||
elif isinstance(qty, (int, float)):
|
||||
quantity = float(qty)
|
||||
|
||||
# Menge extrahieren
|
||||
quantity = self._extract_quantity(record)
|
||||
if quantity is None or quantity <= 0:
|
||||
return None
|
||||
|
||||
# Timestamp extrahieren - neues Format: 'lastTradeTime'
|
||||
# Timestamp extrahieren
|
||||
timestamp = None
|
||||
if 'lastTradeTime' in record:
|
||||
ts_str = record['lastTradeTime']
|
||||
# Format: "2026-01-29T14:07:00.419000000Z"
|
||||
# Python kann max 6 Dezimalstellen, also kürzen
|
||||
if '.' in ts_str:
|
||||
parts = ts_str.replace('Z', '').split('.')
|
||||
if len(parts) == 2 and len(parts[1]) > 6:
|
||||
ts_str = parts[0] + '.' + parts[1][:6] + '+00:00'
|
||||
else:
|
||||
ts_str = ts_str.replace('Z', '+00:00')
|
||||
else:
|
||||
ts_str = ts_str.replace('Z', '+00:00')
|
||||
timestamp = datetime.fromisoformat(ts_str)
|
||||
timestamp = self._parse_timestamp(record['lastTradeTime'])
|
||||
else:
|
||||
# Fallback für altes Format
|
||||
trd_dt = record.get('TrdDt', '')
|
||||
trd_tm = record.get('TrdTm', '00:00:00')
|
||||
|
||||
if not trd_dt:
|
||||
return None
|
||||
|
||||
ts_str = f"{trd_dt}T{trd_tm}"
|
||||
if '.' in ts_str:
|
||||
parts = ts_str.split('.')
|
||||
if len(parts[1]) > 6:
|
||||
ts_str = parts[0] + '.' + parts[1][:6]
|
||||
|
||||
timestamp = datetime.fromisoformat(ts_str)
|
||||
if trd_dt:
|
||||
timestamp = self._parse_timestamp(f"{trd_dt}T{trd_tm}")
|
||||
|
||||
if timestamp is None:
|
||||
return None
|
||||
@@ -273,22 +345,41 @@ class DeutscheBoerseBase(BaseExchange):
|
||||
timestamp=timestamp
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# Debug: Zeige ersten fehlgeschlagenen Record
|
||||
except (ValueError, TypeError, KeyError) as e:
|
||||
logger.debug(f"[{self.name}] Failed to parse trade record: {e}")
|
||||
return None
|
||||
|
||||
def _get_last_trading_day(self, from_date: datetime.date) -> datetime.date:
|
||||
"""
|
||||
Findet den letzten Handelstag (überspringt Wochenenden).
|
||||
Findet den letzten Handelstag (überspringt Wochenenden und bekannte Feiertage).
|
||||
Montag=0, Sonntag=6
|
||||
"""
|
||||
# Deutsche Börsen-Feiertage (fixe Daten, jedes Jahr gleich)
|
||||
# Bewegliche Feiertage (Ostern etc.) müssten jährlich berechnet werden
|
||||
fixed_holidays = {
|
||||
(1, 1), # Neujahr
|
||||
(5, 1), # Tag der Arbeit
|
||||
(12, 24), # Heiligabend
|
||||
(12, 25), # 1. Weihnachtstag
|
||||
(12, 26), # 2. Weihnachtstag
|
||||
(12, 31), # Silvester
|
||||
}
|
||||
|
||||
date = from_date
|
||||
# Wenn Samstag (5), gehe zurück zu Freitag
|
||||
if date.weekday() == 5:
|
||||
date = date - timedelta(days=1)
|
||||
# Wenn Sonntag (6), gehe zurück zu Freitag
|
||||
elif date.weekday() == 6:
|
||||
date = date - timedelta(days=2)
|
||||
max_iterations = 10 # Sicherheit gegen Endlosschleife
|
||||
|
||||
for _ in range(max_iterations):
|
||||
# Wochenende überspringen
|
||||
if date.weekday() == 5: # Samstag
|
||||
date = date - timedelta(days=1)
|
||||
elif date.weekday() == 6: # Sonntag
|
||||
date = date - timedelta(days=2)
|
||||
# Feiertag überspringen
|
||||
elif (date.month, date.day) in fixed_holidays:
|
||||
date = date - timedelta(days=1)
|
||||
else:
|
||||
break
|
||||
|
||||
return date
|
||||
|
||||
def fetch_latest_trades(self, include_yesterday: bool = True, since_date: datetime = None) -> List[Trade]:
|
||||
@@ -304,40 +395,36 @@ class DeutscheBoerseBase(BaseExchange):
|
||||
# Standard: Vortag
|
||||
target_date = (datetime.now(timezone.utc) - timedelta(days=1)).date()
|
||||
|
||||
# Überspringe Wochenenden
|
||||
# Überspringe Wochenenden und Feiertage
|
||||
original_date = target_date
|
||||
target_date = self._get_last_trading_day(target_date)
|
||||
|
||||
if target_date != original_date:
|
||||
print(f"[{self.name}] Skipping weekend: {original_date} -> {target_date}")
|
||||
logger.info(f"[{self.name}] Adjusted date: {original_date} -> {target_date} (weekend/holiday)")
|
||||
|
||||
print(f"[{self.name}] Fetching trades for date: {target_date}")
|
||||
logger.info(f"[{self.name}] Fetching trades for date: {target_date}")
|
||||
|
||||
# Hole Dateiliste von der API
|
||||
files = self._get_file_list()
|
||||
|
||||
if not files:
|
||||
print(f"[{self.name}] No files available from API")
|
||||
logger.warning(f"[{self.name}] No files available from API")
|
||||
return []
|
||||
|
||||
# Dateien für Zieldatum filtern
|
||||
target_files = self._filter_files_for_date(files, target_date)
|
||||
print(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)")
|
||||
logger.info(f"[{self.name}] {len(target_files)} files match target date (of {len(files)} total)")
|
||||
|
||||
if not target_files:
|
||||
print(f"[{self.name}] No files for target date found")
|
||||
logger.warning(f"[{self.name}] No files for target date found")
|
||||
return []
|
||||
|
||||
# Alle passenden Dateien herunterladen und parsen (mit Rate-Limiting)
|
||||
# Alle passenden Dateien herunterladen und parsen
|
||||
successful = 0
|
||||
failed = 0
|
||||
total_files = len(target_files)
|
||||
|
||||
if total_files == 0:
|
||||
print(f"[{self.name}] No files to download for date {target_date}")
|
||||
return []
|
||||
|
||||
print(f"[{self.name}] Starting download of {total_files} files...")
|
||||
logger.info(f"[{self.name}] Starting download of {total_files} files...")
|
||||
|
||||
for i, file in enumerate(target_files):
|
||||
trades = self._download_and_parse_file(file)
|
||||
@@ -353,9 +440,9 @@ class DeutscheBoerseBase(BaseExchange):
|
||||
|
||||
# Fortschritt alle 100 Dateien
|
||||
if (i + 1) % 100 == 0:
|
||||
print(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far")
|
||||
logger.info(f"[{self.name}] Progress: {i + 1}/{total_files} files, {successful} successful, {len(all_trades)} trades so far")
|
||||
|
||||
print(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades")
|
||||
logger.info(f"[{self.name}] Downloaded {successful} files ({failed} failed/empty), total {len(all_trades)} trades")
|
||||
return all_trades
|
||||
|
||||
|
||||
|
||||
@@ -1,30 +1,38 @@
|
||||
import requests
|
||||
import json
|
||||
from bs4 import BeautifulSoup
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Generator, Tuple
|
||||
from .base import BaseExchange, Trade
|
||||
import csv
|
||||
import io
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EIXExchange(BaseExchange):
|
||||
"""European Investor Exchange - CSV-basierte Trade-Daten."""
|
||||
|
||||
API_BASE_URL = "https://european-investor-exchange.com/api"
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "EIX"
|
||||
|
||||
def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None) -> List[Trade]:
|
||||
# EIX stores its file list in a separate API endpoint
|
||||
url = "https://european-investor-exchange.com/api/official-trades"
|
||||
|
||||
def get_files_to_process(self, limit: int = 1, since_date: datetime = None) -> List[dict]:
|
||||
"""Holt die Liste der zu verarbeitenden Dateien ohne sie herunterzuladen."""
|
||||
url = f"{self.API_BASE_URL}/official-trades"
|
||||
try:
|
||||
response = requests.get(url, timeout=15)
|
||||
response.raise_for_status()
|
||||
files_list = response.json()
|
||||
except Exception as e:
|
||||
print(f"Error fetching EIX file list: {e}")
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"[{self.name}] Fehler beim Abrufen der Dateiliste: {e}")
|
||||
return []
|
||||
except ValueError as e:
|
||||
logger.error(f"[{self.name}] Ungültiges JSON in Dateiliste: {e}")
|
||||
return []
|
||||
|
||||
# Filter files based on date in filename if since_date provided
|
||||
# Format: "kursblatt/2025/Kursblatt.2025-07-14.1752526803105.csv"
|
||||
# Filtere Dateien nach Datum wenn since_date angegeben
|
||||
filtered_files = []
|
||||
for item in files_list:
|
||||
file_key = item.get('fileName')
|
||||
@@ -33,90 +41,118 @@ class EIXExchange(BaseExchange):
|
||||
|
||||
if since_date:
|
||||
try:
|
||||
# Extract date from filename: Kursblatt.YYYY-MM-DD
|
||||
parts = file_key.split('/')[-1].split('.')
|
||||
# parts example: ['Kursblatt', '2025-07-14', '1752526803105', 'csv']
|
||||
if len(parts) >= 2:
|
||||
date_str = parts[1]
|
||||
file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
|
||||
file_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
|
||||
|
||||
# Check if file date is newer than since_date (compare dates only)
|
||||
if file_date.date() > since_date.date():
|
||||
if file_date.date() >= since_date.date():
|
||||
filtered_files.append(item)
|
||||
continue
|
||||
# If same day, we might need to check it too, but EIX seems to be daily files
|
||||
if file_date.date() == since_date.date():
|
||||
filtered_files.append(item)
|
||||
continue
|
||||
except Exception:
|
||||
# If parsing fails, default to including it (safety) or skipping?
|
||||
# Let's include it if we are not sure
|
||||
except (ValueError, IndexError) as e:
|
||||
# Dateiname hat unerwartetes Format - zur Sicherheit einschließen
|
||||
logger.debug(f"[{self.name}] Konnte Datum nicht aus {file_key} extrahieren: {e}")
|
||||
filtered_files.append(item)
|
||||
else:
|
||||
filtered_files.append(item)
|
||||
|
||||
# Sort files to process oldest to newest if doing a sync, or newest to oldest?
|
||||
# If we have limit=1 (default), we usually want the newest.
|
||||
# But if we are syncing history (since_date set), we probably want all of them.
|
||||
filtered_files.append(item)
|
||||
|
||||
# Logic: If since_date is set, we ignore limit (or use it as safety cap) and process ALL new files
|
||||
if since_date:
|
||||
files_to_process = filtered_files
|
||||
# Sort by date ? The API list seems chronological.
|
||||
return filtered_files
|
||||
else:
|
||||
# Default behavior: take the last N files (API returns oldest first usually?)
|
||||
# Let's assume list is chronological.
|
||||
if limit:
|
||||
files_to_process = files_list[-limit:]
|
||||
else:
|
||||
files_to_process = files_list
|
||||
return files_list[-limit:]
|
||||
return files_list
|
||||
|
||||
def fetch_trades_from_file(self, file_item: dict) -> List[Trade]:
|
||||
"""Lädt und parst eine einzelne CSV-Datei."""
|
||||
file_key = file_item.get('fileName')
|
||||
if not file_key:
|
||||
return []
|
||||
|
||||
csv_url = f"{self.API_BASE_URL}/trade-file-contents?key={file_key}"
|
||||
try:
|
||||
response = requests.get(csv_url, timeout=60)
|
||||
response.raise_for_status()
|
||||
return self._parse_csv(response.text)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"[{self.name}] Fehler beim Download von {file_key}: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.name}] Unerwarteter Fehler bei {file_key}: {e}")
|
||||
|
||||
return []
|
||||
|
||||
def fetch_trades_streaming(self, limit: int = 1, since_date: datetime = None) -> Generator[Tuple[str, List[Trade]], None, None]:
|
||||
"""
|
||||
Generator der Trades dateiweise zurückgibt.
|
||||
Yields: (filename, trades) Tupel
|
||||
"""
|
||||
files = self.get_files_to_process(limit=limit, since_date=since_date)
|
||||
|
||||
for item in files:
|
||||
file_key = item.get('fileName', 'unknown')
|
||||
trades = self.fetch_trades_from_file(item)
|
||||
if trades:
|
||||
yield (file_key, trades)
|
||||
|
||||
trades = []
|
||||
count = 0
|
||||
for item in files_to_process:
|
||||
file_key = item.get('fileName')
|
||||
|
||||
# Download the CSV
|
||||
csv_url = f"https://european-investor-exchange.com/api/trade-file-contents?key={file_key}"
|
||||
try:
|
||||
csv_response = requests.get(csv_url, timeout=20)
|
||||
if csv_response.status_code == 200:
|
||||
trades.extend(self._parse_csv(csv_response.text))
|
||||
count += 1
|
||||
# Only enforce limit if since_date is NOT set
|
||||
if not since_date and limit and count >= limit:
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"Error downloading EIX CSV {file_key}: {e}")
|
||||
|
||||
return trades
|
||||
def fetch_latest_trades(self, limit: int = 1, since_date: datetime = None, **kwargs) -> List[Trade]:
|
||||
"""
|
||||
Legacy-Methode für Kompatibilität.
|
||||
WARNUNG: Lädt alle Trades in den Speicher! Für große Datenmengen fetch_trades_streaming() verwenden.
|
||||
"""
|
||||
# Für kleine Requests (limit <= 5) normale Verarbeitung
|
||||
if limit and limit <= 5 and not since_date:
|
||||
all_trades = []
|
||||
for filename, trades in self.fetch_trades_streaming(limit=limit, since_date=since_date):
|
||||
all_trades.extend(trades)
|
||||
return all_trades
|
||||
|
||||
# Für große Requests: Warnung ausgeben und leere Liste zurückgeben
|
||||
logger.warning(f"[{self.name}] fetch_latest_trades() mit großem Dataset aufgerufen. Verwende Streaming.")
|
||||
return []
|
||||
|
||||
def _parse_csv(self, csv_text: str) -> List[Trade]:
|
||||
"""Parst CSV-Text zu Trade-Objekten."""
|
||||
trades = []
|
||||
parse_errors = 0
|
||||
|
||||
f = io.StringIO(csv_text)
|
||||
# Header: Trading day & Trading time UTC,Instrument Identifier,Quantity,Unit Price,Price Currency,Venue Identifier,Side
|
||||
reader = csv.DictReader(f, delimiter=',')
|
||||
for row in reader:
|
||||
|
||||
for row_num, row in enumerate(reader, start=2): # Start bei 2 wegen Header
|
||||
try:
|
||||
price = float(row['Unit Price'])
|
||||
quantity = float(row['Quantity'])
|
||||
isin = row['Instrument Identifier']
|
||||
symbol = isin # Often symbol is unknown, use ISIN
|
||||
time_str = row['Trading day & Trading time UTC']
|
||||
|
||||
# Format: 2026-01-22T06:30:00.617Z
|
||||
# Python 3.11+ supports ISO with Z, otherwise we strip Z
|
||||
# Preis und Menge validieren
|
||||
if price <= 0 or quantity <= 0:
|
||||
logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Preis/Menge: {price}/{quantity}")
|
||||
parse_errors += 1
|
||||
continue
|
||||
|
||||
ts_str = time_str.replace('Z', '+00:00')
|
||||
timestamp = datetime.fromisoformat(ts_str)
|
||||
|
||||
trades.append(Trade(
|
||||
exchange=self.name,
|
||||
symbol=symbol,
|
||||
symbol=isin,
|
||||
isin=isin,
|
||||
price=price,
|
||||
quantity=quantity,
|
||||
timestamp=timestamp
|
||||
))
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
except KeyError as e:
|
||||
logger.debug(f"[{self.name}] Zeile {row_num}: Fehlendes Feld {e}")
|
||||
parse_errors += 1
|
||||
except ValueError as e:
|
||||
logger.debug(f"[{self.name}] Zeile {row_num}: Ungültiger Wert: {e}")
|
||||
parse_errors += 1
|
||||
except Exception as e:
|
||||
logger.warning(f"[{self.name}] Zeile {row_num}: Unerwarteter Fehler: {e}")
|
||||
parse_errors += 1
|
||||
|
||||
if parse_errors > 0:
|
||||
logger.debug(f"[{self.name}] {parse_errors} Zeilen konnten nicht geparst werden")
|
||||
|
||||
return trades
|
||||
|
||||
Reference in New Issue
Block a user