#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Krempl PostgreSQL Import Script Importiert Krempl-Daten von CSV nach PostgreSQL und aggregiert dabei 61 Millionen Mapping-Zeilen zu 68k Zeilen mit Arrays. Usage: python3 import.py [--config config.ini] """ import psycopg2 import csv import os import sys import logging from collections import defaultdict from datetime import datetime from configparser import ConfigParser from pathlib import Path import argparse # ============================================================================ # Konfiguration laden # ============================================================================ def load_config(config_file='config.ini'): """Lädt Konfiguration aus INI-Datei""" config = ConfigParser() if not os.path.exists(config_file): print(f"❌ Konfigurationsdatei nicht gefunden: {config_file}") print(f"💡 Kopiere config.ini.example zu config.ini und passe die Werte an!") sys.exit(1) config.read(config_file, encoding='utf-8') return config # ============================================================================ # Logging Setup # ============================================================================ def setup_logging(config): """Richtet Logging ein""" log_level = config.get('logging', 'log_level', fallback='INFO') log_file = config.get('logging', 'log_file', fallback='') logging_config = { 'level': getattr(logging, log_level), 'format': '%(asctime)s [%(levelname)s] %(message)s', 'datefmt': '%Y-%m-%d %H:%M:%S' } if log_file: logging_config['filename'] = log_file logging_config['filemode'] = 'a' logging.basicConfig(**logging_config) return logging.getLogger(__name__) # ============================================================================ # PostgreSQL Connection # ============================================================================ def connect_postgres(config): """Stellt Verbindung zu PostgreSQL her""" try: conn = psycopg2.connect( host=config.get('postgresql', 'host'), port=config.getint('postgresql', 'port'), database=config.get('postgresql', 'database'), user=config.get('postgresql', 'user'), password=config.get('postgresql', 'password') ) logging.info(f"✅ Verbunden mit PostgreSQL: {config.get('postgresql', 'database')}") return conn except Exception as e: logging.error(f"❌ PostgreSQL Verbindung fehlgeschlagen: {e}") sys.exit(1) # ============================================================================ # Import Funktionen # ============================================================================ def import_geraete(conn, config, logger): """Importiert Geräte-Tabelle (~1.4 Mio Zeilen)""" logger.info("=" * 70) logger.info("[1/4] IMPORTIERE GERÄTE") logger.info("=" * 70) csv_path = config.get('csv_files', 'geraete') batch_size = config.getint('import_settings', 'batch_size_geraete', fallback=1000) progress_interval = config.getint('import_settings', 'progress_interval', fallback=100000) if not os.path.exists(csv_path): logger.warning(f"⚠️ CSV nicht gefunden: {csv_path}") logger.warning("⚠️ Überspringe Geräte-Import") return file_size_mb = os.path.getsize(csv_path) / (1024**2) logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_mb:.1f} MB)") cursor = conn.cursor() # Tabelle leeren cursor.execute("TRUNCATE TABLE geraete RESTART IDENTITY CASCADE") logger.info("🗑️ Tabelle geleert") # CSV einlesen batch = [] total = 0 errors = 0 with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f, delimiter=';') for row in reader: try: values = ( int(row['id']) if row.get('id') else None, row.get('nr'), row.get('marke'), row.get('typ'), row.get('zusatzNummer'), row.get('modellBezeichnung'), row.get('produktionsstart'), row.get('produktionsende'), row.get('wgtext1'), row.get('wgtext2'), row.get('zusatz'), row.get('bezeichnungoriginal'), row.get('typDE'), row.get('typFR') ) batch.append(values) if len(batch) >= batch_size: cursor.executemany(""" INSERT INTO geraete (id, nr, marke, typ, zusatz_nummer, modell_bezeichnung, produktionsstart, produktionsende, wgtext1, wgtext2, zusatz, bezeichnung_original, typ_de, typ_fr) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO UPDATE SET nr = EXCLUDED.nr, marke = EXCLUDED.marke, typ = EXCLUDED.typ, updated_at = CURRENT_TIMESTAMP """, batch) conn.commit() total += len(batch) batch.clear() if total % progress_interval == 0: logger.info(f" → {total:,} Geräte importiert...") except Exception as e: errors += 1 logger.debug(f"Fehler bei Zeile {total + len(batch)}: {e}") continue # Letzte Batch if batch: try: cursor.executemany(""" INSERT INTO geraete (id, nr, marke, typ, zusatz_nummer, modell_bezeichnung, produktionsstart, produktionsende, wgtext1, wgtext2, zusatz, bezeichnung_original, typ_de, typ_fr) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO UPDATE SET nr = EXCLUDED.nr, updated_at = CURRENT_TIMESTAMP """, batch) conn.commit() total += len(batch) except Exception as e: logger.error(f"Fehler bei letzter Batch: {e}") logger.info(f"✅ {total:,} Geräte importiert (Fehler: {errors})") cursor.close() def import_ersatzteile(conn, config, logger): """Importiert Ersatzteil-Metadaten""" logger.info("=" * 70) logger.info("[2/4] IMPORTIERE ERSATZTEILE") logger.info("=" * 70) csv_path = config.get('csv_files', 'artikel') batch_size = config.getint('import_settings', 'batch_size_artikel', fallback=1000) if not os.path.exists(csv_path): logger.warning(f"⚠️ CSV nicht gefunden: {csv_path}") logger.warning("⚠️ Überspringe Ersatzteile-Import") return file_size_mb = os.path.getsize(csv_path) / (1024**2) logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_mb:.1f} MB)") cursor = conn.cursor() cursor.execute("TRUNCATE TABLE ersatzteile RESTART IDENTITY") logger.info("🗑️ Tabelle geleert") batch = [] total = 0 errors = 0 with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f, delimiter=';') for row in reader: try: values = ( int(row['id']) if row.get('id') else None, int(row['navisionid']) if row.get('navisionid') else None, row.get('originalnummer'), row.get('marke'), row.get('ean'), row.get('altenr') ) batch.append(values) if len(batch) >= batch_size: cursor.executemany(""" INSERT INTO ersatzteile (id, navision_id, originalnummer, marke, ean, altenr) VALUES (%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING """, batch) conn.commit() total += len(batch) batch.clear() except Exception as e: errors += 1 logger.debug(f"Fehler bei Zeile {total + len(batch)}: {e}") continue # Letzte Batch if batch: try: cursor.executemany(""" INSERT INTO ersatzteile (id, navision_id, originalnummer, marke, ean, altenr) VALUES (%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING """, batch) conn.commit() total += len(batch) except Exception as e: logger.error(f"Fehler bei letzter Batch: {e}") logger.info(f"✅ {total:,} Ersatzteile importiert (Fehler: {errors})") cursor.close() def import_mapping_aggregated(conn, config, logger): """ HAUPTFUNKTION: Import Mapping mit Aggregation 61 Mio Zeilen → 68k Zeilen mit Arrays """ logger.info("=" * 70) logger.info("[3/4] IMPORTIERE MAPPING (AGGREGIERT)") logger.info("=" * 70) csv_path = config.get('csv_files', 'mapping') batch_size = config.getint('import_settings', 'batch_size_mapping', fallback=500) progress_interval = config.getint('import_settings', 'progress_interval', fallback=1000000) in_memory = config.getboolean('import_settings', 'mapping_in_memory', fallback=True) if not os.path.exists(csv_path): logger.error(f"❌ CSV nicht gefunden: {csv_path}") logger.error("❌ Mapping-Import ist kritisch! Abbruch.") return file_size_gb = os.path.getsize(csv_path) / (1024**3) logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_gb:.2f} GB)") if in_memory: logger.info("💾 Modus: In-Memory Aggregation (schnell, RAM-intensiv)") else: logger.info("💾 Modus: Stream (langsam, RAM-freundlich)") # SCHRITT 1: CSV lesen und aggregieren logger.info("⏳ Lese und aggregiere 61 Millionen Zeilen...") mapping = defaultdict(list) count = 0 with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f, delimiter=';') for row in reader: try: ersatzteil = int(row['ersatzteil']) geraet = int(row['geraet']) mapping[ersatzteil].append(geraet) count += 1 if count % progress_interval == 0: logger.info(f" → {count:,} Zeilen gelesen... ({len(mapping):,} unique Ersatzteile)") except Exception as e: logger.debug(f"Fehler bei Zeile {count}: {e}") continue logger.info(f"✓ {count:,} Zeilen gelesen") logger.info(f"✓ {len(mapping):,} unique Ersatzteile gefunden") # SCHRITT 2: In PostgreSQL schreiben logger.info("💾 Schreibe aggregierte Daten nach PostgreSQL...") cursor = conn.cursor() cursor.execute("TRUNCATE TABLE ersatzteil_mapping RESTART IDENTITY") logger.info("🗑️ Tabelle geleert") batch = [] total = 0 for ersatzteil_id, geraet_list in mapping.items(): batch.append(( ersatzteil_id, geraet_list, # Python List → PostgreSQL Array len(geraet_list) )) if len(batch) >= batch_size: cursor.executemany(""" INSERT INTO ersatzteil_mapping (ersatzteil_id, geraet_ids, geraet_count) VALUES (%s, %s, %s) ON CONFLICT (ersatzteil_id) DO UPDATE SET geraet_ids = EXCLUDED.geraet_ids, geraet_count = EXCLUDED.geraet_count, last_updated = CURRENT_TIMESTAMP """, batch) conn.commit() total += len(batch) batch.clear() logger.info(f" → {total:,}/{len(mapping):,} Ersatzteile geschrieben...") # Letzte Batch if batch: cursor.executemany(""" INSERT INTO ersatzteil_mapping (ersatzteil_id, geraet_ids, geraet_count) VALUES (%s, %s, %s) ON CONFLICT (ersatzteil_id) DO UPDATE SET geraet_ids = EXCLUDED.geraet_ids, geraet_count = EXCLUDED.geraet_count, last_updated = CURRENT_TIMESTAMP """, batch) conn.commit() total += len(batch) logger.info(f"✅ {total:,} Ersatzteile mit Arrays gespeichert") # STATISTIK cursor.execute("SELECT MAX(geraet_count), AVG(geraet_count) FROM ersatzteil_mapping") max_count, avg_count = cursor.fetchone() logger.info(f"📊 Max Geräte pro Teil: {max_count:,}") logger.info(f"📊 Ø Geräte pro Teil: {avg_count:.0f}") cursor.close() def import_passendwie(conn, config, logger): """Importiert PassendWie-Daten (Ein Ersatzteil hat mehrere Vertreiber-Codes)""" logger.info("=" * 70) logger.info("[4/4] IMPORTIERE PASSENDWIE") logger.info("=" * 70) csv_path = config.get('csv_files', 'passendwie') batch_size = config.getint('import_settings', 'batch_size_passendwie', fallback=1000) if not os.path.exists(csv_path): logger.warning(f"⚠️ CSV nicht gefunden: {csv_path}") logger.warning("⚠️ Überspringe PassendWie-Import") return file_size_mb = os.path.getsize(csv_path) / (1024**2) logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_mb:.1f} MB)") cursor = conn.cursor() cursor.execute("TRUNCATE TABLE passendwie RESTART IDENTITY CASCADE") logger.info("🗑️ Tabelle geleert") batch = [] total = 0 errors = 0 duplicates = 0 with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f, delimiter=';') for row in reader: try: values = ( int(row['id']) if row.get('id') else None, int(row['navisionid']) if row.get('navisionid') else None, row.get('vertreiberid'), row.get('vertreiber'), row.get('bestellcode') ) batch.append(values) if len(batch) >= batch_size: try: cursor.executemany(""" INSERT INTO passendwie (id, navision_id, vertreiber_id, vertreiber, bestellcode) VALUES (%s,%s,%s,%s,%s) ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING """, batch) conn.commit() total += len(batch) batch.clear() except Exception as e: logger.debug(f"Batch-Fehler: {e}") conn.rollback() # Einzeln einfügen bei Fehler for single_row in batch: try: cursor.execute(""" INSERT INTO passendwie (id, navision_id, vertreiber_id, vertreiber, bestellcode) VALUES (%s,%s,%s,%s,%s) ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING """, single_row) conn.commit() total += 1 except Exception: duplicates += 1 batch.clear() except Exception as e: errors += 1 logger.debug(f"Fehler bei Zeile {total + len(batch)}: {e}") continue # Letzte Batch if batch: try: cursor.executemany(""" INSERT INTO passendwie (id, navision_id, vertreiber_id, vertreiber, bestellcode) VALUES (%s,%s,%s,%s,%s) ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING """, batch) conn.commit() total += len(batch) except Exception as e: logger.debug(f"Fehler bei letzter Batch: {e}") conn.rollback() for single_row in batch: try: cursor.execute(""" INSERT INTO passendwie (id, navision_id, vertreiber_id, vertreiber, bestellcode) VALUES (%s,%s,%s,%s,%s) ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING """, single_row) conn.commit() total += 1 except Exception: duplicates += 1 logger.info(f"✅ {total:,} PassendWie-Einträge importiert (Fehler: {errors}, Duplikate übersprungen: {duplicates})") cursor.close() def analyze_tables(conn, logger): """Aktualisiert Tabellen-Statistiken für Query-Optimizer""" logger.info("=" * 70) logger.info("[FINAL] AKTUALISIERE TABELLEN-STATISTIKEN") logger.info("=" * 70) # Rollback falls vorherige Transaction fehlgeschlagen conn.rollback() cursor = conn.cursor() tables = ['geraete', 'ersatzteil_mapping', 'ersatzteile', 'passendwie'] for table in tables: try: logger.info(f" ANALYZE {table}...") cursor.execute(f"ANALYZE {table}") conn.commit() except Exception as e: logger.warning(f" ⚠️ ANALYZE {table} fehlgeschlagen: {e}") conn.rollback() continue logger.info("✅ Statistiken aktualisiert") cursor.close() # ============================================================================ # Main # ============================================================================ def main(): """Hauptfunktion""" parser = argparse.ArgumentParser(description='Krempl PostgreSQL Import') parser.add_argument('--config', default='config.ini', help='Pfad zur config.ini') args = parser.parse_args() # Config laden config = load_config(args.config) # Logging setup logger = setup_logging(config) # Header print("=" * 70) print("KREMPL POSTGRESQL IMPORT") print("Aggregiert 61 Millionen Zeilen → 68k Zeilen mit Arrays") print("=" * 70) print() start_time = datetime.now() # PostgreSQL Connection conn = connect_postgres(config) try: # Import durchführen import_geraete(conn, config, logger) import_ersatzteile(conn, config, logger) import_mapping_aggregated(conn, config, logger) # WICHTIGSTE FUNKTION! import_passendwie(conn, config, logger) analyze_tables(conn, logger) # Erfolg duration = datetime.now() - start_time print() print("=" * 70) print(f"✅ IMPORT ERFOLGREICH in {duration}") print("=" * 70) except KeyboardInterrupt: logger.warning("\n⚠️ Import abgebrochen durch Benutzer") conn.rollback() sys.exit(1) except Exception as e: logger.error(f"\n❌ FEHLER: {e}") logger.exception("Traceback:") conn.rollback() sys.exit(1) finally: conn.close() logger.info("Datenbankverbindung geschlossen") if __name__ == '__main__': main()