Chat Federation WIP:

Trying to fix some non-working code. Still not working.
This commit is contained in:
John Livingston 2023-05-22 11:23:15 +02:00
parent ef3417dab0
commit cd3afe2b26
No known key found for this signature in database
GPG Key ID: B17B5640CE66CDBC
2 changed files with 110 additions and 60 deletions

View File

@ -140,7 +140,7 @@ local instance_url = assert(module:get_option_string("peertubelivechat_instance_
function discover_websocket_s2s(event)
local to_host = event.to_host;
module:log("debug", "Trying to route to %s", to_host);
module:log("debug", "Searching websocket s2s for remote host %s", to_host);
local f_s2s = io.open(path.join(server_infos_dir, to_host, 's2s'), "r");
if f_s2s ~= nil then

View File

@ -47,6 +47,8 @@ local s2s_listener = portmanager.get_service("s2s").listener;
--- Session methods
local function session_open_stream(session, from, to)
local log = session.log or log;
log("debug", "Opening stream from %s to %to.", from, to);
local attr = {
xmlns = xmlns_framing,
["xml:lang"] = "en",
@ -203,12 +205,16 @@ local function validate_frame(frame, max_length)
end
local function wrap_websocket(session, conn)
local log = session.log or log;
local function websocket_close(code, message)
log("debug", "Websocket close, writing a build_close frame");
conn:write(build_close(code, message));
conn:close();
end
local function websocket_handle_error(session, code, message)
log("debug", "handling an error");
if code == 1009 then -- stanza size limit exceeded
-- we close the session, rather than the connection,
-- otherwise a resuming client will simply resend the
@ -220,7 +226,7 @@ local function wrap_websocket(session, conn)
end
local function handle_frame(frame)
module:log("debug", "Websocket received frame: opcode=%0x, %i bytes", frame.opcode, #frame.data);
log("debug", "Websocket received frame: opcode=%0x, %i bytes", frame.opcode, #frame.data);
-- Check frame makes sense
local frame_ok, err_status, err_text = validate_frame(frame, stanza_size_limit);
@ -246,8 +252,10 @@ local function wrap_websocket(session, conn)
local frameBuffer = dbuffer.new(frame_buffer_limit, frame_fragment_limit);
add_filter(session, "bytes/in", function(data)
log("debug", "Calling the bytes/in filter");
if not frameBuffer:write(data) then
session.log("warn", "websocket frame buffer full - terminating session");
log("warn", "websocket frame buffer full - terminating session");
session:close({ condition = "resource-constraint", text = "frame buffer exceeded" });
return;
end
@ -279,6 +287,8 @@ local function wrap_websocket(session, conn)
end);
add_filter(session, "stanzas/out", function(stanza)
log("debug", "Calling the stanzas/out filter");
stanza = st.clone(stanza);
local attr = stanza.attr;
attr.xmlns = attr.xmlns or xmlns_client;
@ -289,6 +299,7 @@ local function wrap_websocket(session, conn)
end, -1000);
add_filter(session, "bytes/out", function(data)
log("debug", "Calling the bytes/out filter");
return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)});
end);
end
@ -303,7 +314,7 @@ function handle_request(event)
return module:fire_event("http-message", {
response = event.response;
---
title = "Prosody WebSocket endpoint";
title = "Prosody S2s WebSocket endpoint";
message = websocket_get_response_text;
warning = not (consider_websocket_secure or request.secure) and "This endpoint is not considered secure!" or nil;
}) or websocket_get_response_body;
@ -312,7 +323,7 @@ function handle_request(event)
local wants_xmpp = contains_token(request.headers.sec_websocket_protocol or "", "xmpp");
if not wants_xmpp then
module:log("debug", "Client didn't want to talk XMPP, list of protocols was %s", request.headers.sec_websocket_protocol or "(empty)");
log("debug", "Client didn't want to talk XMPP, list of protocols was %s", request.headers.sec_websocket_protocol or "(empty)");
return 501;
end
@ -341,7 +352,7 @@ function handle_request(event)
module:fire_event("websocket-session", { session = session, request = request });
session.log("debug", "Sending WebSocket handshake");
log("debug", "Sending WebSocket handshake");
return "";
end
@ -356,68 +367,91 @@ end
-- OUTGOING CONNECTIONS
local pending_ws_connection_methods = {};
local pending_ws_connection_mt = {
__name = "pending_ws_connection";
__index = pending_ws_connection_methods;
__tostring = function (p)
return "<pending websocket connection "..p.id.." to "..tostring(p.target_resolver.hostname)..">";
end;
};
-- local pending_ws_connection_methods = {};
-- local pending_ws_connection_mt = {
-- __name = "pending_ws_connection";
-- __index = pending_ws_connection_methods;
-- __tostring = function (p)
-- return "<pending websocket connection "..p.id.." to "..tostring(p.data.session.to_host)..">";
-- end;
-- };
function pending_ws_connection_methods:log(level, message, ...)
log(level, "[pending connection %s] "..message, self.id, ...);
end
-- function pending_ws_connection_methods:log(level, message, ...)
-- log(level, "[pending connection %s] "..message, self.id, ...);
-- end
-- pending_ws_connections_map[ws_connection] = pending_connection
local pending_ws_connections_map = {};
local pending_ws_connection_listeners = {};
-- function pending_ws_connection_methods:onopen()
-- self:log("debug", "Pending WS Connection onopen event.");
-- local conn = self.conn;
-- local session = self.data.session;
-- log("debug", "Successfully connected");
-- conn:setlistener(s2s_listener);
-- s2s_listener.onconnect(conn);
-- wrap_websocket(session, conn);
-- end
function pending_ws_connection_listeners.onopen(ws_connection)
local p = pending_ws_connections_map[ws_connection];
if not p then
if ws_connection.conn then
module:log("warn", "Successful connection, but unexpected! Closing.");
ws_connection.conn:close();
else
module:log("error", "Successful connection, but unexpected, and no conn attribute!");
end
return;
end
pending_ws_connections_map[ws_connection] = nil;
local conn = ws_connection.conn;
p:log("debug", "Successfully connected");
conn:setlistener(p.listeners, p.data);
p.listeners.onconnect(conn);
wrap_websocket(session, conn);
end
-- function pending_ws_connection_methods:onclose(code, message)
-- self:log("debug", "Pending WS Connection onclose event.");
-- -- FIXME/TODO: what can i do??
-- -- if p.listeners.onfail then
-- -- p.listeners.onfail(p.data, p.last_error or "unable to connect to websocket");
-- -- end
-- end
function pending_ws_connection_listeners.onclose(ws_connection, reason)
local p = pending_ws_connections_map[ws_connection];
if not p then
module:log("warn", "Failed connection, but unexpected!");
return;
end
p.last_error = reason or "unknown reason";
p:log("debug", "Connection attempt failed: %s", p.last_error);
if p.listeners.onfail then
p.listeners.onfail(p.data, p.last_error or p.target_resolver.last_error or "unable to connect to websocket");
end
end
-- -- pending_ws_connections_map[ws_connection] = pending_connection
-- local pending_ws_connections_map = {};
-- local pending_ws_connection_listeners = {};
-- function pending_ws_connection_listeners.onopen(ws_connection)
-- log("debug", "Pending WS Connection onopen event.");
-- local p = pending_ws_connections_map[ws_connection];
-- if not p then
-- if ws_connection.conn then
-- module:log("warn", "Successful connection, but unexpected! Closing.");
-- ws_connection.conn:close();
-- else
-- module:log("error", "Successful connection, but unexpected, and no conn attribute!");
-- end
-- return;
-- end
-- pending_ws_connections_map[ws_connection] = nil;
-- local conn = ws_connection.conn;
-- p:log("debug", "Successfully connected");
-- conn:setlistener(p.listeners, p.data);
-- p.listeners.onconnect(conn);
-- wrap_websocket(session, conn);
-- end
-- function pending_ws_connection_listeners.onclose(ws_connection, reason)
-- log("debug", "Pending WS Connection onclose event.");
-- local p = pending_ws_connections_map[ws_connection];
-- if not p then
-- module:log("warn", "Failed connection, but unexpected!");
-- return;
-- end
-- p.last_error = reason or "unknown reason";
-- p:log("debug", "Connection attempt failed: %s", p.last_error);
-- if p.listeners.onfail then
-- p.listeners.onfail(p.data, p.last_error or "unable to connect to websocket");
-- end
-- end
function route_to_new_session(event)
local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
module:log("debug", "Trying to route to %s", to_host);
log("debug", "Trying to route to %s", to_host);
local ws_properties = module:fire_event("discover-websocket-s2s", { to_host = to_host });
if not ws_properties then
module:log("debug", "No websocket s2s capabilities from remote host %s", to_host);
log("debug", "No websocket s2s capabilities from remote host %s", to_host);
return;
end
module:log("debug", "Found a Websocket endpoint for s2s communications to remote host %s", to_host);
log("debug", "Found a Websocket endpoint for s2s communications to remote host %s", to_host);
local session = s2s_new_outgoing(from_host, to_host);
session.version = 1;
-- Store in buffer
session.bounce_sendq = bounce_sendq;
@ -434,13 +468,29 @@ function route_to_new_session(event)
ex["headers"] = ws_properties.extra_headers or {};
ex["protocol"] = "xmpp";
module:log("debug", "Starting the websocket connection process");
local p = setmetatable({
id = new_id();
listeners = portmanager.get_service("s2s").listener;
data = { session = session };
}, pending_ws_connection_mt);
local ws_connection = websocket.connect(ws_properties['url'], ex, p);
session.log("debug", "Starting the s2s websocket connection process");
local log = session.log;
local function onopen(s)
log("debug", "Outgoing Websocket S2S: Successfully connected");
local conn = s.conn;
session.conn = conn;
wrap_websocket(session, conn);
conn:setlistener(s2s_listener);
s2s_listener.register_outgoing(conn, session);
s2s_listener.onconnect(conn);
end
local function onclose(s, code, message)
log("debug", "Pending WS Connection onclose event.");
-- FIXME: is this ok?
s2s_destroy_session(session, message or code or "unable to connect to websocket");
end
local ws_connection = websocket.connect(ws_properties['url'], ex, {
onopen = onopen;
onclose = onclose;
});
return true;
end