#!/usr/bin/env python3 """Notion → Apple Kalender / Exchange Sync (Design v1.1).""" from __future__ import annotations import argparse import calendar as cal import json import logging import subprocess import sys from collections import defaultdict from datetime import date, datetime, timedelta from pathlib import Path from typing import Any import requests from dateutil.parser import isoparse NOTION_API_BASE = "https://api.notion.com/v1" NOTION_VERSION = "2022-06-28" HTTP_TIMEOUT = 30 DEFAULT_PROP_NAMES = { "name": "Name", "size": "T-Shirt Size", "last_edit": "Completed on", "project": "Project", "status": "Status", } STATUS_DONE = "Done" OUTLOOK_TITLE_MAX = 248 # Outlook truncates meeting titles beyond this length TITLE_SEP = ";\n" # separator between task titles within one appointment log = logging.getLogger("notion_sync") # ---------- Config & Logging ---------- def load_config(path: str) -> dict: p = Path(path) if not p.exists(): sys.exit(f"FEHLER: Konfigurationsdatei '{path}' nicht gefunden.") with p.open("r", encoding="utf-8") as f: cfg = json.load(f) required = [ "notion_token", "notion_database_id", "calendar_name", "day_start_hour", "t_shirt_size_map", "protocol_file", "log_file", ] missing = [k for k in required if k not in cfg] if missing: sys.exit(f"FEHLER: In config.json fehlen Pflichtfelder: {', '.join(missing)}") user_pn = cfg.get("notion_property_names") or {} if not isinstance(user_pn, dict): sys.exit("FEHLER: 'notion_property_names' in config.json muss ein Objekt sein.") cfg["notion_property_names"] = {**DEFAULT_PROP_NAMES, **user_pn} projects = cfg.get("projects") if projects is None: projects = {} if not isinstance(projects, dict): sys.exit("FEHLER: 'projects' in config.json muss ein Objekt {Notion-Name: Anzeige-Name} sein.") cfg["projects"] = {str(k): str(v) if v else str(k) for k, v in projects.items()} return cfg def setup_logging(log_file: str) -> None: fmt = "[%(asctime)s] %(levelname)-5s %(message)s" datefmt = "%Y-%m-%d %H:%M:%S" formatter = logging.Formatter(fmt, datefmt) log.setLevel(logging.INFO) log.handlers.clear() fh = logging.FileHandler(log_file, encoding="utf-8") fh.setFormatter(formatter) log.addHandler(fh) sh = logging.StreamHandler(sys.stdout) sh.setFormatter(formatter) log.addHandler(sh) # ---------- Datums-Helfer ---------- def get_month_bounds(month_str: str) -> tuple[date, date]: try: year, month = (int(x) for x in month_str.split("-", 1)) first = date(year, month, 1) last = date(year, month, cal.monthrange(year, month)[1]) return first, last except (ValueError, AttributeError): sys.exit(f"FEHLER: Ungültiger Wert für --monat: '{month_str}'. Format: YYYY-MM.") def previous_month_str(today: date | None = None) -> str: today = today or date.today() first_of_this_month = today.replace(day=1) last_of_prev = first_of_this_month - timedelta(days=1) return f"{last_of_prev.year:04d}-{last_of_prev.month:02d}" # ---------- Notion API ---------- def _notion_headers(token: str) -> dict[str, str]: return { "Authorization": f"Bearer {token}", "Notion-Version": NOTION_VERSION, "Content-Type": "application/json", } def get_database_schema(database_id: str, token: str) -> dict: url = f"{NOTION_API_BASE}/databases/{database_id}" try: resp = requests.get(url, headers=_notion_headers(token), timeout=HTTP_TIMEOUT) except requests.RequestException as e: sys.exit(f"FEHLER: Notion API nicht erreichbar: {e}") if resp.status_code == 401: sys.exit("FEHLER: Ungültiger Notion-Token. Bitte 'notion_token' in config.json prüfen.") if resp.status_code == 404: sys.exit("FEHLER: Notion-Datenbank nicht gefunden. Bitte 'notion_database_id' und Integrationszugriff prüfen.") if not resp.ok: sys.exit(f"FEHLER: Datenbank-Schema konnte nicht abgefragt werden ({resp.status_code}): {resp.text}") return resp.json().get("properties", {}) def _resolve_property_name(schema: dict, requested: str) -> str | None: if requested in schema: return requested target = requested.strip().casefold() for actual in schema.keys(): if actual.strip().casefold() == target: return actual return None def _require_property(schema: dict, requested: str, role: str) -> str: actual = _resolve_property_name(schema, requested) if actual is None: available = ", ".join(repr(k) for k in sorted(schema.keys())) sys.exit( f"FEHLER: Eigenschaft '{requested}' (für '{role}') nicht in der Notion-Datenbank gefunden. " f"Verfügbare Properties: {available}" ) if actual != requested: log.warning("Property '%s' aufgelöst zu '%s' (Whitespace/Groß-Klein-Schreibung).", requested, actual) return actual def _detect_filter_type(schema: dict, prop_name: str, allowed: tuple[str, ...]) -> str: t = schema[prop_name].get("type") if t in allowed: return t sys.exit( f"FEHLER: Eigenschaft '{prop_name}' hat den nicht unterstützten Typ '{t}'. " f"Erwartet: {', '.join(allowed)}." ) def query_notion_database(database_id: str, token: str, start: date, end: date, prop_names: dict) -> list[dict]: schema = get_database_schema(database_id, token) prop_names["last_edit"] = _require_property(schema, prop_names["last_edit"], "last_edit") prop_names["status"] = _require_property(schema, prop_names["status"], "status") prop_names["project"] = _require_property(schema, prop_names["project"], "project") prop_names["size"] = _require_property(schema, prop_names["size"], "size") le_name = prop_names["last_edit"] status_name = prop_names["status"] project_name = prop_names["project"] le_filter_type = _detect_filter_type( schema, le_name, ("date", "last_edited_time", "created_time", "formula"), ) status_filter_type = _detect_filter_type(schema, status_name, ("select", "status")) project_type = schema[project_name].get("type") if project_type != "relation": sys.exit( f"FEHLER: Eigenschaft '{project_name}' hat den Typ '{project_type}', " "erwartet wurde 'relation' (Verknüpfung)." ) log.info( "Notion-Schema erkannt: %s (%s), %s (%s), %s (relation)", le_name, le_filter_type, status_name, status_filter_type, project_name, ) def _date_filter(condition: dict) -> dict: if le_filter_type == "formula": return {"property": le_name, "formula": {"date": condition}} return {"property": le_name, le_filter_type: condition} url = f"{NOTION_API_BASE}/databases/{database_id}/query" headers = _notion_headers(token) body = { "filter": { "and": [ _date_filter({"on_or_after": start.isoformat()}), _date_filter({"on_or_before": end.isoformat()}), {"property": status_name, status_filter_type: {"equals": STATUS_DONE}}, ] }, "page_size": 100, } pages: list[dict] = [] while True: try: resp = requests.post(url, headers=headers, json=body, timeout=HTTP_TIMEOUT) except requests.RequestException as e: sys.exit(f"FEHLER: Notion API nicht erreichbar: {e}") if resp.status_code == 401: sys.exit("FEHLER: Ungültiger Notion-Token. Bitte 'notion_token' in config.json prüfen.") if resp.status_code == 404: sys.exit("FEHLER: Notion-Datenbank nicht gefunden. Bitte 'notion_database_id' und Integrationszugriff prüfen.") if not resp.ok: sys.exit(f"FEHLER: Notion API antwortete mit {resp.status_code}: {resp.text}") data = resp.json() pages.extend(data.get("results", [])) if data.get("has_more"): body["start_cursor"] = data["next_cursor"] else: break return pages def resolve_project_name(page_id: str, token: str, cache: dict[str, str | None]) -> str | None: if page_id in cache: return cache[page_id] url = f"{NOTION_API_BASE}/pages/{page_id}" try: resp = requests.get(url, headers=_notion_headers(token), timeout=HTTP_TIMEOUT) except requests.RequestException as e: log.warning("Projekt-Auflösung fehlgeschlagen für %s: %s", page_id, e) cache[page_id] = None return None if not resp.ok: log.warning("Projekt-Auflösung %s: HTTP %s", page_id, resp.status_code) cache[page_id] = None return None page = resp.json() name = _extract_title(page.get("properties", {})) cache[page_id] = name return name def _extract_title(props: dict) -> str | None: for prop in props.values(): if prop.get("type") == "title": parts = prop.get("title", []) text = "".join(p.get("plain_text", "") for p in parts).strip() return text or None return None # ---------- Normalisierung ---------- def normalize_page(page: dict, token: str, cache: dict[str, str | None], size_map: dict[str, int], prop_names: dict, project_filter: dict[str, str]) -> dict | None: props = page.get("properties", {}) page_id = page.get("id", "?") title = _extract_title(props) if not title: log.warning("Seite %s: Titel fehlt – übersprungen", page_id) return None size_key = prop_names["size"] size_prop = props.get(size_key) or {} size_select = (size_prop.get("select") or {}) if size_prop else {} size = size_select.get("name") if size_select else None if not size: log.warning("Seite \"%s\": %s fehlt – übersprungen", title, size_key) return None if size not in size_map: log.warning("Seite \"%s\": Unbekannte T-Shirt Size '%s' – übersprungen", title, size) return None minutes = int(size_map[size]) le_key = prop_names["last_edit"] le_prop = props.get(le_key) or {} le_type = le_prop.get("type") if le_type == "date": le_start = (le_prop.get("date") or {}).get("start") elif le_type == "last_edited_time": le_start = le_prop.get("last_edited_time") elif le_type == "created_time": le_start = le_prop.get("created_time") elif le_type == "formula": formula = le_prop.get("formula") or {} ftype = formula.get("type") if ftype == "date": le_start = (formula.get("date") or {}).get("start") elif ftype == "string": le_start = formula.get("string") elif ftype == "number": le_start = formula.get("number") else: le_start = None else: le_start = None if not le_start: log.warning("Seite \"%s\": %s fehlt – übersprungen", title, le_key) return None try: last_edit = isoparse(le_start).date() except (ValueError, TypeError): log.warning("Seite \"%s\": %s nicht parsbar ('%s') – übersprungen", title, le_key, le_start) return None project_key = prop_names["project"] rel_prop = props.get(project_key) or {} relations = rel_prop.get("relation") or [] if not relations: log.warning("Seite \"%s\": Relation '%s' fehlt – übersprungen", title, project_key) return None project_id = relations[0].get("id") if not project_id: log.warning("Seite \"%s\": Relation '%s' ohne ID – übersprungen", title, project_key) return None raw_project_name = resolve_project_name(project_id, token, cache) if not raw_project_name: log.warning("Seite \"%s\": Projektname nicht auflösbar – übersprungen", title) return None if project_filter: if raw_project_name not in project_filter: log.info("Seite \"%s\": Projekt '%s' nicht in Whitelist – übersprungen", title, raw_project_name) return None project_name = project_filter[raw_project_name] else: project_name = raw_project_name return { "title": title, "t_shirt_size": size, "minutes": minutes, "last_edit": last_edit, "project_id": project_id, "project_name": project_name, } # ---------- Aggregation & Zeitberechnung ---------- # Each item stored during aggregation is (title, t_shirt_size, minutes). _Item = tuple[str, str, int] def _split_items_by_title(items: list[_Item]) -> list[list[_Item]]: """Split a sorted item list into groups whose joined title fits within OUTLOOK_TITLE_MAX characters. A single item that already exceeds the limit is kept as its own group (nothing we can do about that).""" groups: list[list[_Item]] = [] current: list[_Item] = [] current_len = 0 sep_len = len(TITLE_SEP) for item in items: title_len = len(item[0]) if not current: current.append(item) current_len = title_len else: needed = current_len + sep_len + title_len if needed > OUTLOOK_TITLE_MAX: groups.append(current) current = [item] current_len = title_len else: current.append(item) current_len = needed if current: groups.append(current) return groups def aggregate_events(pages: list[dict]) -> dict[date, list[dict]]: # First pass: collect all items per (date, project). groups: dict[tuple[date, str], dict] = {} for p in pages: key = (p["last_edit"], p["project_name"]) g = groups.get(key) if g is None: g = { "date": p["last_edit"], "project_name": p["project_name"], "items": [], } groups[key] = g g["items"].append((p["title"], p["t_shirt_size"], p["minutes"])) by_day: dict[date, list[dict]] = defaultdict(list) for g in groups.values(): g["items"].sort(key=lambda t: t[0]) item_groups = _split_items_by_title(g["items"]) if len(item_groups) > 1: log.info( "Termin '%s' am %s wird in %d Teile aufgeteilt " "(Outlook-Titel-Limit %d Zeichen).", g["project_name"], g["date"].isoformat(), len(item_groups), OUTLOOK_TITLE_MAX, ) total_parts = len(item_groups) for part_idx, sub_items in enumerate(item_groups, start=1): by_day[g["date"]].append({ "date": g["date"], "project_name": g["project_name"], "total_minutes": sum(m for _, _, m in sub_items), "summary": TITLE_SEP.join(title for title, _, _ in sub_items), "notes": g["project_name"], "part": part_idx, "total_parts": total_parts, }) return by_day def assign_sequential_times(events_by_day: dict[date, list[dict]], day_start_hour: int) -> list[dict]: result: list[dict] = [] for d in sorted(events_by_day.keys()): # Primary sort: project name; secondary: part index so split events # stay consecutive and in the correct order. day_events = sorted(events_by_day[d], key=lambda e: (e["project_name"], e.get("part", 1))) cursor = datetime(d.year, d.month, d.day, day_start_hour, 0) for ev in day_events: ev["start_time"] = cursor ev["end_time"] = cursor + timedelta(minutes=ev["total_minutes"]) cursor = ev["end_time"] result.append(ev) return result # ---------- Protokoll & Zustand ---------- def load_protocol(path: str) -> dict: p = Path(path) if not p.exists(): return {} try: with p.open("r", encoding="utf-8") as f: return json.load(f) except (OSError, json.JSONDecodeError) as e: log.warning("Protokolldatei konnte nicht gelesen werden (%s) – starte mit leerem Protokoll.", e) return {} def save_protocol(path: str, protocol: dict) -> None: try: with Path(path).open("w", encoding="utf-8") as f: json.dump(protocol, f, indent=2, ensure_ascii=False, sort_keys=True) except OSError as e: log.warning("Protokolldatei konnte nicht geschrieben werden: %s", e) def protocol_key(event: dict) -> str: key = f"{event['date'].isoformat()}|{event['project_name']}" part = event.get("part", 1) if part > 1: key += f"|part{part}" return key def determine_state(event: dict, protocol: dict) -> str: key = protocol_key(event) prev = protocol.get(key) if prev is None: return "new" same = ( prev.get("total_minutes") == event["total_minutes"] and prev.get("notes") == event["notes"] and prev.get("summary") == event["summary"] and prev.get("start_time") == event["start_time"].strftime("%H:%M") and prev.get("end_time") == event["end_time"].strftime("%H:%M") ) return "unchanged" if same else "changed" # ---------- AppleScript ---------- def escape_applescript_string(s: str) -> str: s = s.replace("\\", "\\\\") s = s.replace('"', '\\"') s = s.replace("\n", "\\n") return s def build_create_script(event: dict, calendar_name: str) -> str: cal_name = escape_applescript_string(calendar_name) title = escape_applescript_string(event["summary"]) notes = escape_applescript_string(event["notes"]) s, e = event["start_time"], event["end_time"] return f'''tell application "Calendar" tell calendar "{cal_name}" set startDate to current date set year of startDate to {s.year} set month of startDate to {s.month} set day of startDate to {s.day} set hours of startDate to {s.hour} set minutes of startDate to {s.minute} set seconds of startDate to 0 set endDate to current date set year of endDate to {e.year} set month of endDate to {e.month} set day of endDate to {e.day} set hours of endDate to {e.hour} set minutes of endDate to {e.minute} set seconds of endDate to 0 set newEvent to make new event with properties {{summary:"{title}", start date:startDate, end date:endDate, description:"{notes}"}} return uid of newEvent end tell end tell''' def build_update_script(event: dict, uid: str, calendar_name: str) -> str: cal_name = escape_applescript_string(calendar_name) uid_esc = escape_applescript_string(uid) title = escape_applescript_string(event["summary"]) notes = escape_applescript_string(event["notes"]) s, e = event["start_time"], event["end_time"] return f'''tell application "Calendar" tell calendar "{cal_name}" set matchingEvents to (every event whose uid is "{uid_esc}") if (count of matchingEvents) = 0 then return "ERROR: not found" end if set theEvent to item 1 of matchingEvents set startDate to current date set year of startDate to {s.year} set month of startDate to {s.month} set day of startDate to {s.day} set hours of startDate to {s.hour} set minutes of startDate to {s.minute} set seconds of startDate to 0 set endDate to current date set year of endDate to {e.year} set month of endDate to {e.month} set day of endDate to {e.day} set hours of endDate to {e.hour} set minutes of endDate to {e.minute} set seconds of endDate to 0 set summary of theEvent to "{title}" set start date of theEvent to startDate set end date of theEvent to endDate set description of theEvent to "{notes}" return "OK" end tell end tell''' def run_applescript(script: str) -> tuple[bool, str]: try: result = subprocess.run( ["osascript", "-e", script], capture_output=True, text=True, timeout=60, ) except FileNotFoundError: return False, "osascript nicht gefunden (kein macOS?)" except subprocess.TimeoutExpired: return False, "osascript-Timeout" if result.returncode != 0: return False, (result.stderr or result.stdout).strip() return True, result.stdout.strip() def applescript_calendar_missing(stderr: str) -> bool: s = stderr.lower() return "can't get calendar" in s or "kann \"calendar" in s or "-1728" in s # ---------- Ausgabe ---------- ANSI_GREEN = "\033[32m" ANSI_RED = "\033[31m" ANSI_ORANGE = "\033[38;5;208m" ANSI_RESET = "\033[0m" WEEKLY_OVERTIME_THRESHOLD = 2880 # 48 h in Minuten def format_duration(minutes: int) -> str: hours, mins = divmod(int(minutes), 60) return f"{hours}h {mins}min" def duration_color(minutes: int) -> str | None: if minutes > 600: return ANSI_RED if minutes >= 480: return ANSI_GREEN return None def _overloaded_week_dates(events: list[dict]) -> set[date]: """Dates belonging to ISO weeks whose combined minutes exceed 2880 (48 h).""" weekly: dict[tuple[int, int], int] = defaultdict(int) for ev in events: iso = ev["date"].isocalendar() weekly[(iso[0], iso[1])] += ev["total_minutes"] overloaded = {k for k, v in weekly.items() if v > WEEKLY_OVERTIME_THRESHOLD} return {ev["date"] for ev in events if (ev["date"].isocalendar()[0], ev["date"].isocalendar()[1]) in overloaded} def _print_legend() -> None: print("Legende:") print(f" Dauer: {ANSI_GREEN}■{ANSI_RESET} grün ≥ 8 h (≥ 480 min) " f"{ANSI_RED}■{ANSI_RESET} rot > 10 h (> 600 min)") print(f" Datum: {ANSI_ORANGE}■{ANSI_RESET} orange = Wochensumme > 48 h (> 2880 min)") print() _STATE_PRIO: dict[str, int] = {"new": 2, "changed": 1, "unchanged": 0} def print_dry_run_table(events: list[dict], protocol: dict) -> None: overloaded_dates = _overloaded_week_dates(events) # Collapse split parts into one display row per (date, project). # Events are already sorted by (date, project_name, part), so parts of the # same project arrive consecutively — we just accumulate into the last row. merged: list[dict] = [] _seen: dict[tuple[date, str], int] = {} # maps (date, project) → index in merged for ev in events: key = (ev["date"], ev["project_name"]) state = determine_state(ev, protocol) if key not in _seen: _seen[key] = len(merged) merged.append({ "date": ev["date"], "project_name": ev["project_name"], "start_time": ev["start_time"], "end_time": ev["end_time"], "total_minutes": ev["total_minutes"], "state": state, }) else: row = merged[_seen[key]] row["end_time"] = ev["end_time"] # last part's end is the overall end row["total_minutes"] += ev["total_minutes"] if _STATE_PRIO[state] > _STATE_PRIO[row["state"]]: row["state"] = state # promote to worst-case state header = ("Zustand", "Datum", "Projekt", "Start", "Ende", "Dauer") data_rows: list[tuple[tuple[str, ...], int, date]] = [] for row in merged: data_rows.append(( ( row["state"].upper(), row["date"].isoformat(), row["project_name"], row["start_time"].strftime("%H:%M"), row["end_time"].strftime("%H:%M"), format_duration(row["total_minutes"]), ), row["total_minutes"], row["date"], )) all_rows = [header] + [r for r, _, _ in data_rows] widths = [max(len(r[i]) for r in all_rows) for i in range(len(header))] _print_legend() table_width = sum(widths) + 2 * (len(widths) - 1) header_line = " ".join(c.ljust(widths[i]) for i, c in enumerate(header)) week_sep = "·" * table_width print(header_line) print("-" * table_width) date_idx = 1 duration_idx = len(header) - 1 prev_iso_week: tuple[int, int] | None = None for row, minutes, ev_date in data_rows: iso_week = (ev_date.isocalendar()[0], ev_date.isocalendar()[1]) if prev_iso_week is not None and iso_week != prev_iso_week: print(week_sep) prev_iso_week = iso_week cells = [] for i, c in enumerate(row): padded = c.ljust(widths[i]) if i == duration_idx: color = duration_color(minutes) if color: padded = f"{color}{padded}{ANSI_RESET}" elif i == date_idx and ev_date in overloaded_dates: padded = f"{ANSI_ORANGE}{padded}{ANSI_RESET}" cells.append(padded) print(" ".join(cells)) # ---------- Sync-Ausführung ---------- def sync_event(event: dict, state: str, protocol: dict, calendar_name: str, counters: dict[str, int]) -> None: key = protocol_key(event) s_str = event["start_time"].strftime("%H:%M") e_str = event["end_time"].strftime("%H:%M") part_tag = (f" [{event['part']}/{event['total_parts']}]" if event.get("total_parts", 1) > 1 else "") label = f"{event['project_name']:<20}{part_tag} | {event['date'].isoformat()} | {s_str}–{e_str}" if state == "unchanged": log.info("UNVERÄNDERT: %s – übersprungen", label) counters["unchanged"] += 1 return if state == "new": ok, out = run_applescript(build_create_script(event, calendar_name)) if not ok: if applescript_calendar_missing(out): sys.exit(f"FEHLER: Kalender '{calendar_name}' nicht gefunden – bitte calendar_name in config.json prüfen.") log.error("CREATE fehlgeschlagen für %s: %s", label, out) counters["errors"] += 1 return uid = out if not uid: log.error("CREATE lieferte leere UID für %s", label) counters["errors"] += 1 return protocol[key] = { "uid": uid, "start_time": s_str, "end_time": e_str, "total_minutes": event["total_minutes"], "summary": event["summary"], "notes": event["notes"], } log.info("NEU: %s", label) counters["created"] += 1 return # state == "changed" prev = protocol.get(key, {}) uid = prev.get("uid") prev_label = "" if prev.get("start_time") and prev.get("end_time"): prev_label = f" (vorher {prev['start_time']}–{prev['end_time']})" if uid: ok, out = run_applescript(build_update_script(event, uid, calendar_name)) if ok and out == "OK": protocol[key] = { "uid": uid, "start_time": s_str, "end_time": e_str, "total_minutes": event["total_minutes"], "summary": event["summary"], "notes": event["notes"], } log.info("GEÄNDERT: %s%s", label, prev_label) counters["updated"] += 1 return if not ok and applescript_calendar_missing(out): sys.exit(f"FEHLER: Kalender '{calendar_name}' nicht gefunden – bitte calendar_name in config.json prüfen.") log.warning("UPDATE fehlgeschlagen (%s) – lege Termin neu an: %s", out or "unbekannt", label) else: log.warning("Kein UID im Protokoll für %s – lege Termin neu an.", label) ok, out = run_applescript(build_create_script(event, calendar_name)) if not ok: if applescript_calendar_missing(out): sys.exit(f"FEHLER: Kalender '{calendar_name}' nicht gefunden – bitte calendar_name in config.json prüfen.") log.error("Fallback-CREATE fehlgeschlagen für %s: %s", label, out) counters["errors"] += 1 return new_uid = out if not new_uid: log.error("Fallback-CREATE lieferte leere UID für %s", label) counters["errors"] += 1 return protocol[key] = { "uid": new_uid, "start_time": s_str, "end_time": e_str, "total_minutes": event["total_minutes"], "summary": event["summary"], "notes": event["notes"], } log.info("GEÄNDERT (neu):%s%s", label, prev_label) counters["updated"] += 1 # ---------- Hauptablauf ---------- def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Notion → Apple Kalender / Exchange Sync") parser.add_argument("--monat", default=None, help="Zielmonat im Format YYYY-MM, z. B. 2026-04. " "Ohne Angabe wird der vergangene Monat verwendet.") parser.add_argument("--dry-run", action="store_true", help="Zeigt Termine an, ohne sie anzulegen oder zu ändern") parser.add_argument("--kalender", help="Name des Zielkalenders (überschreibt config.json)") return parser.parse_args(argv) def main(argv: list[str] | None = None) -> int: args = parse_args(argv) cfg = load_config("config.json") setup_logging(cfg["log_file"]) calendar_name = args.kalender or cfg["calendar_name"] month_str = args.monat or previous_month_str() month_start, month_end = get_month_bounds(month_str) if args.monat is None: log.info("Kein --monat angegeben – verwende vergangenen Monat: %s", month_str) log.info("Starte Sync für Monat: %s", month_str) log.info("Kalender: %s | Tagesstart: %02d:00", calendar_name, cfg["day_start_hour"]) protocol = load_protocol(cfg["protocol_file"]) prop_names = cfg["notion_property_names"] project_filter = cfg["projects"] if project_filter: log.info("Projekt-Whitelist aktiv (%d Einträge): %s", len(project_filter), ", ".join(sorted(project_filter.keys()))) else: log.info("Keine Projekt-Whitelist gesetzt – alle Projekte werden synchronisiert.") raw_pages = query_notion_database( cfg["notion_database_id"], cfg["notion_token"], month_start, month_end, prop_names, ) log.info("%d Notion-Seiten geladen", len(raw_pages)) project_cache: dict[str, str | None] = {} normalized: list[dict] = [] for raw in raw_pages: n = normalize_page( raw, cfg["notion_token"], project_cache, cfg["t_shirt_size_map"], prop_names, project_filter, ) if n is not None: normalized.append(n) by_day = aggregate_events(normalized) events = assign_sequential_times(by_day, int(cfg["day_start_hour"])) if not events: log.info("Keine zu synchronisierenden Termine gefunden.") return 0 if args.dry_run: log.info("--dry-run aktiv: keine AppleScripts werden ausgeführt.") print_dry_run_table(events, protocol) return 0 counters = {"created": 0, "updated": 0, "unchanged": 0, "errors": 0} for ev in events: state = determine_state(ev, protocol) sync_event(ev, state, protocol, calendar_name, counters) save_protocol(cfg["protocol_file"], protocol) log.info( "Sync abgeschlossen: %d erstellt, %d aktualisiert, %d unverändert, %d Fehler", counters["created"], counters["updated"], counters["unchanged"], counters["errors"], ) return 0 if counters["errors"] == 0 else 2 if __name__ == "__main__": sys.exit(main())