564 lines
19 KiB
Python
564 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Krempl PostgreSQL Import Script
|
|
|
|
Importiert Krempl-Daten von CSV nach PostgreSQL und aggregiert dabei
|
|
61 Millionen Mapping-Zeilen zu 68k Zeilen mit Arrays.
|
|
|
|
Usage:
|
|
python3 import.py [--config config.ini]
|
|
"""
|
|
|
|
import psycopg2
|
|
import csv
|
|
import os
|
|
import sys
|
|
import logging
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from configparser import ConfigParser
|
|
from pathlib import Path
|
|
import argparse
|
|
|
|
|
|
# ============================================================================
|
|
# Konfiguration laden
|
|
# ============================================================================
|
|
|
|
def load_config(config_file='config.ini'):
|
|
"""Lädt Konfiguration aus INI-Datei"""
|
|
config = ConfigParser()
|
|
|
|
if not os.path.exists(config_file):
|
|
print(f"❌ Konfigurationsdatei nicht gefunden: {config_file}")
|
|
print(f"💡 Kopiere config.ini.example zu config.ini und passe die Werte an!")
|
|
sys.exit(1)
|
|
|
|
config.read(config_file, encoding='utf-8')
|
|
return config
|
|
|
|
|
|
# ============================================================================
|
|
# Logging Setup
|
|
# ============================================================================
|
|
|
|
def setup_logging(config):
|
|
"""Richtet Logging ein"""
|
|
log_level = config.get('logging', 'log_level', fallback='INFO')
|
|
log_file = config.get('logging', 'log_file', fallback='')
|
|
|
|
logging_config = {
|
|
'level': getattr(logging, log_level),
|
|
'format': '%(asctime)s [%(levelname)s] %(message)s',
|
|
'datefmt': '%Y-%m-%d %H:%M:%S'
|
|
}
|
|
|
|
if log_file:
|
|
logging_config['filename'] = log_file
|
|
logging_config['filemode'] = 'a'
|
|
|
|
logging.basicConfig(**logging_config)
|
|
return logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================================
|
|
# PostgreSQL Connection
|
|
# ============================================================================
|
|
|
|
def connect_postgres(config):
|
|
"""Stellt Verbindung zu PostgreSQL her"""
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=config.get('postgresql', 'host'),
|
|
port=config.getint('postgresql', 'port'),
|
|
database=config.get('postgresql', 'database'),
|
|
user=config.get('postgresql', 'user'),
|
|
password=config.get('postgresql', 'password')
|
|
)
|
|
logging.info(f"✅ Verbunden mit PostgreSQL: {config.get('postgresql', 'database')}")
|
|
return conn
|
|
except Exception as e:
|
|
logging.error(f"❌ PostgreSQL Verbindung fehlgeschlagen: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
# ============================================================================
|
|
# Import Funktionen
|
|
# ============================================================================
|
|
|
|
def import_geraete(conn, config, logger):
|
|
"""Importiert Geräte-Tabelle (~1.4 Mio Zeilen)"""
|
|
logger.info("=" * 70)
|
|
logger.info("[1/4] IMPORTIERE GERÄTE")
|
|
logger.info("=" * 70)
|
|
|
|
csv_path = config.get('csv_files', 'geraete')
|
|
batch_size = config.getint('import_settings', 'batch_size_geraete', fallback=1000)
|
|
progress_interval = config.getint('import_settings', 'progress_interval', fallback=100000)
|
|
|
|
if not os.path.exists(csv_path):
|
|
logger.warning(f"⚠️ CSV nicht gefunden: {csv_path}")
|
|
logger.warning("⚠️ Überspringe Geräte-Import")
|
|
return
|
|
|
|
file_size_mb = os.path.getsize(csv_path) / (1024**2)
|
|
logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_mb:.1f} MB)")
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Tabelle leeren
|
|
cursor.execute("TRUNCATE TABLE geraete RESTART IDENTITY CASCADE")
|
|
logger.info("🗑️ Tabelle geleert")
|
|
|
|
# CSV einlesen
|
|
batch = []
|
|
total = 0
|
|
errors = 0
|
|
|
|
with open(csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f, delimiter=';')
|
|
|
|
for row in reader:
|
|
try:
|
|
values = (
|
|
int(row['id']) if row.get('id') else None,
|
|
row.get('nr'),
|
|
row.get('marke'),
|
|
row.get('typ'),
|
|
row.get('zusatzNummer'),
|
|
row.get('modellBezeichnung'),
|
|
row.get('produktionsstart'),
|
|
row.get('produktionsende'),
|
|
row.get('wgtext1'),
|
|
row.get('wgtext2'),
|
|
row.get('zusatz'),
|
|
row.get('bezeichnungoriginal'),
|
|
row.get('typDE'),
|
|
row.get('typFR')
|
|
)
|
|
batch.append(values)
|
|
|
|
if len(batch) >= batch_size:
|
|
cursor.executemany("""
|
|
INSERT INTO geraete
|
|
(id, nr, marke, typ, zusatz_nummer, modell_bezeichnung,
|
|
produktionsstart, produktionsende, wgtext1, wgtext2,
|
|
zusatz, bezeichnung_original, typ_de, typ_fr)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
nr = EXCLUDED.nr,
|
|
marke = EXCLUDED.marke,
|
|
typ = EXCLUDED.typ,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""", batch)
|
|
conn.commit()
|
|
total += len(batch)
|
|
batch.clear()
|
|
|
|
if total % progress_interval == 0:
|
|
logger.info(f" → {total:,} Geräte importiert...")
|
|
|
|
except Exception as e:
|
|
errors += 1
|
|
logger.debug(f"Fehler bei Zeile {total + len(batch)}: {e}")
|
|
continue
|
|
|
|
# Letzte Batch
|
|
if batch:
|
|
try:
|
|
cursor.executemany("""
|
|
INSERT INTO geraete
|
|
(id, nr, marke, typ, zusatz_nummer, modell_bezeichnung,
|
|
produktionsstart, produktionsende, wgtext1, wgtext2,
|
|
zusatz, bezeichnung_original, typ_de, typ_fr)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
nr = EXCLUDED.nr,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""", batch)
|
|
conn.commit()
|
|
total += len(batch)
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei letzter Batch: {e}")
|
|
|
|
logger.info(f"✅ {total:,} Geräte importiert (Fehler: {errors})")
|
|
cursor.close()
|
|
|
|
|
|
def import_ersatzteile(conn, config, logger):
|
|
"""Importiert Ersatzteil-Metadaten"""
|
|
logger.info("=" * 70)
|
|
logger.info("[2/4] IMPORTIERE ERSATZTEILE")
|
|
logger.info("=" * 70)
|
|
|
|
csv_path = config.get('csv_files', 'artikel')
|
|
batch_size = config.getint('import_settings', 'batch_size_artikel', fallback=1000)
|
|
|
|
if not os.path.exists(csv_path):
|
|
logger.warning(f"⚠️ CSV nicht gefunden: {csv_path}")
|
|
logger.warning("⚠️ Überspringe Ersatzteile-Import")
|
|
return
|
|
|
|
file_size_mb = os.path.getsize(csv_path) / (1024**2)
|
|
logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_mb:.1f} MB)")
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute("TRUNCATE TABLE ersatzteile RESTART IDENTITY")
|
|
logger.info("🗑️ Tabelle geleert")
|
|
|
|
batch = []
|
|
total = 0
|
|
errors = 0
|
|
|
|
with open(csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f, delimiter=';')
|
|
|
|
for row in reader:
|
|
try:
|
|
values = (
|
|
int(row['id']) if row.get('id') else None,
|
|
int(row['navisionid']) if row.get('navisionid') else None,
|
|
row.get('originalnummer'),
|
|
row.get('marke'),
|
|
row.get('ean'),
|
|
row.get('altenr')
|
|
)
|
|
batch.append(values)
|
|
|
|
if len(batch) >= batch_size:
|
|
cursor.executemany("""
|
|
INSERT INTO ersatzteile
|
|
(id, navision_id, originalnummer, marke, ean, altenr)
|
|
VALUES (%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (id) DO NOTHING
|
|
""", batch)
|
|
conn.commit()
|
|
total += len(batch)
|
|
batch.clear()
|
|
|
|
except Exception as e:
|
|
errors += 1
|
|
logger.debug(f"Fehler bei Zeile {total + len(batch)}: {e}")
|
|
continue
|
|
|
|
# Letzte Batch
|
|
if batch:
|
|
try:
|
|
cursor.executemany("""
|
|
INSERT INTO ersatzteile
|
|
(id, navision_id, originalnummer, marke, ean, altenr)
|
|
VALUES (%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (id) DO NOTHING
|
|
""", batch)
|
|
conn.commit()
|
|
total += len(batch)
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei letzter Batch: {e}")
|
|
|
|
logger.info(f"✅ {total:,} Ersatzteile importiert (Fehler: {errors})")
|
|
cursor.close()
|
|
|
|
|
|
def import_mapping_aggregated(conn, config, logger):
|
|
"""
|
|
HAUPTFUNKTION: Import Mapping mit Aggregation
|
|
61 Mio Zeilen → 68k Zeilen mit Arrays
|
|
"""
|
|
logger.info("=" * 70)
|
|
logger.info("[3/4] IMPORTIERE MAPPING (AGGREGIERT)")
|
|
logger.info("=" * 70)
|
|
|
|
csv_path = config.get('csv_files', 'mapping')
|
|
batch_size = config.getint('import_settings', 'batch_size_mapping', fallback=500)
|
|
progress_interval = config.getint('import_settings', 'progress_interval', fallback=1000000)
|
|
in_memory = config.getboolean('import_settings', 'mapping_in_memory', fallback=True)
|
|
|
|
if not os.path.exists(csv_path):
|
|
logger.error(f"❌ CSV nicht gefunden: {csv_path}")
|
|
logger.error("❌ Mapping-Import ist kritisch! Abbruch.")
|
|
return
|
|
|
|
file_size_gb = os.path.getsize(csv_path) / (1024**3)
|
|
logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_gb:.2f} GB)")
|
|
|
|
if in_memory:
|
|
logger.info("💾 Modus: In-Memory Aggregation (schnell, RAM-intensiv)")
|
|
else:
|
|
logger.info("💾 Modus: Stream (langsam, RAM-freundlich)")
|
|
|
|
# SCHRITT 1: CSV lesen und aggregieren
|
|
logger.info("⏳ Lese und aggregiere 61 Millionen Zeilen...")
|
|
|
|
mapping = defaultdict(list)
|
|
count = 0
|
|
|
|
with open(csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f, delimiter=';')
|
|
|
|
for row in reader:
|
|
try:
|
|
ersatzteil = int(row['ersatzteil'])
|
|
geraet = int(row['geraet'])
|
|
mapping[ersatzteil].append(geraet)
|
|
|
|
count += 1
|
|
if count % progress_interval == 0:
|
|
logger.info(f" → {count:,} Zeilen gelesen... ({len(mapping):,} unique Ersatzteile)")
|
|
except Exception as e:
|
|
logger.debug(f"Fehler bei Zeile {count}: {e}")
|
|
continue
|
|
|
|
logger.info(f"✓ {count:,} Zeilen gelesen")
|
|
logger.info(f"✓ {len(mapping):,} unique Ersatzteile gefunden")
|
|
|
|
# SCHRITT 2: In PostgreSQL schreiben
|
|
logger.info("💾 Schreibe aggregierte Daten nach PostgreSQL...")
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute("TRUNCATE TABLE ersatzteil_mapping RESTART IDENTITY")
|
|
logger.info("🗑️ Tabelle geleert")
|
|
|
|
batch = []
|
|
total = 0
|
|
|
|
for ersatzteil_id, geraet_list in mapping.items():
|
|
batch.append((
|
|
ersatzteil_id,
|
|
geraet_list, # Python List → PostgreSQL Array
|
|
len(geraet_list)
|
|
))
|
|
|
|
if len(batch) >= batch_size:
|
|
cursor.executemany("""
|
|
INSERT INTO ersatzteil_mapping
|
|
(ersatzteil_id, geraet_ids, geraet_count)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (ersatzteil_id) DO UPDATE SET
|
|
geraet_ids = EXCLUDED.geraet_ids,
|
|
geraet_count = EXCLUDED.geraet_count,
|
|
last_updated = CURRENT_TIMESTAMP
|
|
""", batch)
|
|
conn.commit()
|
|
total += len(batch)
|
|
batch.clear()
|
|
logger.info(f" → {total:,}/{len(mapping):,} Ersatzteile geschrieben...")
|
|
|
|
# Letzte Batch
|
|
if batch:
|
|
cursor.executemany("""
|
|
INSERT INTO ersatzteil_mapping
|
|
(ersatzteil_id, geraet_ids, geraet_count)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (ersatzteil_id) DO UPDATE SET
|
|
geraet_ids = EXCLUDED.geraet_ids,
|
|
geraet_count = EXCLUDED.geraet_count,
|
|
last_updated = CURRENT_TIMESTAMP
|
|
""", batch)
|
|
conn.commit()
|
|
total += len(batch)
|
|
|
|
logger.info(f"✅ {total:,} Ersatzteile mit Arrays gespeichert")
|
|
|
|
# STATISTIK
|
|
cursor.execute("SELECT MAX(geraet_count), AVG(geraet_count) FROM ersatzteil_mapping")
|
|
max_count, avg_count = cursor.fetchone()
|
|
logger.info(f"📊 Max Geräte pro Teil: {max_count:,}")
|
|
logger.info(f"📊 Ø Geräte pro Teil: {avg_count:.0f}")
|
|
|
|
cursor.close()
|
|
|
|
|
|
def import_passendwie(conn, config, logger):
|
|
"""Importiert PassendWie-Daten (Ein Ersatzteil hat mehrere Vertreiber-Codes)"""
|
|
logger.info("=" * 70)
|
|
logger.info("[4/4] IMPORTIERE PASSENDWIE")
|
|
logger.info("=" * 70)
|
|
|
|
csv_path = config.get('csv_files', 'passendwie')
|
|
batch_size = config.getint('import_settings', 'batch_size_passendwie', fallback=1000)
|
|
|
|
if not os.path.exists(csv_path):
|
|
logger.warning(f"⚠️ CSV nicht gefunden: {csv_path}")
|
|
logger.warning("⚠️ Überspringe PassendWie-Import")
|
|
return
|
|
|
|
file_size_mb = os.path.getsize(csv_path) / (1024**2)
|
|
logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_mb:.1f} MB)")
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute("TRUNCATE TABLE passendwie RESTART IDENTITY CASCADE")
|
|
logger.info("🗑️ Tabelle geleert")
|
|
|
|
batch = []
|
|
total = 0
|
|
errors = 0
|
|
duplicates = 0
|
|
|
|
with open(csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f, delimiter=';')
|
|
|
|
for row in reader:
|
|
try:
|
|
values = (
|
|
int(row['id']) if row.get('id') else None,
|
|
int(row['navisionid']) if row.get('navisionid') else None,
|
|
row.get('vertreiberid'),
|
|
row.get('vertreiber'),
|
|
row.get('bestellcode')
|
|
)
|
|
batch.append(values)
|
|
|
|
if len(batch) >= batch_size:
|
|
try:
|
|
cursor.executemany("""
|
|
INSERT INTO passendwie
|
|
(id, navision_id, vertreiber_id, vertreiber, bestellcode)
|
|
VALUES (%s,%s,%s,%s,%s)
|
|
ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING
|
|
""", batch)
|
|
conn.commit()
|
|
total += len(batch)
|
|
batch.clear()
|
|
except Exception as e:
|
|
logger.debug(f"Batch-Fehler: {e}")
|
|
conn.rollback()
|
|
# Einzeln einfügen bei Fehler
|
|
for single_row in batch:
|
|
try:
|
|
cursor.execute("""
|
|
INSERT INTO passendwie
|
|
(id, navision_id, vertreiber_id, vertreiber, bestellcode)
|
|
VALUES (%s,%s,%s,%s,%s)
|
|
ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING
|
|
""", single_row)
|
|
conn.commit()
|
|
total += 1
|
|
except Exception:
|
|
duplicates += 1
|
|
batch.clear()
|
|
|
|
except Exception as e:
|
|
errors += 1
|
|
logger.debug(f"Fehler bei Zeile {total + len(batch)}: {e}")
|
|
continue
|
|
|
|
# Letzte Batch
|
|
if batch:
|
|
try:
|
|
cursor.executemany("""
|
|
INSERT INTO passendwie
|
|
(id, navision_id, vertreiber_id, vertreiber, bestellcode)
|
|
VALUES (%s,%s,%s,%s,%s)
|
|
ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING
|
|
""", batch)
|
|
conn.commit()
|
|
total += len(batch)
|
|
except Exception as e:
|
|
logger.debug(f"Fehler bei letzter Batch: {e}")
|
|
conn.rollback()
|
|
for single_row in batch:
|
|
try:
|
|
cursor.execute("""
|
|
INSERT INTO passendwie
|
|
(id, navision_id, vertreiber_id, vertreiber, bestellcode)
|
|
VALUES (%s,%s,%s,%s,%s)
|
|
ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING
|
|
""", single_row)
|
|
conn.commit()
|
|
total += 1
|
|
except Exception:
|
|
duplicates += 1
|
|
|
|
logger.info(f"✅ {total:,} PassendWie-Einträge importiert (Fehler: {errors}, Duplikate übersprungen: {duplicates})")
|
|
cursor.close()
|
|
|
|
|
|
def analyze_tables(conn, logger):
|
|
"""Aktualisiert Tabellen-Statistiken für Query-Optimizer"""
|
|
logger.info("=" * 70)
|
|
logger.info("[FINAL] AKTUALISIERE TABELLEN-STATISTIKEN")
|
|
logger.info("=" * 70)
|
|
|
|
# Rollback falls vorherige Transaction fehlgeschlagen
|
|
conn.rollback()
|
|
|
|
cursor = conn.cursor()
|
|
|
|
tables = ['geraete', 'ersatzteil_mapping', 'ersatzteile', 'passendwie']
|
|
for table in tables:
|
|
try:
|
|
logger.info(f" ANALYZE {table}...")
|
|
cursor.execute(f"ANALYZE {table}")
|
|
conn.commit()
|
|
except Exception as e:
|
|
logger.warning(f" ⚠️ ANALYZE {table} fehlgeschlagen: {e}")
|
|
conn.rollback()
|
|
continue
|
|
|
|
logger.info("✅ Statistiken aktualisiert")
|
|
cursor.close()
|
|
|
|
|
|
# ============================================================================
|
|
# Main
|
|
# ============================================================================
|
|
|
|
def main():
|
|
"""Hauptfunktion"""
|
|
parser = argparse.ArgumentParser(description='Krempl PostgreSQL Import')
|
|
parser.add_argument('--config', default='config.ini', help='Pfad zur config.ini')
|
|
args = parser.parse_args()
|
|
|
|
# Config laden
|
|
config = load_config(args.config)
|
|
|
|
# Logging setup
|
|
logger = setup_logging(config)
|
|
|
|
# Header
|
|
print("=" * 70)
|
|
print("KREMPL POSTGRESQL IMPORT")
|
|
print("Aggregiert 61 Millionen Zeilen → 68k Zeilen mit Arrays")
|
|
print("=" * 70)
|
|
print()
|
|
|
|
start_time = datetime.now()
|
|
|
|
# PostgreSQL Connection
|
|
conn = connect_postgres(config)
|
|
|
|
try:
|
|
# Import durchführen
|
|
import_geraete(conn, config, logger)
|
|
import_ersatzteile(conn, config, logger)
|
|
import_mapping_aggregated(conn, config, logger) # WICHTIGSTE FUNKTION!
|
|
import_passendwie(conn, config, logger)
|
|
analyze_tables(conn, logger)
|
|
|
|
# Erfolg
|
|
duration = datetime.now() - start_time
|
|
print()
|
|
print("=" * 70)
|
|
print(f"✅ IMPORT ERFOLGREICH in {duration}")
|
|
print("=" * 70)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.warning("\n⚠️ Import abgebrochen durch Benutzer")
|
|
conn.rollback()
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"\n❌ FEHLER: {e}")
|
|
logger.exception("Traceback:")
|
|
conn.rollback()
|
|
sys.exit(1)
|
|
|
|
finally:
|
|
conn.close()
|
|
logger.info("Datenbankverbindung geschlossen")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|