shop-old/modules/import/import_script.py
2026-04-20 01:03:43 +02:00

434 lines
15 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Import-Skript für Intelectra Webshop
Dieses Skript wird remote per SSH von der GUI aufgerufen.
Es erwartet eine CSV-Datei als erstes Argument.
Mit --passendwie wird die krempl_passendwie Tabelle aktualisiert.
"""
import csv
import pymysql
import os
import sys
import argparse
from datetime import datetime
from configparser import ConfigParser
CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'db_config.ini')
REQUIRED_COLUMNS = {
'id', 'altenr', 'originalnummer', 'navisionid', 'zolltarifnummer',
'marke', 'ersatzteilvertreiber', 'ean', 'ersatzteilvorwaerts',
'ersatzteilvorwaertsnavisionid'
}
REQUIRED_COLUMNS_PASSENDWIE = {
'id', 'navisionid', 'vertreiberid', 'vertreiber', 'bestellcode'
}
REQUIRED_COLUMNS_GERAETE = {
'id', 'nr', 'marke', 'typ', 'zusatzNummer', 'modellBezeichnung', 'produktionsstart', 'produktionsende',
'wgtext1', 'wgtext2', 'zusatz', 'bezeichnungoriginal', 'typDE', 'typFR'
}
REQUIRED_COLUMNS_MAPPING = {
'ersatzteil', 'geraet'
}
def load_db_config():
parser = ConfigParser()
parser.read(CONFIG_PATH)
config = parser['database']
return {
'host': config.get('host', 'localhost'),
'user': config.get('user'),
'password': config.get('password', ''),
'database': config.get('database', 'webshop-sql'),
'autocommit': True
}
def connect_to_db():
return pymysql.connect(**load_db_config())
def validate_headers(headers):
return REQUIRED_COLUMNS.issubset(set(headers))
def validate_geraete_headers(headers):
return REQUIRED_COLUMNS_GERAETE.issubset(set(headers))
def validate_mapping_headers(headers):
return REQUIRED_COLUMNS_MAPPING.issubset(set(headers))
def run_import(csv_path):
if not os.path.exists(csv_path):
print(f"[ERROR] Datei nicht gefunden: {csv_path}")
return
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=';')
if not validate_headers(reader.fieldnames):
print(f"[ERROR] CSV-Header ungültig: {reader.fieldnames}")
return
conn = connect_to_db()
cursor = conn.cursor()
backup_table(cursor)
cursor.execute("TRUNCATE TABLE krempl_art_22")
insert_sql = """
INSERT INTO krempl_art_22
(id, altenr, originalnummer, navisionid, zolltarifnummer, marke,
ersatzteilvertreiber, ean, ersatzteilvorwaerts, ersatzteilvorwaertsnavisionid)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
total = 0
batch = []
for row in reader:
try:
values = (
int(row['id']) if row['id'] else None,
row['altenr'],
row['originalnummer'],
int(row['navisionid']) if row['navisionid'] else None,
row['zolltarifnummer'],
row['marke'],
row['ersatzteilvertreiber'],
row['ean'],
row['ersatzteilvorwaerts'],
int(row['ersatzteilvorwaertsnavisionid']) if row['ersatzteilvorwaertsnavisionid'] else None
)
batch.append(values)
if len(batch) >= 500:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
batch.clear()
except Exception as e:
print(f"[WARN] Zeile übersprungen wegen Fehler: {e}")
continue
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
print(f"[OK] Import abgeschlossen. {total} Datensätze importiert.")
# Tabellen-Statistiken aktualisieren für bessere Query-Performance
print("[INFO] Aktualisiere Tabellen-Statistiken...")
cursor.execute("ANALYZE TABLE krempl_art_22")
print("[OK] Statistiken aktualisiert.")
cursor.close()
conn.close()
def backup_table(cursor):
name = f"krempl_art_22_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
cursor.execute(f"CREATE TABLE {name} AS SELECT * FROM krempl_art_22")
print(f"[INFO] Backup-Tabelle erstellt: {name}")
except Exception as e:
print(f"[WARN] Backup fehlgeschlagen: {e}")
def run_passendwie_import(csv_path):
if not os.path.exists(csv_path):
print(f"[ERROR] Datei nicht gefunden: {csv_path}")
return
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=';')
if not REQUIRED_COLUMNS_PASSENDWIE.issubset(set(reader.fieldnames)):
print(f"[ERROR] CSV-Header ungültig: {reader.fieldnames}")
return
conn = connect_to_db()
cursor = conn.cursor()
backup_passendwie_table(cursor)
insert_sql = """
INSERT INTO krempl_passendwie
(id, navisionid, vertreiberid, vertreiber, bestellcode)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
navisionid = VALUES(navisionid),
vertreiberid = VALUES(vertreiberid),
vertreiber = VALUES(vertreiber),
bestellcode = VALUES(bestellcode)
"""
total = 0
batch = []
for row in reader:
try:
values = (
int(row['id']) if row['id'] else None,
int(row['navisionid']) if row['navisionid'] else None,
row['vertreiberid'],
row['vertreiber'],
row['bestellcode']
)
batch.append(values)
if len(batch) >= 500:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
batch.clear()
except Exception as e:
print(f"[WARN] Zeile übersprungen wegen Fehler: {e}")
continue
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
print(f"[OK] PassendWie-Import abgeschlossen. {total} Datensätze importiert.")
# Tabellen-Statistiken aktualisieren für bessere Query-Performance
print("[INFO] Aktualisiere Tabellen-Statistiken...")
cursor.execute("ANALYZE TABLE krempl_passendwie")
print("[OK] Statistiken aktualisiert.")
cursor.close()
conn.close()
def backup_passendwie_table(cursor):
# 1. Alle bisherigen Backups suchen
cursor.execute("SHOW TABLES LIKE 'krempl_passendwie_backup_%'")
backups = [row[0] for row in cursor.fetchall()]
# 2. Nach Datum sortieren (Tabellenname enthält Zeitstempel)
backups.sort()
# 3. Wenn mehr als 2 Backups existieren, die ältesten löschen
while len(backups) >= 3:
to_delete = backups.pop(0)
try:
cursor.execute(f"DROP TABLE `{to_delete}`")
print(f"[INFO] Altes Backup gelöscht: {to_delete}")
except Exception as e:
print(f"[WARN] Konnte Backup nicht löschen: {e}")
# 4. Neues Backup anlegen
name = f"krempl_passendwie_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
cursor.execute(f"CREATE TABLE `{name}` AS SELECT * FROM krempl_passendwie")
print(f"[INFO] Backup-Tabelle erstellt: {name}")
except Exception as e:
print(f"[WARN] Backup fehlgeschlagen: {e}")
def run_geraete_import(csv_path):
if not os.path.exists(csv_path):
print(f"[ERROR] Datei nicht gefunden: {csv_path}")
return
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=';')
if not validate_geraete_headers(reader.fieldnames):
print(f"[ERROR] CSV-Header ungültig: {reader.fieldnames}")
return
conn = connect_to_db()
cursor = conn.cursor()
backup_geraete_table(cursor)
insert_sql = """
INSERT INTO krempl_geraete_22
(id, nr, marke, typ, zusatZNummer, modellBezeichnung, produktionsstart, produktionsende, wgtext1, wgtext2, zusatz, bezeichnungoriginal, typDE, typFR)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
nr = VALUES(nr),
marke = VALUES(marke),
typ = VALUES(typ),
zusatZNummer = VALUES(zusatZNummer),
modellBezeichnung = VALUES(modellBezeichnung),
produktionsstart = VALUES(produktionsstart),
produktionsende = VALUES(produktionsende),
wgtext1 = VALUES(wgtext1),
wgtext2 = VALUES(wgtext2),
zusatz = VALUES(zusatz),
bezeichnungoriginal = VALUES(bezeichnungoriginal),
typDE = VALUES(typDE),
typFR = VALUES(typFR)
"""
total = 0
batch = []
for row in reader:
try:
values = (
int(row['id']) if row['id'] else None,
row['nr'],
row['marke'],
row['typ'],
row['zusatzNummer'],
row['modellBezeichnung'],
row['produktionsstart'],
row['produktionsende'],
row['wgtext1'],
row['wgtext2'],
row['zusatz'],
row['bezeichnungoriginal'],
row['typDE'],
row['typFR']
)
batch.append(values)
if len(batch) >= 500:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
batch.clear()
except Exception as e:
print(f"[WARN] Zeile übersprungen wegen Fehler: {e}")
continue
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
print(f"[OK] ZusatzDatenOriginalteil-Import abgeschlossen. {total} Datensätze importiert.")
# Tabellen-Statistiken aktualisieren für bessere Query-Performance
print("[INFO] Aktualisiere Tabellen-Statistiken...")
cursor.execute("ANALYZE TABLE krempl_geraete_22")
print("[OK] Statistiken aktualisiert.")
cursor.close()
conn.close()
def backup_geraete_table(cursor):
# 1. Alle bisherigen Backups suchen
cursor.execute("SHOW TABLES LIKE 'krempl_geraete_22_backup_%'")
backups = [row[0] for row in cursor.fetchall()]
# 2. Nach Datum sortieren (Tabellenname enthält Zeitstempel)
backups.sort()
# 3. Wenn mehr als 2 Backups existieren, die ältesten löschen
while len(backups) >= 3:
to_delete = backups.pop(0)
try:
cursor.execute(f"DROP TABLE `{to_delete}`")
print(f"[INFO] Altes Backup gelöscht: {to_delete}")
except Exception as e:
print(f"[WARN] Konnte Backup nicht löschen: {e}")
# 4. Neues Backup anlegen
name = f"krempl_geraete_22_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
cursor.execute(f"CREATE TABLE `{name}` AS SELECT * FROM krempl_geraete_22")
print(f"[INFO] Backup-Tabelle erstellt: {name}")
except Exception as e:
print(f"[WARN] Backup fehlgeschlagen: {e}")
def run_mapping_import(csv_path):
if not os.path.exists(csv_path):
print(f"[ERROR] Datei nicht gefunden: {csv_path}")
return
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=';')
if not validate_mapping_headers(reader.fieldnames):
print(f"[ERROR] CSV-Header ungültig: {reader.fieldnames}")
return
conn = connect_to_db()
cursor = conn.cursor()
backup_mapping_table(cursor)
insert_sql = """
INSERT INTO krempl_artikelgeraet_22
(ersatzteil, geraet)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE
geraet = VALUES(geraet)
"""
total = 0
batch = []
for row in reader:
try:
values = (
int(row['ersatzteil']) if row['ersatzteil'] else None,
int(row['geraet']) if row['geraet'] else None
)
batch.append(values)
if len(batch) >= 500:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
batch.clear()
except Exception as e:
print(f"[WARN] Zeile übersprungen wegen Fehler: {e}")
continue
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
print(f"[OK] KremplMapping-Import abgeschlossen. {total} Datensätze importiert.")
# Tabellen-Statistiken aktualisieren für bessere Query-Performance
print("[INFO] Aktualisiere Tabellen-Statistiken...")
cursor.execute("ANALYZE TABLE krempl_artikelgeraet_22")
print("[OK] Statistiken aktualisiert.")
cursor.close()
conn.close()
def backup_mapping_table(cursor):
# 1. Alle bisherigen Backups suchen
cursor.execute("SHOW TABLES LIKE 'krempl_artikelgeraet_22_backup_%'")
backups = [row[0] for row in cursor.fetchall()]
# 2. Nach Datum sortieren (Tabellenname enthält Zeitstempel)
backups.sort()
# 3. Wenn mehr als 2 Backups existieren, die ältesten löschen
while len(backups) >= 3:
to_delete = backups.pop(0)
try:
cursor.execute(f"DROP TABLE `{to_delete}`")
print(f"[INFO] Altes Backup gelöscht: {to_delete}")
except Exception as e:
print(f"[WARN] Konnte Backup nicht löschen: {e}")
# 4. Neues Backup anlegen
name = f"krempl_artikelgeraet_22_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
cursor.execute(f"CREATE TABLE `{name}` AS SELECT * FROM krempl_artikelgeraet_22")
print(f"[INFO] Backup-Tabelle erstellt: {name}")
except Exception as e:
print(f"[WARN] Backup fehlgeschlagen: {e}")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Import-Skript für Intelectra Webshop')
parser.add_argument('csv_file', help='Pfad zur CSV-Datei')
parser.add_argument('--passendwie', action='store_true', help='Import in krempl_passendwie Tabelle')
parser.add_argument('--geraete', action='store_true', help='Import in krempl_geraete_22 Tabelle')
parser.add_argument('--mapping', action='store_true', help='Import in krempl_artikelgeraet_22 Tabelle')
args = parser.parse_args()
print(f"[INFO] Starte Import für Datei: {args.csv_file}")
if args.mapping:
run_mapping_import(args.csv_file)
elif args.geraete:
run_geraete_import(args.csv_file)
elif args.passendwie:
run_passendwie_import(args.csv_file)
else:
run_import(args.csv_file)