247 lines
8.7 KiB
Python
247 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Sicherheitsdaten-Import-Skript für Intelectra Webshop
|
|
Dieses Skript importiert Sicherheitsdaten aus einer CSV-Datei in die Datenbank.
|
|
|
|
Verwendung:
|
|
python3 sdbimport.py /pfad/zur/csv_datei.csv
|
|
|
|
CSV-Format:
|
|
article_id;navision_id;warning_ids;Bedienungsanleitung;EU_Datenblatt;Energielabel;Produktdatenblatt;Sicherheitsdatenblatt;Zertifikat;null_feld
|
|
Beispiel: 12345;67890;1;2;3;http://example.com/ba.pdf;http://example.com/eu.pdf;http://example.com/el.pdf;http://example.com/pd.pdf;http://example.com/sdb.pdf;http://example.com/cert.pdf;
|
|
"""
|
|
|
|
import csv
|
|
import pymysql
|
|
import sys
|
|
import os
|
|
import logging
|
|
from configparser import ConfigParser
|
|
|
|
# Logging-Konfiguration
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler('sdbimport.log')
|
|
]
|
|
)
|
|
logger = logging.getLogger('sdbimport')
|
|
|
|
def load_config():
|
|
"""
|
|
Lädt die Konfiguration aus einer INI-Datei oder Umgebungsvariablen.
|
|
Falls keine Konfigurationsdatei gefunden wird, werden Standardwerte verwendet.
|
|
"""
|
|
config = {
|
|
'host': 'localhost',
|
|
'user': 'tbapy',
|
|
'password': '9%%0H32ryj_N9%%0H32ryj',
|
|
'database': 'webshop-sql',
|
|
'autocommit': True
|
|
}
|
|
|
|
# Versuche, Konfiguration aus Datei zu laden
|
|
config_file = os.path.join(os.path.dirname(__file__), 'db_config.ini')
|
|
if os.path.exists(config_file):
|
|
parser = ConfigParser()
|
|
parser.read(config_file)
|
|
if 'database' in parser:
|
|
config.update(parser['database'])
|
|
logger.info("Konfiguration aus Datei geladen")
|
|
|
|
# Umgebungsvariablen haben Vorrang
|
|
if 'DB_HOST' in os.environ:
|
|
config['host'] = os.environ['DB_HOST']
|
|
if 'DB_USER' in os.environ:
|
|
config['user'] = os.environ['DB_USER']
|
|
if 'DB_PASSWORD' in os.environ:
|
|
config['password'] = os.environ['DB_PASSWORD']
|
|
if 'DB_NAME' in os.environ:
|
|
config['database'] = os.environ['DB_NAME']
|
|
|
|
return config
|
|
|
|
def connect_db():
|
|
"""
|
|
Stellt eine Verbindung zur Datenbank her.
|
|
|
|
Returns:
|
|
pymysql.Connection: Die Datenbankverbindung
|
|
"""
|
|
config = load_config()
|
|
try:
|
|
conn = pymysql.connect(
|
|
host=config['host'],
|
|
user=config['user'],
|
|
password=config['password'],
|
|
database=config['database'],
|
|
autocommit=config['autocommit']
|
|
)
|
|
logger.info(f"Verbindung zur Datenbank {config['database']} hergestellt")
|
|
return conn
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Verbinden zur Datenbank: {e}")
|
|
raise
|
|
|
|
def get_item_id(cursor, article_id, navision_id):
|
|
"""
|
|
Ermittelt die item_id basierend auf verschiedenen Kriterien.
|
|
|
|
Args:
|
|
cursor: Datenbankverbindung
|
|
article_id: Die Artikel-ID aus der CSV
|
|
navision_id: Die Navision-ID aus der CSV
|
|
|
|
Returns:
|
|
int: Die gefundene item_id oder None wenn nicht gefunden
|
|
"""
|
|
# Versuche zuerst über navision_id und attribute_7 zu finden
|
|
if navision_id:
|
|
sql = """
|
|
SELECT id
|
|
FROM items
|
|
WHERE navision_id = %s
|
|
AND attribute_7 IS NOT NULL
|
|
LIMIT 1
|
|
"""
|
|
cursor.execute(sql, (navision_id,))
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return result[0]
|
|
|
|
# Wenn nicht gefunden, versuche über items.number = article_id
|
|
sql = """
|
|
SELECT id
|
|
FROM items
|
|
WHERE number = %s
|
|
LIMIT 1
|
|
"""
|
|
cursor.execute(sql, (article_id,))
|
|
result = cursor.fetchone()
|
|
return result[0] if result else None
|
|
|
|
def import_sicherheitsdaten(csv_file):
|
|
"""
|
|
Importiert Sicherheitsdaten aus einer CSV-Datei in die Datenbank.
|
|
|
|
Args:
|
|
csv_file: Pfad zur CSV-Datei
|
|
|
|
Returns:
|
|
int: Anzahl der verarbeiteten Artikel
|
|
"""
|
|
conn = connect_db()
|
|
cursor = conn.cursor()
|
|
processed_items = 0
|
|
skipped_items = 0
|
|
|
|
try:
|
|
# Prüfe, ob die Datei existiert
|
|
if not os.path.exists(csv_file):
|
|
raise FileNotFoundError(f"Die Datei {csv_file} wurde nicht gefunden")
|
|
|
|
# Prüfe, ob die Datei leer ist
|
|
if os.path.getsize(csv_file) == 0:
|
|
raise ValueError(f"Die Datei {csv_file} ist leer")
|
|
|
|
with open(csv_file, newline='', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f, delimiter=';')
|
|
|
|
# Prüfe, ob die Datei die erforderlichen Spalten enthält
|
|
required_fields = ['article_id', 'warning_ids']
|
|
missing_fields = [field for field in required_fields if field not in reader.fieldnames]
|
|
if missing_fields:
|
|
raise ValueError(f"Die CSV-Datei fehlt die erforderlichen Spalten: {', '.join(missing_fields)}")
|
|
|
|
logger.info(f"CSV-Header: {reader.fieldnames}")
|
|
|
|
for row_num, row in enumerate(reader, start=2):
|
|
try:
|
|
article_id = row['article_id']
|
|
navision_id = row.get('navision_id')
|
|
|
|
if not article_id:
|
|
logger.warning(f"Zeile {row_num}: Keine article_id angegeben, überspringe Zeile")
|
|
skipped_items += 1
|
|
continue
|
|
|
|
# Ermittle item_id basierend auf den Kriterien
|
|
item_id = get_item_id(cursor, article_id, navision_id)
|
|
|
|
if not item_id:
|
|
logger.warning(f"Zeile {row_num}: Kein passender Artikel gefunden für article_id={article_id}, navision_id={navision_id}")
|
|
skipped_items += 1
|
|
continue
|
|
|
|
warning_ids = row.get('warning_ids') or ''
|
|
ba = row.get('Bedienungsanleitung') or ''
|
|
eu = row.get('EU_Datenblatt') or ''
|
|
el = row.get('Energielabel') or ''
|
|
pd = row.get('Produktdatenblatt') or ''
|
|
sdb = row.get('Sicherheitsdatenblatt') or ''
|
|
cert = row.get('Zertifikat') or ''
|
|
null_feld = row.get('null_feld') or ''
|
|
|
|
sql = """
|
|
INSERT INTO sicherheitshinweis_item
|
|
(article_id, navision_id, warning_ids, Bedienungsanleitung, EU_Datenblatt, Energielabel,
|
|
Produktdatenblatt, Sicherheitsdatenblatt, Zertifikat, null_feld)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON DUPLICATE KEY UPDATE
|
|
navision_id = VALUES(navision_id),
|
|
warning_ids = VALUES(warning_ids),
|
|
Bedienungsanleitung = VALUES(Bedienungsanleitung),
|
|
EU_Datenblatt = VALUES(EU_Datenblatt),
|
|
Energielabel = VALUES(Energielabel),
|
|
Produktdatenblatt = VALUES(Produktdatenblatt),
|
|
Sicherheitsdatenblatt = VALUES(Sicherheitsdatenblatt),
|
|
Zertifikat = VALUES(Zertifikat),
|
|
null_feld = VALUES(null_feld)
|
|
"""
|
|
|
|
cursor.execute(sql, (
|
|
item_id, # Verwende die gefundene item_id statt article_id
|
|
navision_id,
|
|
warning_ids,
|
|
ba, eu, el, pd, sdb, cert, null_feld
|
|
))
|
|
|
|
processed_items += 1
|
|
|
|
# Logge Fortschritt alle 100 Einträge
|
|
if processed_items % 100 == 0:
|
|
logger.info(f"{processed_items} Artikel verarbeitet")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei Zeile {row_num} (article_id: {article_id}): {e}")
|
|
skipped_items += 1
|
|
continue
|
|
|
|
logger.info(f"Import abgeschlossen: {processed_items} Artikel verarbeitet, {skipped_items} übersprungen")
|
|
return processed_items
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Sicherheitsdaten-Import: {e}")
|
|
raise
|
|
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
logger.info("Sicherheitsdaten-Import abgeschlossen")
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) != 2:
|
|
print("Bitte CSV-Datei als Parameter angeben!")
|
|
print("Verwendung: python3 sdbimport.py /pfad/zur/csv_datei.csv")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
processed_count = import_sicherheitsdaten(sys.argv[1])
|
|
print(f"Insgesamt {processed_count} eindeutige Artikel verarbeitet")
|
|
except Exception as e:
|
|
logger.error(f"Fehler: {e}")
|
|
sys.exit(1)
|