# Krempl-Daten Migration zu PostgreSQL ## π― Problem Die MySQL-Tabelle `krempl_artikelgeraet_22` enthΓ€lt **61 Millionen Zeilen** fΓΌr N:N-Mapping zwischen Ersatzteilen und GerΓ€ten: - 68.667 unique Ersatzteile - 1.446.180 unique GerΓ€te - Durchschnittlich 888 GerΓ€te pro Ersatzteil - Worst Case: 129.819 GerΓ€te pro Ersatzteil **Konsequenzen:** - Queries ohne LIMIT kΓΆnnen 100.000+ Zeilen laden - JOINs ΓΌber 61 Mio Zeilen β Timeouts (504) - Schlechte MySQL Query-PlΓ€ne trotz korrekter Indexe - 1.3 GB TabellengrΓΆΓe ## β LΓΆsung: PostgreSQL mit Array-Aggregation **Idee:** Statt 61 Mio einzelne Zeilen β **68.667 Zeilen** (eine pro Ersatzteil) mit Arrays ``` VORHER (MySQL): ersatzteil | geraet -----------+-------- 123 | 1 123 | 2 123 | 3 ... (129.819 Zeilen fΓΌr Ersatzteil 123) NACHHER (Postgres): ersatzteil_id | geraet_ids | geraet_count --------------+-------------------------+------------- 123 | [1,2,3,...,129819] | 129819 ``` **Warum PostgreSQL?** - Native Array-Datentypen (`BIGINT[]`) - GIN-Index fΓΌr schnelle Array-Suchen - Full-Text Search integriert - Polyglot Persistence: MySQL fΓΌr Shop, Postgres nur fΓΌr Krempl --- ## π Architektur ``` βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β MYSQL (Shop-DB) β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β items β β β β - id β β β β - krempl_id (BIGINT) ββββββββ β β β β - navision_id β β β β ββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββ β βββββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββ β β PHP Abfrage β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β POSTGRESQL (Krempl-DB) β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β ersatzteil_mapping β β β β - ersatzteil_id (PK) β β β β - geraet_ids BIGINT[] β GIN Index β β β β - geraet_count INT β β β ββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββ β β β β β β JOIN β β β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β geraete (1.4 Mio Zeilen) β β β β - id (PK) β β β β - nr VARCHAR β Full-Text Index β β β β - marke, typ, modell_bezeichnung β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ ``` **2 Haupt-Abfragen:** 1. **Artikelseite:** Ersatzteil β GerΓ€teliste (vorwΓ€rts) 2. **GerΓ€tesuche:** GerΓ€te-Suche β Ersatzteile β Shop-Artikel (rΓΌckwΓ€rts) --- ## ποΈ PostgreSQL Schema ### Schritt 1: Datenbank erstellen ```sql -- Als postgres User CREATE DATABASE krempl_data WITH ENCODING 'UTF8' LC_COLLATE='de_DE.UTF-8' LC_CTYPE='de_DE.UTF-8'; CREATE USER krempl_user WITH PASSWORD 'SICHERES_PASSWORT'; GRANT ALL PRIVILEGES ON DATABASE krempl_data TO krempl_user; \c krempl_data GRANT ALL ON SCHEMA public TO krempl_user; ``` ### Schritt 2: Tabellen erstellen ```sql -- 1. GERΓTE (ca. 1.4 Mio Zeilen, 227 MB CSV) CREATE TABLE geraete ( id BIGINT PRIMARY KEY, nr VARCHAR(100), marke VARCHAR(100), typ VARCHAR(200), zusatz_nummer VARCHAR(100), modell_bezeichnung VARCHAR(200), produktionsstart VARCHAR(50), produktionsende VARCHAR(50), wgtext1 VARCHAR(200), wgtext2 VARCHAR(200), zusatz TEXT, bezeichnung_original TEXT, typ_de VARCHAR(200), typ_fr VARCHAR(200), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Indexe fΓΌr schnelle Suche CREATE INDEX idx_geraete_nr ON geraete(nr); CREATE INDEX idx_geraete_marke ON geraete(marke); CREATE INDEX idx_geraete_nr_trgm ON geraete USING GIN(nr gin_trgm_ops); CREATE INDEX idx_geraete_typ_trgm ON geraete USING GIN(typ gin_trgm_ops); -- Full-Text Index CREATE INDEX idx_geraete_fts ON geraete USING GIN(to_tsvector('german', COALESCE(nr,'') || ' ' || COALESCE(typ,'') || ' ' || COALESCE(marke,'') )); -- 2. ERSATZTEIL-MAPPING (68k Zeilen statt 61 Mio!) CREATE TABLE ersatzteil_mapping ( ersatzteil_id BIGINT PRIMARY KEY, geraet_ids BIGINT[], -- Array statt Millionen Zeilen! geraet_count INT, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- GIN Index fΓΌr Array-Suche "Welche Teile passen zu GerΓ€t X?" CREATE INDEX idx_mapping_geraet_array ON ersatzteil_mapping USING GIN(geraet_ids); -- 3. ERSATZTEILE (optional - falls Metadaten gewΓΌnscht) CREATE TABLE ersatzteile ( id BIGINT PRIMARY KEY, navision_id BIGINT, originalnummer VARCHAR(100), marke VARCHAR(100), ean VARCHAR(50), altenr VARCHAR(100), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_ersatzteile_navision ON ersatzteile(navision_id); -- 4. PASSENDWIE (optional) CREATE TABLE passendwie ( id BIGINT PRIMARY KEY, navision_id BIGINT, vertreiber_id VARCHAR(100), vertreiber VARCHAR(100), bestellcode VARCHAR(100) ); CREATE INDEX idx_passendwie_navision ON passendwie(navision_id); ``` ### Schritt 3: Extensions aktivieren ```sql -- FΓΌr Trigram-Suche (Γ€hnliche Strings) CREATE EXTENSION IF NOT EXISTS pg_trgm; -- FΓΌr schnellere Array-Operationen CREATE EXTENSION IF NOT EXISTS intarray; ``` --- ## π Import-Script ### Datei: `modules/import/krempl_postgres_import.py` ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Krempl-Daten Import fΓΌr PostgreSQL Aggregiert 61 Mio Zeilen zu 68k Zeilen mit Arrays """ import psycopg2 import csv import os import sys from collections import defaultdict from datetime import datetime from configparser import ConfigParser # Konfiguration PG_CONFIG = { 'host': 'localhost', 'port': 5432, 'database': 'krempl_data', 'user': 'krempl_user', 'password': 'PASSWORT_HIER' } CSV_PATHS = { 'geraete': '/var/www/vhosts/intelectra.de/httpdocs/upload/geraete_Export.csv', 'artikel': '/var/www/vhosts/intelectra.de/httpdocs/upload/artikel_Export.csv', 'mapping': '/var/www/vhosts/intelectra.de/httpdocs/upload/artikelgeraet_Export.csv', 'passendwie': '/var/www/vhosts/intelectra.de/httpdocs/upload/passendwie_Export.csv' } def connect_postgres(): """Verbindung zu PostgreSQL herstellen""" try: conn = psycopg2.connect(**PG_CONFIG) print(f"β Verbunden mit PostgreSQL: {PG_CONFIG['database']}") return conn except Exception as e: print(f"β Postgres-Verbindung fehlgeschlagen: {e}") sys.exit(1) def import_geraete(conn): """Import GerΓ€te-Tabelle (1.4 Mio Zeilen)""" print("\n[1/4] Importiere GerΓ€te...") csv_path = CSV_PATHS['geraete'] if not os.path.exists(csv_path): print(f"β οΈ CSV nicht gefunden: {csv_path}") return cursor = conn.cursor() # Tabelle leeren cursor.execute("TRUNCATE TABLE geraete RESTART IDENTITY CASCADE") # CSV einlesen with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f, delimiter=';') batch = [] total = 0 for row in reader: values = ( int(row['id']) if row['id'] else None, row.get('nr'), row.get('marke'), row.get('typ'), row.get('zusatzNummer'), row.get('modellBezeichnung'), row.get('produktionsstart'), row.get('produktionsende'), row.get('wgtext1'), row.get('wgtext2'), row.get('zusatz'), row.get('bezeichnungoriginal'), row.get('typDE'), row.get('typFR') ) batch.append(values) if len(batch) >= 1000: cursor.executemany(""" INSERT INTO geraete (id, nr, marke, typ, zusatz_nummer, modell_bezeichnung, produktionsstart, produktionsende, wgtext1, wgtext2, zusatz, bezeichnung_original, typ_de, typ_fr) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO UPDATE SET nr = EXCLUDED.nr, marke = EXCLUDED.marke, typ = EXCLUDED.typ """, batch) conn.commit() total += len(batch) batch.clear() print(f" β {total:,} GerΓ€te importiert...") # Letzte Batch if batch: cursor.executemany(""" INSERT INTO geraete (id, nr, marke, typ, zusatz_nummer, modell_bezeichnung, produktionsstart, produktionsende, wgtext1, wgtext2, zusatz, bezeichnung_original, typ_de, typ_fr) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO UPDATE SET nr = EXCLUDED.nr """, batch) conn.commit() total += len(batch) print(f"β {total:,} GerΓ€te importiert") cursor.close() def import_ersatzteile(conn): """Import Ersatzteil-Metadaten""" print("\n[2/4] Importiere Ersatzteile...") csv_path = CSV_PATHS['artikel'] if not os.path.exists(csv_path): print(f"β οΈ CSV nicht gefunden: {csv_path}") return cursor = conn.cursor() cursor.execute("TRUNCATE TABLE ersatzteile RESTART IDENTITY") with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f, delimiter=';') batch = [] total = 0 for row in reader: values = ( int(row['id']) if row['id'] else None, int(row['navisionid']) if row.get('navisionid') else None, row.get('originalnummer'), row.get('marke'), row.get('ean'), row.get('altenr') ) batch.append(values) if len(batch) >= 1000: cursor.executemany(""" INSERT INTO ersatzteile (id, navision_id, originalnummer, marke, ean, altenr) VALUES (%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING """, batch) conn.commit() total += len(batch) batch.clear() if batch: cursor.executemany(""" INSERT INTO ersatzteile (id, navision_id, originalnummer, marke, ean, altenr) VALUES (%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING """, batch) conn.commit() total += len(batch) print(f"β {total:,} Ersatzteile importiert") cursor.close() def import_mapping_aggregated(conn): """ HAUPTFUNKTION: Import Mapping mit Aggregation 61 Mio Zeilen β 68k Zeilen mit Arrays """ print("\n[3/4] Importiere Mapping (AGGREGIERT)...") csv_path = CSV_PATHS['mapping'] if not os.path.exists(csv_path): print(f"β CSV nicht gefunden: {csv_path}") return file_size_gb = os.path.getsize(csv_path) / (1024**3) print(f" CSV-GrΓΆΓe: {file_size_gb:.2f} GB") print(" β³ Lese und aggregiere Daten (kann 5-10 Min dauern)...") # Schritt 1: CSV in Memory aggregieren mapping = defaultdict(list) with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f, delimiter=';') count = 0 for row in reader: ersatzteil = int(row['ersatzteil']) geraet = int(row['geraet']) mapping[ersatzteil].append(geraet) count += 1 if count % 1_000_000 == 0: print(f" β {count:,} Zeilen gelesen...") print(f" β {count:,} Zeilen gelesen") print(f" β {len(mapping):,} unique Ersatzteile gefunden") # Schritt 2: In Postgres schreiben (aggregiert!) print(" πΎ Schreibe aggregierte Daten in Postgres...") cursor = conn.cursor() cursor.execute("TRUNCATE TABLE ersatzteil_mapping RESTART IDENTITY") batch = [] total = 0 for ersatzteil_id, geraet_list in mapping.items(): batch.append(( ersatzteil_id, geraet_list, # Als Python-Liste β wird zu Postgres Array! len(geraet_list) )) if len(batch) >= 500: cursor.executemany(""" INSERT INTO ersatzteil_mapping (ersatzteil_id, geraet_ids, geraet_count) VALUES (%s, %s, %s) """, batch) conn.commit() total += len(batch) batch.clear() print(f" β {total:,}/{len(mapping):,} Ersatzteile geschrieben...") # Letzte Batch if batch: cursor.executemany(""" INSERT INTO ersatzteil_mapping (ersatzteil_id, geraet_ids, geraet_count) VALUES (%s, %s, %s) """, batch) conn.commit() total += len(batch) print(f"β {total:,} Ersatzteile mit Arrays gespeichert") # Statistik cursor.execute("SELECT MAX(geraet_count), AVG(geraet_count) FROM ersatzteil_mapping") max_count, avg_count = cursor.fetchone() print(f" π Max GerΓ€te pro Teil: {max_count:,}") print(f" π Γ GerΓ€te pro Teil: {avg_count:.0f}") cursor.close() def import_passendwie(conn): """Import PassendWie-Daten (optional)""" print("\n[4/4] Importiere PassendWie...") csv_path = CSV_PATHS['passendwie'] if not os.path.exists(csv_path): print(f"β οΈ CSV nicht gefunden: {csv_path}") return cursor = conn.cursor() cursor.execute("TRUNCATE TABLE passendwie RESTART IDENTITY") with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f, delimiter=';') batch = [] total = 0 for row in reader: values = ( int(row['id']) if row['id'] else None, int(row['navisionid']) if row.get('navisionid') else None, row.get('vertreiberid'), row.get('vertreiber'), row.get('bestellcode') ) batch.append(values) if len(batch) >= 1000: cursor.executemany(""" INSERT INTO passendwie (id, navision_id, vertreiber_id, vertreiber, bestellcode) VALUES (%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING """, batch) conn.commit() total += len(batch) batch.clear() if batch: cursor.executemany(""" INSERT INTO passendwie (id, navision_id, vertreiber_id, vertreiber, bestellcode) VALUES (%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING """, batch) conn.commit() total += len(batch) print(f"β {total:,} PassendWie-EintrΓ€ge importiert") cursor.close() def analyze_tables(conn): """Tabellen-Statistiken aktualisieren""" print("\n[FINAL] Aktualisiere Tabellen-Statistiken...") cursor = conn.cursor() cursor.execute("ANALYZE geraete") cursor.execute("ANALYZE ersatzteil_mapping") cursor.execute("ANALYZE ersatzteile") cursor.execute("ANALYZE passendwie") print("β ANALYZE abgeschlossen") cursor.close() def main(): """Hauptfunktion""" print("=" * 60) print("KREMPL POSTGRESQL IMPORT") print("Aggregiert 61 Mio Zeilen β 68k Zeilen mit Arrays") print("=" * 60) start_time = datetime.now() conn = connect_postgres() try: import_geraete(conn) import_ersatzteile(conn) import_mapping_aggregated(conn) # WICHTIGSTE FUNKTION! import_passendwie(conn) analyze_tables(conn) duration = datetime.now() - start_time print("\n" + "=" * 60) print(f"β IMPORT ERFOLGREICH in {duration}") print("=" * 60) except Exception as e: print(f"\nβ FEHLER: {e}") conn.rollback() raise finally: conn.close() if __name__ == '__main__': main() ``` **AusfΓΌhren:** ```bash python3 modules/import/krempl_postgres_import.py ``` --- ## π§ PHP-Γnderungen (2 Stellen) ### 1. Postgres-Connection Helper **Neue Datei:** `core/postgres_connection.php` ```php pdo = new PDO( 'pgsql:host=localhost;dbname=krempl_data', 'krempl_user', 'PASSWORT_HIER', [ PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC, PDO::ATTR_PERSISTENT => true // Connection Pool ] ); } catch (PDOException $e) { error_log("Postgres Krempl Connection failed: " . $e->getMessage()); throw $e; } } public static function getInstance() { if (self::$instance === null) { self::$instance = new self(); } return self::$instance; } public function getConnection() { return $this->pdo; } /** * Gibt GerΓ€te fΓΌr ein Ersatzteil zurΓΌck */ public function getGeraeteByErsatzteil($ersatzteil_id, $limit = 100) { $stmt = $this->pdo->prepare(" SELECT em.geraet_count, g.id, g.nr, g.marke, g.typ, g.modell_bezeichnung, g.produktionsstart, g.produktionsende FROM ersatzteil_mapping em JOIN geraete g ON g.id = ANY(em.geraet_ids) WHERE em.ersatzteil_id = :ersatzteil_id LIMIT :limit "); $stmt->bindValue(':ersatzteil_id', $ersatzteil_id, PDO::PARAM_INT); $stmt->bindValue(':limit', $limit, PDO::PARAM_INT); $stmt->execute(); return [ 'geraete' => $stmt->fetchAll(), 'total_count' => $this->getGeraeteCount($ersatzteil_id) ]; } /** * Gibt Anzahl GerΓ€te fΓΌr ein Ersatzteil zurΓΌck */ public function getGeraeteCount($ersatzteil_id) { $stmt = $this->pdo->prepare(" SELECT geraet_count FROM ersatzteil_mapping WHERE ersatzteil_id = :id "); $stmt->execute([':id' => $ersatzteil_id]); $result = $stmt->fetch(); return $result ? $result['geraet_count'] : 0; } /** * Sucht GerΓ€te nach Nummer/Typ und gibt Ersatzteil-IDs zurΓΌck */ public function searchGeraeteGetErsatzteile($search_term, $limit = 50) { // 1. Finde GerΓ€te $stmt = $this->pdo->prepare(" SELECT id FROM geraete WHERE to_tsvector('german', COALESCE(nr,'') || ' ' || COALESCE(typ,'')) @@ plainto_tsquery('german', :search) LIMIT :limit "); $stmt->bindValue(':search', $search_term, PDO::PARAM_STR); $stmt->bindValue(':limit', $limit, PDO::PARAM_INT); $stmt->execute(); $geraet_ids = $stmt->fetchAll(PDO::FETCH_COLUMN); if (empty($geraet_ids)) { return []; } // 2. Finde Ersatzteile die zu diesen GerΓ€ten passen $geraet_ids_str = implode(',', array_map('intval', $geraet_ids)); $stmt = $this->pdo->query(" SELECT DISTINCT ersatzteil_id FROM ersatzteil_mapping WHERE geraet_ids && ARRAY[$geraet_ids_str]::bigint[] LIMIT 500 "); return $stmt->fetchAll(PDO::FETCH_COLUMN); } } ``` ### 2. Artikeldetailseite anpassen **Datei:** `web/intelectra_shop/php/intelectra_item_additional.inc.php` ```php krempl_id && (!isset($result) || $result->num_rows < 1)) { $sql = "SELECT * FROM krempl_artikelgeraet_22 ka left join krempl_geraete_22 kg on kg.id=ka.geraet WHERE ka.ersatzteil=".$this->db->real_escape_string($obj->krempl_id)." AND not isnull(nr)"; $result = $this->db->query($sql); } */ // NEUE VERSION (PostgreSQL): if ($obj->krempl_id && (!isset($result) || $result->num_rows < 1)) { try { require_once(__DIR__ . '/../../../core/postgres_connection.php'); $pg = PostgresKremplDB::getInstance(); $data = $pg->getGeraeteByErsatzteil($obj->krempl_id, 100); $geraete = $data['geraete']; $total_count = $data['total_count']; // Wenn mehr als 100 GerΓ€te: Warnung anzeigen if ($total_count > 100) { echo '
| GerΓ€tenummer | Marke | Typ | Modell |
|---|---|---|---|
| ' . htmlspecialchars($geraet['nr']) . ' | '; echo '' . htmlspecialchars($geraet['marke']) . ' | '; echo '' . htmlspecialchars($geraet['typ']) . ' | '; echo '' . htmlspecialchars($geraet['modell_bezeichnung']) . ' | '; echo '