From 5cc8e807df6994fa6b0e860bbcfe0af8fa7fe19f Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Sun, 11 Dec 2016 15:03:37 +0300 Subject: [PATCH] Initial version of new XMPP stream behaviour (for review) --- src/cyrsasl.erl | 10 +- src/ejabberd_c2s.erl | 3257 +++++++------------------------------ src/ejabberd_sm.erl | 77 +- src/ejabberd_socket.erl | 29 +- src/mod_admin_extra.erl | 26 +- src/mod_blocking.erl | 33 +- src/mod_caps.erl | 150 +- src/mod_client_state.erl | 13 +- src/mod_last.erl | 29 +- src/mod_legacy_auth.erl | 159 ++ src/mod_offline.erl | 68 +- src/mod_privacy.erl | 46 +- src/mod_pubsub.erl | 104 +- src/mod_register.erl | 41 +- src/mod_roster.erl | 78 +- src/mod_shared_roster.erl | 7 +- src/xmpp_stream_in.erl | 698 ++++++++ 17 files changed, 1842 insertions(+), 2983 deletions(-) create mode 100644 src/mod_legacy_auth.erl create mode 100644 src/xmpp_stream_in.erl diff --git a/src/cyrsasl.erl b/src/cyrsasl.erl index 4b0f5a26b..e23196475 100644 --- a/src/cyrsasl.erl +++ b/src/cyrsasl.erl @@ -31,7 +31,7 @@ -export([start/0, register_mechanism/3, listmech/1, server_new/7, server_start/3, server_step/2, - opt_type/1]). + get_mech/1, opt_type/1]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -53,6 +53,7 @@ -type(password_type() :: plain | digest | scram). -type(props() :: [{username, binary()} | {authzid, binary()} | + {mechanism, binary()} | {auth_module, atom()}]). -type(sasl_mechanism() :: #sasl_mechanism{}). @@ -65,9 +66,11 @@ get_password, check_password, check_password_digest, + mech_name = <<"">>, mech_mod, mech_state }). +-type sasl_state() :: #sasl_state{}. -callback mech_new(binary(), fun(), fun(), fun()) -> any(). -callback mech_step(any(), binary()) -> {ok, props()} | @@ -150,6 +153,7 @@ server_start(State, Mech, ClientIn) -> State#sasl_state.check_password, State#sasl_state.check_password_digest), server_step(State#sasl_state{mech_mod = Module, + mech_name = Mech, mech_state = MechState}, ClientIn); _ -> {error, 'no-mechanism'} @@ -181,6 +185,10 @@ server_step(State, ClientIn) -> {error, Error} end. +-spec get_mech(sasl_state()) -> binary(). +get_mech(#sasl_state{mech_name = Mech}) -> + Mech. + %% Remove the anonymous mechanism from the list if not enabled for the given %% host %% diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 6d84d8d93..1568d5db6 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -1,8 +1,5 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_c2s.erl -%%% Author : Alexey Shchepin -%%% Purpose : Serve C2S connection -%%% Created : 16 Nov 2002 by Alexey Shchepin +%%%------------------------------------------------------------------- +%%% Created : 8 Dec 2016 by Evgeny Khramtsov %%% %%% %%% ejabberd, Copyright (C) 2002-2016 ProcessOne @@ -21,1998 +18,534 @@ %%% with this program; if not, write to the Free Software Foundation, Inc., %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% -%%%---------------------------------------------------------------------- - +%%%------------------------------------------------------------------- -module(ejabberd_c2s). - --behaviour(ejabberd_config). - --author('alexey@process-one.net'). - --protocol({xep, 78, '2.5'}). --protocol({xep, 138, '2.0'}). --protocol({xep, 198, '1.3'}). --protocol({xep, 356, '7.1'}). - --update_info({update, 0}). - --define(GEN_FSM, p1_fsm). - --behaviour(?GEN_FSM). - -%% External exports --export([start/2, - stop/1, - start_link/2, - close/1, - send_text/2, - send_element/2, - socket_type/0, - get_presence/1, - get_last_presence/1, - get_aux_field/2, - set_aux_field/3, - del_aux_field/2, - get_subscription/2, - get_queued_stanzas/1, - get_csi_state/1, - set_csi_state/2, - get_resume_timeout/1, - set_resume_timeout/2, - send_filtered/5, - broadcast/4, - get_subscribed/1, - transform_listen_option/2]). - --export([init/1, wait_for_stream/2, wait_for_auth/2, - wait_for_feature_request/2, wait_for_bind/2, - wait_for_sasl_response/2, - wait_for_resume/2, session_established/2, - handle_event/3, handle_sync_event/4, code_change/4, - handle_info/3, terminate/3, print_state/1, opt_type/1]). +-behaviour(xmpp_stream_in). + +-protocol({rfc, 6121}). + +%% ejabberd_socket callbacks +-export([start/2, socket_type/0]). +%% xmpp_stream_in callbacks +-export([init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2, code_change/3]). +-export([tls_options/1, tls_required/1, sasl_mechanisms/1, init_sasl/1, bind/2, + unauthenticated_stream_features/1, authenticated_stream_features/1, + handle_stream_start/1, handle_stream_end/1, handle_stream_close/1, + handle_unauthenticated_packet/2, handle_authenticated_packet/2, + handle_auth_success/4, handle_auth_failure/4, + handle_unbinded_packet/2]). +%% API +-export([get_presence/1, get_subscription/2, get_subscribed/1, + send/2, close/1]). -include("ejabberd.hrl"). --include("logger.hrl"). - -include("xmpp.hrl"). -%%-include("legacy.hrl"). - --include("mod_privacy.hrl"). +-include("logger.hrl"). -define(SETS, gb_sets). --define(DICT, dict). - -%% pres_a contains all the presence available send (either through roster mechanism or directed). -%% Directed presence unavailable remove user from pres_a. --record(state, {socket, - sockmod, - socket_monitor, - xml_socket, - streamid, - sasl_state, - access, - shaper, - zlib = false, - tls = false, - tls_required = false, - tls_enabled = false, - tls_options = [], - authenticated = false, - jid, - user = <<"">>, server = <<"">>, resource = <<"">>, - sid, - pres_t = ?SETS:new(), - pres_f = ?SETS:new(), - pres_a = ?SETS:new(), - pres_last, - pres_timestamp, - privacy_list = #userlist{}, - conn = unknown, - auth_module = unknown, - ip, - aux_fields = [], - csi_state = active, - mgmt_state, - mgmt_xmlns, - mgmt_queue, - mgmt_max_queue, - mgmt_pending_since, - mgmt_timeout, - mgmt_max_timeout, - mgmt_ack_timeout, - mgmt_ack_timer, - mgmt_resend, - mgmt_stanzas_in = 0, - mgmt_stanzas_out = 0, - mgmt_stanzas_req = 0, - ask_offline = true, - lang = <<"">>}). - --type state_name() :: wait_for_stream | wait_for_auth | - wait_for_feature_request | wait_for_bind | - wait_for_sasl_response | wait_for_resume | - session_established. --type state() :: #state{}. --type fsm_stop() :: {stop, normal, state()}. --type fsm_next() :: {next_state, state_name(), state(), non_neg_integer()}. --type fsm_reply() :: {reply, any(), state_name(), state(), non_neg_integer()}. --type fsm_transition() :: fsm_stop() | fsm_next(). --export_type([state/0]). - -%-define(DBGFSM, true). +%%-define(DBGFSM, true). -ifdef(DBGFSM). - -define(FSMOPTS, [{debug, [trace]}]). - -else. - -define(FSMOPTS, []). - -endif. -%% This is the timeout to apply between event when starting a new -%% session: --define(C2S_OPEN_TIMEOUT, 60000). - --define(C2S_HIBERNATE_TIMEOUT, ejabberd_config:get_option(c2s_hibernate, fun(X) when is_integer(X); X == hibernate-> X end, 90000)). - --define(STREAM_HEADER, - <<"">>). - --define(STREAM_TRAILER, <<"">>). +-type state() :: map(). +-type next_state() :: {noreply, state()} | {stop, term(), state()}. +-export_type([state/0, next_state/0]). -%% XEP-0198: - --define(IS_STREAM_MGMT_PACKET(Pkt), - is_record(Pkt, sm_enable) or - is_record(Pkt, sm_resume) or - is_record(Pkt, sm_a) or - is_record(Pkt, sm_r)). - -%%%---------------------------------------------------------------------- -%%% API -%%%---------------------------------------------------------------------- +%%%=================================================================== +%%% ejabberd_socket API +%%%=================================================================== start(SockData, Opts) -> - ?GEN_FSM:start(ejabberd_c2s, - [SockData, Opts], - fsm_limit_opts(Opts) ++ ?FSMOPTS). - -start_link(SockData, Opts) -> - (?GEN_FSM):start_link(ejabberd_c2s, - [SockData, Opts], - fsm_limit_opts(Opts) ++ ?FSMOPTS). - -socket_type() -> xml_stream. + xmpp_stream_in:start(?MODULE, [SockData, Opts], + fsm_limit_opts(Opts) ++ ?FSMOPTS). -%% Return Username, Resource and presence information -get_presence(FsmRef) -> - (?GEN_FSM):sync_send_all_state_event(FsmRef, - {get_presence}, 1000). -get_last_presence(FsmRef) -> - (?GEN_FSM):sync_send_all_state_event(FsmRef, - {get_last_presence}, 1000). +socket_type() -> + xml_stream. --spec get_aux_field(any(), state()) -> {ok, any()} | error. -get_aux_field(Key, #state{aux_fields = Opts}) -> - case lists:keyfind(Key, 1, Opts) of - {_, Val} -> {ok, Val}; - false -> error - end. - --spec set_aux_field(any(), any(), state()) -> state(). -set_aux_field(Key, Val, - #state{aux_fields = Opts} = State) -> - Opts1 = lists:keydelete(Key, 1, Opts), - State#state{aux_fields = [{Key, Val} | Opts1]}. - --spec del_aux_field(any(), state()) -> state(). -del_aux_field(Key, #state{aux_fields = Opts} = State) -> - Opts1 = lists:keydelete(Key, 1, Opts), - State#state{aux_fields = Opts1}. +-spec get_presence(pid()) -> presence(). +get_presence(Ref) -> + xmpp_stream_in:call(Ref, get_presence, 1000). -spec get_subscription(jid() | ljid(), state()) -> both | from | to | none. -get_subscription(From = #jid{}, StateData) -> - get_subscription(jid:tolower(From), StateData); -get_subscription(LFrom, StateData) -> - LBFrom = setelement(3, LFrom, <<"">>), - F = (?SETS):is_element(LFrom, StateData#state.pres_f) - orelse - (?SETS):is_element(LBFrom, StateData#state.pres_f), - T = (?SETS):is_element(LFrom, StateData#state.pres_t) - orelse - (?SETS):is_element(LBFrom, StateData#state.pres_t), +get_subscription(#jid{} = From, State) -> + get_subscription(jid:tolower(From), State); +get_subscription(LFrom, #{pres_f := PresF, pres_t := PresT}) -> + LBFrom = jid:remove_resource(LFrom), + F = ?SETS:is_element(LFrom, PresF) orelse ?SETS:is_element(LBFrom, PresF), + T = ?SETS:is_element(LFrom, PresT) orelse ?SETS:is_element(LBFrom, PresT), if F and T -> both; F -> from; T -> to; true -> none end. -get_queued_stanzas(#state{mgmt_queue = Queue} = StateData) -> - lists:map(fun({_N, Time, El}) -> - add_resent_delay_info(StateData, El, Time) - end, queue:to_list(Queue)). - -get_csi_state(#state{csi_state = CsiState}) -> - CsiState. - -set_csi_state(#state{} = StateData, CsiState) -> - StateData#state{csi_state = CsiState}; -set_csi_state(FsmRef, CsiState) -> - FsmRef ! {set_csi_state, CsiState}. - -get_resume_timeout(#state{mgmt_timeout = Timeout}) -> - Timeout. - -set_resume_timeout(#state{} = StateData, Timeout) -> - StateData#state{mgmt_timeout = Timeout}; -set_resume_timeout(FsmRef, Timeout) -> - FsmRef ! {set_resume_timeout, Timeout}. - --spec send_filtered(pid(), binary(), jid(), jid(), stanza()) -> any(). -send_filtered(FsmRef, Feature, From, To, Packet) -> - FsmRef ! {send_filtered, Feature, From, To, Packet}. - --spec broadcast(pid(), any(), jid(), stanza()) -> any(). -broadcast(FsmRef, Type, From, Packet) -> - FsmRef ! {broadcast, Type, From, Packet}. - --spec stop(pid()) -> any(). -stop(FsmRef) -> (?GEN_FSM):send_event(FsmRef, stop). - --spec close(pid()) -> any(). -%% What is the difference between stop and close??? -close(FsmRef) -> (?GEN_FSM):send_event(FsmRef, closed). - -%%%---------------------------------------------------------------------- -%%% Callback functions from gen_fsm -%%%---------------------------------------------------------------------- - -init([{SockMod, Socket}, Opts]) -> - Access = gen_mod:get_opt(access, Opts, - fun acl:access_rules_validator/1, all), - Shaper = gen_mod:get_opt(shaper, Opts, - fun acl:shaper_rules_validator/1, none), - XMLSocket = case lists:keysearch(xml_socket, 1, Opts) of - {value, {_, XS}} -> XS; - _ -> false - end, - Zlib = proplists:get_bool(zlib, Opts), - StartTLS = proplists:get_bool(starttls, Opts), - StartTLSRequired = proplists:get_bool(starttls_required, Opts), - TLSEnabled = proplists:get_bool(tls, Opts), - TLS = StartTLS orelse - StartTLSRequired orelse TLSEnabled, - TLSOpts1 = lists:filter(fun ({certfile, _}) -> true; - ({ciphers, _}) -> true; - ({dhfile, _}) -> true; - (_) -> false - end, - Opts), - TLSOpts2 = case lists:keysearch(protocol_options, 1, Opts) of - {value, {_, O}} -> - [_|ProtocolOptions] = lists:foldl( - fun(X, Acc) -> X ++ Acc end, [], - [["|" | binary_to_list(Opt)] || Opt <- O, is_binary(Opt)] - ), - [{protocol_options, iolist_to_binary(ProtocolOptions)} | TLSOpts1]; - _ -> TLSOpts1 - end, - TLSOpts3 = case proplists:get_bool(tls_compression, Opts) of - false -> [compression_none | TLSOpts2]; - true -> TLSOpts2 - end, - TLSOpts = [verify_none | TLSOpts3], - StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), - StreamMgmtState = if StreamMgmtEnabled -> inactive; - true -> disabled - end, - MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of - Limit when is_integer(Limit), Limit > 0 -> Limit; - infinity -> infinity; - _ -> 1000 - end, - ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of - RTimeo when is_integer(RTimeo), RTimeo >= 0 -> RTimeo; - _ -> 300 - end, - MaxResumeTimeout = case proplists:get_value(max_resume_timeout, Opts) of - Max when is_integer(Max), Max >= ResumeTimeout -> Max; - _ -> ResumeTimeout - end, - AckTimeout = case proplists:get_value(ack_timeout, Opts) of - ATimeo when is_integer(ATimeo), ATimeo > 0 -> ATimeo * 1000; - infinity -> undefined; - _ -> 60000 - end, - ResendOnTimeout = case proplists:get_value(resend_on_timeout, Opts) of - Resend when is_boolean(Resend) -> Resend; - if_offline -> if_offline; - _ -> false - end, - IP = peerip(SockMod, Socket), - Socket1 = if TLSEnabled andalso - SockMod /= ejabberd_frontend_socket -> - SockMod:starttls(Socket, TLSOpts); - true -> Socket - end, - SocketMonitor = SockMod:monitor(Socket1), - StateData = #state{socket = Socket1, sockmod = SockMod, - socket_monitor = SocketMonitor, - xml_socket = XMLSocket, zlib = Zlib, tls = TLS, - tls_required = StartTLSRequired, - tls_enabled = TLSEnabled, tls_options = TLSOpts, - sid = ejabberd_sm:make_sid(), streamid = new_id(), - access = Access, shaper = Shaper, ip = IP, - mgmt_state = StreamMgmtState, - mgmt_max_queue = MaxAckQueue, - mgmt_timeout = ResumeTimeout, - mgmt_max_timeout = MaxResumeTimeout, - mgmt_ack_timeout = AckTimeout, - mgmt_resend = ResendOnTimeout}, - {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}. - -spec get_subscribed(pid()) -> [ljid()]. -%% Return list of all available resources of contacts, -get_subscribed(FsmRef) -> - (?GEN_FSM):sync_send_all_state_event(FsmRef, - get_subscribed, 1000). +%% Return list of all available resources of contacts +get_subscribed(Ref) -> + xmpp_stream_in:call(Ref, get_subscribed, 1000). + +-spec close(pid()) -> ok. +close(Ref) -> + xmpp_stream_in:cast(Ref, closed). + +-spec send(state(), xmpp_element()) -> next_state(). +send(State, Pkt) -> + xmpp_stream_in:send(State, Pkt). + +%%%=================================================================== +%%% xmpp_stream_in callbacks +%%%=================================================================== +tls_options(#{server := Server, tls_options := TLSOpts}) -> + LServer = jid:nameprep(Server), + case ejabberd_config:get_option({domain_certfile, LServer}, + fun iolist_to_binary/1) of + undefined -> + TLSOpts; + CertFile -> + lists:keystore(certfile, 1, TLSOpts, {certfile, CertFile}) + end. -wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> - try xmpp:decode(#xmlel{name = Name, attrs = Attrs}) of - #stream_start{xmlns = NS_CLIENT, stream_xmlns = NS_STREAM, - version = Version, lang = Lang} - when NS_CLIENT /= ?NS_CLIENT; NS_STREAM /= ?NS_STREAM -> - send_header(StateData, ?MYNAME, Version, Lang), - send_element(StateData, xmpp:serr_invalid_namespace()), - {stop, normal, StateData}; - #stream_start{lang = Lang, version = Version} when byte_size(Lang) > 35 -> - %% As stated in BCP47, 4.4.1: - %% Protocols or specifications that specify limited buffer sizes for - %% language tags MUST allow for language tags of at least 35 characters. - %% Do not store long language tag to avoid possible DoS/flood attacks - send_header(StateData, ?MYNAME, Version, ?MYLANG), - Txt = <<"Too long value of 'xml:lang' attribute">>, - send_element(StateData, - xmpp:serr_policy_violation(Txt, ?MYLANG)), - {stop, normal, StateData}; - #stream_start{to = undefined, lang = Lang, version = Version} -> - Txt = <<"Missing 'to' attribute">>, - send_header(StateData, ?MYNAME, Version, Lang), - send_element(StateData, - xmpp:serr_improper_addressing(Txt, Lang)), - {stop, normal, StateData}; - #stream_start{to = #jid{lserver = To}, lang = Lang, - version = Version} -> - Server = case StateData#state.server of - <<"">> -> To; - S -> S - end, - StreamVersion = case Version of - {1,0} -> {1,0}; - _ -> undefined - end, - IsBlacklistedIP = is_ip_blacklisted(StateData#state.ip, Lang), - case lists:member(Server, ?MYHOSTS) of - true when IsBlacklistedIP == false -> - change_shaper(StateData, jid:make(<<"">>, Server, <<"">>)), - case StreamVersion of - {1,0} -> - send_header(StateData, Server, {1,0}, ?MYLANG), - case StateData#state.authenticated of - false -> - TLS = StateData#state.tls, - TLSEnabled = StateData#state.tls_enabled, - TLSRequired = StateData#state.tls_required, - SASLState = cyrsasl:server_new( - <<"jabber">>, Server, <<"">>, [], - fun (U) -> - ejabberd_auth:get_password_with_authmodule( - U, Server) - end, - fun(U, AuthzId, P) -> - ejabberd_auth:check_password_with_authmodule( - U, AuthzId, Server, P) - end, - fun(U, AuthzId, P, D, DG) -> - ejabberd_auth:check_password_with_authmodule( - U, AuthzId, Server, P, D, DG) - end), - Mechs = - case TLSEnabled or not TLSRequired of - true -> - [#sasl_mechanisms{list = cyrsasl:listmech(Server)}]; - false -> - [] - end, - SockMod = - (StateData#state.sockmod):get_sockmod(StateData#state.socket), - Zlib = StateData#state.zlib, - CompressFeature = case Zlib andalso - ((SockMod == gen_tcp) orelse (SockMod == fast_tls)) of - true -> - [#compression{methods = [<<"zlib">>]}]; - _ -> - [] - end, - TLSFeature = - case (TLS == true) andalso - (TLSEnabled == false) andalso - (SockMod == gen_tcp) of - true -> - [#starttls{required = TLSRequired}]; - false -> - [] - end, - StreamFeatures1 = TLSFeature ++ CompressFeature ++ Mechs, - StreamFeatures = ejabberd_hooks:run_fold(c2s_stream_features, - Server, StreamFeatures1, [Server]), - send_element(StateData, - #stream_features{sub_els = StreamFeatures}), - fsm_next_state(wait_for_feature_request, - StateData#state{server = Server, - sasl_state = SASLState, - lang = Lang}); - _ -> - case StateData#state.resource of - <<"">> -> - RosterVersioningFeature = - ejabberd_hooks:run_fold(roster_get_versioning_feature, - Server, [], - [Server]), - StreamManagementFeature = - case stream_mgmt_enabled(StateData) of - true -> - [#feature_sm{xmlns = ?NS_STREAM_MGMT_2}, - #feature_sm{xmlns = ?NS_STREAM_MGMT_3}]; - false -> - [] - end, - SockMod = - (StateData#state.sockmod):get_sockmod( - StateData#state.socket), - Zlib = StateData#state.zlib, - CompressFeature = - case Zlib andalso - ((SockMod == gen_tcp) orelse (SockMod == fast_tls)) of - true -> - [#compression{methods = [<<"zlib">>]}]; - _ -> - [] - end, - StreamFeatures1 = - [#bind{}, #xmpp_session{optional = true}] - ++ - RosterVersioningFeature ++ - StreamManagementFeature ++ - CompressFeature ++ - ejabberd_hooks:run_fold(c2s_post_auth_features, - Server, [], [Server]), - StreamFeatures = ejabberd_hooks:run_fold(c2s_stream_features, - Server, StreamFeatures1, [Server]), - send_element(StateData, - #stream_features{sub_els = StreamFeatures}), - fsm_next_state(wait_for_bind, - StateData#state{server = Server, lang = Lang}); - _ -> - send_element(StateData, #stream_features{}), - fsm_next_state(session_established, - StateData#state{server = Server, lang = Lang}) - end - end; - _ -> - send_header(StateData, Server, StreamVersion, ?MYLANG), - if not StateData#state.tls_enabled and - StateData#state.tls_required -> - send_element( - StateData, - xmpp:serr_policy_violation( - <<"Use of STARTTLS required">>, Lang)), - {stop, normal, StateData}; - true -> - fsm_next_state(wait_for_auth, - StateData#state{server = Server, - lang = Lang}) - end - end; - true -> - IP = StateData#state.ip, - {true, LogReason, ReasonT} = IsBlacklistedIP, - ?INFO_MSG("Connection attempt from blacklisted IP ~s: ~s", - [jlib:ip_to_list(IP), LogReason]), - send_header(StateData, Server, StreamVersion, ?MYLANG), - send_element(StateData, xmpp:serr_policy_violation(ReasonT, Lang)), - {stop, normal, StateData}; - _ -> - send_header(StateData, ?MYNAME, StreamVersion, ?MYLANG), - send_element(StateData, xmpp:serr_host_unknown()), - {stop, normal, StateData} - end; - _ -> - send_header(StateData, ?MYNAME, {1,0}, ?MYLANG), - send_element(StateData, xmpp:serr_invalid_xml()), - {stop, normal, StateData} - catch _:{xmpp_codec, Why} -> - Txt = xmpp:format_error(Why), - send_header(StateData, ?MYNAME, {1,0}, ?MYLANG), - send_element(StateData, xmpp:serr_invalid_xml(Txt, ?MYLANG)), - {stop, normal, StateData} - end; -wait_for_stream(timeout, StateData) -> - {stop, normal, StateData}; -wait_for_stream({xmlstreamelement, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_stream({xmlstreamend, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_stream({xmlstreamerror, _}, StateData) -> - send_header(StateData, ?MYNAME, {1,0}, <<"">>), - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_stream(closed, StateData) -> - {stop, normal, StateData}; -wait_for_stream(stop, StateData) -> - {stop, normal, StateData}. +tls_required(#{tls_required := TLSRequired}) -> + TLSRequired. -wait_for_auth({xmlstreamelement, #xmlel{} = El}, StateData) -> - decode_element(El, wait_for_auth, StateData); -wait_for_auth(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> - fsm_next_state(wait_for_auth, dispatch_stream_mgmt(Pkt, StateData)); -wait_for_auth(#iq{type = get, sub_els = [#legacy_auth{}]} = IQ, StateData) -> - Auth = #legacy_auth{username = <<>>, password = <<>>, resource = <<>>}, - Res = case ejabberd_auth:plain_password_required(StateData#state.server) of - false -> - xmpp:make_iq_result(IQ, Auth#legacy_auth{digest = <<>>}); - true -> - xmpp:make_iq_result(IQ, Auth) - end, - send_element(StateData, Res), - fsm_next_state(wait_for_auth, StateData); -wait_for_auth(#iq{type = set, sub_els = [#legacy_auth{resource = <<"">>}]} = IQ, - StateData) -> - Lang = StateData#state.lang, - Txt = <<"No resource provided">>, - Err = xmpp:make_error(IQ, xmpp:err_not_acceptable(Txt, Lang)), - send_element(StateData, Err), - fsm_next_state(wait_for_auth, StateData); -wait_for_auth(#iq{type = set, sub_els = [#legacy_auth{username = U, - password = P0, - digest = D0, - resource = R}]} = IQ, - StateData) when is_binary(U), is_binary(R) -> - JID = jid:make(U, StateData#state.server, R), - case (JID /= error) andalso - acl:access_matches(StateData#state.access, - #{usr => jid:split(JID), ip => StateData#state.ip}, - StateData#state.server) == allow of - true -> - DGen = fun (PW) -> - p1_sha:sha(<<(StateData#state.streamid)/binary, PW/binary>>) - end, - P = if is_binary(P0) -> P0; true -> <<>> end, - D = if is_binary(D0) -> D0; true -> <<>> end, - case ejabberd_auth:check_password_with_authmodule( - U, U, StateData#state.server, P, D, DGen) of - {true, AuthModule} -> - ?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p from ~s", - [StateData#state.socket, - jid:to_string(JID), AuthModule, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [true, U, StateData#state.server, - StateData#state.ip]), - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, AuthModule}], - Res = xmpp:make_iq_result(IQ), - send_element(StateData, Res), - ejabberd_sm:open_session(StateData#state.sid, U, - StateData#state.server, R, - Info), - change_shaper(StateData, JID), - {Fs, Ts} = ejabberd_hooks:run_fold( - roster_get_subscription_lists, - StateData#state.server, - {[], []}, - [U, StateData#state.server]), - LJID = jid:tolower(jid:remove_resource(JID)), - Fs1 = [LJID | Fs], - Ts1 = [LJID | Ts], - PrivList = ejabberd_hooks:run_fold(privacy_get_user_list, - StateData#state.server, - #userlist{}, - [U, StateData#state.server]), - NewStateData = StateData#state{ - user = U, - resource = R, - jid = JID, - conn = Conn, - auth_module = AuthModule, - pres_f = (?SETS):from_list(Fs1), - pres_t = (?SETS):from_list(Ts1), - privacy_list = PrivList}, - fsm_next_state(session_established, NewStateData); - _ -> - ?INFO_MSG("(~w) Failed legacy authentication for ~s from ~s", - [StateData#state.socket, - jid:to_string(JID), - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [false, U, StateData#state.server, - StateData#state.ip]), - Lang = StateData#state.lang, - Txt = <<"Legacy authentication failed">>, - Err = xmpp:make_error(IQ, xmpp:err_not_authorized(Txt, Lang)), - send_element(StateData, Err), - fsm_next_state(wait_for_auth, StateData) - end; - false when JID == error -> - ?INFO_MSG("(~w) Forbidden legacy authentication " - "for username '~s' with resource '~s'", - [StateData#state.socket, U, R]), - Err = xmpp:make_error(IQ, xmpp:err_jid_malformed()), - send_element(StateData, Err), - fsm_next_state(wait_for_auth, StateData); - false -> - ?INFO_MSG("(~w) Forbidden legacy authentication for ~s from ~s", - [StateData#state.socket, - jid:to_string(JID), - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [false, U, StateData#state.server, - StateData#state.ip]), - Lang = StateData#state.lang, - Txt = <<"Legacy authentication forbidden">>, - Err = xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang)), - send_element(StateData, Err), - fsm_next_state(wait_for_auth, StateData) - end; -wait_for_auth(timeout, StateData) -> - {stop, normal, StateData}; -wait_for_auth({xmlstreamend, _Name}, StateData) -> - {stop, normal, StateData}; -wait_for_auth({xmlstreamerror, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_auth(closed, StateData) -> - {stop, normal, StateData}; -wait_for_auth(stop, StateData) -> - {stop, normal, StateData}; -wait_for_auth(Pkt, StateData) -> - process_unauthenticated_stanza(StateData, Pkt), - fsm_next_state(wait_for_auth, StateData). +unauthenticated_stream_features(#{server := Server}) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_pre_auth_features, LServer, [], [LServer]). -wait_for_feature_request({xmlstreamelement, El}, StateData) -> - decode_element(El, wait_for_feature_request, StateData); -wait_for_feature_request(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> - fsm_next_state(wait_for_feature_request, - dispatch_stream_mgmt(Pkt, StateData)); -wait_for_feature_request(#sasl_auth{mechanism = Mech, - text = ClientIn}, - #state{tls_enabled = TLSEnabled, - tls_required = TLSRequired} = StateData) - when TLSEnabled or not TLSRequired -> - case cyrsasl:server_start(StateData#state.sasl_state, Mech, ClientIn) of - {ok, Props} -> - (StateData#state.sockmod):reset_stream(StateData#state.socket), - U = identity(Props), - AuthModule = proplists:get_value(auth_module, Props, undefined), - ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s", - [StateData#state.socket, U, AuthModule, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [true, U, StateData#state.server, - StateData#state.ip]), - send_element(StateData, #sasl_success{}), - fsm_next_state(wait_for_stream, - StateData#state{streamid = new_id(), - authenticated = true, - auth_module = AuthModule, - sasl_state = undefined, - user = U}); - {continue, ServerOut, NewSASLState} -> - send_element(StateData, #sasl_challenge{text = ServerOut}), - fsm_next_state(wait_for_sasl_response, - StateData#state{sasl_state = NewSASLState}); - {error, Error, Username} -> - ?INFO_MSG("(~w) Failed authentication for ~s@~s from ~s", - [StateData#state.socket, - Username, StateData#state.server, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [false, Username, StateData#state.server, - StateData#state.ip]), - send_element(StateData, #sasl_failure{reason = Error}), - fsm_next_state(wait_for_feature_request, StateData); - {error, Error} -> - send_element(StateData, #sasl_failure{reason = Error}), - fsm_next_state(wait_for_feature_request, StateData) - end; -wait_for_feature_request(#starttls{}, - #state{tls = true, tls_enabled = false} = StateData) -> - case (StateData#state.sockmod):get_sockmod(StateData#state.socket) of - gen_tcp -> - TLSOpts = case ejabberd_config:get_option( - {domain_certfile, StateData#state.server}, - fun iolist_to_binary/1) of - undefined -> - StateData#state.tls_options; - CertFile -> - lists:keystore(certfile, 1, - StateData#state.tls_options, - {certfile, CertFile}) - end, - Socket = StateData#state.socket, - BProceed = fxml:element_to_binary(xmpp:encode(#starttls_proceed{})), - TLSSocket = (StateData#state.sockmod):starttls(Socket, TLSOpts, BProceed), - fsm_next_state(wait_for_stream, - StateData#state{socket = TLSSocket, - streamid = new_id(), - tls_enabled = true}); - _ -> - Lang = StateData#state.lang, - Txt = <<"Unsupported TLS transport">>, - send_element(StateData, xmpp:serr_policy_violation(Txt, Lang)), - {stop, normal, StateData} - end; -wait_for_feature_request(#compress{} = Comp, StateData) -> - Zlib = StateData#state.zlib, - SockMod = (StateData#state.sockmod):get_sockmod(StateData#state.socket), - if Zlib == true, (SockMod == gen_tcp) or (SockMod == fast_tls) -> - process_compression_request(Comp, wait_for_feature_request, StateData); - true -> - send_element(StateData, #compress_failure{reason = 'setup-failed'}), - fsm_next_state(wait_for_feature_request, StateData) - end; -wait_for_feature_request(timeout, StateData) -> - {stop, normal, StateData}; -wait_for_feature_request({xmlstreamend, _Name}, - StateData) -> - {stop, normal, StateData}; -wait_for_feature_request({xmlstreamerror, _}, - StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_feature_request(closed, StateData) -> - {stop, normal, StateData}; -wait_for_feature_request(stop, StateData) -> - {stop, normal, StateData}; -wait_for_feature_request(_Pkt, - #state{tls_required = TLSRequired, - tls_enabled = TLSEnabled} = StateData) - when TLSRequired and not TLSEnabled -> - Lang = StateData#state.lang, - Txt = <<"Use of STARTTLS required">>, - send_element(StateData, xmpp:serr_policy_violation(Txt, Lang)), - {stop, normal, StateData}; -wait_for_feature_request(Pkt, StateData) -> - process_unauthenticated_stanza(StateData, Pkt), - fsm_next_state(wait_for_feature_request, StateData). +authenticated_stream_features(#{server := Server}) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_post_auth_features, LServer, [], [LServer]). -wait_for_sasl_response({xmlstreamelement, El}, StateData) -> - decode_element(El, wait_for_sasl_response, StateData); -wait_for_sasl_response(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> - fsm_next_state(wait_for_sasl_response, - dispatch_stream_mgmt(Pkt, StateData)); -wait_for_sasl_response(#sasl_response{text = ClientIn}, StateData) -> - case cyrsasl:server_step(StateData#state.sasl_state, ClientIn) of - {ok, Props} -> - catch (StateData#state.sockmod):reset_stream(StateData#state.socket), - U = identity(Props), - AuthModule = proplists:get_value(auth_module, Props, <<>>), - ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s", - [StateData#state.socket, U, AuthModule, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [true, U, StateData#state.server, - StateData#state.ip]), - send_element(StateData, #sasl_success{}), - fsm_next_state(wait_for_stream, - StateData#state{streamid = new_id(), - authenticated = true, - auth_module = AuthModule, - sasl_state = undefined, - user = U}); - {ok, Props, ServerOut} -> - (StateData#state.sockmod):reset_stream(StateData#state.socket), - U = identity(Props), - AuthModule = proplists:get_value(auth_module, Props, undefined), - ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s", - [StateData#state.socket, U, AuthModule, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [true, U, StateData#state.server, - StateData#state.ip]), - send_element(StateData, #sasl_success{text = ServerOut}), - fsm_next_state(wait_for_stream, - StateData#state{streamid = new_id(), - authenticated = true, - auth_module = AuthModule, - sasl_state = undefined, - user = U}); - {continue, ServerOut, NewSASLState} -> - send_element(StateData, #sasl_challenge{text = ServerOut}), - fsm_next_state(wait_for_sasl_response, - StateData#state{sasl_state = NewSASLState}); - {error, Error, Username} -> - ?INFO_MSG("(~w) Failed authentication for ~s@~s from ~s", - [StateData#state.socket, - Username, StateData#state.server, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [false, Username, StateData#state.server, - StateData#state.ip]), - send_element(StateData, #sasl_failure{reason = Error}), - fsm_next_state(wait_for_feature_request, StateData); - {error, Error} -> - send_element(StateData, #sasl_failure{reason = Error}), - fsm_next_state(wait_for_feature_request, StateData) - end; -wait_for_sasl_response(timeout, StateData) -> - {stop, normal, StateData}; -wait_for_sasl_response({xmlstreamend, _Name}, - StateData) -> - {stop, normal, StateData}; -wait_for_sasl_response({xmlstreamerror, _}, - StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_sasl_response(closed, StateData) -> - {stop, normal, StateData}; -wait_for_sasl_response(stop, StateData) -> - {stop, normal, StateData}; -wait_for_sasl_response(Pkt, StateData) -> - process_unauthenticated_stanza(StateData, Pkt), - fsm_next_state(wait_for_feature_request, StateData). +sasl_mechanisms(#{server := Server}) -> + cyrsasl:listmech(jid:nameprep(Server)). --spec resource_conflict_action(binary(), binary(), binary()) -> - {accept_resource, binary()} | closenew. -resource_conflict_action(U, S, R) -> - OptionRaw = case ejabberd_sm:is_existing_resource(U, S, R) of - true -> - ejabberd_config:get_option( - {resource_conflict, S}, - fun(setresource) -> setresource; - (closeold) -> closeold; - (closenew) -> closenew; - (acceptnew) -> acceptnew - end); - false -> - acceptnew - end, - Option = case OptionRaw of - setresource -> setresource; - closeold -> - acceptnew; %% ejabberd_sm will close old session - closenew -> closenew; - acceptnew -> acceptnew; - _ -> acceptnew %% default ejabberd behavior - end, - case Option of - acceptnew -> {accept_resource, R}; - closenew -> closenew; - setresource -> - Rnew = new_uniq_id(), - {accept_resource, Rnew} - end. +init_sasl(#{server := Server}) -> + LServer = jid:nameprep(Server), + cyrsasl:server_new( + <<"jabber">>, LServer, <<"">>, [], + fun(U) -> + ejabberd_auth:get_password_with_authmodule(U, LServer) + end, + fun(U, AuthzId, P) -> + ejabberd_auth:check_password_with_authmodule(U, AuthzId, LServer, P) + end, + fun(U, AuthzId, P, D, DG) -> + ejabberd_auth:check_password_with_authmodule(U, AuthzId, LServer, P, D, DG) + end). --spec decode_element(xmlel(), state_name(), state()) -> fsm_transition(). -decode_element(#xmlel{} = El, StateName, StateData) -> - try case xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of - #iq{sub_els = [_], type = T} = Pkt when T == set; T == get -> - NewPkt = xmpp:decode_els( - Pkt, ?NS_CLIENT, - fun(SubEl) when StateName == session_established -> - case xmpp:get_ns(SubEl) of - ?NS_PRIVACY -> true; - ?NS_BLOCKING -> true; - _ -> false - end; - (SubEl) -> - xmpp:is_known_tag(SubEl) - end), - ?MODULE:StateName(NewPkt, StateData); - Pkt -> - ?MODULE:StateName(Pkt, StateData) - end - catch error:{xmpp_codec, Why} -> - NS = xmpp:get_ns(El), - fsm_next_state( - StateName, - case xmpp:is_stanza(El) of - true -> - Lang = xmpp:get_lang(El), - Txt = xmpp:format_error(Why), - send_error(StateData, El, xmpp:err_bad_request(Txt, Lang)); - false when NS == ?NS_STREAM_MGMT_2; NS == ?NS_STREAM_MGMT_3 -> - Err = #sm_failed{reason = 'bad-request', xmlns = NS}, - send_element(StateData, Err), - StateData; - false -> - StateData - end) +bind(<<"">>, State) -> + bind(new_uniq_id(), State); +bind(R, #{user := U, server := S} = State) -> + case resource_conflict_action(U, S, R) of + closenew -> + {error, xmpp:err_conflict(), State}; + {accept_resource, Resource} -> + open_session(State, Resource) end. -wait_for_bind({xmlstreamelement, El}, StateData) -> - decode_element(El, wait_for_bind, StateData); -wait_for_bind(#sm_resume{} = Pkt, StateData) -> - case handle_resume(StateData, Pkt) of - {ok, ResumedState} -> - fsm_next_state(session_established, ResumedState); - error -> - fsm_next_state(wait_for_bind, StateData) - end; -wait_for_bind(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> - fsm_next_state(wait_for_bind, dispatch_stream_mgmt(Pkt, StateData)); -wait_for_bind(#iq{type = set, - sub_els = [#bind{resource = R0}]} = IQ, StateData) -> - U = StateData#state.user, - R = case R0 of - <<>> -> new_uniq_id(); - _ -> R0 - end, - case resource_conflict_action(U, StateData#state.server, R) of - closenew -> - Err = xmpp:make_error(IQ, xmpp:err_conflict()), - send_element(StateData, Err), - fsm_next_state(wait_for_bind, StateData); - {accept_resource, R2} -> - JID = jid:make(U, StateData#state.server, R2), - StateData2 = StateData#state{resource = R2, jid = JID}, - case open_session(StateData2) of - {ok, StateData3} -> - Res = xmpp:make_iq_result(IQ, #bind{jid = JID}), - try - send_element(StateData3, Res) - catch - exit:normal -> close(self()) - end, - fsm_next_state_pack(session_established,StateData3); - {error, Error} -> - Err = xmpp:make_error(IQ, Error), - send_element(StateData, Err), - fsm_next_state(wait_for_bind, StateData) +handle_stream_start(#{server := Server, ip := IP, lang := Lang} = State) -> + LServer = jid:nameprep(Server), + case lists:member(LServer, ?MYHOSTS) of + false -> + xmpp_stream_in:send(State, xmpp:serr_host_unknown()); + true -> + case check_bl_c2s(IP, Lang) of + false -> + change_shaper(State), + {noreply, State}; + {true, LogReason, ReasonT} -> + ?INFO_MSG("Connection attempt from blacklisted IP ~s: ~s", + [jlib:ip_to_list(IP), LogReason]), + Err = xmpp:serr_policy_violation(ReasonT, Lang), + xmpp_stream_in:send(State, Err) end - end; -wait_for_bind(#compress{} = Comp, StateData) -> - Zlib = StateData#state.zlib, - SockMod = (StateData#state.sockmod):get_sockmod(StateData#state.socket), - if Zlib == true, (SockMod == gen_tcp) or (SockMod == fast_tls) -> - process_compression_request(Comp, wait_for_bind, StateData); - true -> - send_element(StateData, #compress_failure{reason = 'setup-failed'}), - fsm_next_state(wait_for_bind, StateData) - end; -wait_for_bind(timeout, StateData) -> - {stop, normal, StateData}; -wait_for_bind({xmlstreamend, _Name}, StateData) -> - {stop, normal, StateData}; -wait_for_bind({xmlstreamerror, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_bind(closed, StateData) -> - {stop, normal, StateData}; -wait_for_bind(stop, StateData) -> - {stop, normal, StateData}; -wait_for_bind(Pkt, StateData) -> - fsm_next_state( - wait_for_bind, - case xmpp:is_stanza(Pkt) of - true -> - send_error(StateData, Pkt, xmpp:err_not_acceptable()); - false -> - StateData - end). - --spec open_session(state()) -> {ok, state()} | {error, stanza_error()}. -open_session(StateData) -> - U = StateData#state.user, - R = StateData#state.resource, - JID = StateData#state.jid, - Lang = StateData#state.lang, - IP = StateData#state.ip, - case acl:access_matches(StateData#state.access, - #{usr => jid:split(JID), ip => IP}, - StateData#state.server) of - allow -> - ?INFO_MSG("(~w) Opened session for ~s", - [StateData#state.socket, jid:to_string(JID)]), - change_shaper(StateData, JID), - {Fs, Ts} = ejabberd_hooks:run_fold( - roster_get_subscription_lists, - StateData#state.server, - {[], []}, - [U, StateData#state.server]), - LJID = jid:tolower(jid:remove_resource(JID)), - Fs1 = [LJID | Fs], - Ts1 = [LJID | Ts], - PrivList = - ejabberd_hooks:run_fold( - privacy_get_user_list, - StateData#state.server, - #userlist{}, - [U, StateData#state.server]), - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], - ejabberd_sm:open_session( - StateData#state.sid, U, StateData#state.server, R, Info), - UpdatedStateData = - StateData#state{ - conn = Conn, - pres_f = ?SETS:from_list(Fs1), - pres_t = ?SETS:from_list(Ts1), - privacy_list = PrivList}, - {ok, UpdatedStateData}; - _ -> - ejabberd_hooks:run(forbidden_session_hook, - StateData#state.server, [JID]), - ?INFO_MSG("(~w) Forbidden session for ~s", - [StateData#state.socket, jid:to_string(JID)]), - Txt = <<"Denied by ACL">>, - {error, xmpp:err_not_allowed(Txt, Lang)} end. -session_established({xmlstreamelement, El}, StateData) -> - decode_element(El, session_established, StateData); -session_established(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> - fsm_next_state(session_established, dispatch_stream_mgmt(Pkt, StateData)); -session_established(#csi{type = active}, StateData) -> - NewStateData = csi_flush_queue(StateData), - fsm_next_state(session_established, NewStateData#state{csi_state = active}); -session_established(#csi{type = inactive}, StateData) -> - fsm_next_state(session_established, StateData#state{csi_state = inactive}); -%% We hibernate the process to reduce memory consumption after a -%% configurable activity timeout -session_established(timeout, StateData) -> - Options = [], - proc_lib:hibernate(?GEN_FSM, enter_loop, - [?MODULE, Options, session_established, StateData]), - fsm_next_state(session_established, StateData); -session_established({xmlstreamend, _Name}, StateData) -> - {stop, normal, StateData}; -session_established({xmlstreamerror, - <<"XML stanza is too big">> = E}, - StateData) -> - send_element(StateData, - xmpp:serr_policy_violation(E, StateData#state.lang)), - {stop, normal, StateData}; -session_established({xmlstreamerror, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -session_established(closed, #state{mgmt_state = active} = StateData) -> - catch (StateData#state.sockmod):close(StateData#state.socket), - fsm_next_state(wait_for_resume, StateData); -session_established(closed, StateData) -> - {stop, normal, StateData}; -session_established(stop, StateData) -> - {stop, normal, StateData}; -session_established(Pkt, StateData) when ?is_stanza(Pkt) -> - FromJID = StateData#state.jid, - case check_from(Pkt, FromJID) of - 'invalid-from' -> - send_element(StateData, xmpp:serr_invalid_from()), - {stop, normal, StateData}; - _ -> - NewStateData = update_num_stanzas_in(StateData, Pkt), - session_established2(Pkt, NewStateData) - end; -session_established(_Pkt, StateData) -> - fsm_next_state(session_established, StateData). +handle_stream_end(State) -> + {stop, normal, State}. + +handle_stream_close(State) -> + {stop, normal, State}. + +handle_auth_success(User, Mech, AuthModule, + #{socket := Socket, ip := IP, server := Server} = State) -> + LServer = jid:nameprep(Server), + ?INFO_MSG("(~w) Accepted ~s authentication for ~s@~s by ~p from ~s", + [Socket, Mech, User, LServer, AuthModule, + ejabberd_config:may_hide_data(jlib:ip_to_list(IP))]), + State1 = State#{auth_module => AuthModule}, + ejabberd_hooks:run_fold(c2s_auth_result, LServer, + {noreply, State1}, [true, User]). + +handle_auth_failure(User, Mech, Reason, + #{socket := Socket, ip := IP, server := Server} = State) -> + LServer = jid:nameprep(Server), + ?INFO_MSG("(~w) Failed ~s authentication ~sfrom ~s: ~s", + [Socket, Mech, + if User /= <<"">> -> ["for ", User, "@", LServer, " "]; + true -> "" + end, + ejabberd_config:may_hide_data(jlib:ip_to_list(IP)), Reason]), + ejabberd_hooks:run_fold(c2s_auth_result, LServer, + {noreply, State}, [false, User]). + +handle_unbinded_packet(Pkt, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_unbinded_packet, LServer, + {noreply, State}, [Pkt]). + +handle_unauthenticated_packet(Pkt, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_unauthenticated_packet, + LServer, {noreply, State}, [Pkt]). + +handle_authenticated_packet(Pkt, #{server := Server} = State) when not ?is_stanza(Pkt) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_authenticated_packet, + LServer, {noreply, State}, [Pkt]); +handle_authenticated_packet(Pkt, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + case ejabberd_hooks:run_fold(c2s_authenticated_packet, + LServer, {noreply, State}, [Pkt]) of + {noreply, State1} -> + Pkt1 = ejabberd_hooks:run_fold(user_send_packet, LServer, Pkt, [State1]), + Res = case Pkt1 of + #presence{to = #jid{lresource = <<"">>}} -> + process_self_presence(State1, Pkt1); + #presence{} -> + process_presence_out(State1, Pkt1); + _ -> + check_privacy_then_route(State1, Pkt1) + end, + ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, Pkt}]), + Res; + Err -> + ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, Pkt}]), + Err + end. --spec session_established2(xmpp_element(), state()) -> fsm_next(). -%% Process packets sent by user (coming from user on c2s XMPP connection) -session_established2(Pkt, StateData) -> - User = StateData#state.user, - Server = StateData#state.server, - FromJID = StateData#state.jid, - ToJID = case xmpp:get_to(Pkt) of - undefined -> jid:make(User, Server, <<"">>); - J -> J - end, - Lang = case xmpp:get_lang(Pkt) of - <<"">> -> StateData#state.lang; - L -> L +init([State, Opts]) -> + Access = gen_mod:get_opt(access, Opts, fun acl:access_rules_validator/1, all), + Shaper = gen_mod:get_opt(shaper, Opts, fun acl:shaper_rules_validator/1, none), + TLSOpts = lists:filter( + fun({certfile, _}) -> true; + ({ciphers, _}) -> true; + ({dhfile, _}) -> true; + (_) -> false + end, Opts), + TLSRequired = proplists:get_bool(starttls_required, Opts), + TLSVerify = proplists:get_bool(tls_verify, Opts), + State1 = State#{tls_options => TLSOpts, + tls_required => TLSRequired, + tls_verify => TLSVerify, + pres_a => ?SETS:new(), + pres_f => ?SETS:new(), + pres_t => ?SETS:new(), + sid => ejabberd_sm:make_sid(), + lang => ?MYLANG, + server => ?MYNAME, + access => Access, + shaper => Shaper}, + ejabberd_hooks:run_fold(c2s_init, {ok, State1}, []). + +handle_call(get_presence, _From, + #{user := U, server := S, resource := R} = State) -> + Pres = case maps:get(pres_last, State, undefined) of + undefined -> + From = jid:make(U, S, R), + To = jid:remove_resource(From), + #presence{from = From, to = To, type = unavailable}; + P -> + P end, - NewPkt = xmpp:set_lang(Pkt, Lang), - NewState = - case NewPkt of - #presence{} -> - Presence0 = ejabberd_hooks:run_fold( - c2s_update_presence, Server, NewPkt, - [User, Server]), - Presence = ejabberd_hooks:run_fold( - user_send_packet, Server, Presence0, - [StateData, FromJID, ToJID]), - case ToJID of - #jid{user = User, server = Server, resource = <<"">>} -> - ?DEBUG("presence_update(~p,~n\t~p,~n\t~p)", - [FromJID, Presence, StateData]), - presence_update(FromJID, Presence, - StateData); - _ -> - presence_track(FromJID, ToJID, Presence, - StateData) - end; - #iq{type = T, sub_els = [El]} when T == set; T == get -> - NS = xmpp:get_ns(El), - if NS == ?NS_BLOCKING; NS == ?NS_PRIVACY -> - IQ = xmpp:set_from_to(Pkt, FromJID, ToJID), - process_privacy_iq(IQ, StateData); - NS == ?NS_SESSION -> - Res = xmpp:make_iq_result(Pkt), - send_stanza(StateData, Res); - true -> - NewPkt0 = ejabberd_hooks:run_fold( - user_send_packet, Server, NewPkt, - [StateData, FromJID, ToJID]), - check_privacy_route(FromJID, StateData, FromJID, - ToJID, NewPkt0) - end; - _ -> - NewPkt0 = ejabberd_hooks:run_fold( - user_send_packet, Server, NewPkt, - [StateData, FromJID, ToJID]), - check_privacy_route(FromJID, StateData, FromJID, - ToJID, NewPkt0) - end, - ejabberd_hooks:run(c2s_loop_debug, - [{xmlstreamelement, Pkt}]), - fsm_next_state(session_established, NewState). - -wait_for_resume({xmlstreamelement, _El} = Event, StateData) -> - Result = session_established(Event, StateData), - fsm_next_state(wait_for_resume, element(3, Result)); -wait_for_resume(timeout, StateData) -> - ?DEBUG("Timed out waiting for resumption of stream for ~s", - [jid:to_string(StateData#state.jid)]), - {stop, normal, StateData#state{mgmt_state = timeout}}; -wait_for_resume(Event, StateData) -> - ?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]), - fsm_next_state(wait_for_resume, StateData). - -handle_event(_Event, StateName, StateData) -> - fsm_next_state(StateName, StateData). - -handle_sync_event({get_presence}, _From, StateName, - StateData) -> - User = StateData#state.user, - PresLast = StateData#state.pres_last, - Show = get_showtag(PresLast), - Status = get_statustag(PresLast), - Resource = StateData#state.resource, - Reply = {User, Resource, Show, Status}, - fsm_reply(Reply, StateName, StateData); -handle_sync_event({get_last_presence}, _From, StateName, - StateData) -> - User = StateData#state.user, - Server = StateData#state.server, - PresLast = StateData#state.pres_last, - Resource = StateData#state.resource, - Reply = {User, Server, Resource, PresLast}, - fsm_reply(Reply, StateName, StateData); - -handle_sync_event(get_subscribed, _From, StateName, - StateData) -> - Subscribed = (?SETS):to_list(StateData#state.pres_f), - {reply, Subscribed, StateName, StateData}; -handle_sync_event({resume_session, Time}, _From, _StateName, - StateData) when element(1, StateData#state.sid) == Time -> - %% The old session should be closed before the new one is opened, so we do - %% this here instead of leaving it to the terminate callback - ejabberd_sm:close_session(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource), - {stop, normal, {resume, StateData}, StateData#state{mgmt_state = resumed}}; -handle_sync_event({resume_session, _Time}, _From, StateName, - StateData) -> - {reply, {error, <<"Previous session not found">>}, StateName, StateData}; -handle_sync_event(_Event, _From, StateName, - StateData) -> - Reply = ok, fsm_reply(Reply, StateName, StateData). - -code_change(_OldVsn, StateName, StateData, _Extra) -> - {ok, StateName, StateData}. - -handle_info({send_text, Text}, StateName, StateData) -> - send_text(StateData, Text), - ejabberd_hooks:run(c2s_loop_debug, [Text]), - fsm_next_state(StateName, StateData); -handle_info(replaced, StateName, StateData) -> - Lang = StateData#state.lang, - Pkt = xmpp:serr_conflict(<<"Replaced by new connection">>, Lang), - handle_info({kick, replaced, Pkt}, StateName, StateData); -handle_info(kick, StateName, StateData) -> - Lang = StateData#state.lang, - Pkt = xmpp:serr_policy_violation(<<"has been kicked">>, Lang), - handle_info({kick, kicked_by_admin, Pkt}, StateName, StateData); -handle_info({kick, Reason, Pkt}, _StateName, StateData) -> - send_element(StateData, Pkt), - {stop, normal, - StateData#state{authenticated = Reason}}; -handle_info({route, _From, _To, {broadcast, Data}}, - StateName, StateData) -> - ?DEBUG("broadcast~n~p~n", [Data]), - case Data of - {item, IJID, ISubscription} -> - fsm_next_state(StateName, - roster_change(IJID, ISubscription, StateData)); - {exit, Reason} -> - Lang = StateData#state.lang, - send_element(StateData, xmpp:serr_conflict(Reason, Lang)), - {stop, normal, StateData}; - {privacy_list, PrivList, PrivListName} -> - case ejabberd_hooks:run_fold(privacy_updated_list, - StateData#state.server, - false, - [StateData#state.privacy_list, - PrivList]) of - false -> - fsm_next_state(StateName, StateData); - NewPL -> - PrivPushIQ = - #iq{type = set, - from = jid:remove_resource(StateData#state.jid), - to = StateData#state.jid, - id = <<"push", (randoms:get_string())/binary>>, - sub_els = [#privacy_query{ - lists = [#privacy_list{ - name = PrivListName}]}]}, - NewState = send_stanza(StateData, PrivPushIQ), - fsm_next_state(StateName, - NewState#state{privacy_list = NewPL}) - end; - {blocking, What} -> - NewState = route_blocking(What, StateData), - fsm_next_state(StateName, NewState); - _ -> - fsm_next_state(StateName, StateData) - end; -%% Process Packets that are to be send to the user -handle_info({route, From, To, Packet}, StateName, StateData) when ?is_stanza(Packet) -> - {Pass, NewState} = - case Packet of - #presence{type = T} -> - State = ejabberd_hooks:run_fold(c2s_presence_in, - StateData#state.server, - StateData, - [{From, To, Packet}]), - case T of - probe -> - LFrom = jid:tolower(From), - LBFrom = jid:remove_resource(LFrom), - NewStateData = - case (?SETS):is_element(LFrom, State#state.pres_a) - orelse (?SETS):is_element(LBFrom, State#state.pres_a) of - true -> State; - false -> - case (?SETS):is_element(LFrom, State#state.pres_f) of - true -> - A = (?SETS):add_element(LFrom, State#state.pres_a), - State#state{pres_a = A}; - false -> - case (?SETS):is_element(LBFrom, State#state.pres_f) of - true -> - A = (?SETS):add_element(LBFrom, State#state.pres_a), - State#state{pres_a = A}; - false -> - State - end - end - end, - process_presence_probe(From, To, NewStateData), - {false, NewStateData}; - error -> - NewA = ?SETS:del_element(jid:tolower(From), State#state.pres_a), - {true, State#state{pres_a = NewA}}; - subscribe -> - SRes = is_privacy_allow(State, From, To, Packet, in), - {SRes, State}; - subscribed -> - SRes = is_privacy_allow(State, From, To, Packet, in), - {SRes, State}; - unsubscribe -> - SRes = is_privacy_allow(State, From, To, Packet, in), - {SRes, State}; - unsubscribed -> - SRes = is_privacy_allow(State, From, To, Packet, in), - {SRes, State}; - _ -> - case privacy_check_packet(State, From, To, Packet, in) of - allow -> - LFrom = jid:tolower(From), - LBFrom = jid:remove_resource(LFrom), - case (?SETS):is_element(LFrom, State#state.pres_a) - orelse (?SETS):is_element(LBFrom, State#state.pres_a) of - true -> - {true, State}; - false -> - case (?SETS):is_element(LFrom, State#state.pres_f) of - true -> - A = (?SETS):add_element(LFrom, State#state.pres_a), - {true, State#state{pres_a = A}}; - false -> - case (?SETS):is_element(LBFrom, - State#state.pres_f) of - true -> - A = (?SETS):add_element( - LBFrom, - State#state.pres_a), - {true, State#state{pres_a = A}}; - false -> - {true, State} - end - end - end; - deny -> {false, State} - end - end; - #iq{type = T} -> - case xmpp:has_subtag(Packet, #last{}) of - true when T == get; T == set -> - LFrom = jid:tolower(From), - LBFrom = jid:remove_resource(LFrom), - HasFromSub = ((?SETS):is_element(LFrom, StateData#state.pres_f) - orelse (?SETS):is_element(LBFrom, StateData#state.pres_f)) - andalso is_privacy_allow(StateData, To, From, #presence{}, out), - case HasFromSub of - true -> - case privacy_check_packet( - StateData, From, To, Packet, in) of - allow -> - {true, StateData}; - deny -> - ejabberd_router:route_error( - To, From, Packet, - xmpp:err_service_unavailable()), - {false, StateData} - end; - _ -> - ejabberd_router:route_error( - To, From, Packet, xmpp:err_forbidden()), - {false, StateData} - end; - _ -> - case privacy_check_packet(StateData, From, To, Packet, in) of - allow -> - {true, StateData}; - deny -> - ejabberd_router:route_error( - To, From, Packet, xmpp:err_service_unavailable()), - {false, StateData} - end - end; - #message{type = T} -> - case privacy_check_packet(StateData, From, To, Packet, in) of - allow -> - {true, StateData}; - deny -> - case T of - groupchat -> ok; - headline -> ok; - _ -> - case xmpp:has_subtag(Packet, #muc_user{}) of - true -> - ok; - false -> - ejabberd_router:route_error( - To, From, Packet, xmpp:err_service_unavailable()) - end - end, - {false, StateData} - end - end, + {reply, Pres, State}; +handle_call(get_subscribed, _From, #{pres_f := PresF} = State) -> + Subscribed = ?SETS:to_list(PresF), + {reply, Subscribed, State}; +handle_call(Request, From, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold( + c2s_handle_call, LServer, {noreply, State}, [Request, From]). + +handle_cast(closed, State) -> + handle_stream_close(State); +handle_cast(Msg, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_handle_cast, LServer, {noreply, State}, [Msg]). + +handle_info({route, From, To, Packet0}, #{server := Server} = State) -> + Packet = xmpp:set_from_to(Packet0, From, To), + LServer = jid:nameprep(Server), + {Pass, NewState} = case Packet of + #presence{} -> + process_presence_in(State, Packet); + #message{} -> + process_message_in(State, Packet); + #iq{} -> + process_iq_in(State, Packet) + end, if Pass -> - FixedPacket0 = xmpp:set_from_to(Packet, From, To), - FixedPacket = ejabberd_hooks:run_fold( - user_receive_packet, - NewState#state.server, - FixedPacket0, - [NewState, NewState#state.jid, From, To]), - SentStateData = send_packet(NewState, FixedPacket), + LServer = jid:nameprep(Server), + Packet1 = ejabberd_hooks:run_fold( + user_receive_packet, LServer, Packet, [NewState]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, SentStateData); - true -> - ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState) - end; -handle_info({'DOWN', Monitor, _Type, _Object, _Info}, - _StateName, StateData) - when Monitor == StateData#state.socket_monitor -> - if StateData#state.mgmt_state == active; - StateData#state.mgmt_state == pending -> - fsm_next_state(wait_for_resume, StateData); + xmpp_stream_in:send(NewState, Packet1); true -> - {stop, normal, StateData} + ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), + {noreply, NewState} end; -handle_info(system_shutdown, StateName, StateData) -> - case StateName of - wait_for_stream -> - send_header(StateData, ?MYNAME, {1,0}, <<"en">>), - send_element(StateData, xmpp:serr_system_shutdown()), - ok; - _ -> - send_element(StateData, xmpp:serr_system_shutdown()), - ok - end, - {stop, normal, StateData}; -handle_info({route_xmlstreamelement, El}, _StateName, StateData) -> - {next_state, NStateName, NStateData, _Timeout} = - session_established({xmlstreamelement, El}, StateData), - fsm_next_state(NStateName, NStateData); -handle_info({force_update_presence, LUser, LServer}, StateName, - #state{jid = #jid{luser = LUser, lserver = LServer}} = StateData) -> - NewStateData = case StateData#state.pres_last of - #presence{} -> - Presence = - ejabberd_hooks:run_fold(c2s_update_presence, - LServer, - StateData#state.pres_last, - [LUser, LServer]), - StateData2 = StateData#state{pres_last = Presence}, - presence_update(StateData2#state.jid, Presence, - StateData2), - StateData2; - undefined -> StateData - end, - fsm_next_state(StateName, NewStateData); -handle_info({send_filtered, Feature, From, To, Packet}, StateName, StateData) -> - Drop = ejabberd_hooks:run_fold(c2s_filter_packet, StateData#state.server, - true, [StateData#state.server, StateData, - Feature, To, Packet]), - NewStateData = if Drop -> - ?DEBUG("Dropping packet from ~p to ~p", - [jid:to_string(From), - jid:to_string(To)]), - StateData; - true -> - FinalPacket = xmpp:set_from_to(Packet, From, To), - case StateData#state.jid of - To -> - case privacy_check_packet(StateData, From, To, - FinalPacket, in) of - deny -> - StateData; - allow -> - send_stanza(StateData, FinalPacket) - end; - _ -> - ejabberd_router:route(From, To, FinalPacket), - StateData - end - end, - fsm_next_state(StateName, NewStateData); -handle_info({broadcast, Type, From, Packet}, StateName, StateData) -> - Recipients = ejabberd_hooks:run_fold( - c2s_broadcast_recipients, StateData#state.server, - [], - [StateData#state.server, StateData, Type, From, Packet]), - lists:foreach( - fun(USR) -> - ejabberd_router:route( - From, jid:make(USR), Packet) - end, lists:usort(Recipients)), - fsm_next_state(StateName, StateData); -handle_info({set_csi_state, CsiState}, StateName, StateData) -> - fsm_next_state(StateName, StateData#state{csi_state = CsiState}); -handle_info({set_resume_timeout, Timeout}, StateName, StateData) -> - fsm_next_state(StateName, StateData#state{mgmt_timeout = Timeout}); -handle_info(dont_ask_offline, StateName, StateData) -> - fsm_next_state(StateName, StateData#state{ask_offline = false}); -handle_info(close, StateName, StateData) -> - ?DEBUG("Timeout waiting for stream management acknowledgement of ~s", - [jid:to_string(StateData#state.jid)]), - close(self()), - fsm_next_state(StateName, StateData#state{mgmt_ack_timer = undefined}); -handle_info({_Ref, {resume, OldStateData}}, StateName, StateData) -> - %% This happens if the resume_session/1 request timed out; the new session - %% now receives the late response. - ?DEBUG("Received old session state for ~s after failed resumption", - [jid:to_string(OldStateData#state.jid)]), - handle_unacked_stanzas(OldStateData#state{mgmt_resend = false}), - fsm_next_state(StateName, StateData); -handle_info(Info, StateName, StateData) -> - ?ERROR_MSG("Unexpected info: ~p", [Info]), - fsm_next_state(StateName, StateData). +handle_info(system_shutdown, State) -> + xmpp_stream_in:send(State, xmpp:serr_system_shutdown()); +handle_info(Info, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_handle_info, LServer, {noreply, State}, [Info]). --spec print_state(state()) -> state(). -print_state(State = #state{pres_t = T, pres_f = F, pres_a = A}) -> - State#state{pres_t = {pres_t, (?SETS):size(T)}, - pres_f = {pres_f, (?SETS):size(F)}, - pres_a = {pres_a, (?SETS):size(A)}}. - -terminate(_Reason, StateName, StateData) -> - case StateData#state.mgmt_state of - resumed -> - ?INFO_MSG("Closing former stream of resumed session for ~s", - [jid:to_string(StateData#state.jid)]); - _ -> - if StateName == session_established; - StateName == wait_for_resume -> - case StateData#state.authenticated of - replaced -> - ?INFO_MSG("(~w) Replaced session for ~s", - [StateData#state.socket, - jid:to_string(StateData#state.jid)]), - From = StateData#state.jid, - Lang = StateData#state.lang, - Status = <<"Replaced by new connection">>, - Packet = #presence{ - type = unavailable, - status = xmpp:mk_text(Status, Lang)}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - Status), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet); - _ -> - ?INFO_MSG("(~w) Close session for ~s", - [StateData#state.socket, - jid:to_string(StateData#state.jid)]), - EmptySet = (?SETS):new(), - case StateData of - #state{pres_last = undefined, pres_a = EmptySet} -> - ejabberd_sm:close_session(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource); - _ -> - From = StateData#state.jid, - Packet = #presence{type = unavailable}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - <<"">>), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet) - end, - case StateData#state.mgmt_state of - timeout -> - Info = [{num_stanzas_in, - StateData#state.mgmt_stanzas_in}], - ejabberd_sm:set_offline_info(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - Info); - _ -> - ok - end - end, - handle_unacked_stanzas(StateData), - bounce_messages(); - true -> - ok - end - end, - catch send_trailer(StateData), - (StateData#state.sockmod):close(StateData#state.socket), +terminate(_Reason, _State) -> ok. -%%%---------------------------------------------------------------------- -%%% Internal functions -%%%---------------------------------------------------------------------- --spec change_shaper(state(), jid()) -> ok. -change_shaper(StateData, JID) -> - Shaper = acl:access_matches(StateData#state.shaper, - #{usr => jid:split(JID), ip => StateData#state.ip}, - StateData#state.server), - (StateData#state.sockmod):change_shaper(StateData#state.socket, - Shaper). - --spec send_text(state(), iodata()) -> ok | {error, any()}. -send_text(StateData, Text) when StateData#state.mgmt_state == pending -> - ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]); -send_text(StateData, Text) when StateData#state.xml_socket -> - ?DEBUG("Send Text on stream = ~p", [Text]), - (StateData#state.sockmod):send_xml(StateData#state.socket, - {xmlstreamraw, Text}); -send_text(StateData, Text) when StateData#state.mgmt_state == active -> - ?DEBUG("Send XML on stream = ~p", [Text]), - case catch (StateData#state.sockmod):send(StateData#state.socket, Text) of - {'EXIT', _} -> - (StateData#state.sockmod):close(StateData#state.socket), - {error, closed}; - _ -> - ok - end; -send_text(StateData, Text) -> - ?DEBUG("Send XML on stream = ~p", [Text]), - (StateData#state.sockmod):send(StateData#state.socket, Text). - --spec send_element(state(), xmlel() | xmpp_element()) -> ok | {error, any()}. -send_element(StateData, El) when StateData#state.mgmt_state == pending -> - ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]); -send_element(StateData, #xmlel{} = El) when StateData#state.xml_socket -> - ?DEBUG("Send XML on stream = ~p", [fxml:element_to_binary(El)]), - (StateData#state.sockmod):send_xml(StateData#state.socket, - {xmlstreamelement, El}); -send_element(StateData, #xmlel{} = El) -> - send_text(StateData, fxml:element_to_binary(El)); -send_element(StateData, Pkt) -> - send_element(StateData, xmpp:encode(Pkt, ?NS_CLIENT)). - --spec send_error(state(), xmlel() | stanza(), stanza_error()) -> state(). -send_error(StateData, Stanza, Error) -> - Type = xmpp:get_type(Stanza), - if Type == error; Type == result; - Type == <<"error">>; Type == <<"result">> -> - StateData; - true -> - send_stanza(StateData, xmpp:make_error(Stanza, Error)) - end. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. --spec send_stanza(state(), xmpp_element()) -> state(). -send_stanza(StateData, Stanza) when StateData#state.csi_state == inactive -> - csi_filter_stanza(StateData, Stanza); -send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending -> - mgmt_queue_add(StateData, Stanza); -send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active -> - NewStateData = mgmt_queue_add(StateData, Stanza), - mgmt_send_stanza(NewStateData, Stanza); -send_stanza(StateData, Stanza) -> - send_element(StateData, Stanza), - StateData. +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec check_bl_c2s({inet:ip_address(), non_neg_integer()}, binary()) + -> false | {true, binary(), binary()}. +check_bl_c2s({IP, _Port}, Lang) -> + ejabberd_hooks:run_fold(check_bl_c2s, false, [IP, Lang]). --spec send_packet(state(), xmpp_element()) -> state(). -send_packet(StateData, Packet) -> - case xmpp:is_stanza(Packet) of - true -> - send_stanza(StateData, Packet); - false -> - send_element(StateData, Packet), - StateData +-spec open_session(state(), binary()) -> {ok, state()} | {error, stanza_error(), state()}. +open_session(#{user := U, server := S, sid := SID, + socket := Socket, ip := IP, auth_module := AuthMod, + access := Access, lang := Lang} = State, R) -> + JID = jid:make(U, S, R), + LServer = JID#jid.lserver, + case acl:access_matches(Access, + #{usr => jid:split(JID), ip => IP}, + LServer) of + allow -> + ?INFO_MSG("(~w) Opened session for ~s", + [Socket, jid:to_string(JID)]), + change_shaper(State), + Conn = get_conn_type(State), + Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthMod}], + ejabberd_sm:open_session(SID, U, LServer, R, Info), + State1 = State#{conn => Conn, resource => R, jid => JID}, + State2 = ejabberd_hooks:run_fold( + c2s_session_opened, LServer, State1, []), + {ok, State2}; + deny -> + ejabberd_hooks:run(forbidden_session_hook, LServer, [JID]), + ?INFO_MSG("(~w) Forbidden session for ~s", + [Socket, jid:to_string(JID)]), + Txt = <<"Denied by ACL">>, + {error, xmpp:err_not_allowed(Txt, Lang), State} end. --spec send_header(state(), binary(), binary(), binary()) -> ok | {error, any()}. -send_header(StateData, Server, Version, Lang) -> - Header = #xmlel{name = Name, attrs = Attrs} = - xmpp:encode(#stream_start{version = Version, - lang = Lang, - xmlns = ?NS_CLIENT, - stream_xmlns = ?NS_STREAM, - id = StateData#state.streamid, - from = jid:make(Server)}), - if StateData#state.xml_socket -> - (StateData#state.sockmod):send_xml(StateData#state.socket, - {xmlstreamstart, Name, Attrs}); - true -> - send_text(StateData, fxml:element_to_header(Header)) +-spec process_iq_in(state(), iq()) -> {boolean(), state()}. +process_iq_in(State, #iq{} = IQ) -> + case privacy_check_packet(State, IQ, in) of + allow -> + {true, State}; + deny -> + route_error(IQ, xmpp:err_service_unavailable()), + {false, State} end. --spec send_trailer(state()) -> ok | {error, any()}. -send_trailer(StateData) - when StateData#state.mgmt_state == pending -> - ?DEBUG("Cannot send stream trailer while waiting for resumption", []); -send_trailer(StateData) - when StateData#state.xml_socket -> - (StateData#state.sockmod):send_xml(StateData#state.socket, - {xmlstreamend, <<"stream:stream">>}); -send_trailer(StateData) -> - send_text(StateData, ?STREAM_TRAILER). - --spec new_id() -> binary(). -new_id() -> randoms:get_string(). - --spec new_uniq_id() -> binary(). -new_uniq_id() -> - iolist_to_binary([randoms:get_string(), - integer_to_binary(p1_time_compat:unique_integer([positive]))]). - --spec get_conn_type(state()) -> c2s | c2s_tls | c2s_compressed | websocket | - c2s_compressed_tls | http_bind. -get_conn_type(StateData) -> - case (StateData#state.sockmod):get_transport(StateData#state.socket) of - tcp -> c2s; - tls -> c2s_tls; - tcp_zlib -> c2s_compressed; - tls_zlib -> c2s_compressed_tls; - http_bind -> http_bind; - websocket -> websocket +-spec process_message_in(state(), message()) -> {boolean(), state()}. +process_message_in(State, #message{type = T} = Msg) -> + case privacy_check_packet(State, Msg, in) of + allow -> + {true, State}; + deny when T == groupchat; T == headline -> + ok; + deny -> + case xmpp:has_subtag(Msg, #muc_user{}) of + true -> + ok; + false -> + route_error(Msg, xmpp:err_service_unavailable()) + end, + {false, State} end. --spec process_presence_probe(jid(), jid(), state()) -> ok. -process_presence_probe(From, To, StateData) -> - LFrom = jid:tolower(From), - LBFrom = setelement(3, LFrom, <<"">>), - case StateData#state.pres_last of - undefined -> - ok; +-spec process_presence_in(state(), presence()) -> {boolean(), state()}. +process_presence_in(#{server := Server, pres_a := PresA} = State0, + #presence{from = From, to = To, type = T} = Pres) -> + LServer = jid:nameprep(Server), + State = ejabberd_hooks:run_fold(c2s_presence_in, LServer, State0, [Pres]), + case T of + probe -> + NewState = do_some_magic(State, From), + route_probe_reply(From, To, NewState), + {false, NewState}; + error -> + A = ?SETS:del_element(jid:tolower(From), PresA), + {true, State#{pres_a => A}}; _ -> - Cond = ((?SETS):is_element(LFrom, StateData#state.pres_f) - orelse - ((LFrom /= LBFrom) andalso - (?SETS):is_element(LBFrom, StateData#state.pres_f))), - if Cond -> - %% To is the one sending the presence (the probe target) - Packet = xmpp_util:add_delay_info( - StateData#state.pres_last, To, - StateData#state.pres_timestamp), - case privacy_check_packet(StateData, To, From, Packet, out) of - deny -> - ok; - allow -> - Pid=element(2, StateData#state.sid), - ejabberd_hooks:run(presence_probe_hook, StateData#state.server, [From, To, Pid]), - %% Don't route a presence probe to oneself - case From == To of - false -> - ejabberd_router:route(To, From, Packet); - true -> - ok - end - end; - true -> - ok + case privacy_check_packet(State, Pres, in) of + allow when T == error -> + {true, State}; + allow -> + NewState = do_some_magic(State, From), + {true, NewState}; + deny -> + {false, State} end end. -%% User updates his presence (non-directed presence packet) --spec presence_update(jid(), presence(), state()) -> state(). -presence_update(From, Packet, StateData) -> - #presence{type = Type} = Packet, - case Type of - unavailable -> - Status = xmpp:get_text(Packet#presence.status), - Info = [{ip, StateData#state.ip}, - {conn, StateData#state.conn}, - {auth_module, StateData#state.auth_module}], - ejabberd_sm:unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, Status, Info), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet), - StateData#state{pres_last = undefined, - pres_timestamp = undefined, pres_a = (?SETS):new()}; - error -> StateData; - probe -> StateData; - subscribe -> StateData; - subscribed -> StateData; - unsubscribe -> StateData; - unsubscribed -> StateData; - _ -> - OldPriority = case StateData#state.pres_last of - undefined -> 0; - OldPresence -> get_priority_from_presence(OldPresence) - end, - NewPriority = get_priority_from_presence(Packet), - update_priority(NewPriority, Packet, StateData), - FromUnavail = (StateData#state.pres_last == undefined), - ?DEBUG("from unavail = ~p~n", [FromUnavail]), - NewStateData = StateData#state{pres_last = Packet, - pres_timestamp = p1_time_compat:timestamp()}, - NewState = if FromUnavail -> - ejabberd_hooks:run(user_available_hook, - NewStateData#state.server, - [NewStateData#state.jid]), - ResentStateData = if NewPriority >= 0 -> - resend_offline_messages(NewStateData), - resend_subscription_requests(NewStateData); - true -> NewStateData - end, - presence_broadcast_first(From, ResentStateData, - Packet); +-spec route_probe_reply(jid(), jid(), state()) -> ok. +route_probe_reply(From, To, #{server := Server, pres_f := PresF, + pres_last := LastPres, + pres_timestamp := TS} = State) -> + LFrom = jid:tolower(From), + LBFrom = jid:remove_resource(LFrom), + case ?SETS:is_element(LFrom, PresF) + orelse ?SETS:is_element(LBFrom, PresF) of + true -> + %% To is my JID + Packet = xmpp_util:add_delay_info(LastPres, To, TS), + case privacy_check_packet(State, Packet, out) of + deny -> + ok; + allow -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run(presence_probe_hook, + LServer, + [From, To, self()]), + %% Don't route a presence probe to oneself + case From == To of + false -> + route(xmpp:set_from_to(Packet, To, From)); true -> - presence_broadcast_to_trusted(NewStateData, From, - NewStateData#state.pres_f, - NewStateData#state.pres_a, - Packet), - if OldPriority < 0, NewPriority >= 0 -> - resend_offline_messages(NewStateData); - true -> ok - end, - NewStateData - end, - NewState - end. + ok + end + end; + false -> + ok + end; +route_probe_reply(_, _, _) -> + ok. -%% User sends a directed presence packet --spec presence_track(jid(), jid(), presence(), state()) -> state(). -presence_track(From, To, Packet, StateData) -> - #presence{type = Type} = Packet, +-spec process_presence_out(state(), presence()) -> next_state(). +process_presence_out(#{user := User, server := Server, + lang := Lang, pres_a := PresA} = State, + #presence{from = From, to = To, type = Type} = Pres) -> + LServer = jid:nameprep(Server), LTo = jid:tolower(To), - User = StateData#state.user, - Server = StateData#state.server, - Lang = StateData#state.lang, - case privacy_check_packet(StateData, From, To, Packet, out) of + case privacy_check_packet(State, Pres, out) of deny -> ErrText = <<"Your active privacy list has denied " "the routing of this stanza.">>, Err = xmpp:err_not_acceptable(ErrText, Lang), - send_error(StateData, xmpp:set_from_to(Packet, From, To), Err); + xmpp_stream_in:send_error(State, Pres, Err); allow when Type == subscribe; Type == subscribed; Type == unsubscribe; Type == unsubscribed -> - Access = gen_mod:get_module_opt(Server, mod_roster, access, + Access = gen_mod:get_module_opt(LServer, mod_roster, access, fun(A) when is_atom(A) -> A end, all), MyBareJID = jid:make(User, Server, <<"">>), - case acl:match_rule(Server, Access, MyBareJID) of + case acl:match_rule(LServer, Access, MyBareJID) of deny -> ErrText = <<"Denied by ACL">>, Err = xmpp:err_forbidden(ErrText, Lang), - send_error(StateData, xmpp:set_from_to(Packet, From, To), Err); + xmpp_stream_in:send_error(State, Pres, Err); allow -> ejabberd_hooks:run(roster_out_subscription, - Server, + LServer, [User, Server, To, Type]), - ejabberd_router:route(jid:remove_resource(From), To, Packet), - StateData + BareFrom = jid:remove_resource(From), + route(xmpp:set_from_to(Pres, BareFrom, To)), + {noreply, State} end; allow when Type == error; Type == probe -> - ejabberd_router:route(From, To, Packet), - StateData; + route(Pres), + {noreply, State}; allow -> - ejabberd_router:route(From, To, Packet), + route(Pres), A = case Type of - available -> - ?SETS:add_element(LTo, StateData#state.pres_a); - unavailable -> - ?SETS:del_element(LTo, StateData#state.pres_a) + available -> ?SETS:add_element(LTo, PresA); + unavailable -> ?SETS:del_element(LTo, PresA) end, - StateData#state{pres_a = A} + {noreply, State#{pres_a => A}} end. --spec check_privacy_route(jid(), state(), jid(), jid(), stanza()) -> state(). -check_privacy_route(From, StateData, FromRoute, To, - Packet) -> - case privacy_check_packet(StateData, From, To, Packet, - out) of +-spec process_self_presence(state(), presence()) -> {noreply, state()}. +process_self_presence(#{ip := IP, conn := Conn, + auth_module := AuthMod, sid := SID, + user := U, server := S, resource := R} = State, + #presence{type = unavailable} = Pres) -> + Status = xmpp:get_text(Pres#presence.status), + Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthMod}], + ejabberd_sm:unset_presence(SID, U, S, R, Status, Info), + State1 = broadcast_presence_unavailable(State, Pres), + State2 = maps:remove(pres_last, maps:remove(pres_timestamp, State1)), + {noreply, State2}; +process_self_presence(#{server := Server} = State, + #presence{type = available} = Pres) -> + LServer = jid:nameprep(Server), + PreviousPres = maps:get(pres_last, State, undefined), + update_priority(State, Pres), + State1 = ejabberd_hooks:run_fold(user_available_hook, LServer, State, [Pres]), + State2 = State1#{pres_last => Pres, + pres_timestamp => p1_time_compat:timestamp()}, + FromUnavailable = PreviousPres == undefined, + State3 = broadcast_presence_available(State2, Pres, FromUnavailable), + {noreply, State3}; +process_self_presence(State, _Pres) -> + {noreply, State}. + +-spec update_priority(state(), presence()) -> ok. +update_priority(#{ip := IP, conn := Conn, auth_module := AuthMod, + sid := SID, user := U, server := S, resource := R}, + Pres) -> + Priority = get_priority_from_presence(Pres), + Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthMod}], + ejabberd_sm:set_presence(SID, U, S, R, Priority, Pres, Info). + +-spec broadcast_presence_unavailable(state(), presence()) -> state(). +broadcast_presence_unavailable(#{pres_a := PresA} = State, Pres) -> + JIDs = filter_blocked(State, Pres, PresA), + route_multiple(State, JIDs, Pres), + State#{pres_a => ?SETS:new()}. + +-spec broadcast_presence_available(state(), presence(), boolean()) -> state(). +broadcast_presence_available(#{pres_a := PresA, pres_f := PresF, + pres_t := PresT} = State, + Pres, _FromUnavailable = true) -> + Probe = #presence{type = probe}, + TJIDs = filter_blocked(State, Probe, PresT), + FJIDs = filter_blocked(State, Pres, PresF), + route_multiple(State, TJIDs, Probe), + route_multiple(State, FJIDs, Pres), + State#{pres_a => ?SETS:union(PresA, PresF)}; +broadcast_presence_available(#{pres_a := PresA, pres_f := PresF} = State, + Pres, _FromUnavailable = false) -> + JIDs = filter_blocked(State, Pres, ?SETS:intersection(PresA, PresF)), + route_multiple(State, JIDs, Pres), + State. + +-spec check_privacy_then_route(state(), stanza()) -> next_state(). +check_privacy_then_route(#{lang := Lang} = State, Pkt) -> + case privacy_check_packet(State, Pkt, out) of deny -> - Lang = StateData#state.lang, ErrText = <<"Your active privacy list has denied " "the routing of this stanza.">>, Err = xmpp:err_not_acceptable(ErrText, Lang), - send_error(StateData, xmpp:set_from_to(Packet, From, To), Err); + xmpp_stream_in:send_error(State, Pkt, Err); allow -> - ejabberd_router:route(FromRoute, To, Packet), - StateData - end. - -%% Check if privacy rules allow this delivery --spec privacy_check_packet(state(), jid(), jid(), stanza(), in | out) -> allow | deny. -privacy_check_packet(StateData, From, To, Packet, - Dir) -> - ejabberd_hooks:run_fold(privacy_check_packet, - StateData#state.server, allow, - [StateData#state.user, StateData#state.server, - StateData#state.privacy_list, {From, To, Packet}, - Dir]). - --spec is_privacy_allow(state(), jid(), jid(), stanza(), in | out) -> boolean(). -is_privacy_allow(StateData, From, To, Packet, Dir) -> - allow == - privacy_check_packet(StateData, From, To, Packet, Dir). - -%% Send presence when disconnecting --spec presence_broadcast(state(), jid(), ?SETS:set(), presence()) -> ok. -presence_broadcast(StateData, From, JIDSet, Packet) -> - JIDs = ?SETS:to_list(JIDSet), - JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out), - Server = StateData#state.server, - send_multiple(From, Server, JIDs2, Packet). - --spec presence_broadcast_to_trusted( - state(), jid(), ?SETS:set(), ?SETS:set(), presence()) -> ok. -%% Send presence when updating presence -presence_broadcast_to_trusted(StateData, From, Trusted, JIDSet, Packet) -> - JIDs = ?SETS:to_list(?SETS:intersection(Trusted, JIDSet)), - JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out), - Server = StateData#state.server, - send_multiple(From, Server, JIDs2, Packet). - -%% Send presence when connecting --spec presence_broadcast_first(jid(), state(), presence()) -> state(). -presence_broadcast_first(From, StateData, Packet) -> - JIDsProbe = - ?SETS:fold( - fun(JID, L) -> [JID | L] end, - [], - StateData#state.pres_t), - PacketProbe = #presence{type = probe}, - JIDs2Probe = format_and_check_privacy(From, StateData, PacketProbe, JIDsProbe, out), - Server = StateData#state.server, - send_multiple(From, Server, JIDs2Probe, PacketProbe), - {As, JIDs} = - ?SETS:fold( - fun(JID, {A, JID_list}) -> - {?SETS:add_element(JID, A), JID_list++[JID]} - end, - {StateData#state.pres_a, []}, - StateData#state.pres_f), - JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out), - send_multiple(From, Server, JIDs2, Packet), - StateData#state{pres_a = As}. - --spec format_and_check_privacy( - jid(), state(), stanza(), [ljid()], in | out) -> [jid()]. -format_and_check_privacy(From, StateData, Packet, JIDs, Dir) -> - FJIDs = [jid:make(JID) || JID <- JIDs], - lists:filter( - fun(FJID) -> - case ejabberd_hooks:run_fold( - privacy_check_packet, StateData#state.server, - allow, - [StateData#state.user, - StateData#state.server, - StateData#state.privacy_list, - {From, FJID, Packet}, - Dir]) of - deny -> false; - allow -> true - end - end, - FJIDs). - --spec send_multiple(jid(), binary(), [jid()], stanza()) -> ok. -send_multiple(From, Server, JIDs, Packet) -> - ejabberd_router_multicast:route_multicast(From, Server, JIDs, Packet). - --spec roster_change(jid(), both | from | none | remove | to, state()) -> state(). -roster_change(IJID, ISubscription, StateData) -> - LIJID = jid:tolower(IJID), - IsFrom = (ISubscription == both) or (ISubscription == from), - IsTo = (ISubscription == both) or (ISubscription == to), - OldIsFrom = (?SETS):is_element(LIJID, StateData#state.pres_f), - FSet = if - IsFrom -> (?SETS):add_element(LIJID, StateData#state.pres_f); - true -> ?SETS:del_element(LIJID, StateData#state.pres_f) - end, - TSet = if - IsTo -> (?SETS):add_element(LIJID, StateData#state.pres_t); - true -> ?SETS:del_element(LIJID, StateData#state.pres_t) - end, - case StateData#state.pres_last of - undefined -> - StateData#state{pres_f = FSet, pres_t = TSet}; - P -> - ?DEBUG("roster changed for ~p~n", - [StateData#state.user]), - From = StateData#state.jid, - To = jid:make(IJID), - Cond1 = IsFrom andalso not OldIsFrom, - Cond2 = not IsFrom andalso OldIsFrom andalso - ((?SETS):is_element(LIJID, StateData#state.pres_a)), - if Cond1 -> - ?DEBUG("C1: ~p~n", [LIJID]), - case privacy_check_packet(StateData, From, To, P, out) - of - deny -> ok; - allow -> ejabberd_router:route(From, To, P) - end, - A = (?SETS):add_element(LIJID, StateData#state.pres_a), - StateData#state{pres_a = A, pres_f = FSet, - pres_t = TSet}; - Cond2 -> - ?DEBUG("C2: ~p~n", [LIJID]), - PU = #presence{type = unavailable}, - case privacy_check_packet(StateData, From, To, PU, out) - of - deny -> ok; - allow -> ejabberd_router:route(From, To, PU) - end, - A = ?SETS:del_element(LIJID, StateData#state.pres_a), - StateData#state{pres_a = A, pres_f = FSet, - pres_t = TSet}; - true -> StateData#state{pres_f = FSet, pres_t = TSet} - end + route(Pkt), + {noreply, State} end. --spec update_priority(integer(), presence(), state()) -> ok. -update_priority(Priority, Packet, StateData) -> - Info = [{ip, StateData#state.ip}, {conn, StateData#state.conn}, - {auth_module, StateData#state.auth_module}], - ejabberd_sm:set_presence(StateData#state.sid, - StateData#state.user, StateData#state.server, - StateData#state.resource, Priority, Packet, Info). +-spec privacy_check_packet(state(), stanza(), in | out) -> allow | deny. +privacy_check_packet(#{server := Server} = State, Pkt, Dir) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(privacy_check_packet, LServer, allow, [State, Pkt, Dir]). -spec get_priority_from_presence(presence()) -> integer(). get_priority_from_presence(#presence{priority = Prio}) -> @@ -2021,817 +554,129 @@ get_priority_from_presence(#presence{priority = Prio}) -> _ -> Prio end. --spec process_privacy_iq(iq(), state()) -> state(). -process_privacy_iq(#iq{from = From, to = To, - type = Type, lang = Lang} = IQ, StateData) -> - Txt = <<"No module is handling this query">>, - {Res, NewStateData} = - case Type of - get -> - R = ejabberd_hooks:run_fold( - privacy_iq_get, - StateData#state.server, - {error, xmpp:err_feature_not_implemented(Txt, Lang)}, - [IQ, StateData#state.privacy_list]), - {R, StateData}; - set -> - case ejabberd_hooks:run_fold( - privacy_iq_set, - StateData#state.server, - {error, xmpp:err_feature_not_implemented(Txt, Lang)}, - [IQ, StateData#state.privacy_list]) - of - {result, R, NewPrivList} -> - {{result, R}, - StateData#state{privacy_list = - NewPrivList}}; - R -> {R, StateData} - end - end, - IQRes = case Res of - {result, Result} -> - xmpp:make_iq_result(IQ, Result); - {error, Error} -> - xmpp:make_error(IQ, Error) - end, - ejabberd_router:route(To, From, IQRes), - NewStateData. +-spec filter_blocked(state(), presence(), ?SETS:set()) -> [jid()]. +filter_blocked(#{user := U, server := S, resource := R} = State, + Pres, LJIDSet) -> + From = jid:make(U, S, R), + ?SETS:fold( + fun(LJID, Acc) -> + To = jid:make(LJID), + Pkt = xmpp:set_from_to(Pres, From, To), + case privacy_check_packet(State, Pkt, out) of + allow -> [To|Acc]; + deny -> Acc + end + end, [], LJIDSet). + +-spec route(stanza()) -> ok. +route(Pkt) -> + From = xmpp:get_from(Pkt), + To = xmpp:get_to(Pkt), + ejabberd_router:route(From, To, Pkt). + +-spec route_error(stanza(), stanza_error()) -> ok. +route_error(Pkt, Err) -> + From = xmpp:get_from(Pkt), + To = xmpp:get_to(Pkt), + ejabberd_router:route_error(To, From, Pkt, Err). + +-spec route_multiple(state(), [jid()], stanza()) -> ok. +route_multiple(#{server := Server}, JIDs, Pkt) -> + LServer = jid:nameprep(Server), + From = xmpp:get_from(Pkt), + ejabberd_router_multicast:route_multicast(From, LServer, JIDs, Pkt). --spec resend_offline_messages(state()) -> ok. -resend_offline_messages(#state{ask_offline = true} = StateData) -> - case ejabberd_hooks:run_fold(resend_offline_messages_hook, - StateData#state.server, [], - [StateData#state.user, StateData#state.server]) - of - Rs -> %%when is_list(Rs) -> - lists:foreach(fun ({route, From, To, Packet}) -> - Pass = case privacy_check_packet(StateData, - From, To, - Packet, in) - of - allow -> true; - deny -> false - end, - if Pass -> - ejabberd_router:route(From, To, Packet); - true -> ok - end - end, - Rs) - end; -resend_offline_messages(_StateData) -> - ok. - --spec resend_subscription_requests(state()) -> state(). -resend_subscription_requests(#state{user = User, - server = Server} = StateData) -> - PendingSubscriptions = - ejabberd_hooks:run_fold(resend_subscription_requests_hook, - Server, [], [User, Server]), - lists:foldl(fun (XMLPacket, AccStateData) -> - send_packet(AccStateData, XMLPacket) +-spec resource_conflict_action(binary(), binary(), binary()) -> + {accept_resource, binary()} | closenew. +resource_conflict_action(U, S, R) -> + OptionRaw = case ejabberd_sm:is_existing_resource(U, S, R) of + true -> + ejabberd_config:get_option( + {resource_conflict, S}, + fun(setresource) -> setresource; + (closeold) -> closeold; + (closenew) -> closenew; + (acceptnew) -> acceptnew + end); + false -> + acceptnew end, - StateData, - PendingSubscriptions). - --spec get_showtag(undefined | presence()) -> binary(). -get_showtag(undefined) -> <<"unavailable">>; -get_showtag(#presence{show = undefined}) -> <<"available">>; -get_showtag(#presence{show = Show}) -> atom_to_binary(Show, utf8). - --spec get_statustag(undefined | presence()) -> binary(). -get_statustag(#presence{status = Status}) -> xmpp:get_text(Status); -get_statustag(undefined) -> <<"">>. - --spec process_unauthenticated_stanza(state(), iq()) -> ok | {error, any()}. -process_unauthenticated_stanza(StateData, #iq{type = T, lang = L} = IQ) - when T == set; T == get -> - Lang = if L == undefined; L == <<"">> -> StateData#state.lang; - true -> L - end, - NewIQ = IQ#iq{lang = Lang}, - Res = ejabberd_hooks:run_fold(c2s_unauthenticated_iq, - StateData#state.server, empty, - [StateData#state.server, NewIQ, - StateData#state.ip]), - case Res of - empty -> - Txt = <<"Authentication required">>, - Err0 = xmpp:make_error(IQ, xmpp:err_service_unavailable(Txt, Lang)), - Err1 = Err0#iq{from = jid:make(<<>>, StateData#state.server, <<>>), - to = undefined}, - send_element(StateData, Err1); - _ -> - send_element(StateData, Res) - end; -process_unauthenticated_stanza(_StateData, _) -> - %% Drop any stanza, which isn't IQ stanza - ok. - --spec peerip(ejabberd_socket:sockmod(), - ejabberd_socket:socket()) -> - {inet:ip_address(), non_neg_integer()} | undefined. -peerip(SockMod, Socket) -> - IP = case SockMod of - gen_tcp -> inet:peername(Socket); - _ -> SockMod:peername(Socket) - end, - case IP of - {ok, IPOK} -> IPOK; - _ -> undefined - end. - -%% fsm_next_state_pack: Pack the StateData structure to improve -%% sharing. --spec fsm_next_state_pack(state_name(), state()) -> fsm_transition(). -fsm_next_state_pack(StateName, StateData) -> - fsm_next_state_gc(StateName, pack(StateData)). - --spec fsm_next_state_gc(state_name(), state()) -> fsm_transition(). -%% fsm_next_state_gc: Garbage collect the process heap to make use of -%% the newly packed StateData structure. -fsm_next_state_gc(StateName, PackedStateData) -> - erlang:garbage_collect(), - fsm_next_state(StateName, PackedStateData). - -%% fsm_next_state: Generate the next_state FSM tuple with different -%% timeout, depending on the future state --spec fsm_next_state(state_name(), state()) -> fsm_transition(). -fsm_next_state(session_established, #state{mgmt_max_queue = exceeded} = - StateData) -> - ?WARNING_MSG("ACK queue too long, terminating session for ~s", - [jid:to_string(StateData#state.jid)]), - Err = xmpp:serr_policy_violation(<<"Too many unacked stanzas">>, - StateData#state.lang), - send_element(StateData, Err), - {stop, normal, StateData#state{mgmt_resend = false}}; -fsm_next_state(session_established, #state{mgmt_state = pending} = StateData) -> - fsm_next_state(wait_for_resume, StateData); -fsm_next_state(session_established, StateData) -> - {next_state, session_established, StateData, - ?C2S_HIBERNATE_TIMEOUT}; -fsm_next_state(wait_for_resume, #state{mgmt_timeout = 0} = StateData) -> - {stop, normal, StateData}; -fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined, - sid = SID, jid = JID, ip = IP, - conn = Conn, auth_module = AuthModule, - server = Host} = StateData) -> - case StateData of - #state{mgmt_ack_timer = undefined} -> - ok; - #state{mgmt_ack_timer = Timer} -> - erlang:cancel_timer(Timer) - end, - ?INFO_MSG("Waiting for resumption of stream for ~s", - [jid:to_string(JID)]), - Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}], - NewStateData = ejabberd_hooks:run_fold(c2s_session_pending, Host, StateData, - [SID, JID, Info]), - {next_state, wait_for_resume, - NewStateData#state{mgmt_state = pending, - mgmt_pending_since = os:timestamp()}, - NewStateData#state.mgmt_timeout}; -fsm_next_state(wait_for_resume, StateData) -> - Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), - Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), - {next_state, wait_for_resume, StateData, Timeout}; -fsm_next_state(StateName, StateData) -> - {next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}. - -%% fsm_reply: Generate the reply FSM tuple with different timeout, -%% depending on the future state --spec fsm_reply(_, state_name(), state()) -> fsm_reply(). -fsm_reply(Reply, session_established, StateData) -> - {reply, Reply, session_established, StateData, - ?C2S_HIBERNATE_TIMEOUT}; -fsm_reply(Reply, wait_for_resume, StateData) -> - Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), - Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), - {reply, Reply, wait_for_resume, StateData, Timeout}; -fsm_reply(Reply, StateName, StateData) -> - {reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}. - -%% Used by c2s blacklist plugins --spec is_ip_blacklisted(undefined | {inet:ip_address(), non_neg_integer()}, - binary()) -> false | {true, binary(), binary()}. -is_ip_blacklisted(undefined, _Lang) -> false; -is_ip_blacklisted({IP, _Port}, Lang) -> - ejabberd_hooks:run_fold(check_bl_c2s, false, [IP, Lang]). - -%% Check from attributes -%% returns invalid-from|NewElement --spec check_from(stanza(), jid()) -> 'invalid-from' | stanza(). -check_from(Pkt, FromJID) -> - JID = xmpp:get_from(Pkt), - case JID of - undefined -> - Pkt; - #jid{} -> - if - (JID#jid.luser == FromJID#jid.luser) and - (JID#jid.lserver == FromJID#jid.lserver) and - (JID#jid.lresource == FromJID#jid.lresource) -> - Pkt; - (JID#jid.luser == FromJID#jid.luser) and - (JID#jid.lserver == FromJID#jid.lserver) and - (JID#jid.lresource == <<"">>) -> - Pkt; - true -> - 'invalid-from' - end - end. - -fsm_limit_opts(Opts) -> - case lists:keysearch(max_fsm_queue, 1, Opts) of - {value, {_, N}} when is_integer(N) -> [{max_queue, N}]; - _ -> - case ejabberd_config:get_option( - max_fsm_queue, - fun(I) when is_integer(I), I > 0 -> I end) of - undefined -> []; - N -> [{max_queue, N}] - end + Option = case OptionRaw of + setresource -> setresource; + closeold -> + acceptnew; %% ejabberd_sm will close old session + closenew -> closenew; + acceptnew -> acceptnew; + _ -> acceptnew %% default ejabberd behavior + end, + case Option of + acceptnew -> {accept_resource, R}; + closenew -> closenew; + setresource -> + Rnew = new_uniq_id(), + {accept_resource, Rnew} end. --spec bounce_messages() -> ok. -bounce_messages() -> - receive - {route, From, To, El} -> - ejabberd_router:route(From, To, El), bounce_messages() - after 0 -> ok - end. +-spec new_uniq_id() -> binary(). +new_uniq_id() -> + iolist_to_binary( + [randoms:get_string(), + integer_to_binary(p1_time_compat:unique_integer([positive]))]). --spec process_compression_request(compress(), state_name(), state()) -> fsm_next(). -process_compression_request(#compress{methods = []}, StateName, StateData) -> - send_element(StateData, #compress_failure{reason = 'setup-failed'}), - fsm_next_state(StateName, StateData); -process_compression_request(#compress{methods = Ms}, StateName, StateData) -> - case lists:member(<<"zlib">>, Ms) of - true -> - Socket = StateData#state.socket, - BCompressed = fxml:element_to_binary(xmpp:encode(#compressed{})), - ZlibSocket = (StateData#state.sockmod):compress(Socket, BCompressed), - fsm_next_state(wait_for_stream, - StateData#state{socket = ZlibSocket, - streamid = new_id()}); - false -> - send_element(StateData, - #compress_failure{reason = 'unsupported-method'}), - fsm_next_state(StateName, StateData) +-spec get_conn_type(state()) -> c2s | c2s_tls | c2s_compressed | websocket | + c2s_compressed_tls | http_bind. +get_conn_type(State) -> + case xmpp_stream_in:get_transport(State) of + tcp -> c2s; + tls -> c2s_tls; + tcp_zlib -> c2s_compressed; + tls_zlib -> c2s_compressed_tls; + http_bind -> http_bind; + websocket -> websocket end. -%%%---------------------------------------------------------------------- -%%% XEP-0191 -%%%---------------------------------------------------------------------- - --spec route_blocking( - {block, [jid()]} | {unblock, [jid()]} | unblock_all, state()) -> state(). -route_blocking(What, StateData) -> - SubEl = case What of - {block, JIDs} -> - #block{items = JIDs}; - {unblock, JIDs} -> - #unblock{items = JIDs}; - unblock_all -> - #unblock{} - end, - PrivPushIQ = #iq{type = set, id = <<"push">>, sub_els = [SubEl], - from = jid:remove_resource(StateData#state.jid), - to = StateData#state.jid}, - %% No need to replace active privacy list here, - %% blocking pushes are always accompanied by - %% Privacy List pushes - send_stanza(StateData, PrivPushIQ). - -%%%---------------------------------------------------------------------- -%%% XEP-0198 -%%%---------------------------------------------------------------------- --spec stream_mgmt_enabled(state()) -> boolean(). -stream_mgmt_enabled(#state{mgmt_state = disabled}) -> - false; -stream_mgmt_enabled(_StateData) -> - true. - --spec dispatch_stream_mgmt(xmpp_element(), state()) -> state(). -dispatch_stream_mgmt(El, #state{mgmt_state = MgmtState} = StateData) - when MgmtState == active; - MgmtState == pending -> - perform_stream_mgmt(El, StateData); -dispatch_stream_mgmt(El, StateData) -> - negotiate_stream_mgmt(El, StateData). - --spec negotiate_stream_mgmt(xmpp_element(), state()) -> state(). -negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> - %% XEP-0198 says: "For client-to-server connections, the client MUST NOT - %% attempt to enable stream management until after it has completed Resource - %% Binding unless it is resuming a previous session". However, it also - %% says: "Stream management errors SHOULD be considered recoverable", so we - %% won't bail out. - send_element(StateData, #sm_failed{reason = 'unexpected-request', - xmlns = ?NS_STREAM_MGMT_3}), - StateData; -negotiate_stream_mgmt(Pkt, StateData) -> - Xmlns = xmpp:get_ns(Pkt), - case stream_mgmt_enabled(StateData) of +-spec change_shaper(state()) -> ok. +change_shaper(#{shaper := ShaperName, ip := IP, + user := U, server := S, resource := R} = State) -> + #jid{lserver = LServer} = JID = jid:make(U, S, R), + Shaper = acl:access_matches(ShaperName, + #{usr => jid:split(JID), ip => IP}, + LServer), + xmpp_stream_in:change_shaper(State, Shaper). + +-spec do_some_magic(state(), jid()) -> state(). +do_some_magic(#{pres_a := PresA, pres_f := PresF} = State, From) -> + LFrom = jid:tolower(From), + LBFrom = jid:remove_resource(LFrom), + case (?SETS):is_element(LFrom, PresA) orelse + (?SETS):is_element(LBFrom, PresA) of true -> - case Pkt of - #sm_enable{} -> - handle_enable(StateData#state{mgmt_xmlns = Xmlns}, Pkt); - _ -> - Res = if is_record(Pkt, sm_a); - is_record(Pkt, sm_r); - is_record(Pkt, sm_resume) -> - #sm_failed{reason = 'unexpected-request', - xmlns = Xmlns}; - true -> - #sm_failed{reason = 'bad-request', - xmlns = Xmlns} - end, - send_element(StateData, Res), - StateData - end; + State; false -> - send_element(StateData, - #sm_failed{reason = 'service-unavailable', - xmlns = Xmlns}), - StateData - end. - --spec perform_stream_mgmt(xmpp_element(), state()) -> state(). -perform_stream_mgmt(Pkt, StateData) -> - case xmpp:get_ns(Pkt) of - Xmlns when Xmlns == StateData#state.mgmt_xmlns -> - case Pkt of - #sm_r{} -> - handle_r(StateData); - #sm_a{} -> - handle_a(StateData, Pkt); - _ -> - Res = if is_record(Pkt, sm_enable); - is_record(Pkt, sm_resume) -> - #sm_failed{reason = 'unexpected-request', - xmlns = Xmlns}; - true -> - #sm_failed{reason = 'bad-request', - xmlns = Xmlns} - end, - send_element(StateData, Res), - StateData - end; - _ -> - send_element(StateData, - #sm_failed{reason = 'unsupported-version', - xmlns = StateData#state.mgmt_xmlns}) - end. - --spec handle_enable(state(), sm_enable()) -> state(). -handle_enable(#state{mgmt_timeout = DefaultTimeout, - mgmt_max_timeout = MaxTimeout} = StateData, - #sm_enable{resume = Resume, max = Max}) -> - Timeout = if Resume == false -> - 0; - Max /= undefined, Max > 0, Max =< MaxTimeout -> - Max; - true -> - DefaultTimeout - end, - Res = if Timeout > 0 -> - ?INFO_MSG("Stream management with resumption enabled for ~s", - [jid:to_string(StateData#state.jid)]), - #sm_enabled{xmlns = StateData#state.mgmt_xmlns, - id = make_resume_id(StateData), - resume = true, - max = Timeout}; - true -> - ?INFO_MSG("Stream management without resumption enabled for ~s", - [jid:to_string(StateData#state.jid)]), - #sm_enabled{xmlns = StateData#state.mgmt_xmlns} - end, - send_element(StateData, Res), - StateData#state{mgmt_state = active, - mgmt_queue = queue:new(), - mgmt_timeout = Timeout * 1000}. - --spec handle_r(state()) -> state(). -handle_r(StateData) -> - Res = #sm_a{xmlns = StateData#state.mgmt_xmlns, - h = StateData#state.mgmt_stanzas_in}, - send_element(StateData, Res), - StateData. - --spec handle_a(state(), sm_a()) -> state(). -handle_a(StateData, #sm_a{h = H}) -> - NewStateData = check_h_attribute(StateData, H), - maybe_renew_ack_request(NewStateData). - --spec handle_resume(state(), sm_resume()) -> {ok, state()} | error. -handle_resume(StateData, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) -> - R = case stream_mgmt_enabled(StateData) of - true -> - case inherit_session_state(StateData, PrevID) of - {ok, InheritedState, Info} -> - {ok, InheritedState, Info, H}; - {error, Err, InH} -> - {error, #sm_failed{reason = 'item-not-found', - h = InH, xmlns = Xmlns}, Err}; - {error, Err} -> - {error, #sm_failed{reason = 'item-not-found', - xmlns = Xmlns}, Err} - end; - false -> - {error, #sm_failed{reason = 'service-unavailable', - xmlns = Xmlns}, - <<"XEP-0198 disabled">>} - end, - case R of - {ok, ResumedState, ResumedInfo, NumHandled} -> - NewState = check_h_attribute(ResumedState, NumHandled), - AttrXmlns = NewState#state.mgmt_xmlns, - AttrId = make_resume_id(NewState), - AttrH = NewState#state.mgmt_stanzas_in, - send_element(NewState, #sm_resumed{xmlns = AttrXmlns, - h = AttrH, - previd = AttrId}), - SendFun = fun(_F, _T, El, Time) -> - NewEl = add_resent_delay_info(NewState, El, Time), - send_element(NewState, NewEl) - end, - handle_unacked_stanzas(NewState, SendFun), - send_element(NewState, #sm_r{xmlns = AttrXmlns}), - NewState1 = csi_flush_queue(NewState), - NewState2 = ejabberd_hooks:run_fold(c2s_session_resumed, - StateData#state.server, - NewState1, - [NewState1#state.sid, - NewState1#state.jid, - ResumedInfo]), - ?INFO_MSG("Resumed session for ~s", - [jid:to_string(NewState2#state.jid)]), - {ok, NewState2}; - {error, El, Msg} -> - send_element(StateData, El), - ?INFO_MSG("Cannot resume session for ~s@~s: ~s", - [StateData#state.user, StateData#state.server, Msg]), - error - end. - --spec check_h_attribute(state(), non_neg_integer()) -> state(). -check_h_attribute(#state{mgmt_stanzas_out = NumStanzasOut} = StateData, H) - when H > NumStanzasOut -> - ?DEBUG("~s acknowledged ~B stanzas, but only ~B were sent", - [jid:to_string(StateData#state.jid), H, NumStanzasOut]), - mgmt_queue_drop(StateData#state{mgmt_stanzas_out = H}, NumStanzasOut); -check_h_attribute(#state{mgmt_stanzas_out = NumStanzasOut} = StateData, H) -> - ?DEBUG("~s acknowledged ~B of ~B stanzas", - [jid:to_string(StateData#state.jid), H, NumStanzasOut]), - mgmt_queue_drop(StateData, H). - --spec update_num_stanzas_in(state(), xmpp_element()) -> state(). -update_num_stanzas_in(#state{mgmt_state = MgmtState} = StateData, El) - when MgmtState == active; - MgmtState == pending -> - NewNum = case {xmpp:is_stanza(El), StateData#state.mgmt_stanzas_in} of - {true, 4294967295} -> - 0; - {true, Num} -> - Num + 1; - {false, Num} -> - Num - end, - StateData#state{mgmt_stanzas_in = NewNum}; -update_num_stanzas_in(StateData, _El) -> - StateData. - -mgmt_send_stanza(StateData, Stanza) -> - case send_element(StateData, Stanza) of - ok -> - maybe_request_ack(StateData); - _ -> - StateData#state{mgmt_state = pending} - end. - -maybe_request_ack(#state{mgmt_ack_timer = undefined} = StateData) -> - request_ack(StateData); -maybe_request_ack(StateData) -> - StateData. - -request_ack(#state{mgmt_xmlns = Xmlns, - mgmt_ack_timeout = AckTimeout} = StateData) -> - AckReq = #sm_r{xmlns = Xmlns}, - case {send_element(StateData, AckReq), AckTimeout} of - {ok, undefined} -> - ok; - {ok, Timeout} -> - Timer = erlang:send_after(Timeout, self(), close), - StateData#state{mgmt_ack_timer = Timer, - mgmt_stanzas_req = StateData#state.mgmt_stanzas_out}; - _ -> - StateData#state{mgmt_state = pending} - end. - -maybe_renew_ack_request(#state{mgmt_ack_timer = undefined} = StateData) -> - StateData; -maybe_renew_ack_request(#state{mgmt_ack_timer = Timer, - mgmt_queue = Queue, - mgmt_stanzas_out = NumStanzasOut, - mgmt_stanzas_req = NumStanzasReq} = StateData) -> - erlang:cancel_timer(Timer), - case NumStanzasReq < NumStanzasOut andalso not queue:is_empty(Queue) of - true -> - request_ack(StateData#state{mgmt_ack_timer = undefined}); - false -> - StateData#state{mgmt_ack_timer = undefined} - end. - --spec mgmt_queue_add(state(), xmpp_element()) -> state(). -mgmt_queue_add(StateData, El) -> - NewNum = case StateData#state.mgmt_stanzas_out of - 4294967295 -> - 0; - Num -> - Num + 1 - end, - NewQueue = queue:in({NewNum, p1_time_compat:timestamp(), El}, StateData#state.mgmt_queue), - NewState = StateData#state{mgmt_queue = NewQueue, - mgmt_stanzas_out = NewNum}, - check_queue_length(NewState). - --spec mgmt_queue_drop(state(), non_neg_integer()) -> state(). -mgmt_queue_drop(StateData, NumHandled) -> - NewQueue = jlib:queue_drop_while(fun({N, _T, _E}) -> N =< NumHandled end, - StateData#state.mgmt_queue), - StateData#state{mgmt_queue = NewQueue}. - --spec check_queue_length(state()) -> state(). -check_queue_length(#state{mgmt_max_queue = Limit} = StateData) - when Limit == infinity; - Limit == exceeded -> - StateData; -check_queue_length(#state{mgmt_queue = Queue, - mgmt_max_queue = Limit} = StateData) -> - case queue:len(Queue) > Limit of - true -> - StateData#state{mgmt_max_queue = exceeded}; - false -> - StateData - end. - --spec handle_unacked_stanzas(state(), fun((_, _, _, _) -> _)) -> ok. -handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData, F) - when MgmtState == active; - MgmtState == pending; - MgmtState == timeout -> - Queue = StateData#state.mgmt_queue, - case queue:len(Queue) of - 0 -> - ok; - N -> - ?DEBUG("~B stanza(s) were not acknowledged by ~s", - [N, jid:to_string(StateData#state.jid)]), - lists:foreach( - fun({_, Time, Pkt}) -> - From = xmpp:get_from(Pkt), - To = xmpp:get_to(Pkt), - case {From, To} of - {#jid{}, #jid{}} -> - F(From, To, Pkt, Time); - {_, _} -> - ?DEBUG("Dropping stanza due to invalid JID(s)", []) - end - end, queue:to_list(Queue)) - end; -handle_unacked_stanzas(_StateData, _F) -> - ok. - --spec handle_unacked_stanzas(state()) -> ok. -handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData) - when MgmtState == active; - MgmtState == pending; - MgmtState == timeout -> - ResendOnTimeout = - case StateData#state.mgmt_resend of - Resend when is_boolean(Resend) -> - Resend; - if_offline -> - Resource = StateData#state.resource, - case ejabberd_sm:get_user_resources(StateData#state.user, - StateData#state.server) of - [Resource] -> % Same resource opened new session - true; - [] -> - true; - _ -> - false - end - end, - Lang = StateData#state.lang, - ReRoute = case ResendOnTimeout of + case (?SETS):is_element(LFrom, PresF) of true -> - fun(From, To, El, Time) -> - NewEl = add_resent_delay_info(StateData, El, Time), - ejabberd_router:route(From, To, NewEl) - end; + A = (?SETS):add_element(LFrom, PresA), + State#{pres_a => A}; false -> - fun(From, To, El, _Time) -> - Txt = <<"User session terminated">>, - ejabberd_router:route_error( - To, From, El, xmpp:err_service_unavailable(Txt, Lang)) + case (?SETS):is_element(LBFrom, PresF) of + true -> + A = (?SETS):add_element(LBFrom, PresA), + State#{pres_a => A}; + false -> + State end - end, - F = fun(From, _To, #presence{}, _Time) -> - ?DEBUG("Dropping presence stanza from ~s", - [jid:to_string(From)]); - (From, To, #iq{} = El, _Time) -> - Txt = <<"User session terminated">>, - ejabberd_router:route_error( - To, From, El, xmpp:err_service_unavailable(Txt, Lang)); - (From, _To, #message{meta = #{carbon_copy := true}}, _Time) -> - %% XEP-0280 says: "When a receiving server attempts to deliver a - %% forked message, and that message bounces with an error for - %% any reason, the receiving server MUST NOT forward that error - %% back to the original sender." Resending such a stanza could - %% easily lead to unexpected results as well. - ?DEBUG("Dropping forwarded message stanza from ~s", - [jid:to_string(From)]); - (From, To, El, Time) -> - case ejabberd_hooks:run_fold(message_is_archived, - StateData#state.server, false, - [StateData, From, - StateData#state.jid, El]) of - true -> - ?DEBUG("Dropping archived message stanza from ~p", - [jid:to_string(xmpp:get_from(El))]); - false -> - ReRoute(From, To, El, Time) - end - end, - handle_unacked_stanzas(StateData, F); -handle_unacked_stanzas(_StateData) -> - ok. - --spec inherit_session_state(state(), binary()) -> {ok, state()} | - {error, binary()} | - {error, binary(), non_neg_integer()}. -inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> - case jlib:base64_to_term(ResumeID) of - {term, {R, Time}} -> - case ejabberd_sm:get_session_pid(U, S, R) of - none -> - case ejabberd_sm:get_offline_info(Time, U, S, R) of - none -> - {error, <<"Previous session PID not found">>}; - Info -> - case proplists:get_value(num_stanzas_in, Info) of - undefined -> - {error, <<"Previous session timed out">>}; - H -> - {error, <<"Previous session timed out">>, H} - end - end; - OldPID -> - OldSID = {Time, OldPID}, - case catch resume_session(OldSID) of - {resume, OldStateData} -> - NewSID = {Time, self()}, % Old time, new PID - Priority = case OldStateData#state.pres_last of - undefined -> - 0; - Presence -> - get_priority_from_presence(Presence) - end, - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], - ejabberd_sm:open_session(NewSID, U, S, R, - Priority, Info), - {ok, StateData#state{conn = Conn, - sid = NewSID, - jid = OldStateData#state.jid, - resource = OldStateData#state.resource, - pres_t = OldStateData#state.pres_t, - pres_f = OldStateData#state.pres_f, - pres_a = OldStateData#state.pres_a, - pres_last = OldStateData#state.pres_last, - pres_timestamp = OldStateData#state.pres_timestamp, - privacy_list = OldStateData#state.privacy_list, - aux_fields = OldStateData#state.aux_fields, - mgmt_xmlns = OldStateData#state.mgmt_xmlns, - mgmt_queue = OldStateData#state.mgmt_queue, - mgmt_timeout = OldStateData#state.mgmt_timeout, - mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in, - mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out, - mgmt_state = active, - csi_state = active}, Info}; - {error, Msg} -> - {error, Msg}; - _ -> - {error, <<"Cannot grab session state">>} - end - end; - _ -> - {error, <<"Invalid 'previd' value">>} - end. - --spec resume_session({integer(), pid()}) -> any(). -resume_session({Time, PID}) -> - (?GEN_FSM):sync_send_all_state_event(PID, {resume_session, Time}, 15000). - --spec make_resume_id(state()) -> binary(). -make_resume_id(StateData) -> - {Time, _} = StateData#state.sid, - jlib:term_to_base64({StateData#state.resource, Time}). - --spec add_resent_delay_info(state(), stanza(), erlang:timestamp()) -> stanza(). -add_resent_delay_info(_State, #iq{} = El, _Time) -> - El; -add_resent_delay_info(#state{server = From}, El, Time) -> - xmpp_util:add_delay_info(El, jid:make(From), Time, <<"Resent">>). - -%%%---------------------------------------------------------------------- -%%% XEP-0352 -%%%---------------------------------------------------------------------- --spec csi_filter_stanza(state(), stanza()) -> state(). -csi_filter_stanza(#state{csi_state = CsiState, jid = JID, server = Server} = - StateData, Stanza) -> - {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_filter_stanza, Server, - {StateData, [Stanza]}, - [Server, JID, Stanza]), - StateData2 = lists:foldl(fun(CurStanza, AccState) -> - send_stanza(AccState, CurStanza) - end, StateData1#state{csi_state = active}, - Stanzas), - StateData2#state{csi_state = CsiState}. - --spec csi_flush_queue(state()) -> state(). -csi_flush_queue(#state{csi_state = CsiState, jid = JID, server = Server} = - StateData) -> - {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_flush_queue, Server, - {StateData, []}, - [Server, JID]), - StateData2 = lists:foldl(fun(CurStanza, AccState) -> - send_stanza(AccState, CurStanza) - end, StateData1#state{csi_state = active}, - Stanzas), - StateData2#state{csi_state = CsiState}. - -%%%---------------------------------------------------------------------- -%%% JID Set memory footprint reduction code -%%%---------------------------------------------------------------------- - -%% Try to reduce the heap footprint of the four presence sets -%% by ensuring that we re-use strings and Jids wherever possible. --spec pack(state()) -> state(). -pack(S = #state{pres_a = A, pres_f = F, - pres_t = T}) -> - {NewA, Pack2} = pack_jid_set(A, gb_trees:empty()), - {NewF, Pack3} = pack_jid_set(F, Pack2), - {NewT, _Pack4} = pack_jid_set(T, Pack3), - S#state{pres_a = NewA, pres_f = NewF, - pres_t = NewT}. - -pack_jid_set(Set, Pack) -> - Jids = (?SETS):to_list(Set), - {PackedJids, NewPack} = pack_jids(Jids, Pack, []), - {(?SETS):from_list(PackedJids), NewPack}. - -pack_jids([], Pack, Acc) -> {Acc, Pack}; -pack_jids([{U, S, R} = Jid | Jids], Pack, Acc) -> - case gb_trees:lookup(Jid, Pack) of - {value, PackedJid} -> - pack_jids(Jids, Pack, [PackedJid | Acc]); - none -> - {NewU, Pack1} = pack_string(U, Pack), - {NewS, Pack2} = pack_string(S, Pack1), - {NewR, Pack3} = pack_string(R, Pack2), - NewJid = {NewU, NewS, NewR}, - NewPack = gb_trees:insert(NewJid, NewJid, Pack3), - pack_jids(Jids, NewPack, [NewJid | Acc]) - end. - -pack_string(String, Pack) -> - case gb_trees:lookup(String, Pack) of - {value, PackedString} -> {PackedString, Pack}; - none -> {String, gb_trees:insert(String, String, Pack)} + end end. -transform_listen_option(Opt, Opts) -> - [Opt|Opts]. - --spec identity([{atom(), binary()}]) -> binary(). -identity(Props) -> - case proplists:get_value(authzid, Props, <<>>) of - <<>> -> proplists:get_value(username, Props, <<>>); - AuthzId -> AuthzId +-spec fsm_limit_opts([proplists:property()]) -> [proplists:property()]. +fsm_limit_opts(Opts) -> + case lists:keysearch(max_fsm_queue, 1, Opts) of + {value, {_, N}} when is_integer(N) -> [{max_queue, N}]; + _ -> + case ejabberd_config:get_option( + max_fsm_queue, + fun(I) when is_integer(I), I > 0 -> I end) of + undefined -> []; + N -> [{max_queue, N}] + end end. - -opt_type(domain_certfile) -> fun iolist_to_binary/1; -opt_type(max_fsm_queue) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(resource_conflict) -> - fun (setresource) -> setresource; - (closeold) -> closeold; - (closenew) -> closenew; - (acceptnew) -> acceptnew - end; -opt_type(_) -> - [domain_certfile, max_fsm_queue, resource_conflict]. diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 56dc3092e..11b829a94 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -34,6 +34,7 @@ %% API -export([start/0, start_link/0, + route/2, route/3, process_iq/3, open_session/5, @@ -69,6 +70,7 @@ get_all_pids/0, is_existing_resource/3, get_commands_spec/0, + c2s_handle_info/2, make_sid/0 ]). @@ -98,15 +100,6 @@ %% default value for the maximum number of user connections -define(MAX_USER_SESSIONS, infinity). --type broadcast() :: {broadcast, broadcast_data()}. - --type broadcast_data() :: - {rebind, pid(), binary()} | %% ejabberd_c2s - {item, ljid(), mod_roster:subscription()} | %% mod_roster/mod_shared_roster - {exit, binary()} | %% mod_roster/mod_shared_roster - {privacy_list, mod_privacy:userlist(), binary()} | %% mod_privacy - {blocking, unblock_all | {block | unblock, [ljid()]}}. %% mod_blocking - %%==================================================================== %% API %%==================================================================== @@ -120,7 +113,18 @@ start() -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec route(jid(), jid(), stanza() | broadcast()) -> ok. +-spec route(jid(), term()) -> ok. +%% @doc route arbitrary term to c2s process(es) +route(To, Term) -> + case catch do_route(To, Term) of + {'EXIT', Reason} -> + ?ERROR_MSG("route ~p to ~p failed: ~p", + [Term, To, Reason]); + _ -> + ok + end. + +-spec route(jid(), jid(), stanza()) -> ok. route(From, To, Packet) -> case catch do_route(From, To, Packet) of @@ -180,9 +184,7 @@ bounce_offline_message(From, To, Packet) -> -spec disconnect_removed_user(binary(), binary()) -> ok. disconnect_removed_user(User, Server) -> - ejabberd_sm:route(jid:make(<<"">>, <<"">>, <<"">>), - jid:make(User, Server, <<"">>), - {broadcast, {exit, <<"User removed">>}}). + route(jid:make(User, Server, <<"">>), {exit, <<"User removed">>}). get_user_resources(User, Server) -> LUser = jid:nodeprep(User), @@ -356,6 +358,21 @@ register_iq_handler(Host, XMLNS, Module, Fun, Opts) -> unregister_iq_handler(Host, XMLNS) -> ejabberd_sm ! {unregister_iq_handler, Host, XMLNS}. +%% Why the hell do we have so many similar kicks? +c2s_handle_info({noreply, #{lang := Lang} = State}, replaced) -> + State1 = State#{replaced => true}, + Err = xmpp:serr_conflict(<<"Replaced by new connection">>, Lang), + ejabberd_c2s:send(State1, Err); +c2s_handle_info({noreply, #{lang := Lang} = State}, kick) -> + Err = xmpp:serr_policy_violation(<<"has been kicked">>, Lang), + c2s_handle_info({noreply, State}, {kick, kicked_by_admin, Err}); +c2s_handle_info({noreply, State}, {kick, _Reason, Err}) -> + ejabberd_c2s:send(State, Err); +c2s_handle_info({noreply, #{lang := Lang} = State}, {exit, Reason}) -> + Err = xmpp:serr_conflict(Reason, Lang), + ejabberd_c2s:send(State, Err); +c2s_handle_info(Acc, _) -> + Acc. %%==================================================================== %% gen_server callbacks @@ -366,6 +383,8 @@ init([]) -> ets:new(sm_iqtable, [named_table]), lists:foreach( fun(Host) -> + ejabberd_hooks:add(c2s_handle_info, Host, + ejabberd_sm, c2s_handle_info, 50), ejabberd_hooks:add(roster_in_subscription, Host, ejabberd_sm, check_in_subscription, 20), ejabberd_hooks:add(offline_message_hook, Host, @@ -411,6 +430,17 @@ handle_info({unregister_iq_handler, Host, XMLNS}, handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, _State) -> + lists:foreach( + fun(Host) -> + ejabberd_hooks:delete(c2s_handle_info, Host, + ejabberd_sm, c2s_handle_info, 50), + ejabberd_hooks:delete(roster_in_subscription, Host, + ejabberd_sm, check_in_subscription, 20), + ejabberd_hooks:delete(offline_message_hook, Host, + ejabberd_sm, bounce_offline_message, 100), + ejabberd_hooks:delete(remove_user, Host, + ejabberd_sm, disconnect_removed_user, 100) + end, ?MYHOSTS), ejabberd_commands:unregister_commands(get_commands_spec()), ok. @@ -444,26 +474,27 @@ is_online(#session{info = Info}) -> not proplists:get_bool(offline, Info). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec do_route(jid(), jid(), stanza() | broadcast()) -> any(). -do_route(From, #jid{lresource = <<"">>} = To, {broadcast, _} = Packet) -> - ?DEBUG("processing broadcast to bare JID: ~p", [Packet]), +-spec do_route(jid(), term()) -> any(). +do_route(#jid{lresource = <<"">>} = To, Term) -> lists:foreach( fun(R) -> - do_route(From, jid:replace_resource(To, R), Packet) + do_route(jid:replace_resource(To, R), Term) end, get_user_resources(To#jid.user, To#jid.server)); -do_route(From, To, {broadcast, _} = Packet) -> - ?DEBUG("processing broadcast to full JID: ~p", [Packet]), +do_route(To, Term) -> + ?DEBUG("broadcasting ~p to ~s", [Term, jid:to_string(To)]), {U, S, R} = jid:tolower(To), Mod = get_sm_backend(S), case online(Mod:get_sessions(U, S, R)) of [] -> - ?DEBUG("dropping broadcast to unavailable resourse: ~p", [Packet]); + ?DEBUG("dropping broadcast to unavailable resourse: ~p", [Term]); Ss -> Session = lists:max(Ss), Pid = element(2, Session#session.sid), - ?DEBUG("sending to process ~p: ~p", [Pid, Packet]), - Pid ! {route, From, To, Packet} - end; + ?DEBUG("sending to process ~p: ~p", [Pid, Term]), + Pid ! Term + end. + +-spec do_route(jid(), jid(), stanza()) -> any(). do_route(From, To, #presence{type = T, status = Status} = Packet) when T == subscribe; T == subscribed; T == unsubscribe; T == unsubscribed -> ?DEBUG("processing subscription:~n~s", [xmpp:pp(Packet)]), diff --git a/src/ejabberd_socket.erl b/src/ejabberd_socket.erl index e26fc8652..3f01dae85 100644 --- a/src/ejabberd_socket.erl +++ b/src/ejabberd_socket.erl @@ -49,6 +49,7 @@ sockname/1, peername/1]). -include("ejabberd.hrl"). +-include("xmpp.hrl"). -include("logger.hrl"). -type sockmod() :: ejabberd_http_bind | @@ -150,15 +151,25 @@ connect(Addr, Port, Opts, Timeout, Owner) -> end. starttls(SocketData, TLSOpts) -> - {ok, TLSSocket} = fast_tls:tcp_to_tls(SocketData#socket_state.socket, TLSOpts), - ejabberd_receiver:starttls(SocketData#socket_state.receiver, TLSSocket), - SocketData#socket_state{socket = TLSSocket, sockmod = fast_tls}. + case fast_tls:tcp_to_tls(SocketData#socket_state.socket, TLSOpts) of + {ok, TLSSocket} -> + ejabberd_receiver:starttls(SocketData#socket_state.receiver, TLSSocket), + {ok, SocketData#socket_state{socket = TLSSocket, sockmod = fast_tls}}; + Err -> + ?ERROR_MSG("starttls failed: ~p", [Err]), + Err + end. starttls(SocketData, TLSOpts, Data) -> - {ok, TLSSocket} = fast_tls:tcp_to_tls(SocketData#socket_state.socket, TLSOpts), - ejabberd_receiver:starttls(SocketData#socket_state.receiver, TLSSocket), - send(SocketData, Data), - SocketData#socket_state{socket = TLSSocket, sockmod = fast_tls}. + case fast_tls:tcp_to_tls(SocketData#socket_state.socket, TLSOpts) of + {ok, TLSSocket} -> + ejabberd_receiver:starttls(SocketData#socket_state.receiver, TLSSocket), + send(SocketData, Data), + {ok, SocketData#socket_state{socket = TLSSocket, sockmod = fast_tls}}; + Err -> + ?ERROR_MSG("starttls failed: ~p", [Err]), + Err + end. compress(SocketData) -> compress(SocketData, undefined). @@ -184,10 +195,10 @@ send(SocketData, Data) -> ok -> ok; {error, timeout} -> ?INFO_MSG("Timeout on ~p:send",[SocketData#socket_state.sockmod]), - exit(normal); + {error, timeout}; Error -> ?DEBUG("Error in ~p:send: ~p",[SocketData#socket_state.sockmod, Error]), - exit(normal) + Error end. %% Can only be called when in c2s StateData#state.xml_socket is true diff --git a/src/mod_admin_extra.erl b/src/mod_admin_extra.erl index 4b117d50a..39feb9e26 100644 --- a/src/mod_admin_extra.erl +++ b/src/mod_admin_extra.erl @@ -918,9 +918,8 @@ kick_session(User, Server, Resource, ReasonText) -> ok. kick_this_session(User, Server, Resource, Reason) -> - ejabberd_sm:route(jid:make(<<"">>, <<"">>, <<"">>), - jid:make(User, Server, Resource), - {broadcast, {exit, Reason}}). + ejabberd_sm:route(jid:make(User, Server, Resource), + {exit, Reason}). status_num(Host, Status) -> length(get_status_list(Host, Status)). @@ -948,7 +947,7 @@ get_status_list(Host, Status_required) -> end, Sessions3 = [ {Pid, Server, Priority} || {{_User, Server, _Resource}, {_, Pid}, Priority} <- Sessions2, apply(Fhost, [Server, Host])], %% For each Pid, get its presence - Sessions4 = [ {catch ejabberd_c2s:get_presence(Pid), Server, Priority} || {Pid, Server, Priority} <- Sessions3], + Sessions4 = [ {catch get_presence(Pid), Server, Priority} || {Pid, Server, Priority} <- Sessions3], %% Filter by status Fstatus = case Status_required of <<"all">> -> @@ -1001,6 +1000,16 @@ stringize(String) -> %% Replace newline characters with other code ejabberd_regexp:greplace(String, <<"\n">>, <<"\\n">>). +get_presence(Pid) -> + Pres = #presence{from = From} = ejabberd_c2s:get_presence(Pid), + Show = case Pres of + #presence{type = unavailable} -> <<"unavailable">>; + #presence{show = undefined} -> <<"available">>; + #presence{show = S} -> atom_to_binary(S, utf8) + end, + Status = xmpp:get_text(Pres#presence.status), + {From#jid.user, From#jid.resource, Show, Status}. + get_presence(U, S) -> Pids = [ejabberd_sm:get_session_pid(U, S, R) || R <- ejabberd_sm:get_user_resources(U, S)], @@ -1009,8 +1018,7 @@ get_presence(U, S) -> [] -> {jid:to_string({U, S, <<>>}), <<"unavailable">>, <<"">>}; [SessionPid|_] -> - {_User, Resource, Show, Status} = - ejabberd_c2s:get_presence(SessionPid), + {_User, Resource, Show, Status} = get_presence(SessionPid), FullJID = jid:to_string({U, S, Resource}), {FullJID, Show, Status} end. @@ -1053,7 +1061,7 @@ user_sessions_info(User, Host) -> fun(Session) -> {_U, _S, Resource} = Session#session.usr, {Now, Pid} = Session#session.sid, - {_U, _Resource, Status, StatusText} = ejabberd_c2s:get_presence(Pid), + {_U, _Resource, Status, StatusText} = get_presence(Pid), Info = Session#session.info, Priority = Session#session.priority, Conn = proplists:get_value(conn, Info), @@ -1306,7 +1314,7 @@ push_roster_item(LU, LS, U, S, Action) -> push_roster_item(LU, LS, R, U, S, Action) -> LJID = jid:make(LU, LS, R), BroadcastEl = build_broadcast(U, S, Action), - ejabberd_sm:route(LJID, LJID, BroadcastEl), + ejabberd_sm:route(LJID, BroadcastEl), Item = build_roster_item(U, S, Action), ResIQ = build_iq_roster_push(Item), ejabberd_router:route(jid:remove_resource(LJID), LJID, ResIQ). @@ -1331,7 +1339,7 @@ build_broadcast(U, S, remove) -> %% @spec (U::binary(), S::binary(), Subs::atom()) -> any() %% Subs = both | from | to | none build_broadcast(U, S, SubsAtom) when is_atom(SubsAtom) -> - {broadcast, {item, {U, S, <<>>}, SubsAtom}}. + {item, {U, S, <<>>}, SubsAtom}. %%% %%% Last Activity diff --git a/src/mod_blocking.erl b/src/mod_blocking.erl index d2b187d26..826a7bba3 100644 --- a/src/mod_blocking.erl +++ b/src/mod_blocking.erl @@ -29,7 +29,7 @@ -protocol({xep, 191, '1.2'}). --export([start/2, stop/1, process_iq/1, +-export([start/2, stop/1, process_iq/1, c2s_handle_info/2, process_iq_set/3, process_iq_get/3, mod_opt_type/1, depends/2]). -include("ejabberd.hrl"). @@ -52,6 +52,10 @@ start(Host, Opts) -> process_iq_get, 40), ejabberd_hooks:add(privacy_iq_set, Host, ?MODULE, process_iq_set, 40), + ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE, + c2s_handle_info, 40), + ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE, + c2s_handle_info, 40), mod_disco:register_feature(Host, ?NS_BLOCKING), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_BLOCKING, ?MODULE, process_iq, IQDisc). @@ -229,14 +233,12 @@ make_userlist(Name, List) -> -spec broadcast_list_update(binary(), binary(), binary(), userlist()) -> ok. broadcast_list_update(LUser, LServer, Name, UserList) -> ejabberd_sm:route(jid:make(LUser, LServer, <<"">>), - jid:make(LUser, LServer, <<"">>), - {broadcast, {privacy_list, UserList, Name}}). + {privacy_list, UserList, Name}). -spec broadcast_blocklist_event(binary(), binary(), block_event()) -> ok. broadcast_blocklist_event(LUser, LServer, Event) -> JID = jid:make(LUser, LServer, <<"">>), - ejabberd_sm:route(JID, JID, - {broadcast, {blocking, Event}}). + ejabberd_sm:route(JID, {blocking, Event}). -spec process_blocklist_get(binary(), binary(), binary()) -> {error, stanza_error()} | {result, block_list()}. @@ -251,6 +253,27 @@ process_blocklist_get(LUser, LServer, Lang) -> {result, #block_list{items = Items}} end. +-spec c2s_handle_info(ejabberd_c2s:next_state(), term()) -> ejabberd_c2s:next_state(). +c2s_handle_info({noreply, #{user := U, server := S, resource := R} = State}, + {blocking, Action}) -> + SubEl = case Action of + {block, JIDs} -> + #block{items = JIDs}; + {unblock, JIDs} -> + #unblock{items = JIDs}; + unblock_all -> + #unblock{} + end, + PushIQ = #iq{type = set, + from = jid:make(U, S), + to = jid:make(U, S, R), + id = <<"push", (randoms:get_string())/binary>>, + sub_els = [SubEl]}, + %% No need to replace active privacy list here, + %% blocking pushes are always accompanied by + %% Privacy List pushes + ejabberd_c2s:send(State, PushIQ). + -spec db_mod(binary()) -> module(). db_mod(LServer) -> DBType = gen_mod:db_type(LServer, mod_privacy), diff --git a/src/mod_caps.erl b/src/mod_caps.erl index 3a4492f5c..e2ec30305 100644 --- a/src/mod_caps.erl +++ b/src/mod_caps.erl @@ -35,10 +35,10 @@ -behaviour(gen_mod). --export([read_caps/1, caps_stream_features/2, +-export([read_caps/1, list_features/1, caps_stream_features/2, disco_features/5, disco_identity/5, disco_info/5, get_features/2, export/1, import_info/0, import/5, - import_start/2, import_stop/2]). + get_user_caps/2, import_start/2, import_stop/2]). %% gen_mod callbacks -export([start/2, start_link/2, stop/1, depends/2]). @@ -48,8 +48,7 @@ handle_cast/2, terminate/2, code_change/3]). -export([user_send_packet/4, user_receive_packet/5, - c2s_presence_in/2, c2s_filter_packet/6, - c2s_broadcast_recipients/6, mod_opt_type/1]). + c2s_presence_in/2, mod_opt_type/1]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -104,6 +103,22 @@ get_features(Host, #caps{node = Node, version = Version, end, [], SubNodes). +-spec list_features(ejabberd_c2s:state()) -> [{ljid(), caps()}]. +list_features(C2SState) -> + Rs = maps:get(caps_features, C2SState, gb_trees:empty()), + gb_trees:to_list(Rs). + +-spec get_user_caps(jid(), ejabberd_c2s:state()) -> {ok, caps()} | error. +get_user_caps(JID, C2SState) -> + Rs = maps:get(caps_features, C2SState, gb_trees:empty()), + LJID = jid:tolower(JID), + case gb_trees:lookup(LJID, Rs) of + {value, Caps} -> + {ok, Caps}; + none -> + error + end. + -spec read_caps(#presence{}) -> nothing | caps(). read_caps(Presence) -> case xmpp:get_subtag(Presence, #caps{}) of @@ -194,87 +209,40 @@ disco_info(Acc, Host, Module, Node, Lang) when is_atom(Module) -> disco_info(Acc, _, _, _Node, _Lang) -> Acc. --spec c2s_presence_in(ejabberd_c2s:state(), {jid(), jid(), presence()}) -> - ejabberd_c2s:state(). +-spec c2s_presence_in(ejabberd_c2s:state(), presence()) -> ejabberd_c2s:state(). c2s_presence_in(C2SState, - {From, To, #presence{type = Type} = Presence}) -> - Subscription = ejabberd_c2s:get_subscription(From, - C2SState), + #presence{from = From, to = To, type = Type} = Presence) -> + Subscription = ejabberd_c2s:get_subscription(From, C2SState), Insert = (Type == available) and ((Subscription == both) or (Subscription == to)), Delete = (Type == unavailable) or (Type == error), if Insert or Delete -> - LFrom = jid:tolower(From), - Rs = case ejabberd_c2s:get_aux_field(caps_resources, - C2SState) - of - {ok, Rs1} -> Rs1; - error -> gb_trees:empty() - end, - Caps = read_caps(Presence), - NewRs = case Caps of - nothing when Insert == true -> Rs; - _ when Insert == true -> - case gb_trees:lookup(LFrom, Rs) of - {value, Caps} -> Rs; - none -> - ejabberd_hooks:run(caps_add, To#jid.lserver, - [From, To, - get_features(To#jid.lserver, Caps)]), - gb_trees:insert(LFrom, Caps, Rs); - _ -> - ejabberd_hooks:run(caps_update, To#jid.lserver, - [From, To, - get_features(To#jid.lserver, Caps)]), - gb_trees:update(LFrom, Caps, Rs) - end; - _ -> gb_trees:delete_any(LFrom, Rs) - end, - ejabberd_c2s:set_aux_field(caps_resources, NewRs, - C2SState); - true -> C2SState + LFrom = jid:tolower(From), + Rs = maps:get(caps_resources, C2SState, gb_trees:empty()), + Caps = read_caps(Presence), + NewRs = case Caps of + nothing when Insert == true -> Rs; + _ when Insert == true -> + case gb_trees:lookup(LFrom, Rs) of + {value, Caps} -> Rs; + none -> + ejabberd_hooks:run(caps_add, To#jid.lserver, + [From, To, + get_features(To#jid.lserver, Caps)]), + gb_trees:insert(LFrom, Caps, Rs); + _ -> + ejabberd_hooks:run(caps_update, To#jid.lserver, + [From, To, + get_features(To#jid.lserver, Caps)]), + gb_trees:update(LFrom, Caps, Rs) + end; + _ -> gb_trees:delete_any(LFrom, Rs) + end, + C2SState#{caps_resources := NewRs}; + true -> + C2SState end. --spec c2s_filter_packet(boolean(), binary(), ejabberd_c2s:state(), - {pep_message, binary()}, jid(), stanza()) -> - boolean(). -c2s_filter_packet(InAcc, Host, C2SState, {pep_message, Feature}, To, _Packet) -> - case ejabberd_c2s:get_aux_field(caps_resources, C2SState) of - {ok, Rs} -> - LTo = jid:tolower(To), - case gb_trees:lookup(LTo, Rs) of - {value, Caps} -> - Drop = not lists:member(Feature, get_features(Host, Caps)), - {stop, Drop}; - none -> - {stop, true} - end; - _ -> InAcc - end; -c2s_filter_packet(Acc, _, _, _, _, _) -> Acc. - --spec c2s_broadcast_recipients([ljid()], binary(), ejabberd_c2s:state(), - {pep_message, binary()}, jid(), stanza()) -> - [ljid()]. -c2s_broadcast_recipients(InAcc, Host, C2SState, - {pep_message, Feature}, _From, _Packet) -> - case ejabberd_c2s:get_aux_field(caps_resources, - C2SState) - of - {ok, Rs} -> - gb_trees_fold(fun (USR, Caps, Acc) -> - case lists:member(Feature, - get_features(Host, Caps)) - of - true -> [USR | Acc]; - false -> Acc - end - end, - InAcc, Rs); - _ -> InAcc - end; -c2s_broadcast_recipients(Acc, _, _, _, _, _) -> Acc. - -spec depends(binary(), gen_mod:opts()) -> [{module(), hard | soft}]. depends(_Host, _Opts) -> []. @@ -292,15 +260,11 @@ init([Host, Opts]) -> [{max_size, MaxSize}, {life_time, LifeTime}]), ejabberd_hooks:add(c2s_presence_in, Host, ?MODULE, c2s_presence_in, 75), - ejabberd_hooks:add(c2s_filter_packet, Host, ?MODULE, - c2s_filter_packet, 75), - ejabberd_hooks:add(c2s_broadcast_recipients, Host, - ?MODULE, c2s_broadcast_recipients, 75), ejabberd_hooks:add(user_send_packet, Host, ?MODULE, user_send_packet, 75), ejabberd_hooks:add(user_receive_packet, Host, ?MODULE, user_receive_packet, 75), - ejabberd_hooks:add(c2s_stream_features, Host, ?MODULE, + ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE, caps_stream_features, 75), ejabberd_hooks:add(s2s_stream_features, Host, ?MODULE, caps_stream_features, 75), @@ -325,15 +289,11 @@ terminate(_Reason, State) -> Host = State#state.host, ejabberd_hooks:delete(c2s_presence_in, Host, ?MODULE, c2s_presence_in, 75), - ejabberd_hooks:delete(c2s_filter_packet, Host, ?MODULE, - c2s_filter_packet, 75), - ejabberd_hooks:delete(c2s_broadcast_recipients, Host, - ?MODULE, c2s_broadcast_recipients, 75), ejabberd_hooks:delete(user_send_packet, Host, ?MODULE, user_send_packet, 75), ejabberd_hooks:delete(user_receive_packet, Host, ?MODULE, user_receive_packet, 75), - ejabberd_hooks:delete(c2s_stream_features, Host, + ejabberd_hooks:delete(c2s_post_auth_features, Host, ?MODULE, caps_stream_features, 75), ejabberd_hooks:delete(s2s_stream_features, Host, ?MODULE, caps_stream_features, 75), @@ -494,20 +454,6 @@ concat_xdata_fields(#xdata{fields = Fields} = X) -> is_binary(Var), Var /= <<"FORM_TYPE">>], [Form, $<, lists:sort(Res)]. --spec gb_trees_fold(fun((_, _, T) -> T), T, gb_trees:tree()) -> T. -gb_trees_fold(F, Acc, Tree) -> - Iter = gb_trees:iterator(Tree), - gb_trees_fold_iter(F, Acc, Iter). - --spec gb_trees_fold_iter(fun((_, _, T) -> T), T, gb_trees:iter()) -> T. -gb_trees_fold_iter(F, Acc, Iter) -> - case gb_trees:next(Iter) of - {Key, Val, NewIter} -> - NewAcc = F(Key, Val, Acc), - gb_trees_fold_iter(F, NewAcc, NewIter); - _ -> Acc - end. - -spec now_ts() -> integer(). now_ts() -> p1_time_compat:system_time(seconds). diff --git a/src/mod_client_state.erl b/src/mod_client_state.erl index 2bae7a4f8..a838088fc 100644 --- a/src/mod_client_state.erl +++ b/src/mod_client_state.erl @@ -260,20 +260,15 @@ queue_take(Stanza, Host, C2SState) -> NewState = set_queue(Rest, C2SState), {NewState, get_stanzas(Selected, Host) ++ [Stanza]}. --spec set_queue(csi_queue(), term()) -> term(). +-spec set_queue(csi_queue(), ejabberd_c2s:state()) -> ejabberd_c2s:state(). set_queue(Queue, C2SState) -> - ejabberd_c2s:set_aux_field(csi_queue, Queue, C2SState). + C2SState#{csi_queue => Queue}. --spec get_queue(term()) -> csi_queue(). +-spec get_queue(ejabberd_c2s:state()) -> csi_queue(). get_queue(C2SState) -> - case ejabberd_c2s:get_aux_field(csi_queue, C2SState) of - {ok, Queue} -> - Queue; - error -> - [] - end. + maps:get(csi_queue, C2SState, []). -spec get_stanzas(csi_queue(), binary()) -> [stanza()]. diff --git a/src/mod_last.erl b/src/mod_last.erl index 463eac051..2c17dcda3 100644 --- a/src/mod_last.erl +++ b/src/mod_last.erl @@ -37,7 +37,7 @@ process_sm_iq/1, on_presence_update/4, import_info/0, import/5, import_start/2, store_last_info/4, get_last_info/2, remove_user/2, transform_options/1, mod_opt_type/1, - opt_type/1, register_user/2, depends/2]). + opt_type/1, register_user/2, depends/2, privacy_check_packet/4]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -64,6 +64,8 @@ start(Host, Opts) -> ?NS_LAST, ?MODULE, process_local_iq, IQDisc), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_LAST, ?MODULE, process_sm_iq, IQDisc), + ejabberd_hooks:add(privacy_check_packet, Host, ?MODULE, + privacy_check_packet, 30), ejabberd_hooks:add(register_user, Host, ?MODULE, register_user, 50), ejabberd_hooks:add(remove_user, Host, ?MODULE, @@ -143,6 +145,31 @@ process_sm_iq(#iq{from = From, to = To, lang = Lang} = IQ) -> xmpp:make_error(IQ, xmpp:err_subscription_required(Txt, Lang)) end. +privacy_check_packet(allow, C2SState, + #iq{from = From, to = To, type = T} = IQ, in) + when T == get; T == set -> + case xmpp:has_subtag(IQ, #last{}) of + true -> + Sub = ejabberd_c2s:get_subscription(From, C2SState), + if Sub == from; Sub == both -> + Pres = #presence{from = To, to = From}, + case ejabberd_hooks:run_fold( + privacy_check_packet, allow, + [C2SState, Pres, out]) of + allow -> + allow; + deny -> + {stop, deny} + end; + true -> + {stop, deny} + end; + false -> + allow + end; +privacy_check_packet(Acc, _, _, _) -> + Acc. + %% @spec (LUser::string(), LServer::string()) -> %% {ok, TimeStamp::integer(), Status::string()} | not_found | {error, Reason} -spec get_last(binary(), binary()) -> {ok, non_neg_integer(), binary()} | diff --git a/src/mod_legacy_auth.erl b/src/mod_legacy_auth.erl new file mode 100644 index 000000000..3e83680e5 --- /dev/null +++ b/src/mod_legacy_auth.erl @@ -0,0 +1,159 @@ +%%%------------------------------------------------------------------- +%%% Created : 11 Dec 2016 by Evgeny Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2016 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%------------------------------------------------------------------- +-module(mod_legacy_auth). +-behaviour(gen_mod). + +-protocol({xep, 78, '2.5'}). + +%% gen_mod API +-export([start/2, stop/1, depends/2, mod_opt_type/1]). +%% hooks +-export([c2s_unauthenticated_packet/2, c2s_stream_features/2]). + +-include("xmpp.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +start(Host, _Opts) -> + ejabberd_hooks:add(c2s_unauthenticated_packet, Host, ?MODULE, + c2s_unauthenticated_packet, 50), + ejabberd_hooks:add(c2s_pre_auth_features, Host, ?MODULE, + c2s_stream_features, 50). + +stop(Host) -> + ejabberd_hooks:delete(c2s_unauthenticated_packet, Host, ?MODULE, + c2s_unauthenticated_packet, 50), + ejabberd_hooks:delete(c2s_pre_auth_features, Host, ?MODULE, + c2s_stream_features, 50). + +depends(_Host, _Opts) -> + []. + +mod_opt_type(_) -> + []. + +c2s_unauthenticated_packet({noreply, State}, #iq{type = T, sub_els = [_]} = IQ) + when T == get; T == set -> + case xmpp:get_subtag(IQ, #legacy_auth{}) of + #legacy_auth{} = Auth -> + {stop, authenticate(State, xmpp:set_els(IQ, [Auth]))}; + false -> + {noreply, State} + end; +c2s_unauthenticated_packet(Acc, _) -> + Acc. + +c2s_stream_features(Acc, LServer) -> + case gen_mod:is_loaded(LServer, ?MODULE) of + true -> + [#legacy_auth_feature{}|Acc]; + false -> + Acc + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +authenticate(#{server := Server} = State, + #iq{type = get, sub_els = [#legacy_auth{}]} = IQ) -> + LServer = jid:nameprep(Server), + Auth = #legacy_auth{username = <<>>, password = <<>>, resource = <<>>}, + Res = case ejabberd_auth:plain_password_required(LServer) of + false -> + xmpp:make_iq_result(IQ, Auth#legacy_auth{digest = <<>>}); + true -> + xmpp:make_iq_result(IQ, Auth) + end, + ejabberd_c2s:send(State, Res); +authenticate(State, + #iq{type = set, lang = Lang, + sub_els = [#legacy_auth{username = U, + resource = R}]} = IQ) + when U == undefined; R == undefined; U == <<"">>; R == <<"">> -> + Txt = <<"Both the username and the resource are required">>, + Err = xmpp:make_error(IQ, xmpp:err_not_acceptable(Txt, Lang)), + ejabberd_c2s:send(State, Err); +authenticate(#{stream_id := StreamID, server := Server, + access := Access, ip := IP} = State, + #iq{type = set, lang = Lang, + sub_els = [#legacy_auth{username = U, + password = P0, + digest = D0, + resource = R}]} = IQ) -> + P = if is_binary(P0) -> P0; true -> <<>> end, + D = if is_binary(D0) -> D0; true -> <<>> end, + DGen = fun (PW) -> p1_sha:sha(<>) end, + JID = jid:make(U, Server, R), + case JID /= error andalso + acl:access_matches(Access, + #{usr => jid:split(JID), ip => IP}, + JID#jid.lserver) == allow of + true -> + case ejabberd_auth:check_password_with_authmodule( + U, U, JID#jid.lserver, P, D, DGen) of + {true, AuthModule} -> + case ejabberd_c2s:handle_auth_success( + U, <<"legacy">>, AuthModule, State) of + {noreply, State1} -> + State2 = State1#{user := U}, + open_session(State2, IQ, R); + Err -> + Err + end; + _ -> + Err = xmpp:make_error(IQ, xmpp:err_not_authorized()), + process_auth_failure(State, U, Err, 'not-authorized') + end; + false when JID == error -> + Err = xmpp:make_error(IQ, xmpp:err_jid_malformed()), + process_auth_failure(State, U, Err, 'jid-malformed'); + false -> + Txt = <<"Denied by ACL">>, + Err = xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang)), + process_auth_failure(State, U, Err, 'forbidden') + end. + +open_session(State, IQ, R) -> + case ejabberd_c2s:bind(R, State) of + {ok, State1} -> + Res = xmpp:make_iq_result(IQ), + case ejabberd_c2s:send(State1, Res) of + {noreply, State2} -> + {noreply, State2#{stream_authenticated := true, + stream_state := session_established}}; + Err -> + Err + end; + {error, Err, State1} -> + Res = xmpp:make_error(IQ, Err), + ejabberd_c2s:send(State1, Res) + end. + +process_auth_failure(State, User, StanzaErr, Reason) -> + case ejabberd_c2s:send(State, StanzaErr) of + {noreply, State1} -> + ejabberd_c2s:handle_auth_failure( + User, <<"legacy">>, Reason, State1); + Err -> + Err + end. diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 42bc46631..e0e36a1da 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -61,6 +61,7 @@ count_offline_messages/2, get_offline_els/2, find_x_expire/2, + c2s_handle_info/2, webadmin_page/3, webadmin_user/4, webadmin_user_parse_query/5]). @@ -156,6 +157,7 @@ init([Host, Opts]) -> ejabberd_hooks:add(disco_sm_items, Host, ?MODULE, get_sm_items, 50), ejabberd_hooks:add(disco_info, Host, ?MODULE, get_info, 50), + ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE, c2s_handle_info, 50), ejabberd_hooks:add(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:add(webadmin_user, Host, @@ -211,6 +213,7 @@ terminate(_Reason, State) -> ejabberd_hooks:delete(disco_sm_identity, Host, ?MODULE, get_sm_identity, 50), ejabberd_hooks:delete(disco_sm_items, Host, ?MODULE, get_sm_items, 50), ejabberd_hooks:delete(disco_info, Host, ?MODULE, get_info, 50), + ejabberd_hooks:delete(c2s_handle_info, Host, ?MODULE, c2s_handle_info, 50), ejabberd_hooks:delete(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:delete(webadmin_user, Host, @@ -277,38 +280,28 @@ get_sm_identity(Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, get_sm_identity(Acc, _From, _To, _Node, _Lang) -> Acc. -get_sm_items(_Acc, #jid{luser = U, lserver = S, lresource = R} = JID, +get_sm_items(_Acc, #jid{luser = U, lserver = S} = JID, #jid{luser = U, lserver = S}, ?NS_FLEX_OFFLINE, _Lang) -> - case ejabberd_sm:get_session_pid(U, S, R) of - Pid when is_pid(Pid) -> - Mod = gen_mod:db_mod(S, ?MODULE), - Hdrs = Mod:read_message_headers(U, S), - BareJID = jid:remove_resource(JID), - Pid ! dont_ask_offline, - {result, lists:map( - fun({Seq, From, _To, _TS, _El}) -> - Node = integer_to_binary(Seq), - #disco_item{jid = BareJID, - node = Node, - name = jid:to_string(From)} - end, Hdrs)}; - none -> - {result, []} - end; + ejabberd_sm:route(JID, {resend_offline, false}), + Mod = gen_mod:db_mod(S, ?MODULE), + Hdrs = Mod:read_message_headers(U, S), + BareJID = jid:remove_resource(JID), + {result, lists:map( + fun({Seq, From, _To, _TS, _El}) -> + Node = integer_to_binary(Seq), + #disco_item{jid = BareJID, + node = Node, + name = jid:to_string(From)} + end, Hdrs)}; get_sm_items(Acc, _From, _To, _Node, _Lang) -> Acc. -spec get_info([xdata()], binary(), module(), binary(), binary()) -> [xdata()]; ([xdata()], jid(), jid(), binary(), binary()) -> [xdata()]. -get_info(_Acc, #jid{luser = U, lserver = S, lresource = R}, +get_info(_Acc, #jid{luser = U, lserver = S} = JID, #jid{luser = U, lserver = S}, ?NS_FLEX_OFFLINE, Lang) -> - case ejabberd_sm:get_session_pid(U, S, R) of - Pid when is_pid(Pid) -> - Pid ! dont_ask_offline; - none -> - ok - end, + ejabberd_sm:route(JID, {resend_offline, false}), [#xdata{type = result, fields = flex_offline:encode( [{number_of_messages, count_offline_messages(U, S)}], @@ -316,6 +309,12 @@ get_info(_Acc, #jid{luser = U, lserver = S, lresource = R}, get_info(Acc, _From, _To, _Node, _Lang) -> Acc. +-spec c2s_handle_info(ejabberd_c2s:next_state(), term()) -> ejabberd_c2s:next_state(). +c2s_handle_info({noreply, State}, {resend_offline, Flag}) -> + {noreply, State#{resend_offline => Flag}}; +c2s_handle_info(Acc, _) -> + Acc. + -spec handle_offline_query(iq()) -> iq(). handle_offline_query(#iq{from = #jid{luser = U1, lserver = S1}, to = #jid{luser = U2, lserver = S2}, @@ -395,18 +394,15 @@ set_offline_tag(Msg, Node) -> xmpp:set_subtag(Msg, #offline{items = [#offline_item{node = Node}]}). -spec handle_offline_fetch(jid()) -> ok. -handle_offline_fetch(#jid{luser = U, lserver = S, lresource = R}) -> - case ejabberd_sm:get_session_pid(U, S, R) of - none -> - ok; - Pid when is_pid(Pid) -> - Pid ! dont_ask_offline, - lists:foreach( - fun({Node, El}) -> - NewEl = set_offline_tag(El, Node), - Pid ! {route, xmpp:get_from(El), xmpp:get_to(El), NewEl} - end, read_messages(U, S)) - end. +handle_offline_fetch(#jid{luser = U, lserver = S} = JID) -> + ejabberd_sm:route(JID, {resend_offline, false}), + lists:foreach( + fun({Node, El}) -> + NewEl = set_offline_tag(El, Node), + From = xmpp:get_from(El), + To = xmpp:get_to(El), + ejabberd_router:route(From, To, NewEl) + end, read_messages(U, S)). -spec fetch_msg_by_node(jid(), binary()) -> error | {ok, #offline_msg{}}. fetch_msg_by_node(To, Seq) -> diff --git a/src/mod_privacy.erl b/src/mod_privacy.erl index d6936e1b7..d76edc91d 100644 --- a/src/mod_privacy.erl +++ b/src/mod_privacy.erl @@ -35,7 +35,7 @@ process_iq_set/3, process_iq_get/3, get_user_list/3, check_packet/6, remove_user/2, encode_list_item/1, is_list_needdb/1, updated_list/3, - import_start/2, import_stop/2, + import_start/2, import_stop/2, c2s_handle_info/2, item_to_xml/1, get_user_lists/2, import/5, set_privacy_list/1, mod_opt_type/1, depends/2]). @@ -77,6 +77,8 @@ start(Host, Opts) -> updated_list, 50), ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 50), + ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE, + c2s_handle_info, 50), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PRIVACY, ?MODULE, process_iq, IQDisc). @@ -94,6 +96,8 @@ stop(Host) -> ?MODULE, updated_list, 50), ejabberd_hooks:delete(remove_user, Host, ?MODULE, remove_user, 50), + ejabberd_hooks:delete(c2s_handle_info, Host, ?MODULE, + c2s_handle_info, 50), gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PRIVACY). @@ -310,13 +314,8 @@ process_lists_set(LUser, LServer, Name, [], _UserList, Lang) -> Txt = <<"No privacy list with this name found">>, {error, xmpp:err_item_not_found(Txt, Lang)}; {atomic, ok} -> - ejabberd_sm:route(jid:make(LUser, LServer, - <<"">>), - jid:make(LUser, LServer, <<"">>), - {broadcast, {privacy_list, - #userlist{name = Name, - list = []}, - Name}}), + ejabberd_sm:route(jid:make(LUser, LServer, <<"">>), + {privacy_list, #userlist{name = Name}, Name}), {result, undefined}; Err -> ?ERROR_MSG("failed to remove privacy list '~s' for user ~s@~s: ~p", @@ -334,14 +333,12 @@ process_lists_set(LUser, LServer, Name, Items, _UserList, Lang) -> case Mod:set_privacy_list(LUser, LServer, Name, List) of {atomic, ok} -> NeedDb = is_list_needdb(List), - ejabberd_sm:route(jid:make(LUser, LServer, - <<"">>), - jid:make(LUser, LServer, <<"">>), - {broadcast, {privacy_list, - #userlist{name = Name, - list = List, - needdb = NeedDb}, - Name}}), + ejabberd_sm:route(jid:make(LUser, LServer, <<"">>), + {privacy_list, + #userlist{name = Name, + list = List, + needdb = NeedDb}, + Name}), {result, undefined}; Err -> ?ERROR_MSG("failed to set privacy list '~s' " @@ -538,6 +535,23 @@ remove_user(User, Server) -> Mod = gen_mod:db_mod(LServer, ?MODULE), Mod:remove_user(LUser, LServer). +c2s_handle_info({noreply, #{privacy_list := Old, + user := U, server := S, resource := R} = State}, + {privacy_list, New, Name}) -> + List = if Old#userlist.name == New#userlist.name -> New; + true -> Old + end, + From = jid:make(U, S), + To = jid:make(U, S, R), + PushIQ = #iq{type = set, from = From, to = To, + id = <<"push", (randoms:get_string())/binary>>, + sub_els = [#privacy_query{ + lists = [#privacy_list{name = Name}]}]}, + State1 = State#{privacy_list => List}, + ejabberd_c2s:send(State1, PushIQ); +c2s_handle_info(Acc, _) -> + Acc. + -spec updated_list(userlist(), userlist(), userlist()) -> userlist(). updated_list(_, #userlist{name = OldName} = Old, #userlist{name = NewName} = New) -> diff --git a/src/mod_pubsub.erl b/src/mod_pubsub.erl index 717796fec..98d50660c 100644 --- a/src/mod_pubsub.erl +++ b/src/mod_pubsub.erl @@ -54,7 +54,8 @@ on_user_offline/3, remove_user/2, disco_local_identity/5, disco_local_features/5, disco_local_items/5, disco_sm_identity/5, - disco_sm_features/5, disco_sm_items/5]). + disco_sm_features/5, disco_sm_items/5, + c2s_handle_info/2]). %% exported iq handlers -export([iq_sm/1, process_disco_info/1, process_disco_items/1, @@ -305,6 +306,8 @@ init([ServerHost, Opts]) -> ?MODULE, remove_user, 50), ejabberd_hooks:add(anonymous_purge_hook, ServerHost, ?MODULE, remove_user, 50), + ejabberd_hooks:add(c2s_handle_info, ServerHost, + ?MODULE, c2s_handle_info, 50), gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO, ?MODULE, process_disco_info, IQDisc), gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS, @@ -912,6 +915,8 @@ terminate(_Reason, ?MODULE, remove_user, 50), ejabberd_hooks:delete(anonymous_purge_hook, ServerHost, ?MODULE, remove_user, 50), + ejabberd_hooks:delete(c2s_handle_info, ServerHost, + ?MODULE, c2s_handle_info, 50), gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO), gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS), gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_PUBSUB), @@ -2212,22 +2217,21 @@ send_items(Host, Node, _Nidx, _Type, Options, LJID, _) -> dispatch_items(Host, LJID, Node, Stanza). dispatch_items({FromU, FromS, FromR} = From, {ToU, ToS, ToR} = To, - Node, Stanza) -> + Node, Stanza) -> C2SPid = case ejabberd_sm:get_session_pid(ToU, ToS, ToR) of - ToPid when is_pid(ToPid) -> ToPid; - _ -> - R = user_resource(FromU, FromS, FromR), - case ejabberd_sm:get_session_pid(FromU, FromS, R) of - FromPid when is_pid(FromPid) -> FromPid; - _ -> undefined - end - end, + ToPid when is_pid(ToPid) -> ToPid; + _ -> + R = user_resource(FromU, FromS, FromR), + case ejabberd_sm:get_session_pid(FromU, FromS, R) of + FromPid when is_pid(FromPid) -> FromPid; + _ -> undefined + end + end, if C2SPid == undefined -> ok; - true -> - ejabberd_c2s:send_filtered(C2SPid, - {pep_message, <>}, - service_jid(From), jid:make(To), - Stanza) + true -> + C2SPid ! {send_filtered, {pep_message, <>}, + service_jid(From), jid:make(To), + Stanza} end; dispatch_items(From, To, _Node, Stanza) -> ejabberd_router:route(service_jid(From), jid:make(To), Stanza). @@ -2761,9 +2765,10 @@ get_resource_state({U, S, R}, ShowValues, JIDs) -> lists:append([{U, S, R}], JIDs); Pid -> Show = case ejabberd_c2s:get_presence(Pid) of - {_, _, <<"available">>, _} -> <<"online">>; - {_, _, State, _} -> State - end, + #presence{type = unavailable} -> <<"unavailable">>; + #presence{show = undefined} -> <<"online">>; + #presence{show = S} -> atom_to_binary(S, latin1) + end, case lists:member(Show, ShowValues) of %% If yes, item can be delivered true -> lists:append([{U, S, R}], JIDs); @@ -3008,25 +3013,56 @@ broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, Nidx, Type, NodeO broadcast_stanza({LUser, LServer, <<>>}, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM), %% Handles implicit presence subscriptions SenderResource = user_resource(LUser, LServer, LResource), - case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of - C2SPid when is_pid(C2SPid) -> - NotificationType = get_option(NodeOptions, notification_type, headline), - Stanza = add_message_type(BaseStanza, NotificationType), - %% set the from address on the notification to the bare JID of the account owner - %% Also, add "replyto" if entity has presence subscription to the account owner - %% See XEP-0163 1.1 section 4.3.1 - ejabberd_c2s:broadcast(C2SPid, - {pep_message, <<((Node))/binary, "+notify">>}, - _Sender = jid:make(LUser, LServer, <<"">>), - _StanzaToSend = add_extended_headers( - Stanza, - _ReplyTo = extended_headers([Publisher]))); - _ -> - ?DEBUG("~p@~p has no session; can't deliver ~p to contacts", [LUser, LServer, BaseStanza]) - end; + NotificationType = get_option(NodeOptions, notification_type, headline), + Stanza = add_message_type(BaseStanza, NotificationType), + %% set the from address on the notification to the bare JID of the account owner + %% Also, add "replyto" if entity has presence subscription to the account owner + %% See XEP-0163 1.1 section 4.3.1 + ejabberd_sm:route(jid:make(LUser, LServer, SenderResource), + {pep_message, <<((Node))/binary, "+notify">>, + jid:make(LUser, LServer, <<"">>), + add_extended_headers( + Stanza, extended_headers([Publisher]))}); broadcast_stanza(Host, _Publisher, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) -> broadcast_stanza(Host, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM). +-spec c2s_handle_info(ejabberd_c2s:next_state(), term()) -> ejabberd_c2s:next_state(). +c2s_handle_info({noreply, #{server := Server} = C2SState}, + {pep_message, Feature, From, Packet}) -> + LServer = jid:nameprep(Server), + lists:foreach( + fun({USR, Caps}) -> + Features = mod_caps:get_features(LServer, Caps), + case lists:member(Feature, Features) of + true -> + To = jid:make(USR), + NewPacket = xmpp:set_from_to(Packet, From, To), + ejabberd_router:route(From, To, NewPacket); + false -> + ok + end + end, mod_caps:list_features(C2SState)), + {noreply, C2SState}; +c2s_handle_info({noreply, #{server := Server} = C2SState}, + {send_filtered, {pep_message, Feature}, From, To, Packet}) -> + LServer = jid:nameprep(Server), + case mod_caps:get_user_caps(To, C2SState) of + {ok, Caps} -> + Features = mod_caps:get_features(LServer, Caps), + case lists:member(Feature, Features) of + true -> + NewPacket = xmpp:set_from_to(Packet, From, To), + ejabberd_router:route(From, To, NewPacket); + false -> + ok + end; + error -> + ok + end, + {noreply, C2SState}; +c2s_handle_info(Acc, _) -> + Acc. + subscribed_nodes_by_jid(NotifyType, SubsByDepth) -> NodesToDeliver = fun (Depth, Node, Subs, Acc) -> NodeName = case Node#pubsub_node.nodeid of diff --git a/src/mod_register.erl b/src/mod_register.erl index b96ebecbd..515cb1066 100644 --- a/src/mod_register.erl +++ b/src/mod_register.erl @@ -34,7 +34,7 @@ -behaviour(gen_mod). -export([start/2, stop/1, stream_feature_register/2, - unauthenticated_iq_register/4, try_register/5, + c2s_unauthenticated_packet/2, try_register/5, process_iq/1, send_registration_notifications/3, transform_options/1, transform_module_options/1, mod_opt_type/1, opt_type/1, depends/2]). @@ -50,10 +50,10 @@ start(Host, Opts) -> ?NS_REGISTER, ?MODULE, process_iq, IQDisc), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_REGISTER, ?MODULE, process_iq, IQDisc), - ejabberd_hooks:add(c2s_stream_features, Host, ?MODULE, + ejabberd_hooks:add(c2s_pre_auth_features, Host, ?MODULE, stream_feature_register, 50), - ejabberd_hooks:add(c2s_unauthenticated_iq, Host, - ?MODULE, unauthenticated_iq_register, 50), + ejabberd_hooks:add(c2s_unauthenticated_packet, Host, + ?MODULE, c2s_unauthenticated_packet, 50), ejabberd_mnesia:create(?MODULE, mod_register_ip, [{ram_copies, [node()]}, {local_content, true}, {attributes, [key, value]}]), @@ -62,10 +62,10 @@ start(Host, Opts) -> ok. stop(Host) -> - ejabberd_hooks:delete(c2s_stream_features, Host, + ejabberd_hooks:delete(c2s_pre_auth_features, Host, ?MODULE, stream_feature_register, 50), - ejabberd_hooks:delete(c2s_unauthenticated_iq, Host, - ?MODULE, unauthenticated_iq_register, 50), + ejabberd_hooks:delete(c2s_unauthenticated_packet, Host, + ?MODULE, c2s_unauthenticated_packet, 50), gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_REGISTER), gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, @@ -86,19 +86,20 @@ stream_feature_register(Acc, Host) -> Acc end. --spec unauthenticated_iq_register(empty | iq(), binary(), iq(), - {inet:ip_address(), non_neg_integer()}) -> - empty | iq(). -unauthenticated_iq_register(_Acc, Server, - #iq{sub_els = [#register{}]} = IQ, IP) -> - Address = case IP of - {A, _Port} -> A; - _ -> undefined - end, - ResIQ = process_iq(xmpp:set_from_to(IQ, jid:make(<<>>), jid:make(Server)), - Address), - xmpp:set_from_to(ResIQ, jid:make(Server), undefined); -unauthenticated_iq_register(Acc, _Server, _IQ, _IP) -> +c2s_unauthenticated_packet({noreply, #{ip := IP, server := Server} = State}, + #iq{type = T, sub_els = [_]} = IQ) + when T == set; T == get -> + case xmpp:get_subtag(IQ, #register{}) of + #register{} -> + {Address, _} = IP, + IQ1 = xmpp:set_from_to(IQ, jid:make(<<>>), jid:make(Server)), + ResIQ = process_iq(IQ1, Address), + ResIQ1 = xmpp:set_from_to(ResIQ, jid:make(Server), undefined), + {stop, ejabberd_c2s:send(State, ResIQ1)}; + false -> + {noreply, State} + end; +c2s_unauthenticated_packet(Acc, _) -> Acc. process_iq(#iq{from = From} = IQ) -> diff --git a/src/mod_roster.erl b/src/mod_roster.erl index 89578571c..a896ef055 100644 --- a/src/mod_roster.erl +++ b/src/mod_roster.erl @@ -44,7 +44,7 @@ -export([start/2, stop/1, process_iq/1, export/1, import_info/0, process_local_iq/1, get_user_roster/2, import/5, get_subscription_lists/3, get_roster/2, - import_start/2, import_stop/2, + import_start/2, import_stop/2, c2s_handle_info/2, get_in_pending_subscriptions/3, in_subscription/6, out_subscription/4, set_items/3, remove_user/2, get_jid_info/4, encode_item/1, webadmin_page/3, @@ -63,6 +63,8 @@ -include("ejabberd_web_admin.hrl"). +-define(SETS, gb_sets). + -export_type([subscription/0]). -callback init(binary(), gen_mod:opts()) -> any(). @@ -102,12 +104,14 @@ start(Host, Opts) -> remove_user, 50), ejabberd_hooks:add(resend_subscription_requests_hook, Host, ?MODULE, get_in_pending_subscriptions, 50), - ejabberd_hooks:add(roster_get_versioning_feature, Host, + ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE, get_versioning_feature, 50), ejabberd_hooks:add(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:add(webadmin_user, Host, ?MODULE, webadmin_user, 50), + ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE, + c2s_handle_info, 50), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_ROSTER, ?MODULE, process_iq, IQDisc). @@ -128,12 +132,14 @@ stop(Host) -> ?MODULE, remove_user, 50), ejabberd_hooks:delete(resend_subscription_requests_hook, Host, ?MODULE, get_in_pending_subscriptions, 50), - ejabberd_hooks:delete(roster_get_versioning_feature, + ejabberd_hooks:delete(c2s_post_auth_features, Host, ?MODULE, get_versioning_feature, 50), ejabberd_hooks:delete(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:delete(webadmin_user, Host, ?MODULE, webadmin_user, 50), + ejabberd_hooks:delete(c2s_handle_info, Host, ?MODULE, + c2s_handle_info, 50), gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_ROSTER). @@ -417,10 +423,8 @@ process_iq_set(#iq{from = From, to = To, end. push_item(User, Server, From, Item) -> - ejabberd_sm:route(jid:make(<<"">>, <<"">>, <<"">>), - jid:make(User, Server, <<"">>), - {broadcast, {item, Item#roster.jid, - Item#roster.subscription}}), + ejabberd_sm:route(jid:make(User, Server, <<"">>), + {item, Item#roster.jid, Item#roster.subscription}), case roster_versioning_enabled(Server) of true -> push_item_version(Server, User, From, Item, @@ -460,6 +464,66 @@ push_item_version(Server, User, From, Item, end, ejabberd_sm:get_user_resources(User, Server)). +c2s_handle_info({noreply, State}, {item, JID, Sub}) -> + {noreply, roster_change(State, JID, Sub)}; +c2s_handle_info(Acc, _) -> + Acc. + +-spec roster_change(ejabberd_c2s:state(), jid(), subscription()) -> ejabberd_c2s:state(). +roster_change(#{user := U, server := S, resource := R} = State, + IJID, ISubscription) -> + LIJID = jid:tolower(IJID), + IsFrom = (ISubscription == both) or (ISubscription == from), + IsTo = (ISubscription == both) or (ISubscription == to), + PresF = maps:get(pres_f, State, ?SETS:new()), + PresT = maps:get(pres_t, State, ?SETS:new()), + OldIsFrom = ?SETS:is_element(LIJID, PresF), + FSet = if IsFrom -> ?SETS:add_element(LIJID, PresF); + true -> ?SETS:del_element(LIJID, PresF) + end, + TSet = if IsTo -> ?SETS:add_element(LIJID, PresT); + true -> ?SETS:del_element(LIJID, PresT) + end, + State1 = State#{pres_f => FSet, pres_t => TSet}, + case maps:get(pres_last, State, undefined) of + undefined -> + State1; + LastPres -> + From = jid:make(U, S, R), + PresA = maps:get(pres_a, State1, ?SETS:new()), + To = jid:make(IJID), + Cond1 = IsFrom andalso not OldIsFrom, + Cond2 = not IsFrom andalso OldIsFrom andalso + ?SETS:is_element(LIJID, PresA), + if Cond1 -> + case ejabberd_hooks:run_fold( + privacy_check_packet, allow, + [State1, LastPres, out]) of + deny -> + ok; + allow -> + Pres = xmpp:set_from_to(LastPres, From, To), + ejabberd_router:route(From, To, Pres) + end, + A = ?SETS:add_element(LIJID, PresA), + State1#{pres_a => A}; + Cond2 -> + PU = #presence{from = From, to = To, type = unavailable}, + case ejabberd_hooks:run_fold( + privacy_check_packet, allow, + [State1, PU, out]) of + deny -> + ok; + allow -> + ejabberd_router:route(From, To, PU) + end, + A = ?SETS:del_element(LIJID, PresA), + State1#{pres_a => A}; + true -> + State1 + end + end. + -spec get_subscription_lists({[ljid()], [ljid()]}, binary(), binary()) -> {[ljid()], [ljid()]}. get_subscription_lists(_Acc, User, Server) -> diff --git a/src/mod_shared_roster.erl b/src/mod_shared_roster.erl index 8ef0f41b5..e91f7481a 100644 --- a/src/mod_shared_roster.erl +++ b/src/mod_shared_roster.erl @@ -1038,11 +1038,8 @@ split_grouphost(Host, Group) -> end. broadcast_subscription(User, Server, ContactJid, Subscription) -> - ejabberd_sm:route( - jid:make(<<"">>, Server, <<"">>), - jid:make(User, Server, <<"">>), - {broadcast, {item, ContactJid, - Subscription}}). + ejabberd_sm:route(jid:make(User, Server, <<"">>), + {item, ContactJid, Subscription}). displayed_groups_update(Members, DisplayedGroups, Subscription) -> lists:foreach(fun({U, S}) -> diff --git a/src/xmpp_stream_in.erl b/src/xmpp_stream_in.erl new file mode 100644 index 000000000..6294a7893 --- /dev/null +++ b/src/xmpp_stream_in.erl @@ -0,0 +1,698 @@ +%%%------------------------------------------------------------------- +%%% Created : 26 Nov 2016 by Evgeny Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2016 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%------------------------------------------------------------------- +-module(xmpp_stream_in). +-behaviour(gen_server). + +-protocol({rfc, 6120}). + +%% API +-export([start/3, call/3, cast/2, reply/2, send/2, send_error/3, + get_transport/1, change_shaper/2]). + +%% gen_server callbacks +-export([init/1, handle_cast/2, handle_call/3, handle_info/2, + terminate/2, code_change/3]). + +-include("xmpp.hrl"). +-type state() :: map(). +-type next_state() :: {noreply, state()} | {stop, term(), state()}. + +-callback init(list()) -> {ok, state()} | {stop, term()} | ignore. +-callback handle_authenticated_packet(xmpp_element(), state()) -> next_state(). + +%%%=================================================================== +%%% API +%%%=================================================================== +start(Mod, Args, Opts) -> + gen_server:start(?MODULE, [Mod|Args], Opts). + +call(Ref, Msg, Timeout) -> + gen_server:call(Ref, Msg, Timeout). + +cast(Ref, Msg) -> + gen_server:cast(Ref, Msg). + +reply(Ref, Reply) -> + gen_server:reply(Ref, Reply). + +-spec send(state(), xmpp_element()) -> next_state(). +send(State, Pkt) -> + send_element(State, Pkt). + +get_transport(#{sockmod := SockMod, socket := Socket}) -> + SockMod:get_transport(Socket). + +-spec change_shaper(state(), shaper:shaper()) -> ok. +change_shaper(#{sockmod := SockMod, socket := Socket}, Shaper) -> + SockMod:change_shaper(Socket, Shaper). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([Module, {SockMod, Socket}, Opts]) -> + XMLSocket = case lists:keyfind(xml_socket, 1, Opts) of + {_, XS} -> XS; + false -> false + end, + TLSEnabled = proplists:get_bool(tls, Opts), + SocketMonitor = SockMod:monitor(Socket), + case peername(SockMod, Socket) of + {ok, IP} -> + State = #{mod => Module, + socket => Socket, + sockmod => SockMod, + socket_monitor => SocketMonitor, + stream_id => new_id(), + stream_state => wait_for_stream, + stream_restarted => false, + stream_compressed => false, + stream_tlsed => TLSEnabled, + stream_version => {1,0}, + stream_authenticated => false, + xml_socket => XMLSocket, + xmlns => ?NS_CLIENT, + lang => <<"">>, + user => <<"">>, + server => <<"">>, + resource => <<"">>, + ip => IP}, + Module:init([State, Opts]); + {error, Reason} -> + {stop, Reason} + end. + +handle_cast(Cast, #{mod := Mod} = State) -> + Mod:handle_cast(Cast, State). + +handle_call(Call, From, #{mod := Mod} = State) -> + Mod:handle_call(Call, From, State). + +handle_info({'$gen_event', {xmlstreamstart, Name, Attrs}}, + #{stream_state := wait_for_stream} = State) -> + try xmpp:decode(#xmlel{name = Name, attrs = Attrs}) of + #stream_start{} = Pkt -> + case send_header(State, Pkt) of + {noreply, State1} -> + process_stream(Pkt, State1); + Err -> + Err + end; + _ -> + case send_header(State) of + {noreply, State1} -> + send_element(State1, xmpp:serr_invalid_xml()); + Err -> + Err + end + catch _:{xmpp_codec, Why} -> + case send_header(State) of + {noreply, State1} -> process_invalid_xml(Why, State1); + Err -> Err + end + end; +handle_info({'$gen_event', {xmlstreamend, _}}, #{mod := Mod} = State) -> + try Mod:handle_stream_end(State) + catch _:undef -> {stop, normal, State} + end; +handle_info({'$gen_event', {xmlstreamerror, Reason}}, #{lang := Lang}= State) -> + case send_header(State) of + {noreply, State1} -> + Err = case Reason of + <<"XML stanza is too big">> -> + xmpp:serr_policy_violation(Reason, Lang); + _ -> + xmpp:serr_not_well_formed() + end, + send_element(State1, Err); + Err -> + Err + end; +handle_info({'$gen_event', {xmlstreamelement, El}}, + #{xmlns := NS} = State) -> + try xmpp:decode(El, NS, [ignore_els]) of + Pkt -> + process_element(Pkt, State) + catch _:{xmpp_codec, Why} -> + process_invalid_xml(Why, State) + end; +handle_info({'$gen_all_state_event', {xmlstreamcdata, Data}}, + #{mod := Mod} = State) -> + try Mod:handle_cdata(Data, State) + catch _:undef -> {noreply, State} + end; +handle_info(closed, #{mod := Mod} = State) -> + try Mod:handle_stream_close(State) + catch _:undef -> {stop, normal, State} + end; +handle_info({'DOWN', MRef, _Type, _Object, _Info}, + #{socket_monitor := MRef, mod := Mod} = State) -> + try Mod:handle_stream_close(State) + catch _:undef -> {stop, normal, State} + end; +handle_info(Info, #{mod := Mod} = State) -> + Mod:handle_info(Info, State). + +terminate(Reason, #{mod := Mod, socket := Socket, + sockmod := SockMod} = State) -> + Mod:terminate(Reason, State), + send_text(State, <<"">>), + SockMod:close(Socket). + +code_change(OldVsn, #{mod := Mod} = State, Extra) -> + Mod:code_change(OldVsn, State, Extra). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec new_id() -> binary(). +new_id() -> + randoms:get_string(). + +peername(SockMod, Socket) -> + case SockMod of + gen_tcp -> inet:peername(Socket); + _ -> SockMod:peername(Socket) + end. + +process_invalid_xml(Reason, #{lang := Lang} = State) -> + Txt = xmpp:io_format_error(Reason), + send_element(State, xmpp:serr_invalid_xml(Txt, Lang)). + +process_stream(#stream_start{xmlns = XML_NS, + stream_xmlns = STREAM_NS}, + #{xmlns := NS} = State) + when XML_NS /= NS; STREAM_NS /= ?NS_STREAM -> + send_element(State, xmpp:serr_invalid_namespace()); +process_stream(#stream_start{lang = Lang}, + #{xmlns := ?NS_CLIENT, lang := DefaultLang} = State) + when size(Lang) > 35 -> + %% As stated in BCP47, 4.4.1: + %% Protocols or specifications that specify limited buffer sizes for + %% language tags MUST allow for language tags of at least 35 characters. + %% Do not store long language tag to avoid possible DoS/flood attacks + Txt = <<"Too long value of 'xml:lang' attribute">>, + send_element(State, xmpp:serr_policy_violation(Txt, DefaultLang)); +process_stream(#stream_start{to = undefined}, #{lang := Lang} = State) -> + Txt = <<"Missing 'to' attribute">>, + send_element(State, xmpp:serr_improper_addressing(Txt, Lang)); +process_stream(#stream_start{from = undefined, version = {1,0}}, + #{lang := Lang, xmlns := ?NS_SERVER, + stream_tlsed := true} = State) -> + Txt = <<"Missing 'from' attribute">>, + send_element(State, xmpp:serr_invalid_from(Txt, Lang)); +process_stream(#stream_start{to = #jid{luser = U, lresource = R}}, + #{lang := Lang} = State) when U /= <<"">>; R /= <<"">> -> + Txt = <<"Improper 'to' attribute">>, + send_element(State, xmpp:serr_improper_addressing(Txt, Lang)); +process_stream(#stream_start{to = #jid{lserver = RemoteServer}}, + #{xmlns := ?NS_COMPONENT, mod := Mod} = State) -> + State1 = State#{remote_server => RemoteServer}, + case try Mod:handle_stream_start(State1) + catch _:undef -> {noreply, State1} + end of + {noreply, State2} -> + {noreply, State2#{stream_state => wait_for_handshake}}; + Err -> + Err + end; +process_stream(#stream_start{to = #jid{server = Server}, from = From}, + #{stream_authenticated := Authenticated, + stream_restarted := StreamWasRestarted, + mod := Mod, xmlns := NS, resource := Resource, + stream_tlsed := TLSEnabled} = State) -> + case if not StreamWasRestarted -> + State1 = State#{server => Server}, + try Mod:handle_stream_start(State1) + catch _:undef -> {noreply, State1} + end; + true -> + {noreply, State} + end of + {noreply, State2} -> + State3 = if NS == ?NS_SERVER andalso TLSEnabled -> + State2#{remote_server => From#jid.lserver}; + true -> + State2 + end, + case send_features(State3) of + {noreply, State4} -> + TLSRequired = is_starttls_required(State4), + NewStreamState = + if not Authenticated and + (not TLSEnabled and TLSRequired) -> + wait_for_starttls; + not Authenticated -> + wait_for_sasl_request; + (NS == ?NS_CLIENT) and (Resource == <<"">>) -> + wait_for_bind; + true -> + session_established + end, + {noreply, State4#{stream_state => NewStreamState}}; + Err -> + Err + end; + Err -> + Err + end. + +process_element(Pkt, #{stream_state := StateName, lang := Lang} = State) -> + case Pkt of + #starttls{} when StateName == wait_for_starttls; + StateName == wait_for_sasl_request -> + process_starttls(State); + #starttls{} -> + send_element(State, #starttls_failure{}); + #sasl_auth{} when StateName == wait_for_starttls -> + send_element(State, #sasl_failure{reason = 'encryption-required'}); + #sasl_auth{} when StateName == wait_for_sasl_request -> + process_sasl_request(Pkt, State); + #sasl_auth{} -> + Txt = <<"SASL negotiation is not allowed in this state">>, + send_element(State, #sasl_failure{reason = 'not-authorized', + text = xmpp:mk_text(Txt, Lang)}); + #sasl_response{} when StateName == wait_for_starttls -> + send_element(State, #sasl_failure{reason = 'encryption-required'}); + #sasl_response{} when StateName == wait_for_sasl_response -> + process_sasl_response(Pkt, State); + #sasl_response{} -> + Txt = <<"SASL negotiation is not allowed in this state">>, + send_element(State, #sasl_failure{reason = 'not-authorized', + text = xmpp:mk_text(Txt, Lang)}); + #sasl_abort{} when StateName == wait_for_sasl_response -> + process_sasl_abort(State); + #sasl_abort{} -> + send_element(State, #sasl_failure{reason = 'aborted'}); + #sasl_success{} -> + {noreply, State}; + #compress{} when StateName == wait_for_sasl_response -> + send_element(State, #compress_failure{reason = 'setup-failed'}); + #compress{} -> + process_compress(Pkt, State); + #handshake{} when StateName == wait_for_handshake -> + process_handshake(Pkt, State); + #handshake{} -> + {noreply, State}; + _ when StateName == wait_for_sasl_request; + StateName == wait_for_handshake; + StateName == wait_for_sasl_response -> + process_unauthenticated_packet(Pkt, State); + _ when StateName == wait_for_starttls -> + Txt = <<"Use of STARTTLS required">>, + Err = xmpp:err_policy_violation(Txt, Lang), + send_error(State, Pkt, Err); + _ when StateName == wait_for_bind -> + process_bind(Pkt, State); + _ when StateName == session_established -> + process_authenticated_packet(Pkt, State) + end. + +process_unauthenticated_packet(Pkt, #{mod := Mod} = State) -> + NewPkt = set_lang(Pkt, State), + try Mod:handle_unauthenticated_packet(NewPkt, State) + catch _:undef -> + Err = xmpp:err_not_authorized(), + send_error(State, Pkt, Err) + end. + +process_authenticated_packet(Pkt, #{xmlns := NS, mod := Mod} = State) -> + Pkt1 = set_lang(Pkt, State), + case set_from_to(Pkt1, State) of + {ok, #iq{type = set, sub_els = [_]} = Pkt2} when NS == ?NS_CLIENT -> + case xmpp:get_subtag(Pkt2, #xmpp_session{}) of + #xmpp_session{} -> + send_element(State, xmpp:make_iq_result(Pkt2)); + _ -> + Mod:handle_authenticated_packet(Pkt2, State) + end; + {ok, Pkt2} -> + Mod:handle_authenticated_packet(Pkt2, State); + {error, Err} -> + send_element(State, Err) + end. + +process_bind(#iq{type = set, sub_els = [_]} = Pkt, + #{xmlns := ?NS_CLIENT, mod := Mod, lang := Lang} = State) -> + case xmpp:get_subtag(Pkt, #bind{}) of + #bind{resource = R} -> + case jid:resourceprep(R) of + error -> + Txt = <<"Malformed resource">>, + Err = xmpp:err_bad_request(Txt, Lang), + send_error(State, Pkt, Err); + _ -> + case Mod:bind(R, State) of + {ok, #{user := U, + server := S, + resource := NewR} = State1} when NewR /= <<"">> -> + Reply = #bind{jid = jid:make(U, S, NewR)}, + State2 = State1#{stream_state => session_established}, + send_element(State2, xmpp:make_iq_result(Pkt, Reply)); + {error, #stanza_error{}, State1} = Err -> + send_error(State1, Pkt, Err) + end + end; + _ -> + try Mod:handle_unbinded_packet(Pkt, State) + catch _:undef -> + Err = xmpp:err_not_authorized(), + send_error(State, Pkt, Err) + end + end; +process_bind(Pkt, #{mod := Mod} = State) -> + try Mod:handle_unbinded_packet(Pkt, State) + catch _:undef -> + Err = xmpp:err_not_authorized(), + send_error(State, Pkt, Err) + end. + +process_handshake(#handshake{} = Pkt, #{mod := Mod} = State) -> + Mod:handle_handshake(Pkt, State). + +process_compress(#compress{}, #{stream_compressed := true} = State) -> + send_element(State, #compress_failure{reason = 'setup-failed'}); +process_compress(#compress{methods = HisMethods}, + #{socket := Socket, sockmod := SockMod, mod := Mod} = State) -> + MyMethods = try Mod:compress_methods(State) + catch _:undef -> [] + end, + CommonMethods = lists_intersection(MyMethods, HisMethods), + case lists:member(<<"zlib">>, CommonMethods) of + true -> + BCompressed = fxml:element_to_binary(xmpp:encode(#compressed{})), + ZlibSocket = SockMod:compress(Socket, BCompressed), + State1 = State#{socket => ZlibSocket, + stream_id => new_id(), + stream_restarted => true, + stream_state => wait_for_stream, + stream_compressed => true}, + {noreply, State1}; + false -> + send_element(State, #compress_failure{reason = 'unsupported-method'}) + end. + +process_starttls(#{socket := Socket, + sockmod := SockMod, mod := Mod} = State) -> + TLSOpts = try Mod:tls_options(State) + catch _:undef -> [] + end, + case SockMod:starttls(Socket, TLSOpts) of + {ok, TLSSocket} -> + case send_element(State, #starttls_proceed{}) of + {noreply, State1} -> + {noreply, State1#{socket => TLSSocket, + stream_id => new_id(), + stream_restarted => true, + stream_state => wait_for_stream, + stream_tlsed => true}}; + Err -> + Err + end; + {error, _Reason} -> + send_element(State, #starttls_failure{}) + end. + +process_sasl_request(#sasl_auth{mechanism = <<"EXTERNAL">>}, + #{stream_tlsed := false} = State) -> + process_sasl_failure('encryption-required', <<"">>, State); +process_sasl_request(#sasl_auth{mechanism = Mech, text = ClientIn}, + #{mod := Mod} = State) -> + SASLState = Mod:init_sasl(State), + SASLResult = cyrsasl:server_start(SASLState, Mech, ClientIn), + process_sasl_result(SASLResult, State). + +process_sasl_response(#sasl_response{text = ClientIn}, + #{sasl_state := SASLState} = State) -> + SASLResult = cyrsasl:server_step(SASLState, ClientIn), + process_sasl_result(SASLResult, State). + +process_sasl_result({ok, Props}, State) -> + process_sasl_success(Props, <<"">>, State); +process_sasl_result({ok, Props, ServerOut}, State) -> + process_sasl_success(Props, ServerOut, State); +process_sasl_result({continue, ServerOut, NewSASLState}, State) -> + process_sasl_continue(ServerOut, NewSASLState, State); +process_sasl_result({error, Reason, User}, State) -> + process_sasl_failure(Reason, User, State); +process_sasl_result({error, Reason}, State) -> + process_sasl_failure(Reason, <<"">>, State). + +process_sasl_success(Props, ServerOut, + #{socket := Socket, sockmod := SockMod, + mod := Mod, sasl_state := SASLState} = State) -> + Mech = cyrsasl:get_mech(SASLState), + User = identity(Props), + AuthModule = proplists:get_value(auth_module, Props), + case try Mod:handle_auth_success(User, Mech, AuthModule, State) + catch _:undef -> {noreply, State} + end of + {noreply, State1} -> + SockMod:reset_stream(Socket), + case send_element(State1, #sasl_success{text = ServerOut}) of + {noreply, State2} -> + State3 = maps:remove(sasl_state, State2), + {noreply, State3#{stream_id => new_id(), + stream_authenticated => true, + stream_restarted => true, + stream_state => wait_for_stream, + user => User}}; + Err -> + Err + end; + Err -> + Err + end. + +process_sasl_continue(ServerOut, NewSASLState, State) -> + send_element(State, #sasl_challenge{text = ServerOut}), + {noreply, State#{sasl_state => NewSASLState, + stream_state => wait_for_sasl_response}}. + +process_sasl_failure(Reason, User, + #{mod := Mod, sasl_state := SASLState} = State) -> + Mech = cyrsasl:get_mech(SASLState), + case try Mod:handle_auth_failure(User, Mech, Reason, State) + catch _:undef -> {noreply, State} + end of + {noreply, State1} -> + State2 = maps:remove(sasl_state, State1), + State3 = State2#{stream_state => wait_for_sasl_request}, + send_element(State3, #sasl_failure{reason = Reason}); + Err -> + Err + end. + +process_sasl_abort(State) -> + process_sasl_failure('aborted', <<"">>, State). + +send_features(#{stream_version := {1,0}, + stream_tlsed := TLSEnabled} = State) -> + TLSRequired = is_starttls_required(State), + Features = if TLSRequired and not TLSEnabled -> + get_tls_feature(State); + true -> + get_sasl_feature(State) ++ get_compress_feature(State) + ++ get_tls_feature(State) ++ get_bind_feature(State) + ++ get_session_feature(State) ++ get_other_features(State) + end, + send_element(State, #stream_features{sub_els = Features}); +send_features(State) -> + %% clients from stone age + {noreply, State}. + +get_sasl_feature(#{stream_authenticated := false, + mod := Mod, + stream_tlsed := TLSEnabled} = State) -> + TLSRequired = is_starttls_required(State), + if TLSEnabled or not TLSRequired -> + try Mod:sasl_mechanisms(State) of + [] -> []; + List -> [#sasl_mechanisms{list = List}] + catch _:undef -> + [] + end; + true -> + [] + end; +get_sasl_feature(_) -> + []. + +get_compress_feature(#{stream_compressed := false, mod := Mod} = State) -> + try Mod:compress_methods(State) of + [] -> []; + Ms -> [#compression{methods = Ms}] + catch _:undef -> + [] + end; +get_compress_feature(_) -> + []. + +get_tls_feature(#{stream_authenticated := false, + stream_tlsed := false} = State) -> + TLSRequired = is_starttls_required(State), + [#starttls{required = TLSRequired}]; +get_tls_feature(_) -> + []. + +get_bind_feature(#{stream_authenticated := true, resource := <<"">>}) -> + [#bind{}]; +get_bind_feature(_) -> + []. + +get_session_feature(#{stream_authenticated := true, resource := <<"">>}) -> + [#xmpp_session{optional = true}]; +get_session_feature(_) -> + []. + +get_other_features(#{stream_authenticated := Auth, mod := Mod} = State) -> + try + if Auth -> Mod:authenticated_stream_features(State); + true -> Mod:unauthenticated_stream_features(State) + end + catch _:undef -> + [] + end. + +is_starttls_required(#{mod := Mod} = State) -> + try Mod:tls_required(State) + catch _:undef -> false + end. + +set_from_to(Pkt, _State) when not ?is_stanza(Pkt) -> + {ok, Pkt}; +set_from_to(Pkt, #{user := U, server := S, resource := R, + xmlns := ?NS_CLIENT}) -> + JID = jid:make(U, S, R), + From = case xmpp:get_from(Pkt) of + undefined -> JID; + F -> F + end, + if JID#jid.luser == From#jid.luser andalso + JID#jid.lserver == From#jid.lserver andalso + (JID#jid.lresource == From#jid.lresource + orelse From#jid.lresource == <<"">>) -> + To = case xmpp:get_to(Pkt) of + undefined -> jid:make(U, S); + T -> T + end, + {ok, xmpp:set_from_to(Pkt, JID, To)}; + true -> + {error, xmpp:serr_invalid_from()} + end; +set_from_to(Pkt, #{lang := Lang}) -> + From = xmpp:get_from(Pkt), + To = xmpp:get_to(Pkt), + if From == undefined -> + Txt = <<"Missing 'from' attribute">>, + {error, xmpp:serr_invalid_from(Txt, Lang)}; + To == undefined -> + Txt = <<"Missing 'to' attribute">>, + {error, xmpp:serr_improper_addressing(Txt, Lang)}; + true -> + {ok, Pkt} + end. + +send_header(State) -> + send_header(State, #stream_start{}). + +send_header(#{stream_state := wait_for_stream, + stream_id := StreamID, + stream_version := MyVersion, + lang := MyLang, + xmlns := NS, + server := DefaultServer} = State, + #stream_start{to = To, lang = HisLang, version = HisVersion}) -> + Lang = choose_lang(MyLang, HisLang), + From = case To of + #jid{} -> To; + undefined -> jid:make(DefaultServer) + end, + Version = case HisVersion of + undefined -> MyVersion; + _ -> HisVersion + end, + Header = xmpp:encode(#stream_start{version = Version, + lang = Lang, + xmlns = NS, + stream_xmlns = ?NS_STREAM, + id = StreamID, + from = From}), + State1 = State#{lang => Lang}, + case send_text(State1, fxml:element_to_header(Header)) of + ok -> {noreply, State1}; + {error, _} -> {stop, normal, State1} + end; +send_header(State, _) -> + {noreply, State}. + +send_element(#{xmlns := NS, mod := Mod} = State, Pkt) -> + El = xmpp:encode(Pkt, NS), + Data = fxml:element_to_binary(El), + case send_text(State, Data) of + ok when is_record(Pkt, stream_error) -> + {stop, normal, State}; + ok when is_record(Pkt, starttls_failure) -> + {stop, normal, State}; + Res -> + try Mod:handle_send(Res, Pkt, El, Data, State) + catch _:undef when Res == ok -> + {noreply, State}; + _:undef -> + {stop, normal, State} + end + end. + +send_error(State, Pkt, Err) when ?is_stanza(Pkt) -> + case xmpp:get_type(Pkt) of + result -> {noreply, State}; + error -> {noreply, State}; + _ -> + ErrPkt = xmpp:make_error(Pkt, Err), + send_element(State, ErrPkt) + end; +send_error(State, _, _) -> + {noreply, State}. + +send_text(#{socket := Sock, sockmod := SockMod}, Data) -> + SockMod:send(Sock, Data). + +choose_lang(Lang, <<"">>) -> Lang; +choose_lang(_, Lang) -> Lang. + +set_lang(Pkt, #{lang := MyLang, xmlns := ?NS_CLIENT}) when ?is_stanza(Pkt) -> + HisLang = xmpp:get_lang(Pkt), + Lang = choose_lang(MyLang, HisLang), + xmpp:set_lang(Pkt, Lang); +set_lang(Pkt, _) -> + Pkt. + +lists_intersection(L1, L2) -> + lists:filter( + fun(E) -> + lists:member(E, L2) + end, L1). + +identity(Props) -> + case proplists:get_value(authzid, Props, <<>>) of + <<>> -> proplists:get_value(username, Props, <<>>); + AuthzId -> AuthzId + end. -- 2.40.0