#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Preisupdate-Skript für Intelectra Webshop Dieses Skript importiert Preisdaten aus einer CSV-Datei in die Datenbank. Es erstellt automatisch Backups der bestehenden Preisdaten vor dem Update. Verwendung: python3 preisupdate.py /pfad/zur/csv_datei.csv CSV-Format: Artikelnummer;VK1;VK3;VK100;VK_Endkunde Beispiel: 12345;10,50;9,50;8,50;11,50 """ import csv import pymysql import datetime import os import sys import logging from configparser import ConfigParser # Logging-Konfiguration logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('preisupdate.log') ] ) logger = logging.getLogger('preisupdate') def load_config(): """ Lädt die Konfiguration aus einer INI-Datei oder Umgebungsvariablen. Falls keine Konfigurationsdatei gefunden wird, werden Standardwerte verwendet. """ config = { 'host': 'localhost', 'user': 'tbapy', 'password': '9%%0H32ryj_N9%%0H32ryj', 'database': 'webshop-sql', 'autocommit': True } # Versuche, Konfiguration aus Datei zu laden config_file = os.path.join(os.path.dirname(__file__), 'db_config.ini') if os.path.exists(config_file): parser = ConfigParser() parser.read(config_file) if 'database' in parser: config.update(parser['database']) logger.info("Konfiguration aus Datei geladen") # Umgebungsvariablen haben Vorrang if 'DB_HOST' in os.environ: config['host'] = os.environ['DB_HOST'] if 'DB_USER' in os.environ: config['user'] = os.environ['DB_USER'] if 'DB_PASSWORD' in os.environ: config['password'] = os.environ['DB_PASSWORD'] if 'DB_NAME' in os.environ: config['database'] = os.environ['DB_NAME'] return config def connect_db(): """ Stellt eine Verbindung zur Datenbank her. Returns: pymysql.Connection: Die Datenbankverbindung """ config = load_config() try: conn = pymysql.connect( host=config['host'], user=config['user'], password=config['password'], database=config['database'], autocommit=config['autocommit'] ) logger.info(f"Verbindung zur Datenbank {config['database']} hergestellt") return conn except Exception as e: logger.error(f"Fehler beim Verbinden zur Datenbank: {e}") raise def cleanup_old_backups(cursor, keep_count=3): """ Löscht alte Backup-Tabellen, behält aber die neuesten 'keep_count' Backups. Args: cursor: Der Datenbank-Cursor keep_count: Anzahl der zu behaltenden Backups """ try: # Finde alle Backup-Tabellen cursor.execute(""" SELECT table_name FROM information_schema.tables WHERE table_name LIKE 'item_prices_backup_%' ORDER BY table_name DESC """) backups = cursor.fetchall() # Wenn mehr als keep_count Backups existieren, lösche die ältesten if len(backups) > keep_count: for backup in backups[keep_count:]: logger.info(f"Lösche altes Backup: {backup[0]}") cursor.execute(f"DROP TABLE {backup[0]}") logger.info(f"Backup-Bereinigung abgeschlossen. {len(backups)-keep_count} alte Backups entfernt.") except Exception as e: logger.error(f"Fehler bei der Backup-Bereinigung: {e}") raise def backup_and_clear_table(cursor): """ Erstellt ein Backup der aktuellen Preisdaten und leert die Update-Tabelle. Args: cursor: Der Datenbank-Cursor """ try: # Erstelle ein Backup mit aktuellem Datum backup_table_name = f"item_prices_backup_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" logger.info(f"Backup-Tabelle: {backup_table_name}") cursor.execute(f"CREATE TABLE {backup_table_name} AS SELECT * FROM item_prices;") logger.info("Backup der Echtpreise erstellt.") # Bereinige alte Backups cleanup_old_backups(cursor) # Truncate der item_preise_update-Tabelle cursor.execute("TRUNCATE TABLE item_preise_update;") logger.info("item_preise_update geleert.") except Exception as e: logger.error(f"Fehler beim Backup und Leeren der Tabelle: {e}") raise def get_item_id(cursor, article_number): """ Ermittelt die ID eines Artikels anhand seiner Artikelnummer. Args: cursor: Der Datenbank-Cursor article_number: Die Artikelnummer Returns: int: Die Artikel-ID oder None, wenn nicht gefunden """ try: cursor.execute("SELECT id FROM items WHERE number = %s", (article_number,)) result = cursor.fetchone() return result[0] if result else None except Exception as e: logger.error(f"Fehler beim Abrufen der Artikel-ID für {article_number}: {e}") return None def update_item_prices(cursor): """ Aktualisiert die Haupttabelle item_prices mit den Daten aus item_preise_update. Args: cursor: Der Datenbank-Cursor Returns: int: Anzahl der aktualisierten Datensätze """ try: logger.info("Aktualisiere item_prices aus item_preise_update...") cursor.execute(""" UPDATE item_prices ip JOIN item_preise_update ipu ON ip.item_id = ipu.item_id AND ip.customergroup_id = ipu.customergroup_id SET ip.quantity_1 = ipu.quantity_1, ip.price_1 = ipu.price_1, ip.quantity_2 = ipu.quantity_2, ip.price_2 = ipu.price_2, ip.quantity_3 = ipu.quantity_3, ip.price_3 = ipu.price_3; """) # Hole die Anzahl der aktualisierten Zeilen rows_affected = cursor.rowcount logger.info(f"Aktualisierte Datensätze: {rows_affected}") logger.info("Update von item_prices abgeschlossen.") return rows_affected except Exception as e: logger.error(f"Fehler beim Aktualisieren der Preise: {e}") raise def import_csv_to_db(csv_file): """ Importiert Preisdaten aus einer CSV-Datei in die Datenbank. Args: csv_file: Pfad zur CSV-Datei Returns: tuple: (Anzahl der verarbeiteten Artikel, Anzahl der aktualisierten Datensätze) """ conn = connect_db() cursor = conn.cursor() updated_count = 0 processed_items = set() # Für die Zählung eindeutiger Artikel try: # Backup & Truncate durchführen backup_and_clear_table(cursor) # Prüfe, ob die Datei existiert if not os.path.exists(csv_file): raise FileNotFoundError(f"Die Datei {csv_file} wurde nicht gefunden") # Prüfe, ob die Datei leer ist if os.path.getsize(csv_file) == 0: raise ValueError(f"Die Datei {csv_file} ist leer") with open(csv_file, "r", encoding="utf-8") as file: reader = csv.reader(file, delimiter=';') # Prüfe, ob die Datei eine Kopfzeile hat try: header = next(reader) if len(header) < 5: raise ValueError(f"Ungültiges CSV-Format: Erwartet mindestens 5 Spalten, gefunden {len(header)}") logger.info(f"CSV-Header: {header}") except StopIteration: raise ValueError(f"Die Datei {csv_file} enthält keine Daten") for row_num, row in enumerate(reader, start=2): # Start bei 2, da Zeile 1 der Header ist # Prüfe, ob die Zeile genügend Spalten hat if len(row) < 5: logger.warning(f"Zeile {row_num}: Ungültiges Format, überspringe Zeile") continue article_number, vk1, vk3, vk100, vk_endkunde = row[:5] # Überspringe leere Zeilen if not article_number.strip(): continue item_id = get_item_id(cursor, article_number) # Ersetze Komma durch Punkt für float-Umwandlung try: vk1 = float(vk1.replace(",", ".")) vk3 = float(vk3.replace(",", ".")) vk100 = float(vk100.replace(",", ".")) vk_endkunde = float(vk_endkunde.replace(",", ".")) except ValueError as e: logger.warning(f"Zeile {row_num}: Ungültiger Preiswert, überspringe Zeile: {e}") continue if item_id: processed_items.add(item_id) # Merke uns den Artikel # Sicherstellen, dass beide Einträge für customergroup_id 101 und 1 vorhanden sind cursor.execute(""" INSERT INTO item_preise_update (item_id, customergroup_id, quantity_1, price_1) VALUES (%s, 101, 1, %s) ON DUPLICATE KEY UPDATE price_1 = VALUES(price_1) """, (item_id, vk1)) cursor.execute(""" INSERT INTO item_preise_update (item_id, customergroup_id, quantity_1, price_1) VALUES (%s, 1, 1, %s) ON DUPLICATE KEY UPDATE price_1 = VALUES(price_1) """, (item_id, vk_endkunde)) # VK3 nur wenn > 0 für customergroup_id 101 if vk3 > 0: cursor.execute(""" UPDATE item_preise_update SET quantity_2 = 3, price_2 = %s WHERE item_id = %s AND customergroup_id = 101 """, (vk3, item_id)) # VK100 nur wenn > 0 für customergroup_id 101 if vk100 > 0: cursor.execute(""" UPDATE item_preise_update SET quantity_3 = 100, price_3 = %s WHERE item_id = %s AND customergroup_id = 101 """, (vk100, item_id)) else: logger.warning(f"Artikelnummer {article_number} nicht gefunden!") # Update der Haupttabelle item_prices updated_count = update_item_prices(cursor) logger.info(f"Insgesamt {len(processed_items)} eindeutige Artikel verarbeitet") return len(processed_items), updated_count except Exception as e: logger.error(f"Fehler beim Import: {e}") raise finally: cursor.close() conn.close() logger.info("CSV-Import und Datenabgleich abgeschlossen!") if __name__ == "__main__": if len(sys.argv) != 2: print("Bitte CSV-Datei als Parameter angeben!") print("Verwendung: python3 preisupdate.py /pfad/zur/csv_datei.csv") sys.exit(1) try: processed_count, updated_count = import_csv_to_db(sys.argv[1]) print(f"Insgesamt {processed_count} eindeutige Artikel verarbeitet") print(f"Insgesamt {updated_count} Datensätze aktualisiert") except Exception as e: logger.error(f"Fehler: {e}") sys.exit(1)