Removing some old PoC code.
This commit is contained in:
parent
b38d9a6426
commit
4f9534dc11
@ -1,143 +1,11 @@
|
||||
module:set_global();
|
||||
|
||||
-- module:depends("s2s");
|
||||
|
||||
local path = require "util.paths";
|
||||
local json = require "util.json";
|
||||
-- local st = require "util.stanza";
|
||||
-- local websocket = require "net.websocket";
|
||||
-- local server = require "net.server".addclient;
|
||||
-- local add_filter = require "util.filters".add_filter;
|
||||
-- local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
|
||||
-- local s2s_destroy_session = require "core.s2smanager".destroy_session;
|
||||
-- local bounce_sendq = module:depends "s2s".route_to_new_session.bounce_sendq;
|
||||
-- local portmanager = require "core.portmanager";
|
||||
-- local initialize_filters = require "util.filters".initialize;
|
||||
|
||||
local server_infos_dir = assert(module:get_option_string("peertubelivechat_server_infos_path", nil), "'peertubelivechat_server_infos_path' is a required option");
|
||||
local instance_url = assert(module:get_option_string("peertubelivechat_instance_url", nil), "'peertubelivechat_instance_url' is a required option");
|
||||
|
||||
-- local stanza_size_limit = module:get_option_number("s2s_stanza_size_limit", 1024 * 512);
|
||||
-- local frame_buffer_limit = module:get_option_number("websocket_frame_buffer_limit", 2 * stanza_size_limit);
|
||||
-- local frame_fragment_limit = module:get_option_number("websocket_frame_fragment_limit", 8);
|
||||
|
||||
-- local sessions = module:shared("sessions");
|
||||
|
||||
-- -- The proxy_listener handles connection while still connecting to the remote websocket server,
|
||||
-- -- then it hands them over to the normal listener (in mod_s2s)
|
||||
-- local proxy_listener = { default_port = nil, default_mode = "*a", default_interface = "*" };
|
||||
|
||||
-- function proxy_listener.onconnect(conn, ws)
|
||||
-- local session = sessions[conn];
|
||||
|
||||
-- -- Now the real s2s listener can take over the connection.
|
||||
-- local listener = portmanager.get_service("s2s").listener;
|
||||
|
||||
-- local log = session.log;
|
||||
|
||||
-- local function websocket_close(code, message)
|
||||
-- conn:write(build_close(code, message));
|
||||
-- conn:close();
|
||||
-- end
|
||||
-- local function websocket_handle_error(session, code, message)
|
||||
-- if code == 1009 then -- stanza size limit exceeded
|
||||
-- -- we close the session, rather than the connection,
|
||||
-- -- otherwise a resuming client will simply resend the
|
||||
-- -- offending stanza
|
||||
-- session:close({ condition = "policy-violation", text = "stanza too large" });
|
||||
-- else
|
||||
-- websocket_close(code, message);
|
||||
-- end
|
||||
-- end
|
||||
|
||||
-- initialize_filters(session);
|
||||
-- local frameBuffer = dbuffer.new(frame_buffer_limit, frame_fragment_limit);
|
||||
-- add_filter(session, "bytes/in", function(data)
|
||||
-- if not frameBuffer:write(data) then
|
||||
-- session.log("warn", "websocket frame buffer full - terminating session");
|
||||
-- session:close({ condition = "resource-constraint", text = "frame buffer exceeded" });
|
||||
-- return;
|
||||
-- end
|
||||
|
||||
-- local cache = {};
|
||||
-- local frame, length, partial = parse_frame(frameBuffer);
|
||||
|
||||
-- while frame do
|
||||
-- frameBuffer:discard(length);
|
||||
-- local result, err_status, err_text = handle_frame(frame);
|
||||
-- if not result then
|
||||
-- websocket_handle_error(session, err_status, err_text);
|
||||
-- break;
|
||||
-- end
|
||||
-- cache[#cache+1] = filter_open_close(result);
|
||||
-- frame, length, partial = parse_frame(frameBuffer);
|
||||
-- end
|
||||
|
||||
-- if partial then
|
||||
-- -- The header of the next frame is already in the buffer, run
|
||||
-- -- some early validation here
|
||||
-- local frame_ok, err_status, err_text = validate_frame(partial, stanza_size_limit);
|
||||
-- if not frame_ok then
|
||||
-- websocket_handle_error(session, err_status, err_text);
|
||||
-- end
|
||||
-- end
|
||||
|
||||
-- return t_concat(cache, "");
|
||||
-- end);
|
||||
|
||||
-- add_filter(session, "stanzas/out", function(stanza)
|
||||
-- stanza = st.clone(stanza);
|
||||
-- local attr = stanza.attr;
|
||||
-- attr.xmlns = attr.xmlns or xmlns_client;
|
||||
-- if stanza.name:find("^stream:") then
|
||||
-- attr["xmlns:stream"] = attr["xmlns:stream"] or xmlns_streams;
|
||||
-- end
|
||||
-- return stanza;
|
||||
-- end, -1000);
|
||||
|
||||
-- add_filter(session, "bytes/out", function(data)
|
||||
-- return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)});
|
||||
-- end);
|
||||
-- local filter = session.filters;
|
||||
|
||||
-- session.version = 1;
|
||||
|
||||
-- session.sends2s = function (t)
|
||||
-- log("debug", "sending (s2s over proxy): %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?"));
|
||||
-- if t.name then
|
||||
-- t = filter("stanzas/out", t);
|
||||
-- end
|
||||
-- if t then
|
||||
-- t = filter("bytes/out", tostring(t));
|
||||
-- if t then
|
||||
-- return conn:write(tostring(t));
|
||||
-- end
|
||||
-- end
|
||||
-- end
|
||||
|
||||
-- session.open_stream = function ()
|
||||
-- session.sends2s(st.stanza("stream:stream", {
|
||||
-- xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
|
||||
-- ["xmlns:stream"]='http://etherx.jabber.org/streams',
|
||||
-- from=session.from_host, to=session.to_host, version='1.0', ["xml:lang"]='en'}):top_tag());
|
||||
-- end
|
||||
|
||||
-- conn.setlistener(conn, listener);
|
||||
|
||||
-- listener.register_outgoing(conn, session);
|
||||
|
||||
-- listener.onconnect(conn);
|
||||
-- end
|
||||
|
||||
-- function proxy_listener.register_outgoing(conn, session)
|
||||
-- session.direction = "outgoing";
|
||||
-- sessions[conn] = session;
|
||||
-- end
|
||||
|
||||
-- function proxy_listener.ondisconnect(conn, err)
|
||||
-- sessions[conn] = nil;
|
||||
-- end
|
||||
|
||||
function discover_websocket_s2s(event)
|
||||
local to_host = event.to_host;
|
||||
module:log("debug", "Searching websocket s2s for remote host %s", to_host);
|
||||
@ -180,68 +48,6 @@ function discover_websocket_s2s(event)
|
||||
};
|
||||
properties["url"] = remote_ws_proxy_conf["url"];
|
||||
return properties;
|
||||
|
||||
|
||||
|
||||
-- local host_session = s2s_new_outgoing(from_host, to_host);
|
||||
|
||||
-- -- Store in buffer
|
||||
-- host_session.bounce_sendq = bounce_sendq;
|
||||
-- host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
|
||||
-- host_session.log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name));
|
||||
|
||||
-- local ex = {};
|
||||
-- ex.headers = {
|
||||
-- ["peertube-livechat-ws-s2s-instance-url"] = instance_url;
|
||||
-- ["sec_websocket_protocol"] = 'xmpp';
|
||||
-- }
|
||||
|
||||
-- local ws_listeners = {};
|
||||
-- ws_listeners.onopen = function ()
|
||||
-- local conn = self.conn;
|
||||
-- module:log("debug", "Websocket s2s connection is open, attaching it to the session.");
|
||||
-- host_session.conn = conn;
|
||||
-- end
|
||||
-- ws_listeners.onclose = function (code, message)
|
||||
-- module:log("debug", "Closing websocket connection for host %s with code '%s' and message '%s'", to_host, json.encode(code), json.encode(message));
|
||||
-- s2s_destroy_session(host_session, 'websocket-proxy-connection-closed');
|
||||
-- end
|
||||
-- ws_listeners.onerror = function (code)
|
||||
-- module:log("debug", "Error on websocket connection for host %s: '%s'", to_host, json.encode(code));
|
||||
-- s2s_destroy_session(host_session, 'websocket-proxy-connection-error');
|
||||
-- end
|
||||
-- ws_listeners.onmessage = function (data, data_type)
|
||||
-- module:log("debug", "Receiving %s data for host %s", tostring(data_type), to_host);
|
||||
-- -- TODO ...
|
||||
-- end
|
||||
|
||||
-- module:log("debug", "Starting the websocket connection process");
|
||||
-- local ws_connection = websocket.connect(remote_ws_proxy_conf['url'], ex, ws_listeners);
|
||||
|
||||
-- -- local conn = addclient(to_host, nil, proxy_listener, "*a");
|
||||
-- -- proxy_listener.register_outgoing(conn, host_session);
|
||||
-- -- host_session.conn = conn;
|
||||
|
||||
-- return true;
|
||||
|
||||
-- local inject = injected and injected[to_host];
|
||||
-- if not inject then return end
|
||||
-- module:log("debug", "opening a new outgoing connection for this stanza");
|
||||
-- local host_session = new_outgoing(from_host, to_host);
|
||||
|
||||
-- -- Store in buffer
|
||||
-- host_session.bounce_sendq = bounce_sendq;
|
||||
-- host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
|
||||
-- host_session.log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name));
|
||||
|
||||
-- local host, port = inject[1] or inject, tonumber(inject[2]) or 5269;
|
||||
|
||||
-- local conn = addclient(host, port, proxy_listener, "*a");
|
||||
|
||||
-- proxy_listener.register_outgoing(conn, host_session);
|
||||
|
||||
-- host_session.conn = conn;
|
||||
-- return true;
|
||||
end
|
||||
|
||||
function module.add_host(module)
|
||||
|
Loading…
x
Reference in New Issue
Block a user