import asyncio import json import os import uuid from datetime import datetime, timedelta from aiohttp import web import random import base64 # Constants and Config BASE_URL = 'rpi.local:8080' PERSISTENT_FILE = 'persistent.json' POOL_FILE = 'pool.json' LOG_FILE = 'secretsanta_noncompliant_log.txt' TOKENS_FILE = 'tokens.json' INVITE_URLS_FILE = 'invite_urls.txt' COUPLE_HOURS = 2 # hours to wait before forcing assignment DEFAULT_DOLLAR_LIMIT = 50 CHECK_INTERVAL = 12 * 3600 # 12 hours in seconds # Deadlines default to current year CURRENT_YEAR = datetime.utcnow().year SIGNUP_DEADLINE = datetime(CURRENT_YEAR, 11, 15, 0, 0, 0) DRAWING_DEADLINE = datetime(CURRENT_YEAR, 12, 15, 0, 0, 0) # Calculate purge date: 7 days before Christmas of current year CHRISTMAS = datetime(CURRENT_YEAR, 12, 25) PURGE_DATE = CHRISTMAS - timedelta(days=7) # Load data def load_persistent(): if os.path.exists(PERSISTENT_FILE): with open(PERSISTENT_FILE, 'r') as f: return json.load(f) return [] def load_pool(): if os.path.exists(POOL_FILE): with open(POOL_FILE, 'r') as f: return json.load(f) return [] def load_tokens(): if os.path.exists(TOKENS_FILE): with open(TOKENS_FILE, 'r') as f: return json.load(f) return {} persistent = load_persistent() pool = load_pool() # list of ids available to be drawn (not yet assigned_by someone) tokens = load_tokens() # dict of token -> {used: bool, used_by_cookie: str} # Save data def save_data(): with open(PERSISTENT_FILE, 'w') as f: json.dump(persistent, f, indent=4, default=str) with open(POOL_FILE, 'w') as f: json.dump(pool, f, indent=4) def save_tokens(): with open(TOKENS_FILE, 'w') as f: json.dump(tokens, f, indent=4) # Generate invite tokens def generate_tokens(count=500, base_url=BASE_URL): """Generate invite tokens and save URLs to file""" new_tokens = {} urls = [] for _ in range(count): token = base64.urlsafe_b64encode(uuid.uuid4().bytes).decode('utf-8').rstrip('=') new_tokens[token] = {'used': False, 'used_by_cookie': None} urls.append(f"{base_url}/invite?token={token}") # Save tokens tokens.update(new_tokens) save_tokens() # Save URLs to text file with open(INVITE_URLS_FILE, 'w') as f: f.write('\n'.join(urls)) print(f"Generated {count} invite tokens. URLs saved to {INVITE_URLS_FILE}") # Check if we need to generate tokens on startup if not os.path.exists(TOKENS_FILE) or not tokens: generate_tokens() # Purge data if past purge date def purge_data(): now = datetime.utcnow() if now > PURGE_DATE: for file in [PERSISTENT_FILE, POOL_FILE]: if os.path.exists(file): os.remove(file) print("Data purged due to purge date.") # Log non-compliant (non-drawers) if past drawing deadline def log_non_compliant(): now = datetime.utcnow() if now > DRAWING_DEADLINE: non_drawers = [entry for entry in persistent if entry.get('assigned_receiver') is None] log_lines = [] for nd in non_drawers: contact = nd.get('fedi') or nd.get('email') or 'No contact' log_lines.append(f"Non-drawer: ID {nd['id']}, Contact: {contact}") if nd.get('assigned_by'): sender = next((e for e in persistent if e['id'] == nd['assigned_by']), None) sender_contact = sender.get('fedi') or sender.get('email') or 'No contact' if sender else 'Unknown' log_lines.append(f" Affected sender ID {nd['assigned_by']}, Contact: {sender_contact} - Needs reassignment") if log_lines: log_msg = "\n".join(log_lines) + "\n" print(log_msg) # To console with open(LOG_FILE, 'a') as f: f.write(log_msg) # Background task for periodic checks async def background_checker(): while True: purge_data() log_non_compliant() await asyncio.sleep(CHECK_INTERVAL) # HTML Template for access denied ACCESS_DENIED_HTML = """ Access Denied - Secret Santa

🚫 Access Denied 🚫

{message}

""" # HTML Template with late 90s Christmas theme, inline CSS/JS # Escaped curly braces for str.format() HTML = """ Secret Santa Coordinator - Ho Ho Ho!

🎅 Secret Santa Anonymous Coordinator 🎄

Welcome to our #darkfedi gift exchange! Enter your details to join the pool. We'll assign you someone to send a gift to anonymously.

Signup deadline: {signup_deadline} | Drawing deadline: {drawing_deadline}

{message}






*CLICK* to agree to reach out to the organizer if you cannot send a gift in time.

Check Your Status


""" # Simplified success HTML (removes forms and other elements) SUCCESS_HTML = """ Secret Santa Coordinator - Ho Ho Ho!

🎅 Secret Santa Anonymous Coordinator 🎄

{message} """ # Middleware to check authentication @web.middleware async def auth_middleware(request, handler): # Allow access to invite endpoint and static resources if request.path in ['/invite', '/candycane.png']: return await handler(request) # Check for auth cookie auth_cookie = request.cookies.get('secretsanta_auth') if not auth_cookie: html = ACCESS_DENIED_HTML.format(message="You need a valid invite link to access this page.") return web.Response(text=html, content_type='text/html', status=403) # Verify the auth cookie is valid (exists in our tokens and is marked as used) valid = False for token, data in tokens.items(): if data.get('used_by_cookie') == auth_cookie: valid = True break if not valid: html = ACCESS_DENIED_HTML.format(message="Invalid or expired access token.") return web.Response(text=html, content_type='text/html', status=403) return await handler(request) # Helper to get entry by id def get_entry_by_id(entry_id): return next((e for e in persistent if e['id'] == entry_id), None) # Helper to get entry by code def get_entry_by_code(code): return next((e for e in persistent if e.get('code') == code), None) # Routes routes = web.RouteTableDef() @routes.get('/invite') async def invite(request): token = request.query.get('token') if not token: html = ACCESS_DENIED_HTML.format(message="No invite token provided.") return web.Response(text=html, content_type='text/html', status=400) token_data = tokens.get(token) if not token_data: html = ACCESS_DENIED_HTML.format(message="Invalid invite token.") return web.Response(text=html, content_type='text/html', status=400) if token_data['used']: html = ACCESS_DENIED_HTML.format(message="This invite token has already been used.") return web.Response(text=html, content_type='text/html', status=400) # Mark token as used and create auth cookie auth_cookie = str(uuid.uuid4()) token_data['used'] = True token_data['used_by_cookie'] = auth_cookie save_tokens() # Redirect to main page with auth cookie resp = web.Response(status=302) resp.headers['Location'] = '/' resp.set_cookie('secretsanta_auth', auth_cookie, max_age=3600*24*30) # 30 days return resp @routes.get('/') async def index(request): now = datetime.utcnow() message = "" cookie_code = request.cookies.get('secretsanta_code') if cookie_code: entry = get_entry_by_code(cookie_code) if entry: message = "

You've already joined! Use the check form below with your code to see status.

" html = HTML.format( signup_deadline=SIGNUP_DEADLINE.isoformat(), drawing_deadline=DRAWING_DEADLINE.isoformat(), message=message ) resp = web.Response(text=html, content_type='text/html') return resp @routes.get('/candycane.png') async def serve_candycane(request): # actually ncd icon if os.path.exists('ncd.png'): with open('ncd.png', 'rb') as f: data = f.read() return web.Response(body=data, content_type='image/png') return web.Response(status=404) @routes.post('/submit') async def submit(request): now = datetime.utcnow() if now > SIGNUP_DEADLINE: message = "

Signup period is over!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type='text/html', status=400) data = await request.post() if not data.get('address'): message = "

Address is required!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type='text/html', status=400) if not data.get('fedi') and not data.get('email'): message = "

Either Fedi handle or email is required for contact!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type='text/html', status=400) if not data.get('agreement'): message = "

You must agree to the terms!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type='text/html', status=400) # Check for existing via cookie cookie_code = request.cookies.get('secretsanta_code') if cookie_code and get_entry_by_code(cookie_code): message = "

You've already submitted! Check your status below.

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type='text/html', status=400) entry_id = str(uuid.uuid4())[:8] # First 8 chars of UUID code = str(uuid.uuid4())[:8] # Short code too dollar_limit = data.get('dollar_limit') or DEFAULT_DOLLAR_LIMIT try: dollar_limit = float(dollar_limit) except ValueError: dollar_limit = DEFAULT_DOLLAR_LIMIT entry = { 'id': entry_id, 'name': data.get('name', ''), 'fedi': data.get('fedi', ''), 'email': data.get('email', ''), 'dollar_limit': dollar_limit, 'likes_dislikes': data.get('likes_dislikes', ''), 'address': data.get('address'), 'submission_time': now.isoformat(), 'code': code, 'assigned_receiver': None, 'assigned_by': None } persistent.append(entry) pool.append(entry_id) save_data() print(f'Created Entry:\n{entry}\n\n') # Attempt immediate assignment if pool >=3 (including this one) assignment_msg = await try_assign(entry) message = f"

Thanks for joining! Your one-time code: {code} (save it!)

{assignment_msg}

If not assigned, check back later with your code.

" html = SUCCESS_HTML.format(message=message) resp = web.Response(text=html, content_type='text/html') resp.set_cookie('secretsanta_code', code, max_age=3600*24*30) # 30 days return resp async def try_assign(entry): now = datetime.utcnow() if now > DRAWING_DEADLINE: return "

Drawing period is over!

" submission_time = datetime.fromisoformat(entry['submission_time']) time_in_pool = (now - submission_time).total_seconds() / 3600 if entry['assigned_receiver'] is not None: receiver = get_entry_by_id(entry['assigned_receiver']) if receiver: return f"

You've already been assigned! Send to: Name: {receiver['name']}, Address: {receiver['address']}, Limit: ${receiver['dollar_limit']}, Likes/Dislikes: {receiver['likes_dislikes']}

" available = [eid for eid in pool if eid != entry['id']] if not available: return "

Pool is empty right now (or only you left). Check back later!

" if len(pool) < 3 and time_in_pool <= COUPLE_HOURS: return "

Pool is too small (<3 participants). Please check back in a couple hours.

" # Assign random receiver_id = random.choice(available) receiver_entry = get_entry_by_id(receiver_id) receiver_entry['assigned_by'] = entry['id'] entry['assigned_receiver'] = receiver_id pool.remove(receiver_id) save_data() print(f'Assigned id {receiver_id}') return f"

Assigned! Send a gift to: Name: {receiver_entry['name']}, Address: {receiver_entry['address']}, Limit: ${receiver_entry['dollar_limit']}, Likes/Dislikes: {receiver_entry['likes_dislikes']}

" @routes.post('/check') async def check(request): now = datetime.utcnow() data = await request.post() code = data.get('code') if not code: message = "

Code required!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type='text/html', status=400) entry = get_entry_by_code(code) if not entry: message = "

Invalid code!

" html = SUCCESS_HTML.format(message=message) return web.Response(text=html, content_type='text/html', status=400) assignment_msg = await try_assign(entry) message = f"

Your code is valid. Status: {assignment_msg}

" html = SUCCESS_HTML.format(message=message) resp = web.Response(text=html, content_type='text/html') resp.set_cookie('secretsanta_code', code, max_age=3600*24*30) return resp # App setup app = web.Application(middlewares=[auth_middleware]) app.add_routes(routes) async def start_background_tasks(app): app['background_task'] = asyncio.create_task(background_checker()) app.on_startup.append(start_background_tasks) if __name__ == '__main__': web.run_app(app, port=8080)