| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882 |
- #!/usr/bin/env python3
- """Notion → Apple Kalender / Exchange Sync (Design v1.1)."""
- from __future__ import annotations
- import argparse
- import calendar as cal
- import json
- import logging
- import subprocess
- import sys
- from collections import defaultdict
- from datetime import date, datetime, timedelta
- from pathlib import Path
- from typing import Any
- import requests
- from dateutil.parser import isoparse
- NOTION_API_BASE = "https://api.notion.com/v1"
- NOTION_VERSION = "2022-06-28"
- HTTP_TIMEOUT = 30
- DEFAULT_PROP_NAMES = {
- "name": "Name",
- "size": "T-Shirt Size",
- "last_edit": "Completed on",
- "project": "Project",
- "status": "Status",
- }
- STATUS_DONE = "Done"
- OUTLOOK_TITLE_MAX = 248 # Outlook truncates meeting titles beyond this length
- TITLE_SEP = ";\n" # separator between task titles within one appointment
- log = logging.getLogger("notion_sync")
- # ---------- Config & Logging ----------
- def load_config(path: str) -> dict:
- p = Path(path)
- if not p.exists():
- sys.exit(f"FEHLER: Konfigurationsdatei '{path}' nicht gefunden.")
- with p.open("r", encoding="utf-8") as f:
- cfg = json.load(f)
- required = [
- "notion_token", "notion_database_id", "calendar_name",
- "day_start_hour", "t_shirt_size_map", "protocol_file", "log_file",
- ]
- missing = [k for k in required if k not in cfg]
- if missing:
- sys.exit(f"FEHLER: In config.json fehlen Pflichtfelder: {', '.join(missing)}")
- user_pn = cfg.get("notion_property_names") or {}
- if not isinstance(user_pn, dict):
- sys.exit("FEHLER: 'notion_property_names' in config.json muss ein Objekt sein.")
- cfg["notion_property_names"] = {**DEFAULT_PROP_NAMES, **user_pn}
- projects = cfg.get("projects")
- if projects is None:
- projects = {}
- if not isinstance(projects, dict):
- sys.exit("FEHLER: 'projects' in config.json muss ein Objekt {Notion-Name: Anzeige-Name} sein.")
- cfg["projects"] = {str(k): str(v) if v else str(k) for k, v in projects.items()}
- return cfg
- def setup_logging(log_file: str) -> None:
- fmt = "[%(asctime)s] %(levelname)-5s %(message)s"
- datefmt = "%Y-%m-%d %H:%M:%S"
- formatter = logging.Formatter(fmt, datefmt)
- log.setLevel(logging.INFO)
- log.handlers.clear()
- fh = logging.FileHandler(log_file, encoding="utf-8")
- fh.setFormatter(formatter)
- log.addHandler(fh)
- sh = logging.StreamHandler(sys.stdout)
- sh.setFormatter(formatter)
- log.addHandler(sh)
- # ---------- Datums-Helfer ----------
- def get_month_bounds(month_str: str) -> tuple[date, date]:
- try:
- year, month = (int(x) for x in month_str.split("-", 1))
- first = date(year, month, 1)
- last = date(year, month, cal.monthrange(year, month)[1])
- return first, last
- except (ValueError, AttributeError):
- sys.exit(f"FEHLER: Ungültiger Wert für --monat: '{month_str}'. Format: YYYY-MM.")
- def previous_month_str(today: date | None = None) -> str:
- today = today or date.today()
- first_of_this_month = today.replace(day=1)
- last_of_prev = first_of_this_month - timedelta(days=1)
- return f"{last_of_prev.year:04d}-{last_of_prev.month:02d}"
- # ---------- Notion API ----------
- def _notion_headers(token: str) -> dict[str, str]:
- return {
- "Authorization": f"Bearer {token}",
- "Notion-Version": NOTION_VERSION,
- "Content-Type": "application/json",
- }
- def get_database_schema(database_id: str, token: str) -> dict:
- url = f"{NOTION_API_BASE}/databases/{database_id}"
- try:
- resp = requests.get(url, headers=_notion_headers(token), timeout=HTTP_TIMEOUT)
- except requests.RequestException as e:
- sys.exit(f"FEHLER: Notion API nicht erreichbar: {e}")
- if resp.status_code == 401:
- sys.exit("FEHLER: Ungültiger Notion-Token. Bitte 'notion_token' in config.json prüfen.")
- if resp.status_code == 404:
- sys.exit("FEHLER: Notion-Datenbank nicht gefunden. Bitte 'notion_database_id' und Integrationszugriff prüfen.")
- if not resp.ok:
- sys.exit(f"FEHLER: Datenbank-Schema konnte nicht abgefragt werden ({resp.status_code}): {resp.text}")
- return resp.json().get("properties", {})
- def _resolve_property_name(schema: dict, requested: str) -> str | None:
- if requested in schema:
- return requested
- target = requested.strip().casefold()
- for actual in schema.keys():
- if actual.strip().casefold() == target:
- return actual
- return None
- def _require_property(schema: dict, requested: str, role: str) -> str:
- actual = _resolve_property_name(schema, requested)
- if actual is None:
- available = ", ".join(repr(k) for k in sorted(schema.keys()))
- sys.exit(
- f"FEHLER: Eigenschaft '{requested}' (für '{role}') nicht in der Notion-Datenbank gefunden. "
- f"Verfügbare Properties: {available}"
- )
- if actual != requested:
- log.warning("Property '%s' aufgelöst zu '%s' (Whitespace/Groß-Klein-Schreibung).",
- requested, actual)
- return actual
- def _detect_filter_type(schema: dict, prop_name: str, allowed: tuple[str, ...]) -> str:
- t = schema[prop_name].get("type")
- if t in allowed:
- return t
- sys.exit(
- f"FEHLER: Eigenschaft '{prop_name}' hat den nicht unterstützten Typ '{t}'. "
- f"Erwartet: {', '.join(allowed)}."
- )
- def query_notion_database(database_id: str, token: str, start: date, end: date,
- prop_names: dict) -> list[dict]:
- schema = get_database_schema(database_id, token)
- prop_names["last_edit"] = _require_property(schema, prop_names["last_edit"], "last_edit")
- prop_names["status"] = _require_property(schema, prop_names["status"], "status")
- prop_names["project"] = _require_property(schema, prop_names["project"], "project")
- prop_names["size"] = _require_property(schema, prop_names["size"], "size")
- le_name = prop_names["last_edit"]
- status_name = prop_names["status"]
- project_name = prop_names["project"]
- le_filter_type = _detect_filter_type(
- schema, le_name, ("date", "last_edited_time", "created_time", "formula"),
- )
- status_filter_type = _detect_filter_type(schema, status_name, ("select", "status"))
- project_type = schema[project_name].get("type")
- if project_type != "relation":
- sys.exit(
- f"FEHLER: Eigenschaft '{project_name}' hat den Typ '{project_type}', "
- "erwartet wurde 'relation' (Verknüpfung)."
- )
- log.info(
- "Notion-Schema erkannt: %s (%s), %s (%s), %s (relation)",
- le_name, le_filter_type, status_name, status_filter_type, project_name,
- )
- def _date_filter(condition: dict) -> dict:
- if le_filter_type == "formula":
- return {"property": le_name, "formula": {"date": condition}}
- return {"property": le_name, le_filter_type: condition}
- url = f"{NOTION_API_BASE}/databases/{database_id}/query"
- headers = _notion_headers(token)
- body = {
- "filter": {
- "and": [
- _date_filter({"on_or_after": start.isoformat()}),
- _date_filter({"on_or_before": end.isoformat()}),
- {"property": status_name, status_filter_type: {"equals": STATUS_DONE}},
- ]
- },
- "page_size": 100,
- }
- pages: list[dict] = []
- while True:
- try:
- resp = requests.post(url, headers=headers, json=body, timeout=HTTP_TIMEOUT)
- except requests.RequestException as e:
- sys.exit(f"FEHLER: Notion API nicht erreichbar: {e}")
- if resp.status_code == 401:
- sys.exit("FEHLER: Ungültiger Notion-Token. Bitte 'notion_token' in config.json prüfen.")
- if resp.status_code == 404:
- sys.exit("FEHLER: Notion-Datenbank nicht gefunden. Bitte 'notion_database_id' und Integrationszugriff prüfen.")
- if not resp.ok:
- sys.exit(f"FEHLER: Notion API antwortete mit {resp.status_code}: {resp.text}")
- data = resp.json()
- pages.extend(data.get("results", []))
- if data.get("has_more"):
- body["start_cursor"] = data["next_cursor"]
- else:
- break
- return pages
- def resolve_project_name(page_id: str, token: str, cache: dict[str, str | None]) -> str | None:
- if page_id in cache:
- return cache[page_id]
- url = f"{NOTION_API_BASE}/pages/{page_id}"
- try:
- resp = requests.get(url, headers=_notion_headers(token), timeout=HTTP_TIMEOUT)
- except requests.RequestException as e:
- log.warning("Projekt-Auflösung fehlgeschlagen für %s: %s", page_id, e)
- cache[page_id] = None
- return None
- if not resp.ok:
- log.warning("Projekt-Auflösung %s: HTTP %s", page_id, resp.status_code)
- cache[page_id] = None
- return None
- page = resp.json()
- name = _extract_title(page.get("properties", {}))
- cache[page_id] = name
- return name
- def _extract_title(props: dict) -> str | None:
- for prop in props.values():
- if prop.get("type") == "title":
- parts = prop.get("title", [])
- text = "".join(p.get("plain_text", "") for p in parts).strip()
- return text or None
- return None
- # ---------- Normalisierung ----------
- def normalize_page(page: dict, token: str, cache: dict[str, str | None],
- size_map: dict[str, int], prop_names: dict,
- project_filter: dict[str, str]) -> dict | None:
- props = page.get("properties", {})
- page_id = page.get("id", "?")
- title = _extract_title(props)
- if not title:
- log.warning("Seite %s: Titel fehlt – übersprungen", page_id)
- return None
- size_key = prop_names["size"]
- size_prop = props.get(size_key) or {}
- size_select = (size_prop.get("select") or {}) if size_prop else {}
- size = size_select.get("name") if size_select else None
- if not size:
- log.warning("Seite \"%s\": %s fehlt – übersprungen", title, size_key)
- return None
- if size not in size_map:
- log.warning("Seite \"%s\": Unbekannte T-Shirt Size '%s' – übersprungen", title, size)
- return None
- minutes = int(size_map[size])
- le_key = prop_names["last_edit"]
- le_prop = props.get(le_key) or {}
- le_type = le_prop.get("type")
- if le_type == "date":
- le_start = (le_prop.get("date") or {}).get("start")
- elif le_type == "last_edited_time":
- le_start = le_prop.get("last_edited_time")
- elif le_type == "created_time":
- le_start = le_prop.get("created_time")
- elif le_type == "formula":
- formula = le_prop.get("formula") or {}
- ftype = formula.get("type")
- if ftype == "date":
- le_start = (formula.get("date") or {}).get("start")
- elif ftype == "string":
- le_start = formula.get("string")
- elif ftype == "number":
- le_start = formula.get("number")
- else:
- le_start = None
- else:
- le_start = None
- if not le_start:
- log.warning("Seite \"%s\": %s fehlt – übersprungen", title, le_key)
- return None
- try:
- last_edit = isoparse(le_start).date()
- except (ValueError, TypeError):
- log.warning("Seite \"%s\": %s nicht parsbar ('%s') – übersprungen", title, le_key, le_start)
- return None
- project_key = prop_names["project"]
- rel_prop = props.get(project_key) or {}
- relations = rel_prop.get("relation") or []
- if not relations:
- log.warning("Seite \"%s\": Relation '%s' fehlt – übersprungen", title, project_key)
- return None
- project_id = relations[0].get("id")
- if not project_id:
- log.warning("Seite \"%s\": Relation '%s' ohne ID – übersprungen", title, project_key)
- return None
- raw_project_name = resolve_project_name(project_id, token, cache)
- if not raw_project_name:
- log.warning("Seite \"%s\": Projektname nicht auflösbar – übersprungen", title)
- return None
- if project_filter:
- if raw_project_name not in project_filter:
- log.info("Seite \"%s\": Projekt '%s' nicht in Whitelist – übersprungen",
- title, raw_project_name)
- return None
- project_name = project_filter[raw_project_name]
- else:
- project_name = raw_project_name
- return {
- "title": title,
- "t_shirt_size": size,
- "minutes": minutes,
- "last_edit": last_edit,
- "project_id": project_id,
- "project_name": project_name,
- }
- # ---------- Aggregation & Zeitberechnung ----------
- # Each item stored during aggregation is (title, t_shirt_size, minutes).
- _Item = tuple[str, str, int]
- def _split_items_by_title(items: list[_Item]) -> list[list[_Item]]:
- """Split a sorted item list into groups whose joined title fits within
- OUTLOOK_TITLE_MAX characters. A single item that already exceeds the
- limit is kept as its own group (nothing we can do about that)."""
- groups: list[list[_Item]] = []
- current: list[_Item] = []
- current_len = 0
- sep_len = len(TITLE_SEP)
- for item in items:
- title_len = len(item[0])
- if not current:
- current.append(item)
- current_len = title_len
- else:
- needed = current_len + sep_len + title_len
- if needed > OUTLOOK_TITLE_MAX:
- groups.append(current)
- current = [item]
- current_len = title_len
- else:
- current.append(item)
- current_len = needed
- if current:
- groups.append(current)
- return groups
- def aggregate_events(pages: list[dict]) -> dict[date, list[dict]]:
- # First pass: collect all items per (date, project).
- groups: dict[tuple[date, str], dict] = {}
- for p in pages:
- key = (p["last_edit"], p["project_name"])
- g = groups.get(key)
- if g is None:
- g = {
- "date": p["last_edit"],
- "project_name": p["project_name"],
- "items": [],
- }
- groups[key] = g
- g["items"].append((p["title"], p["t_shirt_size"], p["minutes"]))
- by_day: dict[date, list[dict]] = defaultdict(list)
- for g in groups.values():
- g["items"].sort(key=lambda t: t[0])
- item_groups = _split_items_by_title(g["items"])
- if len(item_groups) > 1:
- log.info(
- "Termin '%s' am %s wird in %d Teile aufgeteilt "
- "(Outlook-Titel-Limit %d Zeichen).",
- g["project_name"], g["date"].isoformat(),
- len(item_groups), OUTLOOK_TITLE_MAX,
- )
- total_parts = len(item_groups)
- for part_idx, sub_items in enumerate(item_groups, start=1):
- by_day[g["date"]].append({
- "date": g["date"],
- "project_name": g["project_name"],
- "total_minutes": sum(m for _, _, m in sub_items),
- "summary": TITLE_SEP.join(title for title, _, _ in sub_items),
- "notes": g["project_name"],
- "part": part_idx,
- "total_parts": total_parts,
- })
- return by_day
- def assign_sequential_times(events_by_day: dict[date, list[dict]],
- day_start_hour: int) -> list[dict]:
- result: list[dict] = []
- for d in sorted(events_by_day.keys()):
- # Primary sort: project name; secondary: part index so split events
- # stay consecutive and in the correct order.
- day_events = sorted(events_by_day[d],
- key=lambda e: (e["project_name"], e.get("part", 1)))
- cursor = datetime(d.year, d.month, d.day, day_start_hour, 0)
- for ev in day_events:
- ev["start_time"] = cursor
- ev["end_time"] = cursor + timedelta(minutes=ev["total_minutes"])
- cursor = ev["end_time"]
- result.append(ev)
- return result
- # ---------- Protokoll & Zustand ----------
- def load_protocol(path: str) -> dict:
- p = Path(path)
- if not p.exists():
- return {}
- try:
- with p.open("r", encoding="utf-8") as f:
- return json.load(f)
- except (OSError, json.JSONDecodeError) as e:
- log.warning("Protokolldatei konnte nicht gelesen werden (%s) – starte mit leerem Protokoll.", e)
- return {}
- def save_protocol(path: str, protocol: dict) -> None:
- try:
- with Path(path).open("w", encoding="utf-8") as f:
- json.dump(protocol, f, indent=2, ensure_ascii=False, sort_keys=True)
- except OSError as e:
- log.warning("Protokolldatei konnte nicht geschrieben werden: %s", e)
- def protocol_key(event: dict) -> str:
- key = f"{event['date'].isoformat()}|{event['project_name']}"
- part = event.get("part", 1)
- if part > 1:
- key += f"|part{part}"
- return key
- def determine_state(event: dict, protocol: dict) -> str:
- key = protocol_key(event)
- prev = protocol.get(key)
- if prev is None:
- return "new"
- same = (
- prev.get("total_minutes") == event["total_minutes"]
- and prev.get("notes") == event["notes"]
- and prev.get("summary") == event["summary"]
- and prev.get("start_time") == event["start_time"].strftime("%H:%M")
- and prev.get("end_time") == event["end_time"].strftime("%H:%M")
- )
- return "unchanged" if same else "changed"
- # ---------- AppleScript ----------
- def escape_applescript_string(s: str) -> str:
- s = s.replace("\\", "\\\\")
- s = s.replace('"', '\\"')
- s = s.replace("\n", "\\n")
- return s
- def build_create_script(event: dict, calendar_name: str) -> str:
- cal_name = escape_applescript_string(calendar_name)
- title = escape_applescript_string(event["summary"])
- notes = escape_applescript_string(event["notes"])
- s, e = event["start_time"], event["end_time"]
- return f'''tell application "Calendar"
- tell calendar "{cal_name}"
- set startDate to current date
- set year of startDate to {s.year}
- set month of startDate to {s.month}
- set day of startDate to {s.day}
- set hours of startDate to {s.hour}
- set minutes of startDate to {s.minute}
- set seconds of startDate to 0
- set endDate to current date
- set year of endDate to {e.year}
- set month of endDate to {e.month}
- set day of endDate to {e.day}
- set hours of endDate to {e.hour}
- set minutes of endDate to {e.minute}
- set seconds of endDate to 0
- set newEvent to make new event with properties {{summary:"{title}", start date:startDate, end date:endDate, description:"{notes}"}}
- return uid of newEvent
- end tell
- end tell'''
- def build_update_script(event: dict, uid: str, calendar_name: str) -> str:
- cal_name = escape_applescript_string(calendar_name)
- uid_esc = escape_applescript_string(uid)
- title = escape_applescript_string(event["summary"])
- notes = escape_applescript_string(event["notes"])
- s, e = event["start_time"], event["end_time"]
- return f'''tell application "Calendar"
- tell calendar "{cal_name}"
- set matchingEvents to (every event whose uid is "{uid_esc}")
- if (count of matchingEvents) = 0 then
- return "ERROR: not found"
- end if
- set theEvent to item 1 of matchingEvents
- set startDate to current date
- set year of startDate to {s.year}
- set month of startDate to {s.month}
- set day of startDate to {s.day}
- set hours of startDate to {s.hour}
- set minutes of startDate to {s.minute}
- set seconds of startDate to 0
- set endDate to current date
- set year of endDate to {e.year}
- set month of endDate to {e.month}
- set day of endDate to {e.day}
- set hours of endDate to {e.hour}
- set minutes of endDate to {e.minute}
- set seconds of endDate to 0
- set summary of theEvent to "{title}"
- set start date of theEvent to startDate
- set end date of theEvent to endDate
- set description of theEvent to "{notes}"
- return "OK"
- end tell
- end tell'''
- def run_applescript(script: str) -> tuple[bool, str]:
- try:
- result = subprocess.run(
- ["osascript", "-e", script],
- capture_output=True, text=True, timeout=60,
- )
- except FileNotFoundError:
- return False, "osascript nicht gefunden (kein macOS?)"
- except subprocess.TimeoutExpired:
- return False, "osascript-Timeout"
- if result.returncode != 0:
- return False, (result.stderr or result.stdout).strip()
- return True, result.stdout.strip()
- def applescript_calendar_missing(stderr: str) -> bool:
- s = stderr.lower()
- return "can't get calendar" in s or "kann \"calendar" in s or "-1728" in s
- # ---------- Ausgabe ----------
- ANSI_GREEN = "\033[32m"
- ANSI_RED = "\033[31m"
- ANSI_ORANGE = "\033[38;5;208m"
- ANSI_RESET = "\033[0m"
- WEEKLY_OVERTIME_THRESHOLD = 2880 # 48 h in Minuten
- def format_duration(minutes: int) -> str:
- hours, mins = divmod(int(minutes), 60)
- return f"{hours}h {mins}min"
- def duration_color(minutes: int) -> str | None:
- if minutes > 600:
- return ANSI_RED
- if minutes >= 480:
- return ANSI_GREEN
- return None
- def _overloaded_week_dates(events: list[dict]) -> set[date]:
- """Dates belonging to ISO weeks whose combined minutes exceed 2880 (48 h)."""
- weekly: dict[tuple[int, int], int] = defaultdict(int)
- for ev in events:
- iso = ev["date"].isocalendar()
- weekly[(iso[0], iso[1])] += ev["total_minutes"]
- overloaded = {k for k, v in weekly.items() if v > WEEKLY_OVERTIME_THRESHOLD}
- return {ev["date"] for ev in events if (ev["date"].isocalendar()[0], ev["date"].isocalendar()[1]) in overloaded}
- def _print_legend() -> None:
- print("Legende:")
- print(f" Dauer: {ANSI_GREEN}■{ANSI_RESET} grün ≥ 8 h (≥ 480 min) "
- f"{ANSI_RED}■{ANSI_RESET} rot > 10 h (> 600 min)")
- print(f" Datum: {ANSI_ORANGE}■{ANSI_RESET} orange = Wochensumme > 48 h (> 2880 min)")
- print()
- _STATE_PRIO: dict[str, int] = {"new": 2, "changed": 1, "unchanged": 0}
- def print_dry_run_table(events: list[dict], protocol: dict) -> None:
- overloaded_dates = _overloaded_week_dates(events)
- # Collapse split parts into one display row per (date, project).
- # Events are already sorted by (date, project_name, part), so parts of the
- # same project arrive consecutively — we just accumulate into the last row.
- merged: list[dict] = []
- _seen: dict[tuple[date, str], int] = {} # maps (date, project) → index in merged
- for ev in events:
- key = (ev["date"], ev["project_name"])
- state = determine_state(ev, protocol)
- if key not in _seen:
- _seen[key] = len(merged)
- merged.append({
- "date": ev["date"],
- "project_name": ev["project_name"],
- "start_time": ev["start_time"],
- "end_time": ev["end_time"],
- "total_minutes": ev["total_minutes"],
- "state": state,
- })
- else:
- row = merged[_seen[key]]
- row["end_time"] = ev["end_time"] # last part's end is the overall end
- row["total_minutes"] += ev["total_minutes"]
- if _STATE_PRIO[state] > _STATE_PRIO[row["state"]]:
- row["state"] = state # promote to worst-case state
- header = ("Zustand", "Datum", "Projekt", "Start", "Ende", "Dauer")
- data_rows: list[tuple[tuple[str, ...], int, date]] = []
- for row in merged:
- data_rows.append((
- (
- row["state"].upper(),
- row["date"].isoformat(),
- row["project_name"],
- row["start_time"].strftime("%H:%M"),
- row["end_time"].strftime("%H:%M"),
- format_duration(row["total_minutes"]),
- ),
- row["total_minutes"],
- row["date"],
- ))
- all_rows = [header] + [r for r, _, _ in data_rows]
- widths = [max(len(r[i]) for r in all_rows) for i in range(len(header))]
- _print_legend()
- table_width = sum(widths) + 2 * (len(widths) - 1)
- header_line = " ".join(c.ljust(widths[i]) for i, c in enumerate(header))
- week_sep = "·" * table_width
- print(header_line)
- print("-" * table_width)
- date_idx = 1
- duration_idx = len(header) - 1
- prev_iso_week: tuple[int, int] | None = None
- for row, minutes, ev_date in data_rows:
- iso_week = (ev_date.isocalendar()[0], ev_date.isocalendar()[1])
- if prev_iso_week is not None and iso_week != prev_iso_week:
- print(week_sep)
- prev_iso_week = iso_week
- cells = []
- for i, c in enumerate(row):
- padded = c.ljust(widths[i])
- if i == duration_idx:
- color = duration_color(minutes)
- if color:
- padded = f"{color}{padded}{ANSI_RESET}"
- elif i == date_idx and ev_date in overloaded_dates:
- padded = f"{ANSI_ORANGE}{padded}{ANSI_RESET}"
- cells.append(padded)
- print(" ".join(cells))
- # ---------- Sync-Ausführung ----------
- def sync_event(event: dict, state: str, protocol: dict, calendar_name: str,
- counters: dict[str, int]) -> None:
- key = protocol_key(event)
- s_str = event["start_time"].strftime("%H:%M")
- e_str = event["end_time"].strftime("%H:%M")
- part_tag = (f" [{event['part']}/{event['total_parts']}]"
- if event.get("total_parts", 1) > 1 else "")
- label = f"{event['project_name']:<20}{part_tag} | {event['date'].isoformat()} | {s_str}–{e_str}"
- if state == "unchanged":
- log.info("UNVERÄNDERT: %s – übersprungen", label)
- counters["unchanged"] += 1
- return
- if state == "new":
- ok, out = run_applescript(build_create_script(event, calendar_name))
- if not ok:
- if applescript_calendar_missing(out):
- sys.exit(f"FEHLER: Kalender '{calendar_name}' nicht gefunden – bitte calendar_name in config.json prüfen.")
- log.error("CREATE fehlgeschlagen für %s: %s", label, out)
- counters["errors"] += 1
- return
- uid = out
- if not uid:
- log.error("CREATE lieferte leere UID für %s", label)
- counters["errors"] += 1
- return
- protocol[key] = {
- "uid": uid,
- "start_time": s_str,
- "end_time": e_str,
- "total_minutes": event["total_minutes"],
- "summary": event["summary"],
- "notes": event["notes"],
- }
- log.info("NEU: %s", label)
- counters["created"] += 1
- return
- # state == "changed"
- prev = protocol.get(key, {})
- uid = prev.get("uid")
- prev_label = ""
- if prev.get("start_time") and prev.get("end_time"):
- prev_label = f" (vorher {prev['start_time']}–{prev['end_time']})"
- if uid:
- ok, out = run_applescript(build_update_script(event, uid, calendar_name))
- if ok and out == "OK":
- protocol[key] = {
- "uid": uid,
- "start_time": s_str,
- "end_time": e_str,
- "total_minutes": event["total_minutes"],
- "summary": event["summary"],
- "notes": event["notes"],
- }
- log.info("GEÄNDERT: %s%s", label, prev_label)
- counters["updated"] += 1
- return
- if not ok and applescript_calendar_missing(out):
- sys.exit(f"FEHLER: Kalender '{calendar_name}' nicht gefunden – bitte calendar_name in config.json prüfen.")
- log.warning("UPDATE fehlgeschlagen (%s) – lege Termin neu an: %s", out or "unbekannt", label)
- else:
- log.warning("Kein UID im Protokoll für %s – lege Termin neu an.", label)
- ok, out = run_applescript(build_create_script(event, calendar_name))
- if not ok:
- if applescript_calendar_missing(out):
- sys.exit(f"FEHLER: Kalender '{calendar_name}' nicht gefunden – bitte calendar_name in config.json prüfen.")
- log.error("Fallback-CREATE fehlgeschlagen für %s: %s", label, out)
- counters["errors"] += 1
- return
- new_uid = out
- if not new_uid:
- log.error("Fallback-CREATE lieferte leere UID für %s", label)
- counters["errors"] += 1
- return
- protocol[key] = {
- "uid": new_uid,
- "start_time": s_str,
- "end_time": e_str,
- "total_minutes": event["total_minutes"],
- "summary": event["summary"],
- "notes": event["notes"],
- }
- log.info("GEÄNDERT (neu):%s%s", label, prev_label)
- counters["updated"] += 1
- # ---------- Hauptablauf ----------
- def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="Notion → Apple Kalender / Exchange Sync")
- parser.add_argument("--monat", default=None,
- help="Zielmonat im Format YYYY-MM, z. B. 2026-04. "
- "Ohne Angabe wird der vergangene Monat verwendet.")
- parser.add_argument("--dry-run", action="store_true", help="Zeigt Termine an, ohne sie anzulegen oder zu ändern")
- parser.add_argument("--kalender", help="Name des Zielkalenders (überschreibt config.json)")
- return parser.parse_args(argv)
- def main(argv: list[str] | None = None) -> int:
- args = parse_args(argv)
- cfg = load_config("config.json")
- setup_logging(cfg["log_file"])
- calendar_name = args.kalender or cfg["calendar_name"]
- month_str = args.monat or previous_month_str()
- month_start, month_end = get_month_bounds(month_str)
- if args.monat is None:
- log.info("Kein --monat angegeben – verwende vergangenen Monat: %s", month_str)
- log.info("Starte Sync für Monat: %s", month_str)
- log.info("Kalender: %s | Tagesstart: %02d:00", calendar_name, cfg["day_start_hour"])
- protocol = load_protocol(cfg["protocol_file"])
- prop_names = cfg["notion_property_names"]
- project_filter = cfg["projects"]
- if project_filter:
- log.info("Projekt-Whitelist aktiv (%d Einträge): %s",
- len(project_filter), ", ".join(sorted(project_filter.keys())))
- else:
- log.info("Keine Projekt-Whitelist gesetzt – alle Projekte werden synchronisiert.")
- raw_pages = query_notion_database(
- cfg["notion_database_id"], cfg["notion_token"], month_start, month_end, prop_names,
- )
- log.info("%d Notion-Seiten geladen", len(raw_pages))
- project_cache: dict[str, str | None] = {}
- normalized: list[dict] = []
- for raw in raw_pages:
- n = normalize_page(
- raw, cfg["notion_token"], project_cache,
- cfg["t_shirt_size_map"], prop_names, project_filter,
- )
- if n is not None:
- normalized.append(n)
- by_day = aggregate_events(normalized)
- events = assign_sequential_times(by_day, int(cfg["day_start_hour"]))
- if not events:
- log.info("Keine zu synchronisierenden Termine gefunden.")
- return 0
- if args.dry_run:
- log.info("--dry-run aktiv: keine AppleScripts werden ausgeführt.")
- print_dry_run_table(events, protocol)
- return 0
- counters = {"created": 0, "updated": 0, "unchanged": 0, "errors": 0}
- for ev in events:
- state = determine_state(ev, protocol)
- sync_event(ev, state, protocol, calendar_name, counters)
- save_protocol(cfg["protocol_file"], protocol)
- log.info(
- "Sync abgeschlossen: %d erstellt, %d aktualisiert, %d unverändert, %d Fehler",
- counters["created"], counters["updated"], counters["unchanged"], counters["errors"],
- )
- return 0 if counters["errors"] == 0 else 2
- if __name__ == "__main__":
- sys.exit(main())
|