9a2da60b7d
Note: websocket s2s is not working yet, still WIP. New Features * Chat Federation: * You can now connect to a remote chat with your local account. * This remote connection is done using a custom implementation of [XEP-0468: WebSocket S2S](https://xmpp.org/extensions/xep-0468.html), using some specific discovering method (so that it will work without any DNS configuration). Minor changes and fixes * Possibility to debug Prosody in development environments. * Using process.spawn instead of process.exec to launch Prosody (safer, and more optimal). * Prosody AppImage: fix path mapping: we only map necessary /etc/ subdir, so that the AppImage can access to /etc/resolv.conf, /etc/hosts, ... * Prosody AppImage: hidden debug mode to disable lua-unbound, that seems broken in some docker dev environments.
248 lines
9.0 KiB
Lua
248 lines
9.0 KiB
Lua
module:set_global();
|
|
|
|
-- module:depends("s2s");
|
|
|
|
local path = require "util.paths";
|
|
local json = require "util.json";
|
|
-- local st = require "util.stanza";
|
|
-- local websocket = require "net.websocket";
|
|
-- local server = require "net.server".addclient;
|
|
-- local add_filter = require "util.filters".add_filter;
|
|
-- local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
|
|
-- local s2s_destroy_session = require "core.s2smanager".destroy_session;
|
|
-- local bounce_sendq = module:depends "s2s".route_to_new_session.bounce_sendq;
|
|
-- local portmanager = require "core.portmanager";
|
|
-- local initialize_filters = require "util.filters".initialize;
|
|
|
|
local server_infos_dir = assert(module:get_option_string("peertubelivechat_server_infos_path", nil), "'peertubelivechat_server_infos_path' is a required option");
|
|
local instance_url = assert(module:get_option_string("peertubelivechat_instance_url", nil), "'peertubelivechat_instance_url' is a required option");
|
|
|
|
-- local stanza_size_limit = module:get_option_number("s2s_stanza_size_limit", 1024 * 512);
|
|
-- local frame_buffer_limit = module:get_option_number("websocket_frame_buffer_limit", 2 * stanza_size_limit);
|
|
-- local frame_fragment_limit = module:get_option_number("websocket_frame_fragment_limit", 8);
|
|
|
|
-- local sessions = module:shared("sessions");
|
|
|
|
-- -- The proxy_listener handles connection while still connecting to the remote websocket server,
|
|
-- -- then it hands them over to the normal listener (in mod_s2s)
|
|
-- local proxy_listener = { default_port = nil, default_mode = "*a", default_interface = "*" };
|
|
|
|
-- function proxy_listener.onconnect(conn, ws)
|
|
-- local session = sessions[conn];
|
|
|
|
-- -- Now the real s2s listener can take over the connection.
|
|
-- local listener = portmanager.get_service("s2s").listener;
|
|
|
|
-- local log = session.log;
|
|
|
|
-- local function websocket_close(code, message)
|
|
-- conn:write(build_close(code, message));
|
|
-- conn:close();
|
|
-- end
|
|
-- local function websocket_handle_error(session, code, message)
|
|
-- if code == 1009 then -- stanza size limit exceeded
|
|
-- -- we close the session, rather than the connection,
|
|
-- -- otherwise a resuming client will simply resend the
|
|
-- -- offending stanza
|
|
-- session:close({ condition = "policy-violation", text = "stanza too large" });
|
|
-- else
|
|
-- websocket_close(code, message);
|
|
-- end
|
|
-- end
|
|
|
|
-- initialize_filters(session);
|
|
-- local frameBuffer = dbuffer.new(frame_buffer_limit, frame_fragment_limit);
|
|
-- add_filter(session, "bytes/in", function(data)
|
|
-- if not frameBuffer:write(data) then
|
|
-- session.log("warn", "websocket frame buffer full - terminating session");
|
|
-- session:close({ condition = "resource-constraint", text = "frame buffer exceeded" });
|
|
-- return;
|
|
-- end
|
|
|
|
-- local cache = {};
|
|
-- local frame, length, partial = parse_frame(frameBuffer);
|
|
|
|
-- while frame do
|
|
-- frameBuffer:discard(length);
|
|
-- local result, err_status, err_text = handle_frame(frame);
|
|
-- if not result then
|
|
-- websocket_handle_error(session, err_status, err_text);
|
|
-- break;
|
|
-- end
|
|
-- cache[#cache+1] = filter_open_close(result);
|
|
-- frame, length, partial = parse_frame(frameBuffer);
|
|
-- end
|
|
|
|
-- if partial then
|
|
-- -- The header of the next frame is already in the buffer, run
|
|
-- -- some early validation here
|
|
-- local frame_ok, err_status, err_text = validate_frame(partial, stanza_size_limit);
|
|
-- if not frame_ok then
|
|
-- websocket_handle_error(session, err_status, err_text);
|
|
-- end
|
|
-- end
|
|
|
|
-- return t_concat(cache, "");
|
|
-- end);
|
|
|
|
-- add_filter(session, "stanzas/out", function(stanza)
|
|
-- stanza = st.clone(stanza);
|
|
-- local attr = stanza.attr;
|
|
-- attr.xmlns = attr.xmlns or xmlns_client;
|
|
-- if stanza.name:find("^stream:") then
|
|
-- attr["xmlns:stream"] = attr["xmlns:stream"] or xmlns_streams;
|
|
-- end
|
|
-- return stanza;
|
|
-- end, -1000);
|
|
|
|
-- add_filter(session, "bytes/out", function(data)
|
|
-- return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)});
|
|
-- end);
|
|
-- local filter = session.filters;
|
|
|
|
-- session.version = 1;
|
|
|
|
-- session.sends2s = function (t)
|
|
-- log("debug", "sending (s2s over proxy): %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?"));
|
|
-- if t.name then
|
|
-- t = filter("stanzas/out", t);
|
|
-- end
|
|
-- if t then
|
|
-- t = filter("bytes/out", tostring(t));
|
|
-- if t then
|
|
-- return conn:write(tostring(t));
|
|
-- end
|
|
-- end
|
|
-- end
|
|
|
|
-- session.open_stream = function ()
|
|
-- session.sends2s(st.stanza("stream:stream", {
|
|
-- xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
|
|
-- ["xmlns:stream"]='http://etherx.jabber.org/streams',
|
|
-- from=session.from_host, to=session.to_host, version='1.0', ["xml:lang"]='en'}):top_tag());
|
|
-- end
|
|
|
|
-- conn.setlistener(conn, listener);
|
|
|
|
-- listener.register_outgoing(conn, session);
|
|
|
|
-- listener.onconnect(conn);
|
|
-- end
|
|
|
|
-- function proxy_listener.register_outgoing(conn, session)
|
|
-- session.direction = "outgoing";
|
|
-- sessions[conn] = session;
|
|
-- end
|
|
|
|
-- function proxy_listener.ondisconnect(conn, err)
|
|
-- sessions[conn] = nil;
|
|
-- end
|
|
|
|
function discover_websocket_s2s(event)
|
|
local to_host = event.to_host;
|
|
module:log("debug", "Trying to route to %s", to_host);
|
|
|
|
local f_s2s = io.open(path.join(server_infos_dir, to_host, 's2s'), "r");
|
|
if f_s2s ~= nil then
|
|
io.close(f_s2s);
|
|
module.log("debug", "Remote host is a known Peertube %s that has s2s activated, we will let legacy s2s module handle the connection", to_host);
|
|
return;
|
|
end
|
|
|
|
local f_ws_proxy = io.open(path.join(server_infos_dir, to_host, 'ws-proxy'), "r");
|
|
if f_ws_proxy == nil then
|
|
module:log("debug", "Remote host %s is not a known remote Peertube, we will let legacy s2s module handle the connection", to_host);
|
|
return;
|
|
end
|
|
local content = f_ws_proxy:read("*all");
|
|
io.close(f_ws_proxy);
|
|
|
|
local remote_ws_proxy_conf = json.decode(content);
|
|
if (not remote_ws_proxy_conf) then
|
|
module:log("error", "Remote host %s has empty ws-proxy configuration", to_host);
|
|
return;
|
|
end
|
|
if (not remote_ws_proxy_conf['url']) then
|
|
module:log("error", "Remote host %s has missing Websocket url in ws-proxy configuration", to_host);
|
|
return;
|
|
end
|
|
|
|
module:log("debug", "Found a Websocket endpoint to proxify s2s communications to remote host %s", to_host);
|
|
local properties = {};
|
|
properties["extra_headers"] = {
|
|
["peertube-livechat-ws-proxy-instance-url"] = instance_url;
|
|
};
|
|
properties["url"] = remote_ws_proxy_conf["url"];
|
|
return properties;
|
|
|
|
|
|
|
|
-- local host_session = s2s_new_outgoing(from_host, to_host);
|
|
|
|
-- -- Store in buffer
|
|
-- host_session.bounce_sendq = bounce_sendq;
|
|
-- host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
|
|
-- host_session.log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name));
|
|
|
|
-- local ex = {};
|
|
-- ex.headers = {
|
|
-- ["peertube-livechat-ws-proxy-instance-url"] = instance_url;
|
|
-- ["sec_websocket_protocol"] = 'xmpp';
|
|
-- }
|
|
|
|
-- local ws_listeners = {};
|
|
-- ws_listeners.onopen = function ()
|
|
-- local conn = self.conn;
|
|
-- module:log("debug", "Websocket s2s connection is open, attaching it to the session.");
|
|
-- host_session.conn = conn;
|
|
-- end
|
|
-- ws_listeners.onclose = function (code, message)
|
|
-- module:log("debug", "Closing websocket connection for host %s with code '%s' and message '%s'", to_host, json.encode(code), json.encode(message));
|
|
-- s2s_destroy_session(host_session, 'websocket-proxy-connection-closed');
|
|
-- end
|
|
-- ws_listeners.onerror = function (code)
|
|
-- module:log("debug", "Error on websocket connection for host %s: '%s'", to_host, json.encode(code));
|
|
-- s2s_destroy_session(host_session, 'websocket-proxy-connection-error');
|
|
-- end
|
|
-- ws_listeners.onmessage = function (data, data_type)
|
|
-- module:log("debug", "Receiving %s data for host %s", tostring(data_type), to_host);
|
|
-- -- TODO ...
|
|
-- end
|
|
|
|
-- module:log("debug", "Starting the websocket connection process");
|
|
-- local ws_connection = websocket.connect(remote_ws_proxy_conf['url'], ex, ws_listeners);
|
|
|
|
-- -- local conn = addclient(to_host, nil, proxy_listener, "*a");
|
|
-- -- proxy_listener.register_outgoing(conn, host_session);
|
|
-- -- host_session.conn = conn;
|
|
|
|
-- return true;
|
|
|
|
-- local inject = injected and injected[to_host];
|
|
-- if not inject then return end
|
|
-- module:log("debug", "opening a new outgoing connection for this stanza");
|
|
-- local host_session = new_outgoing(from_host, to_host);
|
|
|
|
-- -- Store in buffer
|
|
-- host_session.bounce_sendq = bounce_sendq;
|
|
-- host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
|
|
-- host_session.log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name));
|
|
|
|
-- local host, port = inject[1] or inject, tonumber(inject[2]) or 5269;
|
|
|
|
-- local conn = addclient(host, port, proxy_listener, "*a");
|
|
|
|
-- proxy_listener.register_outgoing(conn, host_session);
|
|
|
|
-- host_session.conn = conn;
|
|
-- return true;
|
|
end
|
|
|
|
function module.add_host(module)
|
|
module:hook("discover-websocket-s2s", discover_websocket_s2s, -9);
|
|
end
|
|
|
|
if require"core.modulemanager".get_modules_for_host("*"):contains(module.name) then
|
|
module:add_host();
|
|
end
|