shop-old/core/postgres/import.py
Thomas Bartelt afd2cedf92
All checks were successful
Deploy to Dev / deploy (push) Successful in 0s
feat: neue PostgreSQL-Suche (tba-search) aus newmail-vhost übernommen
2026-04-20 02:11:11 +02:00

564 lines
19 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Krempl PostgreSQL Import Script
Importiert Krempl-Daten von CSV nach PostgreSQL und aggregiert dabei
61 Millionen Mapping-Zeilen zu 68k Zeilen mit Arrays.
Usage:
python3 import.py [--config config.ini]
"""
import psycopg2
import csv
import os
import sys
import logging
from collections import defaultdict
from datetime import datetime
from configparser import ConfigParser
from pathlib import Path
import argparse
# ============================================================================
# Konfiguration laden
# ============================================================================
def load_config(config_file='config.ini'):
"""Lädt Konfiguration aus INI-Datei"""
config = ConfigParser()
if not os.path.exists(config_file):
print(f"❌ Konfigurationsdatei nicht gefunden: {config_file}")
print(f"💡 Kopiere config.ini.example zu config.ini und passe die Werte an!")
sys.exit(1)
config.read(config_file, encoding='utf-8')
return config
# ============================================================================
# Logging Setup
# ============================================================================
def setup_logging(config):
"""Richtet Logging ein"""
log_level = config.get('logging', 'log_level', fallback='INFO')
log_file = config.get('logging', 'log_file', fallback='')
logging_config = {
'level': getattr(logging, log_level),
'format': '%(asctime)s [%(levelname)s] %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'
}
if log_file:
logging_config['filename'] = log_file
logging_config['filemode'] = 'a'
logging.basicConfig(**logging_config)
return logging.getLogger(__name__)
# ============================================================================
# PostgreSQL Connection
# ============================================================================
def connect_postgres(config):
"""Stellt Verbindung zu PostgreSQL her"""
try:
conn = psycopg2.connect(
host=config.get('postgresql', 'host'),
port=config.getint('postgresql', 'port'),
database=config.get('postgresql', 'database'),
user=config.get('postgresql', 'user'),
password=config.get('postgresql', 'password')
)
logging.info(f"✅ Verbunden mit PostgreSQL: {config.get('postgresql', 'database')}")
return conn
except Exception as e:
logging.error(f"❌ PostgreSQL Verbindung fehlgeschlagen: {e}")
sys.exit(1)
# ============================================================================
# Import Funktionen
# ============================================================================
def import_geraete(conn, config, logger):
"""Importiert Geräte-Tabelle (~1.4 Mio Zeilen)"""
logger.info("=" * 70)
logger.info("[1/4] IMPORTIERE GERÄTE")
logger.info("=" * 70)
csv_path = config.get('csv_files', 'geraete')
batch_size = config.getint('import_settings', 'batch_size_geraete', fallback=1000)
progress_interval = config.getint('import_settings', 'progress_interval', fallback=100000)
if not os.path.exists(csv_path):
logger.warning(f"⚠️ CSV nicht gefunden: {csv_path}")
logger.warning("⚠️ Überspringe Geräte-Import")
return
file_size_mb = os.path.getsize(csv_path) / (1024**2)
logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_mb:.1f} MB)")
cursor = conn.cursor()
# Tabelle leeren
cursor.execute("TRUNCATE TABLE geraete RESTART IDENTITY CASCADE")
logger.info("🗑️ Tabelle geleert")
# CSV einlesen
batch = []
total = 0
errors = 0
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=';')
for row in reader:
try:
values = (
int(row['id']) if row.get('id') else None,
row.get('nr'),
row.get('marke'),
row.get('typ'),
row.get('zusatzNummer'),
row.get('modellBezeichnung'),
row.get('produktionsstart'),
row.get('produktionsende'),
row.get('wgtext1'),
row.get('wgtext2'),
row.get('zusatz'),
row.get('bezeichnungoriginal'),
row.get('typDE'),
row.get('typFR')
)
batch.append(values)
if len(batch) >= batch_size:
cursor.executemany("""
INSERT INTO geraete
(id, nr, marke, typ, zusatz_nummer, modell_bezeichnung,
produktionsstart, produktionsende, wgtext1, wgtext2,
zusatz, bezeichnung_original, typ_de, typ_fr)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (id) DO UPDATE SET
nr = EXCLUDED.nr,
marke = EXCLUDED.marke,
typ = EXCLUDED.typ,
updated_at = CURRENT_TIMESTAMP
""", batch)
conn.commit()
total += len(batch)
batch.clear()
if total % progress_interval == 0:
logger.info(f"{total:,} Geräte importiert...")
except Exception as e:
errors += 1
logger.debug(f"Fehler bei Zeile {total + len(batch)}: {e}")
continue
# Letzte Batch
if batch:
try:
cursor.executemany("""
INSERT INTO geraete
(id, nr, marke, typ, zusatz_nummer, modell_bezeichnung,
produktionsstart, produktionsende, wgtext1, wgtext2,
zusatz, bezeichnung_original, typ_de, typ_fr)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (id) DO UPDATE SET
nr = EXCLUDED.nr,
updated_at = CURRENT_TIMESTAMP
""", batch)
conn.commit()
total += len(batch)
except Exception as e:
logger.error(f"Fehler bei letzter Batch: {e}")
logger.info(f"{total:,} Geräte importiert (Fehler: {errors})")
cursor.close()
def import_ersatzteile(conn, config, logger):
"""Importiert Ersatzteil-Metadaten"""
logger.info("=" * 70)
logger.info("[2/4] IMPORTIERE ERSATZTEILE")
logger.info("=" * 70)
csv_path = config.get('csv_files', 'artikel')
batch_size = config.getint('import_settings', 'batch_size_artikel', fallback=1000)
if not os.path.exists(csv_path):
logger.warning(f"⚠️ CSV nicht gefunden: {csv_path}")
logger.warning("⚠️ Überspringe Ersatzteile-Import")
return
file_size_mb = os.path.getsize(csv_path) / (1024**2)
logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_mb:.1f} MB)")
cursor = conn.cursor()
cursor.execute("TRUNCATE TABLE ersatzteile RESTART IDENTITY")
logger.info("🗑️ Tabelle geleert")
batch = []
total = 0
errors = 0
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=';')
for row in reader:
try:
values = (
int(row['id']) if row.get('id') else None,
int(row['navisionid']) if row.get('navisionid') else None,
row.get('originalnummer'),
row.get('marke'),
row.get('ean'),
row.get('altenr')
)
batch.append(values)
if len(batch) >= batch_size:
cursor.executemany("""
INSERT INTO ersatzteile
(id, navision_id, originalnummer, marke, ean, altenr)
VALUES (%s,%s,%s,%s,%s,%s)
ON CONFLICT (id) DO NOTHING
""", batch)
conn.commit()
total += len(batch)
batch.clear()
except Exception as e:
errors += 1
logger.debug(f"Fehler bei Zeile {total + len(batch)}: {e}")
continue
# Letzte Batch
if batch:
try:
cursor.executemany("""
INSERT INTO ersatzteile
(id, navision_id, originalnummer, marke, ean, altenr)
VALUES (%s,%s,%s,%s,%s,%s)
ON CONFLICT (id) DO NOTHING
""", batch)
conn.commit()
total += len(batch)
except Exception as e:
logger.error(f"Fehler bei letzter Batch: {e}")
logger.info(f"{total:,} Ersatzteile importiert (Fehler: {errors})")
cursor.close()
def import_mapping_aggregated(conn, config, logger):
"""
HAUPTFUNKTION: Import Mapping mit Aggregation
61 Mio Zeilen → 68k Zeilen mit Arrays
"""
logger.info("=" * 70)
logger.info("[3/4] IMPORTIERE MAPPING (AGGREGIERT)")
logger.info("=" * 70)
csv_path = config.get('csv_files', 'mapping')
batch_size = config.getint('import_settings', 'batch_size_mapping', fallback=500)
progress_interval = config.getint('import_settings', 'progress_interval', fallback=1000000)
in_memory = config.getboolean('import_settings', 'mapping_in_memory', fallback=True)
if not os.path.exists(csv_path):
logger.error(f"❌ CSV nicht gefunden: {csv_path}")
logger.error("❌ Mapping-Import ist kritisch! Abbruch.")
return
file_size_gb = os.path.getsize(csv_path) / (1024**3)
logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_gb:.2f} GB)")
if in_memory:
logger.info("💾 Modus: In-Memory Aggregation (schnell, RAM-intensiv)")
else:
logger.info("💾 Modus: Stream (langsam, RAM-freundlich)")
# SCHRITT 1: CSV lesen und aggregieren
logger.info("⏳ Lese und aggregiere 61 Millionen Zeilen...")
mapping = defaultdict(list)
count = 0
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=';')
for row in reader:
try:
ersatzteil = int(row['ersatzteil'])
geraet = int(row['geraet'])
mapping[ersatzteil].append(geraet)
count += 1
if count % progress_interval == 0:
logger.info(f"{count:,} Zeilen gelesen... ({len(mapping):,} unique Ersatzteile)")
except Exception as e:
logger.debug(f"Fehler bei Zeile {count}: {e}")
continue
logger.info(f"{count:,} Zeilen gelesen")
logger.info(f"{len(mapping):,} unique Ersatzteile gefunden")
# SCHRITT 2: In PostgreSQL schreiben
logger.info("💾 Schreibe aggregierte Daten nach PostgreSQL...")
cursor = conn.cursor()
cursor.execute("TRUNCATE TABLE ersatzteil_mapping RESTART IDENTITY")
logger.info("🗑️ Tabelle geleert")
batch = []
total = 0
for ersatzteil_id, geraet_list in mapping.items():
batch.append((
ersatzteil_id,
geraet_list, # Python List → PostgreSQL Array
len(geraet_list)
))
if len(batch) >= batch_size:
cursor.executemany("""
INSERT INTO ersatzteil_mapping
(ersatzteil_id, geraet_ids, geraet_count)
VALUES (%s, %s, %s)
ON CONFLICT (ersatzteil_id) DO UPDATE SET
geraet_ids = EXCLUDED.geraet_ids,
geraet_count = EXCLUDED.geraet_count,
last_updated = CURRENT_TIMESTAMP
""", batch)
conn.commit()
total += len(batch)
batch.clear()
logger.info(f"{total:,}/{len(mapping):,} Ersatzteile geschrieben...")
# Letzte Batch
if batch:
cursor.executemany("""
INSERT INTO ersatzteil_mapping
(ersatzteil_id, geraet_ids, geraet_count)
VALUES (%s, %s, %s)
ON CONFLICT (ersatzteil_id) DO UPDATE SET
geraet_ids = EXCLUDED.geraet_ids,
geraet_count = EXCLUDED.geraet_count,
last_updated = CURRENT_TIMESTAMP
""", batch)
conn.commit()
total += len(batch)
logger.info(f"{total:,} Ersatzteile mit Arrays gespeichert")
# STATISTIK
cursor.execute("SELECT MAX(geraet_count), AVG(geraet_count) FROM ersatzteil_mapping")
max_count, avg_count = cursor.fetchone()
logger.info(f"📊 Max Geräte pro Teil: {max_count:,}")
logger.info(f"📊 Ø Geräte pro Teil: {avg_count:.0f}")
cursor.close()
def import_passendwie(conn, config, logger):
"""Importiert PassendWie-Daten (Ein Ersatzteil hat mehrere Vertreiber-Codes)"""
logger.info("=" * 70)
logger.info("[4/4] IMPORTIERE PASSENDWIE")
logger.info("=" * 70)
csv_path = config.get('csv_files', 'passendwie')
batch_size = config.getint('import_settings', 'batch_size_passendwie', fallback=1000)
if not os.path.exists(csv_path):
logger.warning(f"⚠️ CSV nicht gefunden: {csv_path}")
logger.warning("⚠️ Überspringe PassendWie-Import")
return
file_size_mb = os.path.getsize(csv_path) / (1024**2)
logger.info(f"📁 CSV-Datei: {csv_path} ({file_size_mb:.1f} MB)")
cursor = conn.cursor()
cursor.execute("TRUNCATE TABLE passendwie RESTART IDENTITY CASCADE")
logger.info("🗑️ Tabelle geleert")
batch = []
total = 0
errors = 0
duplicates = 0
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=';')
for row in reader:
try:
values = (
int(row['id']) if row.get('id') else None,
int(row['navisionid']) if row.get('navisionid') else None,
row.get('vertreiberid'),
row.get('vertreiber'),
row.get('bestellcode')
)
batch.append(values)
if len(batch) >= batch_size:
try:
cursor.executemany("""
INSERT INTO passendwie
(id, navision_id, vertreiber_id, vertreiber, bestellcode)
VALUES (%s,%s,%s,%s,%s)
ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING
""", batch)
conn.commit()
total += len(batch)
batch.clear()
except Exception as e:
logger.debug(f"Batch-Fehler: {e}")
conn.rollback()
# Einzeln einfügen bei Fehler
for single_row in batch:
try:
cursor.execute("""
INSERT INTO passendwie
(id, navision_id, vertreiber_id, vertreiber, bestellcode)
VALUES (%s,%s,%s,%s,%s)
ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING
""", single_row)
conn.commit()
total += 1
except Exception:
duplicates += 1
batch.clear()
except Exception as e:
errors += 1
logger.debug(f"Fehler bei Zeile {total + len(batch)}: {e}")
continue
# Letzte Batch
if batch:
try:
cursor.executemany("""
INSERT INTO passendwie
(id, navision_id, vertreiber_id, vertreiber, bestellcode)
VALUES (%s,%s,%s,%s,%s)
ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING
""", batch)
conn.commit()
total += len(batch)
except Exception as e:
logger.debug(f"Fehler bei letzter Batch: {e}")
conn.rollback()
for single_row in batch:
try:
cursor.execute("""
INSERT INTO passendwie
(id, navision_id, vertreiber_id, vertreiber, bestellcode)
VALUES (%s,%s,%s,%s,%s)
ON CONFLICT (id, vertreiber_id, bestellcode) DO NOTHING
""", single_row)
conn.commit()
total += 1
except Exception:
duplicates += 1
logger.info(f"{total:,} PassendWie-Einträge importiert (Fehler: {errors}, Duplikate übersprungen: {duplicates})")
cursor.close()
def analyze_tables(conn, logger):
"""Aktualisiert Tabellen-Statistiken für Query-Optimizer"""
logger.info("=" * 70)
logger.info("[FINAL] AKTUALISIERE TABELLEN-STATISTIKEN")
logger.info("=" * 70)
# Rollback falls vorherige Transaction fehlgeschlagen
conn.rollback()
cursor = conn.cursor()
tables = ['geraete', 'ersatzteil_mapping', 'ersatzteile', 'passendwie']
for table in tables:
try:
logger.info(f" ANALYZE {table}...")
cursor.execute(f"ANALYZE {table}")
conn.commit()
except Exception as e:
logger.warning(f" ⚠️ ANALYZE {table} fehlgeschlagen: {e}")
conn.rollback()
continue
logger.info("✅ Statistiken aktualisiert")
cursor.close()
# ============================================================================
# Main
# ============================================================================
def main():
"""Hauptfunktion"""
parser = argparse.ArgumentParser(description='Krempl PostgreSQL Import')
parser.add_argument('--config', default='config.ini', help='Pfad zur config.ini')
args = parser.parse_args()
# Config laden
config = load_config(args.config)
# Logging setup
logger = setup_logging(config)
# Header
print("=" * 70)
print("KREMPL POSTGRESQL IMPORT")
print("Aggregiert 61 Millionen Zeilen → 68k Zeilen mit Arrays")
print("=" * 70)
print()
start_time = datetime.now()
# PostgreSQL Connection
conn = connect_postgres(config)
try:
# Import durchführen
import_geraete(conn, config, logger)
import_ersatzteile(conn, config, logger)
import_mapping_aggregated(conn, config, logger) # WICHTIGSTE FUNKTION!
import_passendwie(conn, config, logger)
analyze_tables(conn, logger)
# Erfolg
duration = datetime.now() - start_time
print()
print("=" * 70)
print(f"✅ IMPORT ERFOLGREICH in {duration}")
print("=" * 70)
except KeyboardInterrupt:
logger.warning("\n⚠️ Import abgebrochen durch Benutzer")
conn.rollback()
sys.exit(1)
except Exception as e:
logger.error(f"\n❌ FEHLER: {e}")
logger.exception("Traceback:")
conn.rollback()
sys.exit(1)
finally:
conn.close()
logger.info("Datenbankverbindung geschlossen")
if __name__ == '__main__':
main()