#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Sicherheitsdaten-Import-Skript für Intelectra Webshop Dieses Skript importiert Sicherheitsdaten aus einer CSV-Datei in die Datenbank. Verwendung: python3 sdbimport.py /pfad/zur/csv_datei.csv CSV-Format: article_id;navision_id;warning_ids;Bedienungsanleitung;EU_Datenblatt;Energielabel;Produktdatenblatt;Sicherheitsdatenblatt;Zertifikat;null_feld Beispiel: 12345;67890;1;2;3;http://example.com/ba.pdf;http://example.com/eu.pdf;http://example.com/el.pdf;http://example.com/pd.pdf;http://example.com/sdb.pdf;http://example.com/cert.pdf; """ import csv import pymysql import sys import os import logging from configparser import ConfigParser # Logging-Konfiguration logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('sdbimport.log') ] ) logger = logging.getLogger('sdbimport') def load_config(): """ Lädt die Konfiguration aus einer INI-Datei oder Umgebungsvariablen. Falls keine Konfigurationsdatei gefunden wird, werden Standardwerte verwendet. """ config = { 'host': 'localhost', 'user': 'tbapy', 'password': '9%%0H32ryj_N9%%0H32ryj', 'database': 'webshop-sql', 'autocommit': True } # Versuche, Konfiguration aus Datei zu laden config_file = os.path.join(os.path.dirname(__file__), 'db_config.ini') if os.path.exists(config_file): parser = ConfigParser() parser.read(config_file) if 'database' in parser: config.update(parser['database']) logger.info("Konfiguration aus Datei geladen") # Umgebungsvariablen haben Vorrang if 'DB_HOST' in os.environ: config['host'] = os.environ['DB_HOST'] if 'DB_USER' in os.environ: config['user'] = os.environ['DB_USER'] if 'DB_PASSWORD' in os.environ: config['password'] = os.environ['DB_PASSWORD'] if 'DB_NAME' in os.environ: config['database'] = os.environ['DB_NAME'] return config def connect_db(): """ Stellt eine Verbindung zur Datenbank her. Returns: pymysql.Connection: Die Datenbankverbindung """ config = load_config() try: conn = pymysql.connect( host=config['host'], user=config['user'], password=config['password'], database=config['database'], autocommit=config['autocommit'] ) logger.info(f"Verbindung zur Datenbank {config['database']} hergestellt") return conn except Exception as e: logger.error(f"Fehler beim Verbinden zur Datenbank: {e}") raise def get_item_id(cursor, article_id, navision_id): """ Ermittelt die item_id basierend auf verschiedenen Kriterien. Args: cursor: Datenbankverbindung article_id: Die Artikel-ID aus der CSV navision_id: Die Navision-ID aus der CSV Returns: int: Die gefundene item_id oder None wenn nicht gefunden """ # Versuche zuerst über navision_id und attribute_7 zu finden if navision_id: sql = """ SELECT id FROM items WHERE navision_id = %s AND attribute_7 IS NOT NULL LIMIT 1 """ cursor.execute(sql, (navision_id,)) result = cursor.fetchone() if result: return result[0] # Wenn nicht gefunden, versuche über items.number = article_id sql = """ SELECT id FROM items WHERE number = %s LIMIT 1 """ cursor.execute(sql, (article_id,)) result = cursor.fetchone() return result[0] if result else None def import_sicherheitsdaten(csv_file): """ Importiert Sicherheitsdaten aus einer CSV-Datei in die Datenbank. Args: csv_file: Pfad zur CSV-Datei Returns: int: Anzahl der verarbeiteten Artikel """ conn = connect_db() cursor = conn.cursor() processed_items = 0 skipped_items = 0 try: # Prüfe, ob die Datei existiert if not os.path.exists(csv_file): raise FileNotFoundError(f"Die Datei {csv_file} wurde nicht gefunden") # Prüfe, ob die Datei leer ist if os.path.getsize(csv_file) == 0: raise ValueError(f"Die Datei {csv_file} ist leer") with open(csv_file, newline='', encoding='utf-8') as f: reader = csv.DictReader(f, delimiter=';') # Prüfe, ob die Datei die erforderlichen Spalten enthält required_fields = ['article_id', 'warning_ids'] missing_fields = [field for field in required_fields if field not in reader.fieldnames] if missing_fields: raise ValueError(f"Die CSV-Datei fehlt die erforderlichen Spalten: {', '.join(missing_fields)}") logger.info(f"CSV-Header: {reader.fieldnames}") for row_num, row in enumerate(reader, start=2): try: article_id = row['article_id'] navision_id = row.get('navision_id') if not article_id: logger.warning(f"Zeile {row_num}: Keine article_id angegeben, überspringe Zeile") skipped_items += 1 continue # Ermittle item_id basierend auf den Kriterien item_id = get_item_id(cursor, article_id, navision_id) if not item_id: logger.warning(f"Zeile {row_num}: Kein passender Artikel gefunden für article_id={article_id}, navision_id={navision_id}") skipped_items += 1 continue warning_ids = row.get('warning_ids') or '' ba = row.get('Bedienungsanleitung') or '' eu = row.get('EU_Datenblatt') or '' el = row.get('Energielabel') or '' pd = row.get('Produktdatenblatt') or '' sdb = row.get('Sicherheitsdatenblatt') or '' cert = row.get('Zertifikat') or '' null_feld = row.get('null_feld') or '' sql = """ INSERT INTO sicherheitshinweis_item (article_id, navision_id, warning_ids, Bedienungsanleitung, EU_Datenblatt, Energielabel, Produktdatenblatt, Sicherheitsdatenblatt, Zertifikat, null_feld) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE navision_id = VALUES(navision_id), warning_ids = VALUES(warning_ids), Bedienungsanleitung = VALUES(Bedienungsanleitung), EU_Datenblatt = VALUES(EU_Datenblatt), Energielabel = VALUES(Energielabel), Produktdatenblatt = VALUES(Produktdatenblatt), Sicherheitsdatenblatt = VALUES(Sicherheitsdatenblatt), Zertifikat = VALUES(Zertifikat), null_feld = VALUES(null_feld) """ cursor.execute(sql, ( item_id, # Verwende die gefundene item_id statt article_id navision_id, warning_ids, ba, eu, el, pd, sdb, cert, null_feld )) processed_items += 1 # Logge Fortschritt alle 100 Einträge if processed_items % 100 == 0: logger.info(f"{processed_items} Artikel verarbeitet") except Exception as e: logger.error(f"Fehler bei Zeile {row_num} (article_id: {article_id}): {e}") skipped_items += 1 continue logger.info(f"Import abgeschlossen: {processed_items} Artikel verarbeitet, {skipped_items} übersprungen") return processed_items except Exception as e: logger.error(f"Fehler beim Sicherheitsdaten-Import: {e}") raise finally: cursor.close() conn.close() logger.info("Sicherheitsdaten-Import abgeschlossen") if __name__ == "__main__": if len(sys.argv) != 2: print("Bitte CSV-Datei als Parameter angeben!") print("Verwendung: python3 sdbimport.py /pfad/zur/csv_datei.csv") sys.exit(1) try: processed_count = import_sicherheitsdaten(sys.argv[1]) print(f"Insgesamt {processed_count} eindeutige Artikel verarbeitet") except Exception as e: logger.error(f"Fehler: {e}") sys.exit(1)