Chat Federation WIP:
Found that net.websocket is not appropriate. This commit is a proof of concept. It has to be cleaned. Still not working, but close!
This commit is contained in:
parent
cd3afe2b26
commit
5a945a0cb7
@ -142,16 +142,22 @@ function discover_websocket_s2s(event)
|
||||
local to_host = event.to_host;
|
||||
module:log("debug", "Searching websocket s2s for remote host %s", to_host);
|
||||
|
||||
local f_s2s = io.open(path.join(server_infos_dir, to_host, 's2s'), "r");
|
||||
-- FIXME: dont to this room. prefix thing. Peertube should create needed files.
|
||||
local to_host_room = to_host;
|
||||
if string.sub(to_host_room, 1, 5) ~= 'room.' then
|
||||
to_host_room = 'room.'..to_host_room;
|
||||
end
|
||||
|
||||
local f_s2s = io.open(path.join(server_infos_dir, to_host_room, 's2s'), "r");
|
||||
if f_s2s ~= nil then
|
||||
io.close(f_s2s);
|
||||
module.log("debug", "Remote host is a known Peertube %s that has s2s activated, we will let legacy s2s module handle the connection", to_host);
|
||||
module.log("debug", "Remote host is a known Peertube %s that has s2s activated, we will let legacy s2s module handle the connection", to_host_room);
|
||||
return;
|
||||
end
|
||||
|
||||
local f_ws_proxy = io.open(path.join(server_infos_dir, to_host, 'ws-s2s'), "r");
|
||||
local f_ws_proxy = io.open(path.join(server_infos_dir, to_host_room, 'ws-s2s'), "r");
|
||||
if f_ws_proxy == nil then
|
||||
module:log("debug", "Remote host %s is not a known remote Peertube, we will let legacy s2s module handle the connection", to_host);
|
||||
module:log("debug", "Remote host %s is not a known remote Peertube, we will let legacy s2s module handle the connection", to_host_room);
|
||||
return;
|
||||
end
|
||||
local content = f_ws_proxy:read("*all");
|
||||
|
@ -59,7 +59,7 @@ local function session_open_stream(session, from, to)
|
||||
if session.stream_attrs then
|
||||
session:stream_attrs(from, to, attr)
|
||||
end
|
||||
session.send(st.stanza("open", attr));
|
||||
session.sends2s(st.stanza("open", attr));
|
||||
end
|
||||
|
||||
local function session_close(session, reason)
|
||||
@ -89,11 +89,11 @@ local function session_close(session, reason)
|
||||
end
|
||||
end
|
||||
log("debug", "Disconnecting s2s websocket server, <stream:error> is: %s", stream_error);
|
||||
session.send(stream_error);
|
||||
session.sends2s(stream_error);
|
||||
end
|
||||
|
||||
session.send(st.stanza("close", { xmlns = xmlns_framing }));
|
||||
function session.send() return false; end
|
||||
session.sends2s(st.stanza("close", { xmlns = xmlns_framing }));
|
||||
function session.sends2s() return false; end
|
||||
|
||||
-- luacheck: ignore 422/reason
|
||||
-- FIXME reason should be handled in common place
|
||||
@ -352,7 +352,7 @@ function handle_request(event)
|
||||
|
||||
module:fire_event("websocket-session", { session = session, request = request });
|
||||
|
||||
log("debug", "Sending WebSocket handshake");
|
||||
log("debug", "Sending S2S WebSocket handshake");
|
||||
|
||||
return "";
|
||||
end
|
||||
@ -439,6 +439,192 @@ end
|
||||
-- end
|
||||
|
||||
|
||||
-- DIRTY HACK BEGINS: copying net.websocket code... because net.websocket tries to handle incomming data, but we don't want to.
|
||||
local t_concat = table.concat;
|
||||
|
||||
local http = require "net.http";
|
||||
local frames = require "net.websocket.frames";
|
||||
local base64 = require "util.encodings".base64;
|
||||
local sha1 = require "util.hashes".sha1;
|
||||
local random_bytes = require "util.random".bytes;
|
||||
local timer = require "util.timer";
|
||||
local log = require "util.logger".init "websocket";
|
||||
|
||||
local close_timeout = 3; -- Seconds to wait after sending close frame until closing connection.
|
||||
|
||||
local websockets = {};
|
||||
|
||||
local websocket_listeners = {};
|
||||
function websocket_listeners.ondisconnect(conn, err)
|
||||
local s = websockets[conn];
|
||||
if not s then return; end
|
||||
websockets[conn] = nil;
|
||||
if s.close_timer then
|
||||
timer.stop(s.close_timer);
|
||||
s.close_timer = nil;
|
||||
end
|
||||
s.readyState = 3;
|
||||
if s.close_code == nil and s.onerror then s:onerror(err); end
|
||||
if s.onclose then s:onclose(s.close_code, s.close_message or err); end
|
||||
end
|
||||
|
||||
function websocket_listeners.ondetach(conn)
|
||||
websockets[conn] = nil;
|
||||
end
|
||||
|
||||
local function fail(s, code, reason)
|
||||
log("warn", "WebSocket connection failed, closing. %d %s", code, reason);
|
||||
s:close(code, reason);
|
||||
s.conn:close();
|
||||
return false
|
||||
end
|
||||
|
||||
local websocket_methods = {};
|
||||
local function close_timeout_cb(now, timerid, s) -- luacheck: ignore 212/now 212/timerid
|
||||
s.close_timer = nil;
|
||||
log("warn", "Close timeout waiting for server to close, closing manually.");
|
||||
s.conn:close();
|
||||
end
|
||||
function websocket_methods:close(code, reason)
|
||||
if self.readyState < 2 then
|
||||
code = code or 1000;
|
||||
log("debug", "closing WebSocket with code %i: %s" , code , reason);
|
||||
self.readyState = 2;
|
||||
local conn = self.conn;
|
||||
conn:write(frames.build_close(code, reason, true));
|
||||
-- Do not close socket straight away, wait for acknowledgement from server.
|
||||
self.close_timer = timer.add_task(close_timeout, close_timeout_cb, self);
|
||||
elseif self.readyState == 2 then
|
||||
log("debug", "tried to close a closing WebSocket, closing the raw socket.");
|
||||
-- Stop timer
|
||||
if self.close_timer then
|
||||
timer.stop(self.close_timer);
|
||||
self.close_timer = nil;
|
||||
end
|
||||
local conn = self.conn;
|
||||
conn:close();
|
||||
else
|
||||
log("debug", "tried to close a closed WebSocket, ignoring.");
|
||||
end
|
||||
end
|
||||
function websocket_methods:send(data, opcode)
|
||||
if self.readyState < 1 then
|
||||
return nil, "WebSocket not open yet, unable to send data.";
|
||||
elseif self.readyState >= 2 then
|
||||
return nil, "WebSocket closed, unable to send data.";
|
||||
end
|
||||
if opcode == "text" or opcode == nil then
|
||||
opcode = 0x1;
|
||||
elseif opcode == "binary" then
|
||||
opcode = 0x2;
|
||||
end
|
||||
local frame = {
|
||||
FIN = true;
|
||||
MASK = true; -- RFC 6455 6.1.5: If the data is being sent by the client, the frame(s) MUST be masked
|
||||
opcode = opcode;
|
||||
data = tostring(data);
|
||||
};
|
||||
log("debug", "WebSocket sending frame: opcode=%0x, %i bytes", frame.opcode, #frame.data);
|
||||
return self.conn:write(frames.build(frame));
|
||||
end
|
||||
|
||||
local websocket_metatable = {
|
||||
__index = websocket_methods;
|
||||
};
|
||||
local function custom_connect(url, ex, listeners)
|
||||
ex = ex or {};
|
||||
|
||||
--[[RFC 6455 4.1.7:
|
||||
The request MUST include a header field with the name
|
||||
|Sec-WebSocket-Key|. The value of this header field MUST be a
|
||||
nonce consisting of a randomly selected 16-byte value that has
|
||||
been base64-encoded (see Section 4 of [RFC4648]). The nonce
|
||||
MUST be selected randomly for each connection.
|
||||
]]
|
||||
local key = base64.encode(random_bytes(16));
|
||||
|
||||
-- Either a single protocol string or an array of protocol strings.
|
||||
local protocol = ex.protocol;
|
||||
if type(protocol) == "string" then
|
||||
protocol = { protocol, [protocol] = true };
|
||||
elseif type(protocol) == "table" and protocol[1] then
|
||||
for _, v in ipairs(protocol) do
|
||||
protocol[v] = true;
|
||||
end
|
||||
else
|
||||
protocol = nil;
|
||||
end
|
||||
|
||||
local headers = {
|
||||
["Upgrade"] = "websocket";
|
||||
["Connection"] = "Upgrade";
|
||||
["Sec-WebSocket-Key"] = key;
|
||||
["Sec-WebSocket-Protocol"] = protocol and t_concat(protocol, ", ");
|
||||
["Sec-WebSocket-Version"] = "13";
|
||||
["Sec-WebSocket-Extensions"] = ex.extensions;
|
||||
}
|
||||
if ex.headers then
|
||||
for k,v in pairs(ex.headers) do
|
||||
headers[k] = v;
|
||||
end
|
||||
end
|
||||
|
||||
local s = setmetatable({
|
||||
readbuffer = "";
|
||||
databuffer = nil;
|
||||
conn = nil;
|
||||
close_code = nil;
|
||||
close_message = nil;
|
||||
close_timer = nil;
|
||||
readyState = 0;
|
||||
protocol = nil;
|
||||
|
||||
url = url;
|
||||
|
||||
onopen = listeners.onopen;
|
||||
onclose = listeners.onclose;
|
||||
onmessage = listeners.onmessage;
|
||||
onerror = listeners.onerror;
|
||||
}, websocket_metatable);
|
||||
|
||||
local http_url = url:gsub("^(ws)", "http");
|
||||
local http_req = http.request(http_url, { -- luacheck: ignore 211/http_req
|
||||
method = "GET";
|
||||
headers = headers;
|
||||
sslctx = ex.sslctx;
|
||||
insecure = ex.insecure;
|
||||
}, function(b, c, r, http_req)
|
||||
if c ~= 101
|
||||
or r.headers["connection"]:lower() ~= "upgrade"
|
||||
or r.headers["upgrade"] ~= "websocket"
|
||||
or r.headers["sec-websocket-accept"] ~= base64.encode(sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
|
||||
or (protocol and not protocol[r.headers["sec-websocket-protocol"]])
|
||||
then
|
||||
s.readyState = 3;
|
||||
log("warn", "WebSocket connection to %s failed: %s", url, b);
|
||||
if s.onerror then s:onerror("connecting-failed"); end
|
||||
return;
|
||||
end
|
||||
|
||||
s.protocol = r.headers["sec-websocket-protocol"];
|
||||
|
||||
-- Take possession of socket from http
|
||||
local conn = http_req.conn;
|
||||
http_req.conn = nil;
|
||||
s.conn = conn;
|
||||
websockets[conn] = s;
|
||||
conn:setlistener(websocket_listeners);
|
||||
|
||||
log("debug", "WebSocket connected successfully to %s", url);
|
||||
s.readyState = 1;
|
||||
if s.onopen then s:onopen(); end
|
||||
-- websocket_listeners.onincoming(conn, b);
|
||||
end);
|
||||
|
||||
return s;
|
||||
end
|
||||
-- DIRTY HACK END
|
||||
|
||||
function route_to_new_session(event)
|
||||
local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
|
||||
log("debug", "Trying to route to %s", to_host);
|
||||
@ -474,6 +660,7 @@ function route_to_new_session(event)
|
||||
local function onopen(s)
|
||||
log("debug", "Outgoing Websocket S2S: Successfully connected");
|
||||
local conn = s.conn;
|
||||
conn.starttls = false; -- FIXME: it is needed? Prevent mod_tls from believing starttls can be done
|
||||
session.conn = conn;
|
||||
wrap_websocket(session, conn);
|
||||
conn:setlistener(s2s_listener);
|
||||
@ -487,7 +674,7 @@ function route_to_new_session(event)
|
||||
s2s_destroy_session(session, message or code or "unable to connect to websocket");
|
||||
end
|
||||
|
||||
local ws_connection = websocket.connect(ws_properties['url'], ex, {
|
||||
local ws_connection = custom_connect(ws_properties['url'], ex, {
|
||||
onopen = onopen;
|
||||
onclose = onclose;
|
||||
});
|
||||
|
@ -296,6 +296,8 @@ class ProsodyConfigContent {
|
||||
this.global.set('peertubelivechat_instance_url', publicServerUrl)
|
||||
|
||||
this.global.add('modules_enabled', 'websocket_s2s_peertubelivechat')
|
||||
// FIXME: seems to be necessary to add the module on the muc host, so that dialback can trigger route/remote.
|
||||
this.muc.add('modules_enabled', 'websocket_s2s_peertubelivechat')
|
||||
|
||||
this.muc.add('modules_enabled', 'dialback') // This allows s2s connections without certicicates!
|
||||
this.authenticated?.add('modules_enabled', 'dialback') // This allows s2s connections without certicicates!
|
||||
|
Loading…
x
Reference in New Issue
Block a user