41bab34d04
Warning summary boxes in the import wizard had white text on a light warning background, making them unreadable. Switch to a light-yellow background with dark text instead. Also adds convert_xlsx.py for converting the LvS member Excel spreadsheet into CSV files for ZIP bundle import, and documents the mandatory version bump rule in CLAUDE.md. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
419 lines
13 KiB
Python
419 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Convert 'Mitglieder LvS 25.xlsx' into CSV files suitable for ZIP bundle import
|
|
into the Mitgliederverwaltung Nextcloud plugin.
|
|
|
|
Usage:
|
|
python3 convert_xlsx.py "/path/to/Mitglieder LvS 25.xlsx" [output_dir]
|
|
|
|
Output (in output_dir, default ./import_bundle):
|
|
Stufen.csv, Familien.csv, Mitglieder.csv, Adressen.csv,
|
|
Telefonnummern.csv, E-Mails.csv, import_bundle.zip
|
|
"""
|
|
|
|
import csv
|
|
import os
|
|
import re
|
|
import sys
|
|
import zipfile
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
import openpyxl
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config
|
|
# ---------------------------------------------------------------------------
|
|
STUFEN_MAP = {
|
|
"W": "Wölflinge",
|
|
"P": "Pfadfinder",
|
|
"R": "Rover",
|
|
}
|
|
|
|
STUFEN_META = {
|
|
"Wölflinge": {"sort_order": 1, "age_min": 7, "age_max": 10, "color": "#ff9800"},
|
|
"Pfadfinder": {"sort_order": 2, "age_min": 11, "age_max": 16, "color": "#4caf50"},
|
|
"Rover": {"sort_order": 3, "age_min": 16, "age_max": 25, "color": "#2196f3"},
|
|
}
|
|
|
|
CSV_DELIMITER = ";"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
def fmt_date(val):
|
|
"""Convert an Excel date value to YYYY-MM-DD string."""
|
|
if val is None:
|
|
return ""
|
|
if isinstance(val, datetime):
|
|
return val.strftime("%Y-%m-%d")
|
|
s = str(val).strip()
|
|
if not s:
|
|
return ""
|
|
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%d.%m.%Y", "%d/%m/%Y"):
|
|
try:
|
|
return datetime.strptime(s, fmt).strftime("%Y-%m-%d")
|
|
except ValueError:
|
|
continue
|
|
return s
|
|
|
|
|
|
def parse_address(raw):
|
|
"""
|
|
Parse 'Straße Nr, PLZ Ort' into (strasse, plz, ort).
|
|
Handles incomplete addresses gracefully.
|
|
"""
|
|
if not raw or str(raw).strip().lower() == "none":
|
|
return "", "", ""
|
|
raw = str(raw).strip()
|
|
# Try: "Street, PLZ City"
|
|
m = re.match(r"^(.+?),\s*(\d{5})\s+(.+)$", raw)
|
|
if m:
|
|
return m.group(1).strip(), m.group(2), m.group(3).strip()
|
|
# Try: "Street, City" (no PLZ)
|
|
m = re.match(r"^(.+?),\s*(.+)$", raw)
|
|
if m:
|
|
return m.group(1).strip(), "", m.group(2).strip()
|
|
return raw, "", ""
|
|
|
|
|
|
def clean(val):
|
|
"""Stringify a cell value, stripping None."""
|
|
if val is None:
|
|
return ""
|
|
return str(val).strip()
|
|
|
|
|
|
def write_csv(path, headers, rows):
|
|
"""Write a semicolon-delimited UTF-8 CSV with BOM."""
|
|
with open(path, "w", newline="", encoding="utf-8-sig") as f:
|
|
w = csv.writer(f, delimiter=CSV_DELIMITER)
|
|
w.writerow(headers)
|
|
w.writerows(rows)
|
|
print(f" {os.path.basename(path)}: {len(rows)} rows")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Readers
|
|
# ---------------------------------------------------------------------------
|
|
def read_mitglieder_stamm(ws):
|
|
"""
|
|
Parse the 'Mitglieder Stamm' sheet.
|
|
Returns list of dicts with keys matching column semantics.
|
|
"""
|
|
members = []
|
|
for row in ws.iter_rows(min_row=3, max_row=ws.max_row, values_only=True):
|
|
nachname = clean(row[1])
|
|
if not nachname:
|
|
continue
|
|
members.append({
|
|
"nachname": nachname,
|
|
"vorname": clean(row[2]),
|
|
"geburtsdatum": fmt_date(row[3]),
|
|
"stufe_code": clean(row[5]), # W / P / R
|
|
"adresse_raw": clean(row[6]),
|
|
"email": clean(row[7]),
|
|
"telefon": clean(row[8]),
|
|
"aktiv": clean(row[9]).lower() == "x",
|
|
"beitrag": clean(row[10]),
|
|
})
|
|
return members
|
|
|
|
|
|
def read_kontaktdaten(ws):
|
|
"""
|
|
Parse a 'Kontaktdaten …' sheet.
|
|
Returns dict keyed by (nachname, vorname) → health info dict.
|
|
"""
|
|
info = {}
|
|
# Find header row (contains "Name")
|
|
header_row = None
|
|
for r in range(1, min(10, ws.max_row + 1)):
|
|
cell = ws.cell(r, 2).value
|
|
if cell and str(cell).strip().lower() == "name":
|
|
header_row = r
|
|
break
|
|
if header_row is None:
|
|
return info
|
|
|
|
for row in ws.iter_rows(min_row=header_row + 1, max_row=ws.max_row, values_only=True):
|
|
nachname = clean(row[1])
|
|
vorname = clean(row[2])
|
|
if not nachname:
|
|
continue
|
|
allergien = clean(row[6])
|
|
if allergien.upper() == "NEIN":
|
|
allergien = ""
|
|
krankheiten = clean(row[5])
|
|
if krankheiten.upper() == "NEIN":
|
|
krankheiten = ""
|
|
medis = clean(row[7])
|
|
if medis.upper() == "NEIN":
|
|
medis = ""
|
|
|
|
notes_parts = []
|
|
schwimmer = clean(row[4])
|
|
if schwimmer.upper() == "JA":
|
|
notes_parts.append("Schwimmer: Ja")
|
|
elif schwimmer:
|
|
notes_parts.append(f"Schwimmer: {schwimmer}")
|
|
if krankheiten:
|
|
notes_parts.append(f"Krankheiten: {krankheiten}")
|
|
if medis:
|
|
notes_parts.append(f"Medikamente: {medis}")
|
|
|
|
# Parent contact from right side of sheet (cols K-N, indices 11-13)
|
|
parent_name = ""
|
|
parent_phone = ""
|
|
if len(row) > 12:
|
|
parent_name = clean(row[12]) # col M: parent Vorname
|
|
if len(row) > 13:
|
|
parent_phone = clean(row[13]) # col N: parent Mobil
|
|
|
|
info[(nachname, vorname)] = {
|
|
"allergien": allergien,
|
|
"notizen": "; ".join(notes_parts) if notes_parts else "",
|
|
"parent_nachname": clean(row[11]) if len(row) > 11 else "",
|
|
"parent_vorname": parent_name,
|
|
"parent_phone": parent_phone,
|
|
}
|
|
return info
|
|
|
|
|
|
def read_kontoliste(ws):
|
|
"""
|
|
Parse the 'Kontoliste' sheet.
|
|
Returns list of dicts: kontoinhaber, betrag, iban.
|
|
"""
|
|
accounts = []
|
|
for row in ws.iter_rows(min_row=2, max_row=ws.max_row, values_only=True):
|
|
inhaber = clean(row[1])
|
|
if not inhaber:
|
|
continue
|
|
accounts.append({
|
|
"kontoinhaber": inhaber,
|
|
"betrag": clean(row[2]),
|
|
"iban": clean(row[3]),
|
|
})
|
|
return accounts
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Family grouping
|
|
# ---------------------------------------------------------------------------
|
|
def build_families(members, accounts):
|
|
"""
|
|
Group members into families by shared last name + address.
|
|
Match to Kontoliste by last name.
|
|
Returns:
|
|
- families: list of {name, kontoinhaber, iban}
|
|
- member→family_name mapping
|
|
"""
|
|
# Group by (nachname, address) to handle same last name at different addresses
|
|
groups = defaultdict(list)
|
|
for m in members:
|
|
key = m["nachname"]
|
|
groups[key].append(m)
|
|
|
|
# Build account lookup: last name → account
|
|
acct_by_name = {}
|
|
for a in accounts:
|
|
# Kontoinhaber is "Nachname Vorname" — extract last name
|
|
parts = a["kontoinhaber"].replace(",", " ").split()
|
|
if parts:
|
|
acct_by_name[parts[0].lower()] = a
|
|
|
|
families = []
|
|
member_family = {} # (nachname, vorname) → family name
|
|
|
|
for nachname, group in sorted(groups.items()):
|
|
family_name = f"Familie {nachname}"
|
|
acct = acct_by_name.get(nachname.lower(), {})
|
|
families.append({
|
|
"name": family_name,
|
|
"kontoinhaber": acct.get("kontoinhaber", ""),
|
|
"iban": acct.get("iban", ""),
|
|
})
|
|
for m in group:
|
|
member_family[(m["nachname"], m["vorname"])] = family_name
|
|
|
|
return families, member_family
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print(__doc__)
|
|
sys.exit(1)
|
|
|
|
xlsx_path = sys.argv[1]
|
|
out_dir = sys.argv[2] if len(sys.argv) > 2 else os.path.join(os.path.dirname(xlsx_path), "import_bundle")
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
print(f"Reading {xlsx_path} ...")
|
|
wb = openpyxl.load_workbook(xlsx_path, data_only=True)
|
|
|
|
# --- Read source sheets ---
|
|
members = read_mitglieder_stamm(wb["Mitglieder Stamm"])
|
|
print(f" {len(members)} members from 'Mitglieder Stamm'")
|
|
|
|
kontakt_info = {}
|
|
for sheet_name in wb.sheetnames:
|
|
if sheet_name.startswith("Kontaktdaten"):
|
|
ki = read_kontaktdaten(wb[sheet_name])
|
|
kontakt_info.update(ki)
|
|
print(f" {len(ki)} entries from '{sheet_name}'")
|
|
|
|
accounts = read_kontoliste(wb["Kontoliste"])
|
|
print(f" {len(accounts)} accounts from 'Kontoliste'")
|
|
|
|
families, member_family = build_families(members, accounts)
|
|
print(f" {len(families)} families grouped")
|
|
|
|
print(f"\nWriting CSVs to {out_dir} ...")
|
|
|
|
# --- Stufen.csv ---
|
|
stufen_rows = []
|
|
for code, name in sorted(STUFEN_MAP.items(), key=lambda x: STUFEN_META[x[1]]["sort_order"]):
|
|
meta = STUFEN_META[name]
|
|
stufen_rows.append([
|
|
name, meta["sort_order"], meta["age_min"], meta["age_max"], meta["color"]
|
|
])
|
|
write_csv(
|
|
os.path.join(out_dir, "Stufen.csv"),
|
|
["Name", "Sortierung", "Mindestalter", "Hoechstalter", "Farbe"],
|
|
stufen_rows,
|
|
)
|
|
|
|
# --- Familien.csv ---
|
|
fam_rows = []
|
|
for f in families:
|
|
fam_rows.append([f["name"], f["kontoinhaber"], f["iban"]])
|
|
write_csv(
|
|
os.path.join(out_dir, "Familien.csv"),
|
|
["Name", "Kontoinhaber", "IBAN"],
|
|
fam_rows,
|
|
)
|
|
|
|
# --- Mitglieder.csv ---
|
|
mitglieder_rows = []
|
|
for m in members:
|
|
key = (m["nachname"], m["vorname"])
|
|
ki = kontakt_info.get(key, {})
|
|
stufe_name = STUFEN_MAP.get(m["stufe_code"], "")
|
|
status = "aktiv" if m["aktiv"] else "inaktiv"
|
|
family_name = member_family.get(key, "")
|
|
|
|
mitglieder_rows.append([
|
|
m["vorname"],
|
|
m["nachname"],
|
|
m["geburtsdatum"],
|
|
"", # Geschlecht — not in source
|
|
"mitglied", # Rolle
|
|
stufe_name,
|
|
"2025-01-01", # Eintritt — not in source, use reasonable default
|
|
"", # Austritt
|
|
status,
|
|
ki.get("allergien", ""),
|
|
ki.get("notizen", ""),
|
|
"", # Zusaetzliche Notizen
|
|
"", # KV-Typ
|
|
"", # KV-Name
|
|
family_name,
|
|
m["beitrag"], # Eingefrorener Beitragssatz
|
|
])
|
|
write_csv(
|
|
os.path.join(out_dir, "Mitglieder.csv"),
|
|
[
|
|
"Vorname", "Nachname", "Geburtsdatum", "Geschlecht", "Rolle",
|
|
"Stufenname", "Eintritt", "Austritt", "Status", "Allergien",
|
|
"Notizen", "Zusaetzliche Notizen", "KV-Typ", "KV-Name",
|
|
"Familienname", "Eingefrorener Beitragssatz",
|
|
],
|
|
mitglieder_rows,
|
|
)
|
|
|
|
# --- Adressen.csv ---
|
|
addr_rows = []
|
|
for m in members:
|
|
strasse, plz, ort = parse_address(m["adresse_raw"])
|
|
if not strasse:
|
|
continue
|
|
mitgliedername = f"{m['vorname']} {m['nachname']}"
|
|
addr_rows.append([
|
|
mitgliedername, "Hauptadresse", strasse, plz, ort, "Deutschland", "Ja"
|
|
])
|
|
write_csv(
|
|
os.path.join(out_dir, "Adressen.csv"),
|
|
["Mitgliedername", "Label", "Strasse", "PLZ", "Ort", "Land", "Primaer"],
|
|
addr_rows,
|
|
)
|
|
|
|
# --- Telefonnummern.csv ---
|
|
phone_rows = []
|
|
for m in members:
|
|
# Member's own phone
|
|
if m["telefon"]:
|
|
mitgliedername = f"{m['vorname']} {m['nachname']}"
|
|
phone_rows.append([mitgliedername, "Mobil", m["telefon"]])
|
|
|
|
# Parent phones from Kontaktdaten sheets
|
|
seen_parent_phones = set()
|
|
for (nachname, vorname), ki in kontakt_info.items():
|
|
parent_phone = ki.get("parent_phone", "")
|
|
if not parent_phone:
|
|
continue
|
|
mitgliedername = f"{vorname} {nachname}"
|
|
# Split "number oder number" into separate entries
|
|
phones = [p.strip() for p in parent_phone.split("oder")]
|
|
for i, phone in enumerate(phones):
|
|
if not phone:
|
|
continue
|
|
dedup_key = (mitgliedername, phone)
|
|
if dedup_key in seen_parent_phones:
|
|
continue
|
|
seen_parent_phones.add(dedup_key)
|
|
label = f"Eltern {i+1}" if len(phones) > 1 else "Eltern"
|
|
phone_rows.append([mitgliedername, label, phone])
|
|
|
|
write_csv(
|
|
os.path.join(out_dir, "Telefonnummern.csv"),
|
|
["Mitgliedername", "Label", "Nummer"],
|
|
phone_rows,
|
|
)
|
|
|
|
# --- E-Mails.csv ---
|
|
email_rows = []
|
|
for m in members:
|
|
if m["email"]:
|
|
mitgliedername = f"{m['vorname']} {m['nachname']}"
|
|
email_rows.append([mitgliedername, "Privat", m["email"]])
|
|
write_csv(
|
|
os.path.join(out_dir, "E-Mails.csv"),
|
|
["Mitgliedername", "Label", "E-Mail-Adresse"],
|
|
email_rows,
|
|
)
|
|
|
|
# --- Create ZIP bundle ---
|
|
zip_path = os.path.join(out_dir, "import_bundle.zip")
|
|
csv_files = [
|
|
"Stufen.csv", "Familien.csv", "Mitglieder.csv",
|
|
"Adressen.csv", "Telefonnummern.csv", "E-Mails.csv",
|
|
]
|
|
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
|
for name in csv_files:
|
|
fpath = os.path.join(out_dir, name)
|
|
if os.path.exists(fpath):
|
|
zf.write(fpath, name)
|
|
print(f"\n import_bundle.zip created ({len(csv_files)} files)")
|
|
|
|
print(f"\nDone! Import via: Mitgliederverwaltung → Import → ZIP-Bundle hochladen")
|
|
print(f" {zip_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|