diff --git a/main_invite_only.py b/main_invite_only.py index 103eaa1..1889a56 100644 --- a/main_invite_only.py +++ b/main_invite_only.py @@ -8,84 +8,95 @@ import random import base64 # Constants and Config -BASE_URL = 'rpi.local:8080' -PERSISTENT_FILE = 'persistent.json' -POOL_FILE = 'pool.json' -LOG_FILE = 'secretsanta_noncompliant_log.txt' -TOKENS_FILE = 'tokens.json' -INVITE_URLS_FILE = 'invite_urls.txt' +BASE_URL = "rpi.local:8080" +PERSISTENT_FILE = "persistent.json" +POOL_FILE = "pool.json" +LOG_FILE = "secretsanta_noncompliant_log.txt" +TOKENS_FILE = "tokens.json" +INVITE_URLS_FILE = "invite_urls.txt" COUPLE_HOURS = 2 # hours to wait before forcing assignment DEFAULT_DOLLAR_LIMIT = 50 -CHECK_INTERVAL = 12 * 3600 # 12 hours in seconds +CHECK_INTERVAL = ( + 12 * 3600 +) # 12 hours in seconds to check if we need to purge our data for the year # Deadlines default to current year CURRENT_YEAR = datetime.utcnow().year SIGNUP_DEADLINE = datetime(CURRENT_YEAR, 11, 15, 0, 0, 0) DRAWING_DEADLINE = datetime(CURRENT_YEAR, 12, 15, 0, 0, 0) -# Calculate purge date: 7 days before Christmas of current year +# Calculate purge date: 1 day after Christmas of current year CHRISTMAS = datetime(CURRENT_YEAR, 12, 25) -PURGE_DATE = CHRISTMAS - timedelta(days=7) +PURGE_DATE = CHRISTMAS + timedelta(days=1) + # Load data def load_persistent(): if os.path.exists(PERSISTENT_FILE): - with open(PERSISTENT_FILE, 'r') as f: + with open(PERSISTENT_FILE, "r") as f: return json.load(f) return [] + def load_pool(): if os.path.exists(POOL_FILE): - with open(POOL_FILE, 'r') as f: + with open(POOL_FILE, "r") as f: return json.load(f) return [] + def load_tokens(): if os.path.exists(TOKENS_FILE): - with open(TOKENS_FILE, 'r') as f: + with open(TOKENS_FILE, "r") as f: return json.load(f) return {} + persistent = load_persistent() pool = load_pool() # list of ids available to be drawn (not yet assigned_by someone) tokens = load_tokens() # dict of token -> {used: bool, used_by_cookie: str} + # Save data def save_data(): - with open(PERSISTENT_FILE, 'w') as f: + with open(PERSISTENT_FILE, "w") as f: json.dump(persistent, f, indent=4, default=str) - with open(POOL_FILE, 'w') as f: + with open(POOL_FILE, "w") as f: json.dump(pool, f, indent=4) + def save_tokens(): - with open(TOKENS_FILE, 'w') as f: + with open(TOKENS_FILE, "w") as f: json.dump(tokens, f, indent=4) + # Generate invite tokens def generate_tokens(count=500, base_url=BASE_URL): """Generate invite tokens and save URLs to file""" new_tokens = {} urls = [] - + for _ in range(count): - token = base64.urlsafe_b64encode(uuid.uuid4().bytes).decode('utf-8').rstrip('=') - new_tokens[token] = {'used': False, 'used_by_cookie': None} + token = base64.urlsafe_b64encode(uuid.uuid4().bytes).decode("utf-8").rstrip("=") + new_tokens[token] = {"used": False, "used_by_cookie": None} urls.append(f"{base_url}/invite?token={token}") - + # Save tokens tokens.update(new_tokens) save_tokens() - + # Save URLs to text file - with open(INVITE_URLS_FILE, 'w') as f: - f.write('\n'.join(urls)) - + with open(INVITE_URLS_FILE, "w") as f: + f.write("\n".join(urls)) + print(f"Generated {count} invite tokens. URLs saved to {INVITE_URLS_FILE}") + # Check if we need to generate tokens on startup if not os.path.exists(TOKENS_FILE) or not tokens: generate_tokens() + # Purge data if past purge date def purge_data(): now = datetime.utcnow() @@ -95,25 +106,37 @@ def purge_data(): os.remove(file) print("Data purged due to purge date.") + # Log non-compliant (non-drawers) if past drawing deadline def log_non_compliant(): now = datetime.utcnow() if now > DRAWING_DEADLINE: - non_drawers = [entry for entry in persistent if entry.get('assigned_receiver') is None] + non_drawers = [ + entry for entry in persistent if entry.get("assigned_receiver") is None + ] log_lines = [] for nd in non_drawers: - contact = nd.get('fedi') or nd.get('email') or 'No contact' + contact = nd.get("fedi") or nd.get("email") or "No contact" log_lines.append(f"Non-drawer: ID {nd['id']}, Contact: {contact}") - if nd.get('assigned_by'): - sender = next((e for e in persistent if e['id'] == nd['assigned_by']), None) - sender_contact = sender.get('fedi') or sender.get('email') or 'No contact' if sender else 'Unknown' - log_lines.append(f" Affected sender ID {nd['assigned_by']}, Contact: {sender_contact} - Needs reassignment") + if nd.get("assigned_by"): + sender = next( + (e for e in persistent if e["id"] == nd["assigned_by"]), None + ) + sender_contact = ( + sender.get("fedi") or sender.get("email") or "No contact" + if sender + else "Unknown" + ) + log_lines.append( + f" Affected sender ID {nd['assigned_by']}, Contact: {sender_contact} - Needs reassignment" + ) if log_lines: log_msg = "\n".join(log_lines) + "\n" print(log_msg) # To console - with open(LOG_FILE, 'a') as f: + with open(LOG_FILE, "a") as f: f.write(log_msg) + # Background task for periodic checks async def background_checker(): while True: @@ -121,6 +144,7 @@ async def background_checker(): log_non_compliant() await asyncio.sleep(CHECK_INTERVAL) + # HTML Template for access denied ACCESS_DENIED_HTML = """ @@ -378,76 +402,86 @@ SUCCESS_HTML = """ """ + # Middleware to check authentication @web.middleware async def auth_middleware(request, handler): # Allow access to invite endpoint and static resources - if request.path in ['/invite', '/candycane.png']: + if request.path in ["/invite", "/candycane.png"]: return await handler(request) - + # Check for auth cookie - auth_cookie = request.cookies.get('secretsanta_auth') + auth_cookie = request.cookies.get("secretsanta_auth") if not auth_cookie: - html = ACCESS_DENIED_HTML.format(message="You need a valid invite link to access this page.") - return web.Response(text=html, content_type='text/html', status=403) - + html = ACCESS_DENIED_HTML.format( + message="You need a valid invite link to access this page." + ) + return web.Response(text=html, content_type="text/html", status=403) + # Verify the auth cookie is valid (exists in our tokens and is marked as used) valid = False for token, data in tokens.items(): - if data.get('used_by_cookie') == auth_cookie: + if data.get("used_by_cookie") == auth_cookie: valid = True break - + if not valid: html = ACCESS_DENIED_HTML.format(message="Invalid or expired access token.") - return web.Response(text=html, content_type='text/html', status=403) - + return web.Response(text=html, content_type="text/html", status=403) + return await handler(request) + # Helper to get entry by id def get_entry_by_id(entry_id): - return next((e for e in persistent if e['id'] == entry_id), None) + return next((e for e in persistent if e["id"] == entry_id), None) + # Helper to get entry by code def get_entry_by_code(code): - return next((e for e in persistent if e.get('code') == code), None) + return next((e for e in persistent if e.get("code") == code), None) + # Routes routes = web.RouteTableDef() -@routes.get('/invite') + +@routes.get("/invite") async def invite(request): - token = request.query.get('token') + token = request.query.get("token") if not token: html = ACCESS_DENIED_HTML.format(message="No invite token provided.") - return web.Response(text=html, content_type='text/html', status=400) - + return web.Response(text=html, content_type="text/html", status=400) + token_data = tokens.get(token) if not token_data: html = ACCESS_DENIED_HTML.format(message="Invalid invite token.") - return web.Response(text=html, content_type='text/html', status=400) - - if token_data['used']: - html = ACCESS_DENIED_HTML.format(message="This invite token has already been used.") - return web.Response(text=html, content_type='text/html', status=400) - + return web.Response(text=html, content_type="text/html", status=400) + + if token_data["used"]: + html = ACCESS_DENIED_HTML.format( + message="This invite token has already been used." + ) + return web.Response(text=html, content_type="text/html", status=400) + # Mark token as used and create auth cookie auth_cookie = str(uuid.uuid4()) - token_data['used'] = True - token_data['used_by_cookie'] = auth_cookie + token_data["used"] = True + token_data["used_by_cookie"] = auth_cookie save_tokens() - + # Redirect to main page with auth cookie resp = web.Response(status=302) - resp.headers['Location'] = '/' - resp.set_cookie('secretsanta_auth', auth_cookie, max_age=3600*24*30) # 30 days + resp.headers["Location"] = "/" + resp.set_cookie("secretsanta_auth", auth_cookie, max_age=3600 * 24 * 30) # 30 days return resp -@routes.get('/') + +@routes.get("/") async def index(request): now = datetime.utcnow() message = "" - cookie_code = request.cookies.get('secretsanta_code') + cookie_code = request.cookies.get("secretsanta_code") if cookie_code: entry = get_entry_by_code(cookie_code) if entry: @@ -455,97 +489,100 @@ async def index(request): html = HTML.format( signup_deadline=SIGNUP_DEADLINE.isoformat(), drawing_deadline=DRAWING_DEADLINE.isoformat(), - message=message + message=message, ) - resp = web.Response(text=html, content_type='text/html') + resp = web.Response(text=html, content_type="text/html") return resp -@routes.get('/candycane.png') -async def serve_candycane(request): # actually ncd icon - if os.path.exists('ncd.png'): - with open('ncd.png', 'rb') as f: + +@routes.get("/candycane.png") +async def serve_candycane(request): # actually ncd icon + if os.path.exists("ncd.png"): + with open("ncd.png", "rb") as f: data = f.read() - return web.Response(body=data, content_type='image/png') + return web.Response(body=data, content_type="image/png") return web.Response(status=404) -@routes.post('/submit') + +@routes.post("/submit") async def submit(request): now = datetime.utcnow() if now > SIGNUP_DEADLINE: message = "

Signup period is over!

" html = SUCCESS_HTML.format(message=message) - return web.Response(text=html, content_type='text/html', status=400) + return web.Response(text=html, content_type="text/html", status=400) data = await request.post() - if not data.get('address'): + if not data.get("address"): message = "

Address is required!

" html = SUCCESS_HTML.format(message=message) - return web.Response(text=html, content_type='text/html', status=400) - if not data.get('fedi') and not data.get('email'): + return web.Response(text=html, content_type="text/html", status=400) + if not data.get("fedi") and not data.get("email"): message = "

Either Fedi handle or email is required for contact!

" html = SUCCESS_HTML.format(message=message) - return web.Response(text=html, content_type='text/html', status=400) - if not data.get('agreement'): + return web.Response(text=html, content_type="text/html", status=400) + if not data.get("agreement"): message = "

You must agree to the terms!

" html = SUCCESS_HTML.format(message=message) - return web.Response(text=html, content_type='text/html', status=400) + return web.Response(text=html, content_type="text/html", status=400) # Check for existing via cookie - cookie_code = request.cookies.get('secretsanta_code') + cookie_code = request.cookies.get("secretsanta_code") if cookie_code and get_entry_by_code(cookie_code): message = "

You've already submitted! Check your status below.

" html = SUCCESS_HTML.format(message=message) - return web.Response(text=html, content_type='text/html', status=400) + return web.Response(text=html, content_type="text/html", status=400) entry_id = str(uuid.uuid4())[:8] # First 8 chars of UUID code = str(uuid.uuid4())[:8] # Short code too - dollar_limit = data.get('dollar_limit') or DEFAULT_DOLLAR_LIMIT + dollar_limit = data.get("dollar_limit") or DEFAULT_DOLLAR_LIMIT try: dollar_limit = float(dollar_limit) except ValueError: dollar_limit = DEFAULT_DOLLAR_LIMIT entry = { - 'id': entry_id, - 'name': data.get('name', ''), - 'fedi': data.get('fedi', ''), - 'email': data.get('email', ''), - 'dollar_limit': dollar_limit, - 'likes_dislikes': data.get('likes_dislikes', ''), - 'address': data.get('address'), - 'submission_time': now.isoformat(), - 'code': code, - 'assigned_receiver': None, - 'assigned_by': None + "id": entry_id, + "name": data.get("name", ""), + "fedi": data.get("fedi", ""), + "email": data.get("email", ""), + "dollar_limit": dollar_limit, + "likes_dislikes": data.get("likes_dislikes", ""), + "address": data.get("address"), + "submission_time": now.isoformat(), + "code": code, + "assigned_receiver": None, + "assigned_by": None, } persistent.append(entry) pool.append(entry_id) save_data() - print(f'Created Entry:\n{entry}\n\n') + print(f"Created Entry:\n{entry}\n\n") # Attempt immediate assignment if pool >=3 (including this one) assignment_msg = await try_assign(entry) message = f"

Thanks for joining! Your one-time code: {code} (save it!)

{assignment_msg}

If not assigned, check back later with your code.

" html = SUCCESS_HTML.format(message=message) - resp = web.Response(text=html, content_type='text/html') - resp.set_cookie('secretsanta_code', code, max_age=3600*24*30) # 30 days + resp = web.Response(text=html, content_type="text/html") + resp.set_cookie("secretsanta_code", code, max_age=3600 * 24 * 30) # 30 days return resp + async def try_assign(entry): now = datetime.utcnow() if now > DRAWING_DEADLINE: return "

Drawing period is over!

" - submission_time = datetime.fromisoformat(entry['submission_time']) + submission_time = datetime.fromisoformat(entry["submission_time"]) time_in_pool = (now - submission_time).total_seconds() / 3600 - if entry['assigned_receiver'] is not None: - receiver = get_entry_by_id(entry['assigned_receiver']) + if entry["assigned_receiver"] is not None: + receiver = get_entry_by_id(entry["assigned_receiver"]) if receiver: return f"

You've already been assigned! Send to: Name: {receiver['name']}, Address: {receiver['address']}, Limit: ${receiver['dollar_limit']}, Likes/Dislikes: {receiver['likes_dislikes']}

" - available = [eid for eid in pool if eid != entry['id']] + available = [eid for eid in pool if eid != entry["id"]] if not available: return "

Pool is empty right now (or only you left). Check back later!

" @@ -555,47 +592,50 @@ async def try_assign(entry): # Assign random receiver_id = random.choice(available) receiver_entry = get_entry_by_id(receiver_id) - receiver_entry['assigned_by'] = entry['id'] - entry['assigned_receiver'] = receiver_id + receiver_entry["assigned_by"] = entry["id"] + entry["assigned_receiver"] = receiver_id pool.remove(receiver_id) save_data() - print(f'Assigned id {receiver_id}') + print(f"Assigned id {receiver_id}") return f"

Assigned! Send a gift to: Name: {receiver_entry['name']}, Address: {receiver_entry['address']}, Limit: ${receiver_entry['dollar_limit']}, Likes/Dislikes: {receiver_entry['likes_dislikes']}

" -@routes.post('/check') + +@routes.post("/check") async def check(request): now = datetime.utcnow() data = await request.post() - code = data.get('code') + code = data.get("code") if not code: message = "

Code required!

" html = SUCCESS_HTML.format(message=message) - return web.Response(text=html, content_type='text/html', status=400) + return web.Response(text=html, content_type="text/html", status=400) entry = get_entry_by_code(code) if not entry: message = "

Invalid code!

" html = SUCCESS_HTML.format(message=message) - return web.Response(text=html, content_type='text/html', status=400) + return web.Response(text=html, content_type="text/html", status=400) assignment_msg = await try_assign(entry) message = f"

Your code is valid. Status: {assignment_msg}

" html = SUCCESS_HTML.format(message=message) - resp = web.Response(text=html, content_type='text/html') - resp.set_cookie('secretsanta_code', code, max_age=3600*24*30) + resp = web.Response(text=html, content_type="text/html") + resp.set_cookie("secretsanta_code", code, max_age=3600 * 24 * 30) return resp + # App setup app = web.Application(middlewares=[auth_middleware]) app.add_routes(routes) async def start_background_tasks(app): - app['background_task'] = asyncio.create_task(background_checker()) + app["background_task"] = asyncio.create_task(background_checker()) + app.on_startup.append(start_background_tasks) -if __name__ == '__main__': +if __name__ == "__main__": web.run_app(app, port=8080)