358 lines
11 KiB
Python
358 lines
11 KiB
Python
"""Sync configurations from the Notion 'Recurring Events' database into the
|
|
'Gemeinsamer Kalender' database by expanding each configuration into its
|
|
individual occurrences.
|
|
|
|
Uses only the Python standard library (urllib, json, datetime).
|
|
"""
|
|
|
|
import json
|
|
import urllib.request
|
|
import urllib.error
|
|
from datetime import date, timedelta, datetime
|
|
from calendar import monthrange
|
|
|
|
NOTION_SECRET = "secret_b7PiPL2FqC9QEikqkAEWOht7LmzPMIJMWTzUPWwbw4H"
|
|
RECURRING_DB_ID = "34010a5f51bd804091dafc91d094bf8b"
|
|
CALENDAR_DB_ID = "34010a5f51bd8093ad67d3d3cd3908a4"
|
|
NOTION_VERSION = "2022-06-28"
|
|
API_BASE = "https://api.notion.com/v1"
|
|
|
|
# If a configuration has no End Date, we cap occurrence generation at this
|
|
# many years from today so the calendar does not balloon.
|
|
DEFAULT_HORIZON_YEARS = 2
|
|
|
|
WEEKDAY_INDEX = {
|
|
"Monday": 0,
|
|
"Tuesday": 1,
|
|
"Wednesday": 2,
|
|
"Thursday": 3,
|
|
"Friday": 4,
|
|
"Saturday": 5,
|
|
"Sunday": 6,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Notion HTTP helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _request(method, path, payload=None):
|
|
url = f"{API_BASE}{path}"
|
|
data = None
|
|
if payload is not None:
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(url, data=data, method=method)
|
|
req.add_header("Authorization", f"Bearer {NOTION_SECRET}")
|
|
req.add_header("Notion-Version", NOTION_VERSION)
|
|
req.add_header("Content-Type", "application/json")
|
|
try:
|
|
with urllib.request.urlopen(req) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode("utf-8", errors="replace")
|
|
raise RuntimeError(f"Notion API {method} {path} -> {e.code}: {body}") from e
|
|
|
|
|
|
def query_database(database_id, filter_=None):
|
|
results = []
|
|
payload = {}
|
|
if filter_ is not None:
|
|
payload["filter"] = filter_
|
|
while True:
|
|
data = _request("POST", f"/databases/{database_id}/query", payload)
|
|
results.extend(data.get("results", []))
|
|
if not data.get("has_more"):
|
|
break
|
|
payload["start_cursor"] = data["next_cursor"]
|
|
return results
|
|
|
|
|
|
def create_page(properties, icon=None):
|
|
body = {"parent": {"database_id": CALENDAR_DB_ID}, "properties": properties}
|
|
if icon is not None:
|
|
body["icon"] = icon
|
|
return _request("POST", "/pages", body)
|
|
|
|
|
|
def update_page(page_id, properties, icon=None):
|
|
body = {"properties": properties}
|
|
if icon is not None:
|
|
body["icon"] = icon
|
|
return _request("PATCH", f"/pages/{page_id}", body)
|
|
|
|
|
|
def archive_page(page_id):
|
|
return _request("PATCH", f"/pages/{page_id}", {"archived": True})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property extraction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _title_text(prop):
|
|
return "".join(t.get("plain_text", "") for t in prop.get("title", []))
|
|
|
|
|
|
def _rich_text(prop):
|
|
return "".join(t.get("plain_text", "") for t in prop.get("rich_text", []))
|
|
|
|
|
|
def _select_name(prop):
|
|
sel = prop.get("select")
|
|
return sel["name"] if sel else None
|
|
|
|
|
|
def _multi_select_names(prop):
|
|
return [o["name"] for o in prop.get("multi_select", [])]
|
|
|
|
|
|
def _date_start(prop):
|
|
d = prop.get("date")
|
|
if not d or not d.get("start"):
|
|
return None
|
|
return date.fromisoformat(d["start"][:10])
|
|
|
|
|
|
def parse_recurring_event(page):
|
|
props = page["properties"]
|
|
return {
|
|
"page_id": page["id"],
|
|
"icon": page.get("icon"),
|
|
"name": _title_text(props["Event Name"]),
|
|
"description": _rich_text(props["Description"]),
|
|
"location": _rich_text(props["Location"]),
|
|
"start_date": _date_start(props["Start Date"]),
|
|
"end_date": _date_start(props["End Date"]),
|
|
"interval": int(props["Interval"].get("number") or 1),
|
|
"frequency": _select_name(props["Frequency"]),
|
|
"days_of_week": _multi_select_names(props["Days of Week"]),
|
|
"days_of_month": [int(x) for x in _multi_select_names(props["Days of Month"])],
|
|
"betroffen": _multi_select_names(props.get("Betroffen", {"multi_select": []})),
|
|
"bereich": _multi_select_names(props.get("Bereich", {"multi_select": []})),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Occurrence calculation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _add_months(d, months):
|
|
"""Return a date `months` months after d, clamped to last day of month."""
|
|
total = d.month - 1 + months
|
|
year = d.year + total // 12
|
|
month = total % 12 + 1
|
|
day = min(d.day, monthrange(year, month)[1])
|
|
return date(year, month, day)
|
|
|
|
|
|
def _effective_end(cfg, today):
|
|
horizon = date(today.year + DEFAULT_HORIZON_YEARS, today.month, today.day)
|
|
if cfg["end_date"] is None:
|
|
return horizon
|
|
return min(cfg["end_date"], horizon)
|
|
|
|
|
|
def compute_occurrences(cfg, today=None):
|
|
today = today or date.today()
|
|
if cfg["start_date"] is None or not cfg["frequency"]:
|
|
return []
|
|
start = cfg["start_date"]
|
|
end = _effective_end(cfg, today)
|
|
if end < start:
|
|
return []
|
|
interval = max(1, cfg["interval"])
|
|
freq = cfg["frequency"]
|
|
|
|
if freq == "Daily":
|
|
return _daily(start, end, interval)
|
|
if freq == "Weekly":
|
|
return _weekly(start, end, interval, cfg["days_of_week"])
|
|
if freq == "Monthly":
|
|
return _monthly(start, end, interval, cfg["days_of_month"])
|
|
if freq == "Yearly":
|
|
return _yearly(start, end, interval)
|
|
return []
|
|
|
|
|
|
def _daily(start, end, interval):
|
|
out = []
|
|
d = start
|
|
while d <= end:
|
|
out.append(d)
|
|
d += timedelta(days=interval)
|
|
return out
|
|
|
|
|
|
def _weekly(start, end, interval, days_of_week):
|
|
# Fall back to start date's weekday if nothing is selected.
|
|
if days_of_week:
|
|
target_weekdays = {WEEKDAY_INDEX[name] for name in days_of_week if name in WEEKDAY_INDEX}
|
|
else:
|
|
target_weekdays = {start.weekday()}
|
|
if not target_weekdays:
|
|
return []
|
|
# Anchor at the start of the week containing `start` (Monday).
|
|
week_anchor = start - timedelta(days=start.weekday())
|
|
out = []
|
|
w = week_anchor
|
|
while w <= end:
|
|
for wd in sorted(target_weekdays):
|
|
d = w + timedelta(days=wd)
|
|
if start <= d <= end:
|
|
out.append(d)
|
|
w += timedelta(weeks=interval)
|
|
return sorted(out)
|
|
|
|
|
|
def _monthly(start, end, interval, days_of_month):
|
|
if days_of_month:
|
|
target_days = sorted(set(days_of_month))
|
|
else:
|
|
target_days = [start.day]
|
|
out = []
|
|
cursor = date(start.year, start.month, 1)
|
|
months_step = interval
|
|
# Walk month-by-month in steps of `interval`, starting from start's month.
|
|
m = date(start.year, start.month, 1)
|
|
while m <= end:
|
|
last_day = monthrange(m.year, m.month)[1]
|
|
for d_num in target_days:
|
|
if 1 <= d_num <= last_day:
|
|
d = date(m.year, m.month, d_num)
|
|
if start <= d <= end:
|
|
out.append(d)
|
|
m = _add_months(m, months_step)
|
|
return sorted(out)
|
|
|
|
|
|
def _yearly(start, end, interval):
|
|
out = []
|
|
y = start.year
|
|
while True:
|
|
try:
|
|
d = date(y, start.month, start.day)
|
|
except ValueError:
|
|
# Feb 29 on non-leap year: skip.
|
|
d = None
|
|
if d is not None:
|
|
if d > end:
|
|
break
|
|
if d >= start:
|
|
out.append(d)
|
|
y += interval
|
|
# Safety: if start.month/day can never exist (shouldn't happen), break.
|
|
if y > end.year + interval:
|
|
break
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Calendar upsert
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _build_calendar_properties(cfg, occurrence_date, today):
|
|
props = {
|
|
"Event Name": {"title": [{"text": {"content": cfg["name"]}}]},
|
|
"Date": {"date": {"start": occurrence_date.isoformat()}},
|
|
"Is Recurring": {"checkbox": True},
|
|
"Sync Date": {"date": {"start": today.isoformat()}},
|
|
"Description": {
|
|
"rich_text": [{"text": {"content": cfg["description"]}}] if cfg["description"] else []
|
|
},
|
|
"Location": {
|
|
"rich_text": [{"text": {"content": cfg["location"]}}] if cfg["location"] else []
|
|
},
|
|
"Betroffen": {"multi_select": [{"name": n} for n in cfg["betroffen"]]},
|
|
"Bereich": {"multi_select": [{"name": n} for n in cfg["bereich"]]},
|
|
}
|
|
return props
|
|
|
|
|
|
def _existing_index(calendar_pages):
|
|
"""Map (event_name, date_iso) -> page_id for existing entries."""
|
|
index = {}
|
|
for page in calendar_pages:
|
|
props = page["properties"]
|
|
name = _title_text(props["Event Name"])
|
|
d = _date_start(props["Date"])
|
|
if name and d:
|
|
index[(name, d.isoformat())] = page["id"]
|
|
return index
|
|
|
|
|
|
def sync():
|
|
today = date.today()
|
|
print(f"Sync started at {today.isoformat()}")
|
|
|
|
recurring_pages = query_database(RECURRING_DB_ID)
|
|
print(f"Fetched {len(recurring_pages)} recurring event configuration(s).")
|
|
|
|
calendar_pages = query_database(CALENDAR_DB_ID)
|
|
existing = _existing_index(calendar_pages)
|
|
print(f"Found {len(existing)} existing calendar entrie(s).")
|
|
|
|
created = 0
|
|
updated = 0
|
|
skipped = 0
|
|
desired_keys = set()
|
|
|
|
for page in recurring_pages:
|
|
cfg = parse_recurring_event(page)
|
|
if not cfg["name"]:
|
|
print(" Skipping row without Event Name.")
|
|
skipped += 1
|
|
continue
|
|
occurrences = compute_occurrences(cfg, today=today)
|
|
print(f" '{cfg['name']}' ({cfg['frequency']}, interval {cfg['interval']}): "
|
|
f"{len(occurrences)} occurrence(s) computed.")
|
|
for occ in occurrences:
|
|
key = (cfg["name"], occ.isoformat())
|
|
desired_keys.add(key)
|
|
props = _build_calendar_properties(cfg, occ, today)
|
|
icon = cfg["icon"]
|
|
if key in existing:
|
|
update_page(existing[key], props, icon=icon)
|
|
updated += 1
|
|
else:
|
|
create_page(props, icon=icon)
|
|
created += 1
|
|
|
|
# Purge stale recurring entries: any calendar page marked as recurring
|
|
# that no longer corresponds to a desired (name, date) occurrence.
|
|
archived = _purge_stale(calendar_pages, desired_keys)
|
|
|
|
print(f"Done. Created {created}, updated {updated}, archived {archived}, "
|
|
f"skipped {skipped}.")
|
|
|
|
|
|
def _purge_stale(calendar_pages, desired_keys):
|
|
archived = 0
|
|
for page in calendar_pages:
|
|
# Skip pages that are already archived / in trash.
|
|
if page.get("archived") or page.get("in_trash"):
|
|
continue
|
|
props = page["properties"]
|
|
# Only consider entries flagged as recurring — manual entries are
|
|
# always left untouched.
|
|
is_recurring = props.get("Is Recurring", {}).get("checkbox", False)
|
|
if not is_recurring:
|
|
continue
|
|
name = _title_text(props["Event Name"])
|
|
d = _date_start(props["Date"])
|
|
if not name or not d:
|
|
continue
|
|
key = (name, d.isoformat())
|
|
if key not in desired_keys:
|
|
archive_page(page["id"])
|
|
archived += 1
|
|
print(f" Archived stale occurrence: {name} @ {d.isoformat()}")
|
|
return archived
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sync()
|