shop-old/modules/import/preisupdate.py
2026-04-20 01:03:43 +02:00

319 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Preisupdate-Skript für Intelectra Webshop
Dieses Skript importiert Preisdaten aus einer CSV-Datei in die Datenbank.
Es erstellt automatisch Backups der bestehenden Preisdaten vor dem Update.
Verwendung:
python3 preisupdate.py /pfad/zur/csv_datei.csv
CSV-Format:
Artikelnummer;VK1;VK3;VK100;VK_Endkunde
Beispiel: 12345;10,50;9,50;8,50;11,50
"""
import csv
import pymysql
import datetime
import os
import sys
import logging
from configparser import ConfigParser
# Logging-Konfiguration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('preisupdate.log')
]
)
logger = logging.getLogger('preisupdate')
def load_config():
"""
Lädt die Konfiguration aus einer INI-Datei oder Umgebungsvariablen.
Falls keine Konfigurationsdatei gefunden wird, werden Standardwerte verwendet.
"""
config = {
'host': 'localhost',
'user': 'tbapy',
'password': '9%%0H32ryj_N9%%0H32ryj',
'database': 'webshop-sql',
'autocommit': True
}
# Versuche, Konfiguration aus Datei zu laden
config_file = os.path.join(os.path.dirname(__file__), 'db_config.ini')
if os.path.exists(config_file):
parser = ConfigParser()
parser.read(config_file)
if 'database' in parser:
config.update(parser['database'])
logger.info("Konfiguration aus Datei geladen")
# Umgebungsvariablen haben Vorrang
if 'DB_HOST' in os.environ:
config['host'] = os.environ['DB_HOST']
if 'DB_USER' in os.environ:
config['user'] = os.environ['DB_USER']
if 'DB_PASSWORD' in os.environ:
config['password'] = os.environ['DB_PASSWORD']
if 'DB_NAME' in os.environ:
config['database'] = os.environ['DB_NAME']
return config
def connect_db():
"""
Stellt eine Verbindung zur Datenbank her.
Returns:
pymysql.Connection: Die Datenbankverbindung
"""
config = load_config()
try:
conn = pymysql.connect(
host=config['host'],
user=config['user'],
password=config['password'],
database=config['database'],
autocommit=config['autocommit']
)
logger.info(f"Verbindung zur Datenbank {config['database']} hergestellt")
return conn
except Exception as e:
logger.error(f"Fehler beim Verbinden zur Datenbank: {e}")
raise
def cleanup_old_backups(cursor, keep_count=3):
"""
Löscht alte Backup-Tabellen, behält aber die neuesten 'keep_count' Backups.
Args:
cursor: Der Datenbank-Cursor
keep_count: Anzahl der zu behaltenden Backups
"""
try:
# Finde alle Backup-Tabellen
cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_name LIKE 'item_prices_backup_%'
ORDER BY table_name DESC
""")
backups = cursor.fetchall()
# Wenn mehr als keep_count Backups existieren, lösche die ältesten
if len(backups) > keep_count:
for backup in backups[keep_count:]:
logger.info(f"Lösche altes Backup: {backup[0]}")
cursor.execute(f"DROP TABLE {backup[0]}")
logger.info(f"Backup-Bereinigung abgeschlossen. {len(backups)-keep_count} alte Backups entfernt.")
except Exception as e:
logger.error(f"Fehler bei der Backup-Bereinigung: {e}")
raise
def backup_and_clear_table(cursor):
"""
Erstellt ein Backup der aktuellen Preisdaten und leert die Update-Tabelle.
Args:
cursor: Der Datenbank-Cursor
"""
try:
# Erstelle ein Backup mit aktuellem Datum
backup_table_name = f"item_prices_backup_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
logger.info(f"Backup-Tabelle: {backup_table_name}")
cursor.execute(f"CREATE TABLE {backup_table_name} AS SELECT * FROM item_prices;")
logger.info("Backup der Echtpreise erstellt.")
# Bereinige alte Backups
cleanup_old_backups(cursor)
# Truncate der item_preise_update-Tabelle
cursor.execute("TRUNCATE TABLE item_preise_update;")
logger.info("item_preise_update geleert.")
except Exception as e:
logger.error(f"Fehler beim Backup und Leeren der Tabelle: {e}")
raise
def get_item_id(cursor, article_number):
"""
Ermittelt die ID eines Artikels anhand seiner Artikelnummer.
Args:
cursor: Der Datenbank-Cursor
article_number: Die Artikelnummer
Returns:
int: Die Artikel-ID oder None, wenn nicht gefunden
"""
try:
cursor.execute("SELECT id FROM items WHERE number = %s", (article_number,))
result = cursor.fetchone()
return result[0] if result else None
except Exception as e:
logger.error(f"Fehler beim Abrufen der Artikel-ID für {article_number}: {e}")
return None
def update_item_prices(cursor):
"""
Aktualisiert die Haupttabelle item_prices mit den Daten aus item_preise_update.
Args:
cursor: Der Datenbank-Cursor
Returns:
int: Anzahl der aktualisierten Datensätze
"""
try:
logger.info("Aktualisiere item_prices aus item_preise_update...")
cursor.execute("""
UPDATE item_prices ip
JOIN item_preise_update ipu
ON ip.item_id = ipu.item_id
AND ip.customergroup_id = ipu.customergroup_id
SET
ip.quantity_1 = ipu.quantity_1,
ip.price_1 = ipu.price_1,
ip.quantity_2 = ipu.quantity_2,
ip.price_2 = ipu.price_2,
ip.quantity_3 = ipu.quantity_3,
ip.price_3 = ipu.price_3;
""")
# Hole die Anzahl der aktualisierten Zeilen
rows_affected = cursor.rowcount
logger.info(f"Aktualisierte Datensätze: {rows_affected}")
logger.info("Update von item_prices abgeschlossen.")
return rows_affected
except Exception as e:
logger.error(f"Fehler beim Aktualisieren der Preise: {e}")
raise
def import_csv_to_db(csv_file):
"""
Importiert Preisdaten aus einer CSV-Datei in die Datenbank.
Args:
csv_file: Pfad zur CSV-Datei
Returns:
tuple: (Anzahl der verarbeiteten Artikel, Anzahl der aktualisierten Datensätze)
"""
conn = connect_db()
cursor = conn.cursor()
updated_count = 0
processed_items = set() # Für die Zählung eindeutiger Artikel
try:
# Backup & Truncate durchführen
backup_and_clear_table(cursor)
# Prüfe, ob die Datei existiert
if not os.path.exists(csv_file):
raise FileNotFoundError(f"Die Datei {csv_file} wurde nicht gefunden")
# Prüfe, ob die Datei leer ist
if os.path.getsize(csv_file) == 0:
raise ValueError(f"Die Datei {csv_file} ist leer")
with open(csv_file, "r", encoding="utf-8") as file:
reader = csv.reader(file, delimiter=';')
# Prüfe, ob die Datei eine Kopfzeile hat
try:
header = next(reader)
if len(header) < 5:
raise ValueError(f"Ungültiges CSV-Format: Erwartet mindestens 5 Spalten, gefunden {len(header)}")
logger.info(f"CSV-Header: {header}")
except StopIteration:
raise ValueError(f"Die Datei {csv_file} enthält keine Daten")
for row_num, row in enumerate(reader, start=2): # Start bei 2, da Zeile 1 der Header ist
# Prüfe, ob die Zeile genügend Spalten hat
if len(row) < 5:
logger.warning(f"Zeile {row_num}: Ungültiges Format, überspringe Zeile")
continue
article_number, vk1, vk3, vk100, vk_endkunde = row[:5]
# Überspringe leere Zeilen
if not article_number.strip():
continue
item_id = get_item_id(cursor, article_number)
# Ersetze Komma durch Punkt für float-Umwandlung
try:
vk1 = float(vk1.replace(",", "."))
vk3 = float(vk3.replace(",", "."))
vk100 = float(vk100.replace(",", "."))
vk_endkunde = float(vk_endkunde.replace(",", "."))
except ValueError as e:
logger.warning(f"Zeile {row_num}: Ungültiger Preiswert, überspringe Zeile: {e}")
continue
if item_id:
processed_items.add(item_id) # Merke uns den Artikel
# Sicherstellen, dass beide Einträge für customergroup_id 101 und 1 vorhanden sind
cursor.execute("""
INSERT INTO item_preise_update (item_id, customergroup_id, quantity_1, price_1)
VALUES (%s, 101, 1, %s)
ON DUPLICATE KEY UPDATE price_1 = VALUES(price_1)
""", (item_id, vk1))
cursor.execute("""
INSERT INTO item_preise_update (item_id, customergroup_id, quantity_1, price_1)
VALUES (%s, 1, 1, %s)
ON DUPLICATE KEY UPDATE price_1 = VALUES(price_1)
""", (item_id, vk_endkunde))
# VK3 nur wenn > 0 für customergroup_id 101
if vk3 > 0:
cursor.execute("""
UPDATE item_preise_update SET quantity_2 = 3, price_2 = %s
WHERE item_id = %s AND customergroup_id = 101
""", (vk3, item_id))
# VK100 nur wenn > 0 für customergroup_id 101
if vk100 > 0:
cursor.execute("""
UPDATE item_preise_update SET quantity_3 = 100, price_3 = %s
WHERE item_id = %s AND customergroup_id = 101
""", (vk100, item_id))
else:
logger.warning(f"Artikelnummer {article_number} nicht gefunden!")
# Update der Haupttabelle item_prices
updated_count = update_item_prices(cursor)
logger.info(f"Insgesamt {len(processed_items)} eindeutige Artikel verarbeitet")
return len(processed_items), updated_count
except Exception as e:
logger.error(f"Fehler beim Import: {e}")
raise
finally:
cursor.close()
conn.close()
logger.info("CSV-Import und Datenabgleich abgeschlossen!")
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Bitte CSV-Datei als Parameter angeben!")
print("Verwendung: python3 preisupdate.py /pfad/zur/csv_datei.csv")
sys.exit(1)
try:
processed_count, updated_count = import_csv_to_db(sys.argv[1])
print(f"Insgesamt {processed_count} eindeutige Artikel verarbeitet")
print(f"Insgesamt {updated_count} Datensätze aktualisiert")
except Exception as e:
logger.error(f"Fehler: {e}")
sys.exit(1)