From cd3afe2b26568475a97d5a8a654a649421e80e21 Mon Sep 17 00:00:00 2001 From: John Livingston Date: Mon, 22 May 2023 11:23:15 +0200 Subject: [PATCH] Chat Federation WIP: Trying to fix some non-working code. Still not working. --- .../mod_s2s_peertubelivechat.lua | 2 +- .../mod_websocket_s2s_peertubelivechat.lua | 168 ++++++++++++------ 2 files changed, 110 insertions(+), 60 deletions(-) diff --git a/prosody-modules/mod_s2s_peertubelivechat/mod_s2s_peertubelivechat.lua b/prosody-modules/mod_s2s_peertubelivechat/mod_s2s_peertubelivechat.lua index 92a889d7..ec0f1878 100644 --- a/prosody-modules/mod_s2s_peertubelivechat/mod_s2s_peertubelivechat.lua +++ b/prosody-modules/mod_s2s_peertubelivechat/mod_s2s_peertubelivechat.lua @@ -140,7 +140,7 @@ local instance_url = assert(module:get_option_string("peertubelivechat_instance_ function discover_websocket_s2s(event) local to_host = event.to_host; - module:log("debug", "Trying to route to %s", to_host); + module:log("debug", "Searching websocket s2s for remote host %s", to_host); local f_s2s = io.open(path.join(server_infos_dir, to_host, 's2s'), "r"); if f_s2s ~= nil then diff --git a/prosody-modules/mod_websocket_s2s_peertubelivechat/mod_websocket_s2s_peertubelivechat.lua b/prosody-modules/mod_websocket_s2s_peertubelivechat/mod_websocket_s2s_peertubelivechat.lua index f1fa1f4b..c520698d 100644 --- a/prosody-modules/mod_websocket_s2s_peertubelivechat/mod_websocket_s2s_peertubelivechat.lua +++ b/prosody-modules/mod_websocket_s2s_peertubelivechat/mod_websocket_s2s_peertubelivechat.lua @@ -47,6 +47,8 @@ local s2s_listener = portmanager.get_service("s2s").listener; --- Session methods local function session_open_stream(session, from, to) + local log = session.log or log; + log("debug", "Opening stream from %s to %to.", from, to); local attr = { xmlns = xmlns_framing, ["xml:lang"] = "en", @@ -203,12 +205,16 @@ local function validate_frame(frame, max_length) end local function wrap_websocket(session, conn) + local log = session.log or log; + local function websocket_close(code, message) + log("debug", "Websocket close, writing a build_close frame"); conn:write(build_close(code, message)); conn:close(); end local function websocket_handle_error(session, code, message) + log("debug", "handling an error"); if code == 1009 then -- stanza size limit exceeded -- we close the session, rather than the connection, -- otherwise a resuming client will simply resend the @@ -220,7 +226,7 @@ local function wrap_websocket(session, conn) end local function handle_frame(frame) - module:log("debug", "Websocket received frame: opcode=%0x, %i bytes", frame.opcode, #frame.data); + log("debug", "Websocket received frame: opcode=%0x, %i bytes", frame.opcode, #frame.data); -- Check frame makes sense local frame_ok, err_status, err_text = validate_frame(frame, stanza_size_limit); @@ -246,8 +252,10 @@ local function wrap_websocket(session, conn) local frameBuffer = dbuffer.new(frame_buffer_limit, frame_fragment_limit); add_filter(session, "bytes/in", function(data) + log("debug", "Calling the bytes/in filter"); + if not frameBuffer:write(data) then - session.log("warn", "websocket frame buffer full - terminating session"); + log("warn", "websocket frame buffer full - terminating session"); session:close({ condition = "resource-constraint", text = "frame buffer exceeded" }); return; end @@ -279,6 +287,8 @@ local function wrap_websocket(session, conn) end); add_filter(session, "stanzas/out", function(stanza) + log("debug", "Calling the stanzas/out filter"); + stanza = st.clone(stanza); local attr = stanza.attr; attr.xmlns = attr.xmlns or xmlns_client; @@ -289,6 +299,7 @@ local function wrap_websocket(session, conn) end, -1000); add_filter(session, "bytes/out", function(data) + log("debug", "Calling the bytes/out filter"); return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)}); end); end @@ -303,7 +314,7 @@ function handle_request(event) return module:fire_event("http-message", { response = event.response; --- - title = "Prosody WebSocket endpoint"; + title = "Prosody S2s WebSocket endpoint"; message = websocket_get_response_text; warning = not (consider_websocket_secure or request.secure) and "This endpoint is not considered secure!" or nil; }) or websocket_get_response_body; @@ -312,7 +323,7 @@ function handle_request(event) local wants_xmpp = contains_token(request.headers.sec_websocket_protocol or "", "xmpp"); if not wants_xmpp then - module:log("debug", "Client didn't want to talk XMPP, list of protocols was %s", request.headers.sec_websocket_protocol or "(empty)"); + log("debug", "Client didn't want to talk XMPP, list of protocols was %s", request.headers.sec_websocket_protocol or "(empty)"); return 501; end @@ -341,7 +352,7 @@ function handle_request(event) module:fire_event("websocket-session", { session = session, request = request }); - session.log("debug", "Sending WebSocket handshake"); + log("debug", "Sending WebSocket handshake"); return ""; end @@ -356,68 +367,91 @@ end -- OUTGOING CONNECTIONS -local pending_ws_connection_methods = {}; -local pending_ws_connection_mt = { - __name = "pending_ws_connection"; - __index = pending_ws_connection_methods; - __tostring = function (p) - return ""; - end; -}; +-- local pending_ws_connection_methods = {}; +-- local pending_ws_connection_mt = { +-- __name = "pending_ws_connection"; +-- __index = pending_ws_connection_methods; +-- __tostring = function (p) +-- return ""; +-- end; +-- }; -function pending_ws_connection_methods:log(level, message, ...) - log(level, "[pending connection %s] "..message, self.id, ...); -end +-- function pending_ws_connection_methods:log(level, message, ...) +-- log(level, "[pending connection %s] "..message, self.id, ...); +-- end --- pending_ws_connections_map[ws_connection] = pending_connection -local pending_ws_connections_map = {}; -local pending_ws_connection_listeners = {}; +-- function pending_ws_connection_methods:onopen() +-- self:log("debug", "Pending WS Connection onopen event."); +-- local conn = self.conn; +-- local session = self.data.session; +-- log("debug", "Successfully connected"); +-- conn:setlistener(s2s_listener); +-- s2s_listener.onconnect(conn); +-- wrap_websocket(session, conn); +-- end -function pending_ws_connection_listeners.onopen(ws_connection) - local p = pending_ws_connections_map[ws_connection]; - if not p then - if ws_connection.conn then - module:log("warn", "Successful connection, but unexpected! Closing."); - ws_connection.conn:close(); - else - module:log("error", "Successful connection, but unexpected, and no conn attribute!"); - end - return; - end - pending_ws_connections_map[ws_connection] = nil; - local conn = ws_connection.conn; - p:log("debug", "Successfully connected"); - conn:setlistener(p.listeners, p.data); - p.listeners.onconnect(conn); - wrap_websocket(session, conn); -end +-- function pending_ws_connection_methods:onclose(code, message) +-- self:log("debug", "Pending WS Connection onclose event."); +-- -- FIXME/TODO: what can i do?? +-- -- if p.listeners.onfail then +-- -- p.listeners.onfail(p.data, p.last_error or "unable to connect to websocket"); +-- -- end +-- end -function pending_ws_connection_listeners.onclose(ws_connection, reason) - local p = pending_ws_connections_map[ws_connection]; - if not p then - module:log("warn", "Failed connection, but unexpected!"); - return; - end - p.last_error = reason or "unknown reason"; - p:log("debug", "Connection attempt failed: %s", p.last_error); - if p.listeners.onfail then - p.listeners.onfail(p.data, p.last_error or p.target_resolver.last_error or "unable to connect to websocket"); - end -end +-- -- pending_ws_connections_map[ws_connection] = pending_connection +-- local pending_ws_connections_map = {}; +-- local pending_ws_connection_listeners = {}; + +-- function pending_ws_connection_listeners.onopen(ws_connection) +-- log("debug", "Pending WS Connection onopen event."); + +-- local p = pending_ws_connections_map[ws_connection]; +-- if not p then +-- if ws_connection.conn then +-- module:log("warn", "Successful connection, but unexpected! Closing."); +-- ws_connection.conn:close(); +-- else +-- module:log("error", "Successful connection, but unexpected, and no conn attribute!"); +-- end +-- return; +-- end +-- pending_ws_connections_map[ws_connection] = nil; +-- local conn = ws_connection.conn; +-- p:log("debug", "Successfully connected"); +-- conn:setlistener(p.listeners, p.data); +-- p.listeners.onconnect(conn); +-- wrap_websocket(session, conn); +-- end + +-- function pending_ws_connection_listeners.onclose(ws_connection, reason) +-- log("debug", "Pending WS Connection onclose event."); + +-- local p = pending_ws_connections_map[ws_connection]; +-- if not p then +-- module:log("warn", "Failed connection, but unexpected!"); +-- return; +-- end +-- p.last_error = reason or "unknown reason"; +-- p:log("debug", "Connection attempt failed: %s", p.last_error); +-- if p.listeners.onfail then +-- p.listeners.onfail(p.data, p.last_error or "unable to connect to websocket"); +-- end +-- end function route_to_new_session(event) local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; - module:log("debug", "Trying to route to %s", to_host); + log("debug", "Trying to route to %s", to_host); local ws_properties = module:fire_event("discover-websocket-s2s", { to_host = to_host }); if not ws_properties then - module:log("debug", "No websocket s2s capabilities from remote host %s", to_host); + log("debug", "No websocket s2s capabilities from remote host %s", to_host); return; end - module:log("debug", "Found a Websocket endpoint for s2s communications to remote host %s", to_host); + log("debug", "Found a Websocket endpoint for s2s communications to remote host %s", to_host); local session = s2s_new_outgoing(from_host, to_host); + session.version = 1; -- Store in buffer session.bounce_sendq = bounce_sendq; @@ -434,13 +468,29 @@ function route_to_new_session(event) ex["headers"] = ws_properties.extra_headers or {}; ex["protocol"] = "xmpp"; - module:log("debug", "Starting the websocket connection process"); - local p = setmetatable({ - id = new_id(); - listeners = portmanager.get_service("s2s").listener; - data = { session = session }; - }, pending_ws_connection_mt); - local ws_connection = websocket.connect(ws_properties['url'], ex, p); + session.log("debug", "Starting the s2s websocket connection process"); + local log = session.log; + + local function onopen(s) + log("debug", "Outgoing Websocket S2S: Successfully connected"); + local conn = s.conn; + session.conn = conn; + wrap_websocket(session, conn); + conn:setlistener(s2s_listener); + s2s_listener.register_outgoing(conn, session); + s2s_listener.onconnect(conn); + end + + local function onclose(s, code, message) + log("debug", "Pending WS Connection onclose event."); + -- FIXME: is this ok? + s2s_destroy_session(session, message or code or "unable to connect to websocket"); + end + + local ws_connection = websocket.connect(ws_properties['url'], ex, { + onopen = onopen; + onclose = onclose; + }); return true; end