From 627b45ffc79a502bccca9cbdd95fce217a6c4aa1 Mon Sep 17 00:00:00 2001 From: John Livingston Date: Tue, 23 May 2023 17:46:47 +0200 Subject: [PATCH] Chat Federation WIP: code cleaning. --- CHANGELOG.md | 1 + .../mod_s2s_peertubelivechat/README.md | 7 +- .../README.md | 12 ++ .../mod_websocket_s2s_peertubelivechat.lua | 202 ++++++------------ 4 files changed, 90 insertions(+), 132 deletions(-) create mode 100644 prosody-modules/mod_websocket_s2s_peertubelivechat/README.md diff --git a/CHANGELOG.md b/CHANGELOG.md index ffd72458..25ef589e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ TODO: mod_s2s_peertubelivechat: dont allow to connect to remote server that are TODO: when sanitizing remote chat endpoint, check that the domain is the same as the video domain (or is room.videodomain.tld). TODO: get remote server chat informations if missing (for now, it can be missing if there is no known remote video from that server). TODO: outgoing s2s connection have a session.secure=true hardcoded. Should not. +TODO: only compatible with Prosody 0.12.x. So it should be documented for people using «system Prosody». And i should fix the ARM AppImage. ### Minor changes and fixes diff --git a/prosody-modules/mod_s2s_peertubelivechat/README.md b/prosody-modules/mod_s2s_peertubelivechat/README.md index bda8723c..f8e7b807 100644 --- a/prosody-modules/mod_s2s_peertubelivechat/README.md +++ b/prosody-modules/mod_s2s_peertubelivechat/README.md @@ -2,4 +2,9 @@ This module is part of peertube-plugin-livechat, and is under the same LICENSE. -This module proxify s2s connections through Peertube if needed. +This module provides some custom Websocket S2S tweaking. + +For example, it ensures you can connect to a remote XMPP server that is not a Peertube instance +(unless you enabled S2S in the plugin). + +It also provides specific Websocket S2S discovery methods. diff --git a/prosody-modules/mod_websocket_s2s_peertubelivechat/README.md b/prosody-modules/mod_websocket_s2s_peertubelivechat/README.md new file mode 100644 index 00000000..aaa5e859 --- /dev/null +++ b/prosody-modules/mod_websocket_s2s_peertubelivechat/README.md @@ -0,0 +1,12 @@ +# mod_websocket_s2s_peertubelivechat + +This module is part of peertube-plugin-livechat, and is under the same LICENSE. + +This module implements [XEP-0468: WebSocket S2S](https://xmpp.org/extensions/xep-0468.html). + +It is not 100% usable in a non-Peertube Prosody. Here is what is missing: + +* should be merged with mod_websocket (by adding an option to enable Websocket S2S) +* should listen on the same HTTP endpoint than mob_websocket +* missing remote Websocket S2S discovering (discovery is done by a `discover-websocket-s2s` hook, with a custom implementation in mod_s2s_peertubelivechat) +* should use net.websocket instead of duplicating code (not possible with the current Prosody code, as net.websocket.connect expect to remain the connection listener) diff --git a/prosody-modules/mod_websocket_s2s_peertubelivechat/mod_websocket_s2s_peertubelivechat.lua b/prosody-modules/mod_websocket_s2s_peertubelivechat/mod_websocket_s2s_peertubelivechat.lua index 26be093d..ef8cb22d 100644 --- a/prosody-modules/mod_websocket_s2s_peertubelivechat/mod_websocket_s2s_peertubelivechat.lua +++ b/prosody-modules/mod_websocket_s2s_peertubelivechat/mod_websocket_s2s_peertubelivechat.lua @@ -5,10 +5,12 @@ module:set_global(); -local add_task = require "util.timer".add_task; +local timer = require "util.timer"; +local add_task = timer.add_task; local add_filter = require "util.filters".add_filter; local sha1 = require "util.hashes".sha1; -local base64 = require "util.encodings".base64.encode; +local base64_encode = require "util.encodings".base64.encode; +local random_bytes = require "util.random".bytes; local st = require "util.stanza"; local parse_xml = require "util.xml".parse; local contains_token = require "util.http".contains_token; @@ -17,9 +19,9 @@ local s2s_new_outgoing = require "core.s2smanager".new_outgoing; local s2s_destroy_session = require "core.s2smanager".destroy_session; local log = module._log; local dbuffer = require "util.dbuffer"; -local new_id = require "util.id".short; +local http = require "net.http"; +local async = require "util.async"; -local websocket = require "net.websocket"; local websocket_frames = require"net.websocket.frames"; local parse_frame = websocket_frames.parse; local build_frame = websocket_frames.build; @@ -42,7 +44,6 @@ local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; module:depends("s2s") local bounce_sendq = module:depends "s2s".route_to_new_session.bounce_sendq; - local sessions = module:shared("s2s/sessions"); local s2s_listener = portmanager.get_service("s2s").listener; @@ -209,7 +210,7 @@ end local function wrap_websocket(session, conn) local log = session.log or log; - log("Debug", "Calling wrap_websocket"); + -- log("Debug", "Calling wrap_websocket"); local function websocket_close(code, message) log("debug", "Websocket close, writing a build_close frame"); @@ -218,7 +219,7 @@ local function wrap_websocket(session, conn) end local function websocket_handle_error(session, code, message) - log("debug", "handling an error"); + log("debug", "handling an error on a websocket s2s"); if code == 1009 then -- stanza size limit exceeded -- we close the session, rather than the connection, -- otherwise a resuming client will simply resend the @@ -256,7 +257,7 @@ local function wrap_websocket(session, conn) local frameBuffer = dbuffer.new(frame_buffer_limit, frame_fragment_limit); add_filter(session, "bytes/in", function(data) - log("debug", "Calling the bytes/in filter"); + -- log("debug", "Calling the bytes/in filter"); if not frameBuffer:write(data) then log("warn", "websocket frame buffer full - terminating session"); @@ -291,7 +292,7 @@ local function wrap_websocket(session, conn) end); add_filter(session, "stanzas/out", function(stanza) - log("debug", "Calling the stanzas/out filter"); + -- log("debug", "Calling the stanzas/out filter"); stanza = st.clone(stanza); local attr = stanza.attr; @@ -303,7 +304,7 @@ local function wrap_websocket(session, conn) end, -1000); add_filter(session, "bytes/out", function(data) - log("debug", "Calling the bytes/out filter"); + -- log("debug", "Calling the bytes/out filter"); return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)}); end); end @@ -351,7 +352,7 @@ function handle_request(event) response.status_code = 101; response.headers.upgrade = "websocket"; response.headers.connection = "Upgrade"; - response.headers.sec_webSocket_accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); + response.headers.sec_webSocket_accept = base64_encode(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); response.headers.sec_webSocket_protocol = "xmpp"; module:fire_event("websocket-session", { session = session, request = request }); @@ -370,99 +371,15 @@ end -- OUTGOING CONNECTIONS +local pending_websocket_close_timeout = 3; -- Seconds to wait after sending close frame until closing connection. --- local pending_ws_connection_methods = {}; --- local pending_ws_connection_mt = { --- __name = "pending_ws_connection"; --- __index = pending_ws_connection_methods; --- __tostring = function (p) --- return ""; --- end; --- }; +local pending_websockets = {}; --- function pending_ws_connection_methods:log(level, message, ...) --- log(level, "[pending connection %s] "..message, self.id, ...); --- end - --- function pending_ws_connection_methods:onopen() --- self:log("debug", "Pending WS Connection onopen event."); --- local conn = self.conn; --- local session = self.data.session; --- log("debug", "Successfully connected"); --- conn:setlistener(s2s_listener); --- s2s_listener.onconnect(conn); --- wrap_websocket(session, conn); --- end - --- function pending_ws_connection_methods:onclose(code, message) --- self:log("debug", "Pending WS Connection onclose event."); --- -- FIXME/TODO: what can i do?? --- -- if p.listeners.onfail then --- -- p.listeners.onfail(p.data, p.last_error or "unable to connect to websocket"); --- -- end --- end - --- -- pending_ws_connections_map[ws_connection] = pending_connection --- local pending_ws_connections_map = {}; --- local pending_ws_connection_listeners = {}; - --- function pending_ws_connection_listeners.onopen(ws_connection) --- log("debug", "Pending WS Connection onopen event."); - --- local p = pending_ws_connections_map[ws_connection]; --- if not p then --- if ws_connection.conn then --- module:log("warn", "Successful connection, but unexpected! Closing."); --- ws_connection.conn:close(); --- else --- module:log("error", "Successful connection, but unexpected, and no conn attribute!"); --- end --- return; --- end --- pending_ws_connections_map[ws_connection] = nil; --- local conn = ws_connection.conn; --- p:log("debug", "Successfully connected"); --- conn:setlistener(p.listeners, p.data); --- p.listeners.onconnect(conn); --- wrap_websocket(session, conn); --- end - --- function pending_ws_connection_listeners.onclose(ws_connection, reason) --- log("debug", "Pending WS Connection onclose event."); - --- local p = pending_ws_connections_map[ws_connection]; --- if not p then --- module:log("warn", "Failed connection, but unexpected!"); --- return; --- end --- p.last_error = reason or "unknown reason"; --- p:log("debug", "Connection attempt failed: %s", p.last_error); --- if p.listeners.onfail then --- p.listeners.onfail(p.data, p.last_error or "unable to connect to websocket"); --- end --- end - - --- DIRTY HACK BEGINS: copying net.websocket code... because net.websocket tries to handle incomming data, but we don't want to. -local t_concat = table.concat; - -local http = require "net.http"; -local frames = require "net.websocket.frames"; -local base64 = require "util.encodings".base64; -local sha1 = require "util.hashes".sha1; -local random_bytes = require "util.random".bytes; -local timer = require "util.timer"; -local log = require "util.logger".init "websocket"; - -local close_timeout = 3; -- Seconds to wait after sending close frame until closing connection. - -local websockets = {}; - -local websocket_listeners = {}; -function websocket_listeners.ondisconnect(conn, err) - local s = websockets[conn]; +local pending_websocket_listeners = {}; +function pending_websocket_listeners.ondisconnect(conn, err) + local s = pending_websockets[conn]; if not s then return; end - websockets[conn] = nil; + pending_websockets[conn] = nil; if s.close_timer then timer.stop(s.close_timer); s.close_timer = nil; @@ -471,33 +388,25 @@ function websocket_listeners.ondisconnect(conn, err) if s.close_code == nil and s.onerror then s:onerror(err); end if s.onclose then s:onclose(s.close_code, s.close_message or err); end end - -function websocket_listeners.ondetach(conn) - websockets[conn] = nil; +function pending_websocket_listeners.ondetach(conn) + pending_websockets[conn] = nil; end -local function fail(s, code, reason) - log("warn", "WebSocket connection failed, closing. %d %s", code, reason); - s:close(code, reason); - s.conn:close(); - return false -end - -local websocket_methods = {}; +local pending_websocket_methods = {}; local function close_timeout_cb(now, timerid, s) -- luacheck: ignore 212/now 212/timerid s.close_timer = nil; log("warn", "Close timeout waiting for server to close, closing manually."); s.conn:close(); end -function websocket_methods:close(code, reason) +function pending_websocket_methods:close(code, reason) if self.readyState < 2 then code = code or 1000; log("debug", "closing WebSocket with code %i: %s" , code , reason); self.readyState = 2; local conn = self.conn; - conn:write(frames.build_close(code, reason, true)); + conn:write(websocket_frames.build_close(code, reason, true)); -- Do not close socket straight away, wait for acknowledgement from server. - self.close_timer = timer.add_task(close_timeout, close_timeout_cb, self); + self.close_timer = add_task(pending_websocket_close_timeout, close_timeout_cb, self); elseif self.readyState == 2 then log("debug", "tried to close a closing WebSocket, closing the raw socket."); -- Stop timer @@ -511,7 +420,7 @@ function websocket_methods:close(code, reason) log("debug", "tried to close a closed WebSocket, ignoring."); end end -function websocket_methods:send(data, opcode) +function pending_websocket_methods:send(data, opcode) if self.readyState < 1 then return nil, "WebSocket not open yet, unable to send data."; elseif self.readyState >= 2 then @@ -528,13 +437,14 @@ function websocket_methods:send(data, opcode) opcode = opcode; data = tostring(data); }; - log("debug", "WebSocket sending frame: opcode=%0x, %i bytes", frame.opcode, #frame.data); - return self.conn:write(frames.build(frame)); + log("debug", "Pending S2S WebSocket sending frame: opcode=%0x, %i bytes", frame.opcode, #frame.data); + return self.conn:write(websocket_frames.build(frame)); end -local websocket_metatable = { - __index = websocket_methods; +local pending_websocket_metatable = { + __index = pending_websocket_methods; }; + local function custom_connect(url, ex, listeners) ex = ex or {}; @@ -545,7 +455,7 @@ local function custom_connect(url, ex, listeners) been base64-encoded (see Section 4 of [RFC4648]). The nonce MUST be selected randomly for each connection. ]] - local key = base64.encode(random_bytes(16)); + local key = base64_encode(random_bytes(16)); -- Either a single protocol string or an array of protocol strings. local protocol = ex.protocol; @@ -589,7 +499,7 @@ local function custom_connect(url, ex, listeners) onclose = listeners.onclose; onmessage = listeners.onmessage; onerror = listeners.onerror; - }, websocket_metatable); + }, pending_websocket_metatable); local http_url = url:gsub("^(ws)", "http"); local http_req = http.request(http_url, { -- luacheck: ignore 211/http_req @@ -601,7 +511,7 @@ local function custom_connect(url, ex, listeners) if c ~= 101 or r.headers["connection"]:lower() ~= "upgrade" or r.headers["upgrade"] ~= "websocket" - or r.headers["sec-websocket-accept"] ~= base64.encode(sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")) + or r.headers["sec-websocket-accept"] ~= base64_encode(sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")) or (protocol and not protocol[r.headers["sec-websocket-protocol"]]) then s.readyState = 3; @@ -616,22 +526,20 @@ local function custom_connect(url, ex, listeners) local conn = http_req.conn; http_req.conn = nil; s.conn = conn; - websockets[conn] = s; - conn:setlistener(websocket_listeners); + pending_websockets[conn] = s; + conn:setlistener(pending_websocket_listeners); log("debug", "WebSocket connected successfully to %s", url); s.readyState = 1; - if s.onopen then s:onopen(); end - -- websocket_listeners.onincoming(conn, b); + if s.onopen then s:onopen(); end -- this will detach pending_websocket_listeners end); return s; end --- DIRTY HACK END function route_to_new_session(event) local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; - log("debug", "Trying to route to %s", to_host); + log("debug", "Trying to route to %s using Websocket S2S", to_host); local ws_properties = module:fire_event("discover-websocket-s2s", { to_host = to_host }); if not ws_properties then @@ -659,15 +567,17 @@ function route_to_new_session(event) ex["headers"] = ws_properties.extra_headers or {}; ex["protocol"] = "xmpp"; - session.log("debug", "Starting the s2s websocket connection process"); + -- now we start using the session logger local log = session.log; + log("debug", "Starting the s2s websocket connection process"); local function onopen(s) log("debug", "Outgoing Websocket S2S: Successfully connected"); local conn = s.conn; - conn.starttls = false; -- FIXME: it is needed? Prevent mod_tls from believing starttls can be done + conn.starttls = false; -- Prevent mod_tls from believing starttls can be done. FIXME: is this really needed? session.conn = conn; wrap_websocket(session, conn); + -- Switching to the s2s listener. conn:setlistener(s2s_listener); s2s_listener.register_outgoing(conn, session); s2s_listener.onconnect(conn); @@ -679,6 +589,12 @@ function route_to_new_session(event) s2s_destroy_session(session, message or code or "unable to connect to websocket"); end + -- Note: we should use net.websocket.connect to connect to the remote server. + -- But at time i'm writing this comment (Prosody 0.12.3), this function calls + -- `websocket_listeners.onincoming(conn, b);` just after the `onopen` function + -- is called. But here, we switch the connection listener to use the + -- s2s_listener as soon as the connection is open. So it can't work. + -- That's why I use net.http, and handle the Websocket handshake by hand. local ws_connection = custom_connect(ws_properties['url'], ex, { onopen = onopen; onclose = onclose; @@ -687,6 +603,30 @@ function route_to_new_session(event) return true; end + +module:hook("server-stopping", function(event) + -- Stop opening new connections + log("debug", "Unhooking route/remote to stop accepting new outgoing connections"); + module:unhook("route/remote", route_to_new_session); + + log("debug", "Closing pending Websocket S2S connections"); + local wait, done = async.waiter(1, true); + + -- Close pending websockets + local reason = event.reason; + for conn, pending_websocket in pairs(pending_websockets) do + log("debug", "Found a pending connection, closing it."); + pending_websocket.close(nil, reason); + end + + -- Wait for them to close properly if they haven't already + if next(pending_websockets) ~= nil then + module:log("info", "Waiting for pending websocket sessions to close"); + add_task(stream_close_timeout + 1, function () done() end); + wait(); + end +end, -100); + function module.add_host(module) module:hook("s2s-read-timeout", keepalive, -0.9);