Chat Federation WIP: code cleaning.

This commit is contained in:
John Livingston
2023-05-23 17:46:47 +02:00
parent 8fe48a068f
commit 627b45ffc7
4 changed files with 90 additions and 132 deletions

View File

@ -0,0 +1,12 @@
# mod_websocket_s2s_peertubelivechat
This module is part of peertube-plugin-livechat, and is under the same LICENSE.
This module implements [XEP-0468: WebSocket S2S](https://xmpp.org/extensions/xep-0468.html).
It is not 100% usable in a non-Peertube Prosody. Here is what is missing:
* should be merged with mod_websocket (by adding an option to enable Websocket S2S)
* should listen on the same HTTP endpoint than mob_websocket
* missing remote Websocket S2S discovering (discovery is done by a `discover-websocket-s2s` hook, with a custom implementation in mod_s2s_peertubelivechat)
* should use net.websocket instead of duplicating code (not possible with the current Prosody code, as net.websocket.connect expect to remain the connection listener)

View File

@ -5,10 +5,12 @@
module:set_global();
local add_task = require "util.timer".add_task;
local timer = require "util.timer";
local add_task = timer.add_task;
local add_filter = require "util.filters".add_filter;
local sha1 = require "util.hashes".sha1;
local base64 = require "util.encodings".base64.encode;
local base64_encode = require "util.encodings".base64.encode;
local random_bytes = require "util.random".bytes;
local st = require "util.stanza";
local parse_xml = require "util.xml".parse;
local contains_token = require "util.http".contains_token;
@ -17,9 +19,9 @@ local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
local s2s_destroy_session = require "core.s2smanager".destroy_session;
local log = module._log;
local dbuffer = require "util.dbuffer";
local new_id = require "util.id".short;
local http = require "net.http";
local async = require "util.async";
local websocket = require "net.websocket";
local websocket_frames = require"net.websocket.frames";
local parse_frame = websocket_frames.parse;
local build_frame = websocket_frames.build;
@ -42,7 +44,6 @@ local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
module:depends("s2s")
local bounce_sendq = module:depends "s2s".route_to_new_session.bounce_sendq;
local sessions = module:shared("s2s/sessions");
local s2s_listener = portmanager.get_service("s2s").listener;
@ -209,7 +210,7 @@ end
local function wrap_websocket(session, conn)
local log = session.log or log;
log("Debug", "Calling wrap_websocket");
-- log("Debug", "Calling wrap_websocket");
local function websocket_close(code, message)
log("debug", "Websocket close, writing a build_close frame");
@ -218,7 +219,7 @@ local function wrap_websocket(session, conn)
end
local function websocket_handle_error(session, code, message)
log("debug", "handling an error");
log("debug", "handling an error on a websocket s2s");
if code == 1009 then -- stanza size limit exceeded
-- we close the session, rather than the connection,
-- otherwise a resuming client will simply resend the
@ -256,7 +257,7 @@ local function wrap_websocket(session, conn)
local frameBuffer = dbuffer.new(frame_buffer_limit, frame_fragment_limit);
add_filter(session, "bytes/in", function(data)
log("debug", "Calling the bytes/in filter");
-- log("debug", "Calling the bytes/in filter");
if not frameBuffer:write(data) then
log("warn", "websocket frame buffer full - terminating session");
@ -291,7 +292,7 @@ local function wrap_websocket(session, conn)
end);
add_filter(session, "stanzas/out", function(stanza)
log("debug", "Calling the stanzas/out filter");
-- log("debug", "Calling the stanzas/out filter");
stanza = st.clone(stanza);
local attr = stanza.attr;
@ -303,7 +304,7 @@ local function wrap_websocket(session, conn)
end, -1000);
add_filter(session, "bytes/out", function(data)
log("debug", "Calling the bytes/out filter");
-- log("debug", "Calling the bytes/out filter");
return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)});
end);
end
@ -351,7 +352,7 @@ function handle_request(event)
response.status_code = 101;
response.headers.upgrade = "websocket";
response.headers.connection = "Upgrade";
response.headers.sec_webSocket_accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
response.headers.sec_webSocket_accept = base64_encode(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
response.headers.sec_webSocket_protocol = "xmpp";
module:fire_event("websocket-session", { session = session, request = request });
@ -370,99 +371,15 @@ end
-- OUTGOING CONNECTIONS
local pending_websocket_close_timeout = 3; -- Seconds to wait after sending close frame until closing connection.
-- local pending_ws_connection_methods = {};
-- local pending_ws_connection_mt = {
-- __name = "pending_ws_connection";
-- __index = pending_ws_connection_methods;
-- __tostring = function (p)
-- return "<pending websocket connection "..p.id.." to "..tostring(p.data.session.to_host)..">";
-- end;
-- };
local pending_websockets = {};
-- function pending_ws_connection_methods:log(level, message, ...)
-- log(level, "[pending connection %s] "..message, self.id, ...);
-- end
-- function pending_ws_connection_methods:onopen()
-- self:log("debug", "Pending WS Connection onopen event.");
-- local conn = self.conn;
-- local session = self.data.session;
-- log("debug", "Successfully connected");
-- conn:setlistener(s2s_listener);
-- s2s_listener.onconnect(conn);
-- wrap_websocket(session, conn);
-- end
-- function pending_ws_connection_methods:onclose(code, message)
-- self:log("debug", "Pending WS Connection onclose event.");
-- -- FIXME/TODO: what can i do??
-- -- if p.listeners.onfail then
-- -- p.listeners.onfail(p.data, p.last_error or "unable to connect to websocket");
-- -- end
-- end
-- -- pending_ws_connections_map[ws_connection] = pending_connection
-- local pending_ws_connections_map = {};
-- local pending_ws_connection_listeners = {};
-- function pending_ws_connection_listeners.onopen(ws_connection)
-- log("debug", "Pending WS Connection onopen event.");
-- local p = pending_ws_connections_map[ws_connection];
-- if not p then
-- if ws_connection.conn then
-- module:log("warn", "Successful connection, but unexpected! Closing.");
-- ws_connection.conn:close();
-- else
-- module:log("error", "Successful connection, but unexpected, and no conn attribute!");
-- end
-- return;
-- end
-- pending_ws_connections_map[ws_connection] = nil;
-- local conn = ws_connection.conn;
-- p:log("debug", "Successfully connected");
-- conn:setlistener(p.listeners, p.data);
-- p.listeners.onconnect(conn);
-- wrap_websocket(session, conn);
-- end
-- function pending_ws_connection_listeners.onclose(ws_connection, reason)
-- log("debug", "Pending WS Connection onclose event.");
-- local p = pending_ws_connections_map[ws_connection];
-- if not p then
-- module:log("warn", "Failed connection, but unexpected!");
-- return;
-- end
-- p.last_error = reason or "unknown reason";
-- p:log("debug", "Connection attempt failed: %s", p.last_error);
-- if p.listeners.onfail then
-- p.listeners.onfail(p.data, p.last_error or "unable to connect to websocket");
-- end
-- end
-- DIRTY HACK BEGINS: copying net.websocket code... because net.websocket tries to handle incomming data, but we don't want to.
local t_concat = table.concat;
local http = require "net.http";
local frames = require "net.websocket.frames";
local base64 = require "util.encodings".base64;
local sha1 = require "util.hashes".sha1;
local random_bytes = require "util.random".bytes;
local timer = require "util.timer";
local log = require "util.logger".init "websocket";
local close_timeout = 3; -- Seconds to wait after sending close frame until closing connection.
local websockets = {};
local websocket_listeners = {};
function websocket_listeners.ondisconnect(conn, err)
local s = websockets[conn];
local pending_websocket_listeners = {};
function pending_websocket_listeners.ondisconnect(conn, err)
local s = pending_websockets[conn];
if not s then return; end
websockets[conn] = nil;
pending_websockets[conn] = nil;
if s.close_timer then
timer.stop(s.close_timer);
s.close_timer = nil;
@ -471,33 +388,25 @@ function websocket_listeners.ondisconnect(conn, err)
if s.close_code == nil and s.onerror then s:onerror(err); end
if s.onclose then s:onclose(s.close_code, s.close_message or err); end
end
function websocket_listeners.ondetach(conn)
websockets[conn] = nil;
function pending_websocket_listeners.ondetach(conn)
pending_websockets[conn] = nil;
end
local function fail(s, code, reason)
log("warn", "WebSocket connection failed, closing. %d %s", code, reason);
s:close(code, reason);
s.conn:close();
return false
end
local websocket_methods = {};
local pending_websocket_methods = {};
local function close_timeout_cb(now, timerid, s) -- luacheck: ignore 212/now 212/timerid
s.close_timer = nil;
log("warn", "Close timeout waiting for server to close, closing manually.");
s.conn:close();
end
function websocket_methods:close(code, reason)
function pending_websocket_methods:close(code, reason)
if self.readyState < 2 then
code = code or 1000;
log("debug", "closing WebSocket with code %i: %s" , code , reason);
self.readyState = 2;
local conn = self.conn;
conn:write(frames.build_close(code, reason, true));
conn:write(websocket_frames.build_close(code, reason, true));
-- Do not close socket straight away, wait for acknowledgement from server.
self.close_timer = timer.add_task(close_timeout, close_timeout_cb, self);
self.close_timer = add_task(pending_websocket_close_timeout, close_timeout_cb, self);
elseif self.readyState == 2 then
log("debug", "tried to close a closing WebSocket, closing the raw socket.");
-- Stop timer
@ -511,7 +420,7 @@ function websocket_methods:close(code, reason)
log("debug", "tried to close a closed WebSocket, ignoring.");
end
end
function websocket_methods:send(data, opcode)
function pending_websocket_methods:send(data, opcode)
if self.readyState < 1 then
return nil, "WebSocket not open yet, unable to send data.";
elseif self.readyState >= 2 then
@ -528,13 +437,14 @@ function websocket_methods:send(data, opcode)
opcode = opcode;
data = tostring(data);
};
log("debug", "WebSocket sending frame: opcode=%0x, %i bytes", frame.opcode, #frame.data);
return self.conn:write(frames.build(frame));
log("debug", "Pending S2S WebSocket sending frame: opcode=%0x, %i bytes", frame.opcode, #frame.data);
return self.conn:write(websocket_frames.build(frame));
end
local websocket_metatable = {
__index = websocket_methods;
local pending_websocket_metatable = {
__index = pending_websocket_methods;
};
local function custom_connect(url, ex, listeners)
ex = ex or {};
@ -545,7 +455,7 @@ local function custom_connect(url, ex, listeners)
been base64-encoded (see Section 4 of [RFC4648]). The nonce
MUST be selected randomly for each connection.
]]
local key = base64.encode(random_bytes(16));
local key = base64_encode(random_bytes(16));
-- Either a single protocol string or an array of protocol strings.
local protocol = ex.protocol;
@ -589,7 +499,7 @@ local function custom_connect(url, ex, listeners)
onclose = listeners.onclose;
onmessage = listeners.onmessage;
onerror = listeners.onerror;
}, websocket_metatable);
}, pending_websocket_metatable);
local http_url = url:gsub("^(ws)", "http");
local http_req = http.request(http_url, { -- luacheck: ignore 211/http_req
@ -601,7 +511,7 @@ local function custom_connect(url, ex, listeners)
if c ~= 101
or r.headers["connection"]:lower() ~= "upgrade"
or r.headers["upgrade"] ~= "websocket"
or r.headers["sec-websocket-accept"] ~= base64.encode(sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
or r.headers["sec-websocket-accept"] ~= base64_encode(sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
or (protocol and not protocol[r.headers["sec-websocket-protocol"]])
then
s.readyState = 3;
@ -616,22 +526,20 @@ local function custom_connect(url, ex, listeners)
local conn = http_req.conn;
http_req.conn = nil;
s.conn = conn;
websockets[conn] = s;
conn:setlistener(websocket_listeners);
pending_websockets[conn] = s;
conn:setlistener(pending_websocket_listeners);
log("debug", "WebSocket connected successfully to %s", url);
s.readyState = 1;
if s.onopen then s:onopen(); end
-- websocket_listeners.onincoming(conn, b);
if s.onopen then s:onopen(); end -- this will detach pending_websocket_listeners
end);
return s;
end
-- DIRTY HACK END
function route_to_new_session(event)
local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
log("debug", "Trying to route to %s", to_host);
log("debug", "Trying to route to %s using Websocket S2S", to_host);
local ws_properties = module:fire_event("discover-websocket-s2s", { to_host = to_host });
if not ws_properties then
@ -659,15 +567,17 @@ function route_to_new_session(event)
ex["headers"] = ws_properties.extra_headers or {};
ex["protocol"] = "xmpp";
session.log("debug", "Starting the s2s websocket connection process");
-- now we start using the session logger
local log = session.log;
log("debug", "Starting the s2s websocket connection process");
local function onopen(s)
log("debug", "Outgoing Websocket S2S: Successfully connected");
local conn = s.conn;
conn.starttls = false; -- FIXME: it is needed? Prevent mod_tls from believing starttls can be done
conn.starttls = false; -- Prevent mod_tls from believing starttls can be done. FIXME: is this really needed?
session.conn = conn;
wrap_websocket(session, conn);
-- Switching to the s2s listener.
conn:setlistener(s2s_listener);
s2s_listener.register_outgoing(conn, session);
s2s_listener.onconnect(conn);
@ -679,6 +589,12 @@ function route_to_new_session(event)
s2s_destroy_session(session, message or code or "unable to connect to websocket");
end
-- Note: we should use net.websocket.connect to connect to the remote server.
-- But at time i'm writing this comment (Prosody 0.12.3), this function calls
-- `websocket_listeners.onincoming(conn, b);` just after the `onopen` function
-- is called. But here, we switch the connection listener to use the
-- s2s_listener as soon as the connection is open. So it can't work.
-- That's why I use net.http, and handle the Websocket handshake by hand.
local ws_connection = custom_connect(ws_properties['url'], ex, {
onopen = onopen;
onclose = onclose;
@ -687,6 +603,30 @@ function route_to_new_session(event)
return true;
end
module:hook("server-stopping", function(event)
-- Stop opening new connections
log("debug", "Unhooking route/remote to stop accepting new outgoing connections");
module:unhook("route/remote", route_to_new_session);
log("debug", "Closing pending Websocket S2S connections");
local wait, done = async.waiter(1, true);
-- Close pending websockets
local reason = event.reason;
for conn, pending_websocket in pairs(pending_websockets) do
log("debug", "Found a pending connection, closing it.");
pending_websocket.close(nil, reason);
end
-- Wait for them to close properly if they haven't already
if next(pending_websockets) ~= nil then
module:log("info", "Waiting for pending websocket sessions to close");
add_task(stream_close_timeout + 1, function () done() end);
wait();
end
end, -100);
function module.add_host(module)
module:hook("s2s-read-timeout", keepalive, -0.9);