import asyncio import json import os import uuid from datetime import datetime, timedelta from aiohttp import web import random import base64 # Constants and Config BASE_URL = "rpi.local:8080" PERSISTENT_FILE = "persistent.json" POOL_FILE = "pool.json" LOG_FILE = "secretsanta_noncompliant_log.txt" TOKENS_FILE = "tokens.json" INVITE_URLS_FILE = "invite_urls.txt" COUPLE_HOURS = 2 # hours to wait before forcing assignment DEFAULT_DOLLAR_LIMIT = 50 CHECK_INTERVAL = ( 12 * 3600 ) # 12 hours in seconds to check if we need to purge our data for the year # Deadlines default to current year CURRENT_YEAR = datetime.utcnow().year SIGNUP_DEADLINE = datetime(CURRENT_YEAR, 11, 15, 0, 0, 0) DRAWING_DEADLINE = datetime(CURRENT_YEAR, 12, 15, 0, 0, 0) # Calculate purge date: 1 day after Christmas of current year CHRISTMAS = datetime(CURRENT_YEAR, 12, 25) PURGE_DATE = CHRISTMAS + timedelta(days=1) # Load data def load_persistent(): if os.path.exists(PERSISTENT_FILE): with open(PERSISTENT_FILE, "r") as f: return json.load(f) return [] def load_pool(): if os.path.exists(POOL_FILE): with open(POOL_FILE, "r") as f: return json.load(f) return [] def load_tokens(): if os.path.exists(TOKENS_FILE): with open(TOKENS_FILE, "r") as f: return json.load(f) return {} persistent = load_persistent() pool = load_pool() # list of ids available to be drawn (not yet assigned_by someone) tokens = load_tokens() # dict of token -> {used: bool, used_by_cookie: str} # Save data def save_data(): with open(PERSISTENT_FILE, "w") as f: json.dump(persistent, f, indent=4, default=str) with open(POOL_FILE, "w") as f: json.dump(pool, f, indent=4) def save_tokens(): with open(TOKENS_FILE, "w") as f: json.dump(tokens, f, indent=4) # Generate invite tokens def generate_tokens(count=500, base_url=BASE_URL): """Generate invite tokens and save URLs to file""" new_tokens = {} urls = [] for _ in range(count): token = base64.urlsafe_b64encode(uuid.uuid4().bytes).decode("utf-8").rstrip("=") new_tokens[token] = {"used": False, "used_by_cookie": None} urls.append(f"{base_url}/invite?token={token}") # Save tokens tokens.update(new_tokens) save_tokens() # Save URLs to text file with open(INVITE_URLS_FILE, "w") as f: f.write("\n".join(urls)) print(f"Generated {count} invite tokens. URLs saved to {INVITE_URLS_FILE}") # Check if we need to generate tokens on startup if not os.path.exists(TOKENS_FILE) or not tokens: generate_tokens() # Purge data if past purge date def purge_data(): now = datetime.utcnow() if now > PURGE_DATE: for file in [PERSISTENT_FILE, POOL_FILE]: if os.path.exists(file): os.remove(file) print("Data purged due to purge date.") # Log non-compliant (non-drawers) if past drawing deadline def log_non_compliant(): now = datetime.utcnow() if now > DRAWING_DEADLINE: non_drawers = [ entry for entry in persistent if entry.get("assigned_receiver") is None ] log_lines = [] for nd in non_drawers: contact = nd.get("fedi") or nd.get("email") or "No contact" log_lines.append(f"Non-drawer: ID {nd['id']}, Contact: {contact}") if nd.get("assigned_by"): sender = next( (e for e in persistent if e["id"] == nd["assigned_by"]), None ) sender_contact = ( sender.get("fedi") or sender.get("email") or "No contact" if sender else "Unknown" ) log_lines.append( f" Affected sender ID {nd['assigned_by']}, Contact: {sender_contact} - Needs reassignment" ) if log_lines: log_msg = "\n".join(log_lines) + "\n" print(log_msg) # To console with open(LOG_FILE, "a") as f: f.write(log_msg) # Background task for periodic checks async def background_checker(): while True: purge_data() log_non_compliant() await asyncio.sleep(CHECK_INTERVAL) # HTML Template for access denied ACCESS_DENIED_HTML = """ Access Denied - Secret Santa

🚫 Access Denied 🚫

{message}

""" # HTML Template with late 90s Christmas theme, inline CSS/JS # Escaped curly braces for str.format() HTML = """ Secret Santa Coordinator - Ho Ho Ho!

🎅 Secret Santa Anonymous Coordinator 🎄

Welcome to our #darkfedi gift exchange! Enter your details to join the pool. We'll assign you someone to send a gift to anonymously.

Signup deadline: {signup_deadline} | Drawing deadline: {drawing_deadline}

{message}






*CLICK* to agree to reach out to the organizer if you cannot send a gift in time.

Check Your Status


""" # Simplified success HTML (removes forms and other elements) SUCCESS_HTML = """ Secret Santa Coordinator - Ho Ho Ho!

🎅 Secret Santa Anonymous Coordinator 🎄

{message} """ # Middleware to check authentication @web.middleware async def auth_middleware(request, handler): # Allow access to invite endpoint and static resources if request.path in ["/invite", "/candycane.png"]: return await handler(request) # Check for auth cookie auth_cookie = request.cookies.get("secretsanta_auth") if not auth_cookie: html = ACCESS_DENIED_HTML.format( message="You need a valid invite link to access this page." ) return web.Response(text=html, content_type="text/html", status=403) # Verify the auth cookie is valid (exists in our tokens and is marked as used) valid = False for token, data in tokens.items(): if data.get("used_by_cookie") == auth_cookie: valid = True break if not valid: html = ACCESS_DENIED_HTML.format(message="Invalid or expired access token.") return web.Response(text=html, content_type="text/html", status=403) return await handler(request) # Helper to get entry by id def get_entry_by_id(entry_id): return next((e for e in persistent if e["id"] == entry_id), None) # Helper to get entry by code def get_entry_by_code(code): return next((e for e in persistent if e.get("code") == code), None) # Routes routes = web.RouteTableDef() @routes.get("/invite") async def invite(request): token = request.query.get("token") if not token: html = ACCESS_DENIED_HTML.format(message="No invite token provided.") return web.Response(text=html, content_type="text/html", status=400) token_data = tokens.get(token) if not token_data: html = ACCESS_DENIED_HTML.format(message="Invalid invite token.") return web.Response(text=html, content_type="text/html", status=400) if token_data["used"]: html = ACCESS_DENIED_HTML.format( message="This invite token has already been used." ) return web.Response(text=html, content_type="text/html", status=400) # Mark token as used and create auth cookie auth_cookie = str(uuid.uuid4()) token_data["used"] = True token_data["used_by_cookie"] = auth_cookie save_tokens() # Redirect to main page with auth cookie resp = web.Response(status=302) resp.headers["Location"] = "/" resp.set_cookie("secretsanta_auth", auth_cookie, max_age=3600 * 24 * 30) # 30 days return resp @routes.get("/") async def index(request): now = datetime.utcnow() message = "" cookie_code = request.cookies.get("secretsanta_code") if cookie_code: entry = get_entry_by_code(cookie_code) if entry: message = "

You've already joined! Use the check form below with your code to see status.

" html = HTML.format( signup_deadline=SIGNUP_DEADLINE.isoformat(), drawing_deadline=DRAWING_DEADLINE.isoformat(), message=message, ) resp = web.Response(text=html, content_type="text/html") return resp @routes.get("/candycane.png") async def serve_candycane(request): # actually ncd icon if os.path.exists("ncd.png"): with open("ncd.png", "rb") as f: data = f.read() return web.Response(body=data, content_type="image/png") return web.Response(status=404) @routes.post("/submit") async def submit(request): now = datetime.utcnow() if now > SIGNUP_DEADLINE: message = "

Signup period is over!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type="text/html", status=400) data = await request.post() if not data.get("address"): message = "

Address is required!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type="text/html", status=400) if not data.get("fedi") and not data.get("email"): message = "

Either Fedi handle or email is required for contact!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type="text/html", status=400) if not data.get("agreement"): message = "

You must agree to the terms!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type="text/html", status=400) # Check for existing via cookie cookie_code = request.cookies.get("secretsanta_code") if cookie_code and get_entry_by_code(cookie_code): message = "

You've already submitted! Check your status below.

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type="text/html", status=400) entry_id = str(uuid.uuid4())[:8] # First 8 chars of UUID code = str(uuid.uuid4())[:8] # Short code too dollar_limit = data.get("dollar_limit") or DEFAULT_DOLLAR_LIMIT try: dollar_limit = float(dollar_limit) except ValueError: dollar_limit = DEFAULT_DOLLAR_LIMIT entry = { "id": entry_id, "name": data.get("name", ""), "fedi": data.get("fedi", ""), "email": data.get("email", ""), "dollar_limit": dollar_limit, "likes_dislikes": data.get("likes_dislikes", ""), "address": data.get("address"), "submission_time": now.isoformat(), "code": code, "assigned_receiver": None, "assigned_by": None, } persistent.append(entry) pool.append(entry_id) save_data() print(f"Created Entry:\n{entry}\n\n") # Attempt immediate assignment if pool >=3 (including this one) assignment_msg = await try_assign(entry) message = f"

Thanks for joining! Your one-time code: {code} (save it!)

{assignment_msg}

If not assigned, check back later with your code.

" html = SUCCESS_HTML.format(message=message) resp = web.Response(text=html, content_type="text/html") resp.set_cookie("secretsanta_code", code, max_age=3600 * 24 * 30) # 30 days return resp async def try_assign(entry): now = datetime.utcnow() if now > DRAWING_DEADLINE: return "

Drawing period is over!

" submission_time = datetime.fromisoformat(entry["submission_time"]) time_in_pool = (now - submission_time).total_seconds() / 3600 if entry["assigned_receiver"] is not None: receiver = get_entry_by_id(entry["assigned_receiver"]) if receiver: return f"

You've already been assigned! Send to: Name: {receiver['name']}, Address: {receiver['address']}, Limit: ${receiver['dollar_limit']}, Likes/Dislikes: {receiver['likes_dislikes']}

" available = [eid for eid in pool if eid != entry["id"]] if not available: return "

Pool is empty right now (or only you left). Check back later!

" if len(pool) < 3 and time_in_pool <= COUPLE_HOURS: return "

Pool is too small (<3 participants). Please check back in a couple hours.

" # Assign random receiver_id = random.choice(available) receiver_entry = get_entry_by_id(receiver_id) receiver_entry["assigned_by"] = entry["id"] entry["assigned_receiver"] = receiver_id pool.remove(receiver_id) save_data() print(f"Assigned id {receiver_id}") return f"

Assigned! Send a gift to: Name: {receiver_entry['name']}, Address: {receiver_entry['address']}, Limit: ${receiver_entry['dollar_limit']}, Likes/Dislikes: {receiver_entry['likes_dislikes']}

" @routes.post("/check") async def check(request): now = datetime.utcnow() data = await request.post() code = data.get("code") if not code: message = "

Code required!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type="text/html", status=400) entry = get_entry_by_code(code) if not entry: message = "

Invalid code!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type="text/html", status=400) assignment_msg = await try_assign(entry) message = f"

Your code is valid. Status: {assignment_msg}

" html = SUCCESS_HTML.format(message=message) resp = web.Response(text=html, content_type="text/html") resp.set_cookie("secretsanta_code", code, max_age=3600 * 24 * 30) return resp # App setup app = web.Application(middlewares=[auth_middleware]) app.add_routes(routes) async def start_background_tasks(app): app["background_task"] = asyncio.create_task(background_checker()) app.on_startup.append(start_background_tasks) if __name__ == "__main__": web.run_app(app, port=8080)