From edba1aebb51a7bb4f8a5f4f069fba3ed9540d729 Mon Sep 17 00:00:00 2001 From: Evgeny Khramtsov Date: Tue, 23 Apr 2019 19:18:22 +0300 Subject: [PATCH] Add WebSockets support to mod_mqtt Example configuration: listen: ... - port: 5280 module: ejabberd_http request_handlers: "/mqtt": mod_mqtt modules: ... mod_mqtt: {} --- src/mod_mqtt.erl | 5 ++ src/mod_mqtt_session.erl | 4 +- src/mod_mqtt_ws.erl | 171 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 src/mod_mqtt_ws.erl diff --git a/src/mod_mqtt.erl b/src/mod_mqtt.erl index 9fd1e57ac..566804f36 100644 --- a/src/mod_mqtt.erl +++ b/src/mod_mqtt.erl @@ -27,6 +27,8 @@ terminate/2, code_change/3]). %% ejabberd_listener API -export([start/3, start_link/3, listen_opt_type/1, listen_options/0, accept/1]). +%% ejabberd_http API +-export([socket_handoff/3]). %% Legacy ejabberd_listener API -export([become_controller/2, socket_type/0]). %% API @@ -98,6 +100,9 @@ become_controller(Pid, _) -> accept(Pid) -> mod_mqtt_session:accept(Pid). +socket_handoff(LocalPath, Request, Opts) -> + mod_mqtt_ws:socket_handoff(LocalPath, Request, Opts). + open_session({U, S, R}) -> Mod = gen_mod:ram_db_mod(S, ?MODULE), Mod:open_session({U, S, R}). diff --git a/src/mod_mqtt_session.erl b/src/mod_mqtt_session.erl index d988b513e..bbcf9258a 100644 --- a/src/mod_mqtt_session.erl +++ b/src/mod_mqtt_session.erl @@ -64,8 +64,8 @@ session_expiry_non_zero | unknown_topic_alias. -type state() :: #state{}. --type sockmod() :: gen_tcp | fast_tls. --type socket() :: {sockmod(), inet:socket() | fast_tls:tls_socket()}. +-type sockmod() :: gen_tcp | fast_tls | mod_mqtt_ws. +-type socket() :: {sockmod(), inet:socket() | fast_tls:tls_socket() | mod_mqtt_ws:socket()}. -type peername() :: {inet:ip_address(), inet:port_number()}. -type seconds() :: non_neg_integer(). -type milli_seconds() :: non_neg_integer(). diff --git a/src/mod_mqtt_ws.erl b/src/mod_mqtt_ws.erl new file mode 100644 index 000000000..872553445 --- /dev/null +++ b/src/mod_mqtt_ws.erl @@ -0,0 +1,171 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2002-2019 ProcessOne, SARL. All Rights Reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% +%%%------------------------------------------------------------------- +-module(mod_mqtt_ws). +-ifndef(GEN_SERVER). +-define(GEN_SERVER, gen_server). +-endif. +-behaviour(?GEN_SERVER). + +%% API +-export([socket_handoff/3]). +-export([start/1, start_link/1]). +-export([peername/1, setopts/2, send/2, close/1]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3, format_status/2]). + +-include("xmpp.hrl"). +-include("ejabberd_http.hrl"). +-include("logger.hrl"). + +-define(SEND_TIMEOUT, timer:seconds(15)). + +-record(state, {socket :: socket(), + ws_pid :: pid(), + mqtt_session :: undefined | pid()}). + +-type peername() :: {inet:ip_address(), inet:port_number()}. +-type socket() :: {http_ws, pid(), peername()}. +-export_type([socket/0]). + +%%%=================================================================== +%%% API +%%%=================================================================== +socket_handoff(LocalPath, Request, Opts) -> + ejabberd_websocket:socket_handoff( + LocalPath, Request, Opts, ?MODULE, fun get_human_html_xmlel/0). + +start({#ws{http_opts = Opts}, _} = WS) -> + ?GEN_SERVER:start(?MODULE, [WS], ejabberd_config:fsm_limit_opts(Opts)). + +start_link({#ws{http_opts = Opts}, _} = WS) -> + ?GEN_SERVER:start_link(?MODULE, [WS], ejabberd_config:fsm_limit_opts(Opts)). + +-spec peername(socket()) -> {ok, peername()}. +peername({http_ws, _, IP}) -> + {ok, IP}. + +-spec setopts(socket(), list()) -> ok. +setopts(_WSock, _Opts) -> + ok. + +-spec send(socket(), iodata()) -> ok | {error, timeout | einval}. +send({http_ws, Pid, _}, Data) -> + try ?GEN_SERVER:call(Pid, {send, Data}, ?SEND_TIMEOUT) + catch exit:{timeout, {?GEN_SERVER, _, _}} -> + {error, timeout}; + exit:{_, {?GEN_SERVER, _, _}} -> + {error, einval} + end. + +-spec close(socket()) -> ok. +close({http_ws, Pid, _}) -> + ?GEN_SERVER:cast(Pid, close). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([{#ws{ip = IP, http_opts = ListenOpts}, WsPid}]) -> + Socket = {http_ws, self(), IP}, + case mod_mqtt_session:start(?MODULE, Socket, ListenOpts) of + {ok, Pid} -> + erlang:monitor(process, Pid), + erlang:monitor(process, WsPid), + mod_mqtt_session:accept(Pid), + State = #state{socket = Socket, + ws_pid = WsPid, + mqtt_session = Pid}, + {ok, State}; + {error, Reason} -> + {stop, Reason}; + ignore -> + ignore + end. + +handle_call({send, Data}, _From, #state{ws_pid = WsPid} = State) -> + WsPid ! {send, Data}, + {reply, ok, State}; +handle_call(Request, From, State) -> + ?WARNING_MSG("Got unexpected call from ~p: ~p", [From, Request]), + {noreply, State}. + +handle_cast(close, State) -> + {stop, normal, State#state{mqtt_session = undefined}}; +handle_cast(Request, State) -> + ?WARNING_MSG("Got unexpected cast: ~p", [Request]), + {noreply, State}. + +handle_info(closed, State) -> + {stop, normal, State}; +handle_info({received, Data}, State) -> + State#state.mqtt_session ! {tcp, State#state.socket, Data}, + {noreply, State}; +handle_info({'DOWN', _, process, Pid, _}, State) + when Pid == State#state.mqtt_session orelse Pid == State#state.ws_pid -> + {stop, normal, State}; +handle_info(Info, State) -> + ?WARNING_MSG("Got unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, State) -> + if State#state.mqtt_session /= undefined -> + State#state.mqtt_session ! {tcp_closed, State#state.socket}; + true -> + ok + end. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +format_status(_Opt, Status) -> + Status. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec get_human_html_xmlel() -> xmlel(). +get_human_html_xmlel() -> + Heading = <<"ejabberd mod_mqtt">>, + #xmlel{name = <<"html">>, + attrs = + [{<<"xmlns">>, <<"http://www.w3.org/1999/xhtml">>}], + children = + [#xmlel{name = <<"head">>, attrs = [], + children = + [#xmlel{name = <<"title">>, attrs = [], + children = [{xmlcdata, Heading}]}]}, + #xmlel{name = <<"body">>, attrs = [], + children = + [#xmlel{name = <<"h1">>, attrs = [], + children = [{xmlcdata, Heading}]}, + #xmlel{name = <<"p">>, attrs = [], + children = + [{xmlcdata, <<"An implementation of ">>}, + #xmlel{name = <<"a">>, + attrs = + [{<<"href">>, + <<"http://tools.ietf.org/html/rfc6455">>}], + children = + [{xmlcdata, + <<"WebSocket protocol">>}]}]}, + #xmlel{name = <<"p">>, attrs = [], + children = + [{xmlcdata, + <<"This web page is only informative. To " + "use WebSocket connection you need an MQTT " + "client that supports it.">>}]}]}]}. -- 2.40.0