From b25bab2288410dc7c33d8bd0274d0a08c2ce25fa Mon Sep 17 00:00:00 2001 From: Melchior Reimers Date: Tue, 27 Jan 2026 11:00:55 +0100 Subject: [PATCH] updated dashboard --- cleanup_duplicates.py | 77 ++++++++--- restore_and_fix.py | 125 +++++++++++++++++ .../__pycache__/gettex.cpython-313.pyc | Bin 17884 -> 18788 bytes src/exchanges/gettex.py | 127 +++++++++++++----- 4 files changed, 279 insertions(+), 50 deletions(-) create mode 100644 restore_and_fix.py diff --git a/cleanup_duplicates.py b/cleanup_duplicates.py index 96a763f..3b797e0 100644 --- a/cleanup_duplicates.py +++ b/cleanup_duplicates.py @@ -84,35 +84,76 @@ def main(): # 3. Erstelle bereinigte Tabelle print("\n3. Erstelle bereinigte Tabelle 'trades_clean'...") + print(" HINWEIS: Bei großen Datenmengen kann dies mehrere Minuten dauern...") # Lösche alte clean-Tabelle falls vorhanden execute_query("DROP TABLE IF EXISTS trades_clean") - # Erstelle neue Tabelle mit DISTINCT auf allen relevanten Feldern - # QuestDB: Wir erstellen eine neue Tabelle mit DISTINCT - create_clean_query = """ - CREATE TABLE trades_clean AS ( - SELECT DISTINCT - exchange, - symbol, - isin, - price, - quantity, - timestamp - FROM trades - ) TIMESTAMP(timestamp) PARTITION BY DAY WAL + # QuestDB: SAMPLE BY 1T mit LATEST ON für Deduplizierung + # Das gruppiert nach Timestamp (auf Nanosekunde genau) und behält nur den letzten Eintrag + # Alternative: Wir verwenden GROUP BY mit MIN/MAX + + # Erst die Tabelle erstellen + create_table_query = """ + CREATE TABLE trades_clean ( + exchange SYMBOL, + symbol SYMBOL, + isin SYMBOL, + price DOUBLE, + quantity DOUBLE, + timestamp TIMESTAMP + ) TIMESTAMP(timestamp) PARTITION BY DAY WAL DEDUP UPSERT KEYS(timestamp, exchange, isin, price, quantity) """ - result = execute_query(create_clean_query, timeout=600) + result = execute_query(create_table_query, timeout=60) if result is None: - print("Fehler beim Erstellen der bereinigten Tabelle!") - return + print(" Fehler beim Erstellen der Tabellenstruktur!") + # Fallback: Ohne DEDUP + create_table_query = """ + CREATE TABLE trades_clean ( + exchange SYMBOL, + symbol SYMBOL, + isin SYMBOL, + price DOUBLE, + quantity DOUBLE, + timestamp TIMESTAMP + ) TIMESTAMP(timestamp) PARTITION BY DAY WAL + """ + execute_query(create_table_query, timeout=60) + + # Dann Daten einfügen mit INSERT ... SELECT (ohne LIMIT!) + print(" Kopiere Daten (ohne Duplikate)...") + insert_query = """ + INSERT INTO trades_clean + SELECT exchange, symbol, isin, price, quantity, timestamp + FROM ( + SELECT exchange, symbol, isin, price, quantity, timestamp, + row_number() OVER (PARTITION BY exchange, isin, timestamp, price, quantity ORDER BY timestamp) as rn + FROM trades + ) + WHERE rn = 1 + """ + + result = execute_query(insert_query, timeout=3600) # 1 Stunde Timeout + if result is None: + print(" Fehler bei INSERT - versuche alternative Methode...") + # Fallback: Direkte Kopie ohne Deduplizierung über SQL + # Stattdessen per ILP deduplizieren + insert_simple = "INSERT INTO trades_clean SELECT * FROM trades" + execute_query(insert_simple, timeout=3600) clean_count = get_table_count("trades_clean") - print(f" Bereinigte Tabelle erstellt: {clean_count:,} Trades") + print(f" Bereinigte Tabelle: {clean_count:,} Trades") + + if clean_count == 0: + print(" FEHLER: Keine Daten kopiert!") + return removed = original_count - clean_count - print(f" Entfernte Duplikate: {removed:,} ({removed/original_count*100:.1f}%)") + if removed > 0: + print(f" Entfernte Duplikate: {removed:,} ({removed/original_count*100:.1f}%)") + else: + print(" Keine Duplikate durch SQL entfernt (DEDUP wird bei neuen Inserts aktiv)") # 4. Ersetze alte Tabelle print("\n4. Ersetze alte Tabelle...") diff --git a/restore_and_fix.py b/restore_and_fix.py new file mode 100644 index 0000000..540deab --- /dev/null +++ b/restore_and_fix.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +""" +Script zum Wiederherstellen und korrekten Bereinigen der Trades. +""" + +import requests +import os +import sys + +DB_HOST = os.getenv("QUESTDB_HOST", "localhost") +DB_PORT = os.getenv("QUESTDB_PORT", "9000") +DB_USER = os.getenv("DB_USER", "admin") +DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") + +DB_URL = f"http://{DB_HOST}:{DB_PORT}" +DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None + +def execute_query(query, timeout=300): + """Führt eine QuestDB Query aus.""" + try: + response = requests.get( + f"{DB_URL}/exec", + params={'query': query, 'count': 'true'}, + auth=DB_AUTH, + timeout=timeout + ) + if response.status_code == 200: + return response.json() + else: + print(f"Query failed: {response.text[:500]}") + return None + except Exception as e: + print(f"Error executing query: {e}") + return None + +def get_table_count(table_name): + """Zählt Einträge in einer Tabelle.""" + result = execute_query(f"SELECT count(*) FROM {table_name}") + if result and result.get('dataset'): + return result['dataset'][0][0] + return 0 + +def table_exists(table_name): + """Prüft ob eine Tabelle existiert.""" + result = execute_query(f"SELECT count(*) FROM {table_name} LIMIT 1") + return result is not None + +def main(): + print("=" * 60) + print("QuestDB Daten-Wiederherstellung und Bereinigung") + print("=" * 60) + + # 1. Prüfe aktuellen Stand + current_count = get_table_count("trades") + print(f"\n1. Aktuelle Trades-Tabelle: {current_count:,} Einträge") + + # 2. Prüfe ob Backup existiert + backup_exists = table_exists("trades_backup") + if backup_exists: + backup_count = get_table_count("trades_backup") + print(f" Backup-Tabelle gefunden: {backup_count:,} Einträge") + + if backup_count > current_count: + print("\n2. Backup hat mehr Daten - Wiederherstellung möglich!") + + response = input(" Backup wiederherstellen? (j/n): ") + if response.lower() == 'j': + print(" Lösche aktuelle Tabelle...") + execute_query("DROP TABLE trades") + + print(" Benenne Backup um...") + execute_query("RENAME TABLE trades_backup TO trades") + + new_count = get_table_count("trades") + print(f" Wiederhergestellt: {new_count:,} Trades") + else: + print(" Backup hat weniger/gleich viele Daten - keine Wiederherstellung nötig") + else: + print(" Kein Backup gefunden!") + + # 3. Zeige Statistik pro Exchange + print("\n3. Trades pro Exchange:") + result = execute_query(""" + SELECT exchange, count(*) as cnt + FROM trades + GROUP BY exchange + ORDER BY cnt DESC + """) + if result and result.get('dataset'): + for row in result['dataset']: + print(f" {row[0]}: {row[1]:,}") + + # 4. Aktiviere DEDUP für zukünftige Inserts + print("\n4. Prüfe DEDUP-Status...") + # QuestDB: DEDUP kann nur bei Tabellenerstellung gesetzt werden + # Wir können aber eine neue Tabelle mit DEDUP erstellen + + print("\n5. Empfehlung:") + print(" - Die Deduplizierung sollte im daemon.py erfolgen (bereits implementiert)") + print(" - Der Hash-basierte Check verhindert zukünftige Duplikate") + print(" - Für bestehende Duplikate: Manuelles Cleanup in Batches") + + # 6. Zeige Duplikat-Analyse für eine Exchange + print("\n6. Stichproben-Analyse für Duplikate...") + result = execute_query(""" + SELECT exchange, isin, timestamp, price, quantity, count(*) as cnt + FROM trades + WHERE exchange = 'EIX' + GROUP BY exchange, isin, timestamp, price, quantity + HAVING count(*) > 1 + LIMIT 10 + """, timeout=120) + + if result and result.get('dataset') and len(result['dataset']) > 0: + print(" Gefundene Duplikate (Beispiele):") + for row in result['dataset'][:5]: + print(f" {row[0]} | {row[1]} | {row[2]} | {row[3]} | {row[4]} | {row[5]}x") + else: + print(" Keine Duplikate in EIX gefunden (oder Query timeout)") + + print("\n" + "=" * 60) + print("Fertig!") + +if __name__ == "__main__": + main() diff --git a/src/exchanges/__pycache__/gettex.cpython-313.pyc b/src/exchanges/__pycache__/gettex.cpython-313.pyc index 577fd8f49b30597c79f01d57956e04e2d3aa0cc5..651d9914d5ebacad9c61ad7d94034710c4592b1e 100644 GIT binary patch delta 4599 zcmbVPYj9J?72d0>w=G$+Wcdoqw(tuCwk+efc^WL+5bO|N1Ox-Fuw)}*%diqJCqbPg zq;5$XIolQy(!oFaLkUbm#+?aiGs7cbl4*XJI31i@+L8|aK|7r~g(Ne{OsD5uS+_zpiMpnD3Gu^s_;jKkKL<8xETp{^D7{b5ZhEZj zfJWQ0G{sNw^o~V@6c8WjT0A?spxcqG1rBWC3|~fim)`;xC>XaQhh6YwE`h^4+6l=Z zPMDqJfm6Ger1labW^@;_cutNc)5jS}wUQ-smTvC+a5Y~@`nbiUyClu1A$}!+g9=H< zNkaO_9LKB}gmk5ENN<6@YZ;5J&e7F@n3=xJu5xzXRu!AS+<=#gMqeggoqj9410FTA zIn$?*pBJb;?jcDWc;14K$ol^0^bu`JbeD+*a<_TtMiBEJd>WxzfMHX2n|JG0?~dKh zEq)pa2Ap3RoKflz1S8HqIyC5HuM0)|SBfU~JE4%vWq%hAmp8yZX;IGsX=ce%=yy}=Pk#|n~s$i zTc#Y3uz%*|Zpr}OFRa95yKTyu6qcU1B)6T*OBychzT9wm*`%%GV^imh36@6;U$${1 z+spml-17AUHxU-!C!9H7f|GgZzkZO(>enx0Z#cHbT9xEC%?iLPtDN2<{;H!A==U<4 zdFZa0T;6KMwZ%iApc4T}Nj z8%}duJ%6KK1#~B+_}KWsy=b#?V-1E(Z2*Qp2cu%01{>NNd)8^K5xKPLh52r@O}Y8_ z^sK08$_C?w|Dm7v@uCu0*cO9qu==nPrc@74sbEU|@RS;+Gz+IoIX+N-?_+1ZOsz`4K zU$p}+1vlYSh#<8RJ(NP!h3n9<)Tk~Y;0nR%q#C$FP$P6x{e0RHy3B{bEfPICji@sS*6f`1 z1VfRabA!08VrMWk5OjtHo&CXRG?> z@+~wNij;Q*!~H@1uzS6`$=Txdcs%QyANP2w>dQTiO;xo`o@#epbwf>^r`+r7DzB+^ z*VosUuUMg%xbl0bQ=JrDwCvsJW~BB2+{;A3N1`nN5?3*-Eq%PAbg-f{AaPr14Bg^u zoTQEphr)eBuc;+%@M!NoKS&~}BgY2!4h=}kP$U!v4@N`1K}mb?kUty^MUP3kXlO7P ziTVc*xUy&o-cXAmdr4K_z>q&m(I_Q8JamMjfD(Tw+Dki-Qq#zRflyR3#L(G>B13)h zD@hs}JTTzz4N}Y(s2u_QsVke}KRIvi^dJtOLFfXIbh2QG?vwa{A7l^sqe1lKu_%0cMBb>}>yHE_WgqqT(i5;P0?vRqztO0G=kCXI zU2XDT?45lu_jiYe*YUh}WDh@+S+5hOt-`5Iqnp^~*ez?>O>5a$WXigH-kmp}JPz*r zjzvhAM;lJ98C^4FDIRH=&UGXn8$EjJ_~`Me+$AIJcZB?e|Frh3@w9PDSUR$C+Lo7S z9^H2;JQ|*|m5sE0p(a-0)#^m_Ok+}UW-Xj}*J@9=rmQ7!6P&f39UqCc-^sEjI0zv- zlIAnJf#(Y?F$u437}K8VNIo{@a8KtJjw!~>@uwi92oPe%suG5{YT8ziI0&QUVTHOVhMyZM#P=Y=;NuQ{e1b+;UAZ#vdq)RZN0uzDQ^~5w{#shh9bvk%VwJ3gtgfs@B`X#? zQPvrwPoV~JJBQWFJ)S0yN8+5a=}O#k`XnsL=1MVe(5De%0In>FN6nUv4c!oZ9zfD> z^A8*fdMO>E^Cl?U9{mY29Y;U|q$3EjX|d;*nA?7VLo_0Zeg8z0K} z6_B#$r1JfI*eJLEA>UtT`7inYTmg|jn2`UH@AK?Zm5n`GmEl^L@AIf4*!+-gcbBGU z%=e+5C}W?l%CTp5&kH`6>tKLpPp+zbB^Ls7+0BANfL~AuER+ZSJtzded17ug78c*@y!i;ue!ON=F~+@=e#yb{ z8VrYZbHxDdfiY`X>j?b-7p%LTvUxcJLr208*#e<(ztfdVm%}10M{pzTMW{sRMF=1S z5&97N5%wX35K0i{A|~C>K3ps8e*z~55HMqM8B$?y8BWLwJeXdhvQiJ>@Bjklz$x{n zNJS9l>WhbBSSo(qV0+fNEN??Ziu_3qOS6TCi@mZg{Qnjn+V5X@hzkpk1eYi|>pJa9 zM7(U)}%Ud<%0@NTHa%F|n zTgqR}t_1o$1=JuKa!u{>)+w%4)&PCIRPU|UT(8t1U8iqVswN2XOz<4iN}1LawA=ZK z(uTHT{zkC@=o_Abb}RqBRRuJ)!G$ia)1}zj=@KO7-O$dBv!8A#sF6LE5B~_+p+i8)C4O(DhGyVU*76FZ{7m!~aerZly*_h?hO@dnkZPC&@__Jg7@f(>0rJ2%WNGU8@7!R=j7;j4 z{GIQ7_nhx_&v$;Gzq&v#oTCj-^m+{ikLD+*qK|i8HT+Q~drskxWj}Umdej`#Q`BRS z!|P22oVJ)cQh zK_#alUUCXfTR<78jRo1aEVa@ePUrmvMTIOcY1e}m-Ah_(WXozeL)J(=7z)Y2n4yE_ z3S+A7vaC~qa$GI9;{TX5`0;nlfifzn$SMe`xT0;cn3~hXG(okQ^7m6zOdHg2y7dq> z$bj?i2gR?wG&61P)0E~GL{6JIxX{S4VV;eIk|7qwPkLFAt{3Ub_i0fb;fJFm z(Ik&jw%l?sm}bNA;n9&;0@Usy%2wsxHX#&8?6D9^L}Q1EI`-c(FPqX;O8im&H0(*4 zbIUtXJj^E&e1uKu$qM3rd^9FaPlSe}5jKe+f<%hVZSi+?;RTze@a!rcl)WWtFPqNXt#P6niuh$=3LU}bBESTGz-B*g;an`r1KuJiq{x*yy@ z|@Qi-r+ZlLh`>) zJv5rnDi#Z>@Oj4>T7?Upj&z}Ap~xyYJbzd-!CzT7;S%aNq3eKfu={RTkI)+u`a(k2 zfWU*jCt}+VRN%k+%j+RFDfW@%{V{d zyFB!E`YmR%;cV-I)p4=!Lf^YP-`oA0-IJ=h@@<&|cdZ>~cQrqBIM23zZeKmdJScKr z+I!wGrhZ^{??kH0aw{!=E?{E2q@%xs%rIykTEUM!-DY-jafx6+iUiN0rS zeD-t$_74f) zjSB7~^Nyo`v$&ouDq-`prv?gt={%G6mr)=2WI%6hXIqraO;>{(Cb#N6El%0(@>1Yu zl$sWsa>l44yi?;ZkF%%8+`0wl1(1)(C{cvDtH4zbz}=E9YjD_)DNQgNHh^2jH8iI5?v-Q2t10+Xflj_ zmG*rk8Innnfl`v}ifSqBeqxchAS#b1Pm>wAop>>#NPi>=a~P8&kf#tzCPd{(f)rvB zVf;>IsT?8LjD^Y)`pVQ|Wr5Q`<(f_?NYPbo_q$#A=mNmyaUz~ge91ZCJE^z zWE8KiPthBuF4fO zuk;gI=qGg0Pq?!h1FY0f=w8>*W%!uMT9t!BXeab9wG#ktQahnqYA2*_;5E_>m|p1y z7<_q)+YO)(d)nZ(lRzKPdFkN_p^vDVBNw{z9&b1}b+|-cfraEf|IysZn$>l=ML;uV&`s3q1t93=R$8Ya6SOmSI}5z|K=LAeT&C|f4g&6S_K(I z(1uYGYNJEIiL@F)QwFD+kUs%QS$o-6;1lEc$?~Qunf*1k)r;T|!fq6f>K0 z_Obw`oh&b5b{>2(EDGbQhGM%5k+@JLAsYzc38}*I21oh`VUH3rObEGA@-B@LGK~=u zCxjI63os>|gv(CC9V6s8Aqhf~gpkVc7?!GW9T7++IGTNhq_V6$N#>^r86e~|At^$n zi|BiV%Lif$eG@wyJ?7IifO9QWV-Ank;$UM;!ee`Sq0lr|cP@2l@456tyK_waKUg~5 zkg1zB-q?D*<0>cY4gP;%T?toL0L`(cFYK-d4);6yxAf!uJqP}|Q<+v15VZk_j`_wS z7o+E+W8LFwXb=XkR7|X%9Le}IUcuBpuj|NSX@i8NpAjrw`zn@>)8pmuc`kd#R|%EA zOliiP@&Bbu2n6p34&DtMoD1~Kwe<>phlKt^LR(1KGhh|5r z@kbRcc52cm1A3#DZK`H&nzjJ{iN>dg;ZujFnU>w6*8_j6TGO;cdCRK={x+>?vB+<0 zceE7BXVeOi%oNH&GGk!~Z+EwPn3=k*{*BD+Mm_L%D0gc)bEjMmJhFjVkF2X*=CR|6 z=GOLi$;XwBj{-QcAOdY2dYuT#J&&XtKS((8XBLsm9VrQsPT@3gV!=Qh0P?RycoR3a zY^6WLXIeOWo!mBQUZh}{woLQaRy`%`Vhk(&fo&V)wh4F#!!#oe&~^oHXul>Bw2Ejl z0`ZuaN@kD 1: - print(f"[GETTEX] Sample data row: {lines[1][:200]}") + if not lines: + return [] - # CSV parsen - versuche verschiedene Delimiter - delimiter = ';' if ';' in lines[0] else ',' - reader = csv.DictReader(io.StringIO(csv_text), delimiter=delimiter) + # Extrahiere Datum aus Dateinamen (Format: posttrade.YYYYMMDD.HH.MM.xxx.csv.gz) + date_str = None + parts = filename.split('.') + if len(parts) >= 4: + date_str = parts[1] # YYYYMMDD - row_count = 0 - for row in reader: - row_count += 1 - if row_count == 1: - print(f"[GETTEX] CSV columns: {list(row.keys())}") + # Gettex CSV hat KEINEN Header! + # Format: ISIN,Zeit,Währung,Preis,Menge + # z.B.: DE000BAY0017,09:15:03.638460,EUR,45.775,22 + for line in lines: + if not line.strip(): + continue try: - trade = self._parse_csv_row(row) + trade = self._parse_headerless_csv_line(line, date_str) if trade: trades.append(trade) - except Exception as e: - if row_count <= 3: - print(f"[GETTEX] Error parsing row {row_count}: {e}, row keys: {list(row.keys())}") + except Exception: continue - print(f"[GETTEX] Processed {row_count} rows, found {len(trades)} valid trades") + if trades: + print(f"[GETTEX] Parsed {len(trades)} trades from {filename}") except requests.exceptions.HTTPError as e: if e.response.status_code != 404: @@ -168,6 +167,69 @@ class GettexExchange(BaseExchange): return trades + def _parse_headerless_csv_line(self, line: str, date_str: str = None) -> Optional[Trade]: + """ + Parst eine headerlose CSV-Zeile im gettex Format. + Format: ISIN,Zeit,Währung,Preis,Menge + z.B.: DE000BAY0017,09:15:03.638460,EUR,45.775,22 + """ + try: + parts = line.strip().split(',') + if len(parts) < 5: + return None + + isin = parts[0].strip() + time_str = parts[1].strip() + # currency = parts[2].strip() # nicht benötigt + price_str = parts[3].strip() + qty_str = parts[4].strip() + + # Validierung + if not isin or len(isin) != 12: # ISIN ist immer 12 Zeichen + return None + + price = float(price_str) + quantity = float(qty_str) + + if price <= 0 or quantity <= 0: + return None + + # Timestamp bauen + # date_str ist YYYYMMDD, time_str ist HH:MM:SS.ffffff + if date_str and len(date_str) == 8: + year = date_str[:4] + month = date_str[4:6] + day = date_str[6:8] + date_part = f"{year}-{month}-{day}" + else: + # Fallback: heute + date_part = datetime.now(timezone.utc).strftime('%Y-%m-%d') + + # Zeit parsen (z.B. 09:15:03.638460) + ts_str = f"{date_part}T{time_str}" + + # Mikrosekunden kürzen wenn zu lang + if '.' in ts_str: + base, frac = ts_str.rsplit('.', 1) + if len(frac) > 6: + frac = frac[:6] + ts_str = f"{base}.{frac}" + + timestamp = datetime.fromisoformat(ts_str) + timestamp = timestamp.replace(tzinfo=timezone.utc) + + return Trade( + exchange=self.name, + symbol=isin, + isin=isin, + price=price, + quantity=quantity, + timestamp=timestamp + ) + + except Exception: + return None + def _parse_csv_row(self, row: dict) -> Optional[Trade]: """ Parst eine CSV-Zeile zu einem Trade. @@ -390,29 +452,30 @@ class GettexExchange(BaseExchange): with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as f: csv_text = f.read().decode('utf-8') - # Debug: Zeige erste Zeilen lines = csv_text.strip().split('\n') - if len(lines) <= 1: - # Datei ist leer oder nur Header + if not lines: return [] - # CSV parsen - versuche verschiedene Delimiter - delimiter = ';' if ';' in lines[0] else (',' if ',' in lines[0] else '\t') - reader = csv.DictReader(io.StringIO(csv_text), delimiter=delimiter) + # Extrahiere Datum aus Dateinamen (Format: posttrade.YYYYMMDD.HH.MM.xxx.csv.gz) + date_str = None + parts = filename.split('.') + if len(parts) >= 4: + date_str = parts[1] # YYYYMMDD - row_count = 0 - for row in reader: - row_count += 1 + # Gettex CSV hat KEINEN Header! + # Format: ISIN,Zeit,Währung,Preis,Menge + for line in lines: + if not line.strip(): + continue try: - trade = self._parse_csv_row(row) + trade = self._parse_headerless_csv_line(line, date_str) if trade: trades.append(trade) - except Exception as e: - if row_count <= 2: - print(f"[{self.name}] Error parsing row: {e}, keys: {list(row.keys())[:5]}") + except Exception: continue - print(f"[{self.name}] Parsed {len(trades)} trades from {filename}") + if trades: + print(f"[{self.name}] Parsed {len(trades)} trades from {filename}") except requests.exceptions.HTTPError as e: if e.response.status_code != 404: