]> granicus.if.org Git - ejabberd/commitdiff
SIP Outbound (RFC 5626) support
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Fri, 6 Jun 2014 05:32:07 +0000 (09:32 +0400)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Fri, 6 Jun 2014 05:36:45 +0000 (09:36 +0400)
src/mod_sip.erl
src/mod_sip_registrar.erl

index 8f7dba9cbd1d6a44c84299beee9e485aa8847327..fd36fb5ac18b9b2a33c58ac369f19840e273ac8e 100644 (file)
@@ -68,8 +68,8 @@ message_in(#sip{type = request, method = M} = Req, SIPSock)
         Action ->
             request(Req, SIPSock, undefined, Action)
     end;
-message_in(ping, _SIPSock) ->
-    pong;
+message_in(ping, SIPSock) ->
+    mod_sip_registrar:ping(SIPSock);
 message_in(_, _) ->
     ok.
 
@@ -162,8 +162,9 @@ action(#sip{method = <<"REGISTER">>, type = request, hdrs = Hdrs,
             uri = #uri{user = <<"">>} = URI} = Req, SIPSock) ->
     case at_my_host(URI) of
        true ->
-           case esip:get_hdrs('require', Hdrs) of
-               [_|_] = Require ->
+           Require = esip:get_hdrs('require', Hdrs) -- supported(),
+           case Require of
+               [_|_] ->
                    {unsupported, Require};
                _ ->
                    {_, ToURI, _} = esip:get_hdr('to', Hdrs),
@@ -189,8 +190,9 @@ action(#sip{method = Method, hdrs = Hdrs, type = request} = Req, SIPSock) ->
         0 ->
             loop;
         _ ->
-            case esip:get_hdrs('proxy-require', Hdrs) of
-                [_|_] = Require ->
+           Require = esip:get_hdrs('proxy-require', Hdrs) -- supported(),
+            case Require of
+                [_|_] ->
                     {unsupported, Require};
                 _ ->
                     {_, ToURI, _} = esip:get_hdr('to', Hdrs),
@@ -253,9 +255,13 @@ check_auth(#sip{method = Method, hdrs = Hdrs, body = Body}, AuthHdr, _SIPSock) -
 allow() ->
     [<<"OPTIONS">>, <<"REGISTER">>].
 
+supported() ->
+    [<<"path">>, <<"outbound">>].
+
 process(#sip{method = <<"OPTIONS">>} = Req, _) ->
     make_response(Req, #sip{type = response, status = 200,
-                            hdrs = [{'allow', allow()}]});
+                            hdrs = [{'allow', allow()},
+                                   {'supported', supported()}]});
 process(#sip{method = <<"REGISTER">>} = Req, _) ->
     make_response(Req, #sip{type = response, status = 400});
 process(Req, _) ->
index 2102af8518fc302811616cfccce6dd9ac6e39bb5..da2c473c26e54b3ba54cb72ae84ff154d186c262 100644 (file)
@@ -12,7 +12,7 @@
 -behaviour(?GEN_SERVER).
 
 %% API
--export([start_link/0, request/2, find_sockets/2]).
+-export([start_link/0, request/2, find_sockets/2, ping/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 
 -define(CALL_TIMEOUT, timer:seconds(30)).
 -define(DEFAULT_EXPIRES, 3600).
-
--record(binding, {socket = #sip_socket{},
-                 call_id = <<"">> :: binary(),
-                 cseq = 0 :: non_neg_integer(),
-                 timestamp = now() :: erlang:timestamp(),
-                 contact :: {binary(), #uri{}, [{binary(), binary()}]},
-                 tref = make_ref() :: reference(),
-                 expires = 0 :: non_neg_integer()}).
+-define(FLOW_TIMEOUT_DATAGRAM, 29).
+-define(FLOW_TIMEOUT_STREAM, 180).
 
 -record(sip_session, {us = {<<"">>, <<"">>} :: {binary(), binary()},
-                     bindings = [] :: [#binding{}]}).
+                     socket = #sip_socket{} :: #sip_socket{},
+                     call_id = <<"">> :: binary(),
+                     cseq = 0 :: non_neg_integer(),
+                     timestamp = now() :: erlang:timestamp(),
+                     contact :: {binary(), #uri{}, [{binary(), binary()}]},
+                     tref = make_ref() :: reference(),
+                     mref = make_ref() :: reference(),
+                     expires = 0 :: non_neg_integer()}).
 
 -record(state, {}).
 
@@ -53,6 +54,8 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) ->
     CallID = esip:get_hdr('call-id', Hdrs),
     CSeq = esip:get_hdr('cseq', Hdrs),
     Expires = esip:get_hdr('expires', Hdrs, ?DEFAULT_EXPIRES),
+    Supported = esip:get_hdrs('supported', Hdrs),
+    IsOutboundSupported = lists:member(<<"outbound">>, Supported),
     case esip:get_hdrs('contact', Hdrs) of
         [<<"*">>] when Expires == 0 ->
             case unregister_session(US, CallID, CSeq) of
@@ -74,6 +77,7 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) ->
            end;
         [{_, _URI, _Params}|_] = Contacts ->
            ContactsWithExpires = make_contacts_with_expires(Contacts, Expires),
+           ContactsHaveManyRegID = contacts_have_many_reg_id(Contacts),
            Expires1 = lists:max([E || {_, E} <- ContactsWithExpires]),
            MinExpires = min_expires(),
            if Expires1 > 0, Expires1 < MinExpires ->
@@ -81,19 +85,31 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) ->
                      Req, #sip{type = response,
                                status = 423,
                                hdrs = [{'min-expires', MinExpires}]});
+              ContactsHaveManyRegID ->
+                   mod_sip:make_response(
+                     Req, #sip{type = response, status = 400,
+                               reason = <<"Multiple 'reg-id' parameter">>});
               true ->
                    case register_session(US, SIPSock, CallID, CSeq,
+                                         IsOutboundSupported,
                                          ContactsWithExpires) of
                        {ok, Res} ->
                            ?INFO_MSG("~s SIP session for user ~s@~s from ~s",
                                      [Res, LUser, LServer,
                                       inet_parse:ntoa(PeerIP)]),
                            Cs = prepare_contacts_to_send(ContactsWithExpires),
+                           Require = case need_ob_hdrs(
+                                            Contacts, IsOutboundSupported) of
+                                         true -> [{'require', [<<"outbound">>]},
+                                                  {'flow-timer',
+                                                   get_flow_timeout(LServer, SIPSock)}];
+                                         false -> []
+                                     end,
                            mod_sip:make_response(
                              Req,
                              #sip{type = response,
                                   status = 200,
-                                  hdrs = [{'contact', Cs}]});
+                                  hdrs = [{'contact', Cs}|Require]});
                        {error, Why} ->
                            {Status, Reason} = make_status(Why),
                            mod_sip:make_response(
@@ -104,12 +120,12 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) ->
             end;
        [] ->
            case mnesia:dirty_read(sip_session, US) of
-               [#sip_session{bindings = Bindings}] ->
+               [_|_] = Sessions ->
                    ContactsWithExpires =
                        lists:map(
-                         fun(#binding{contact = Contact, expires = Es}) ->
+                         fun(#sip_session{contact = Contact, expires = Es}) ->
                                  {Contact, Es}
-                         end, Bindings),
+                         end, Sessions),
                    Cs = prepare_contacts_to_send(ContactsWithExpires),
                    mod_sip:make_response(
                      Req, #sip{type = response, status = 200,
@@ -127,28 +143,42 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) ->
 
 find_sockets(U, S) ->
     case mnesia:dirty_read(sip_session, {U, S}) of
-       [#sip_session{bindings = Bindings}] ->
+       [_|_] = Sessions ->
            lists:map(
-             fun(#binding{contact = {_, URI, _},
+             fun(#sip_session{contact = {_, URI, _},
                           socket = Socket}) ->
                      {Socket, URI}
-             end, Bindings);
+             end, Sessions);
        [] ->
            []
     end.
 
+ping(#sip_socket{type = Type} = SIPSocket) ->
+    case mnesia:dirty_index_read(sip_session, SIPSocket, #sip_session.socket) of
+       [] when Type == udp ->
+           error;
+       [] ->
+           drop;
+       [_|_] ->
+           pong
+    end.
+
 %%%===================================================================
 %%% gen_server callbacks
 %%%===================================================================
 init([]) ->
+    update_table(),
     mnesia:create_table(sip_session,
                        [{ram_copies, [node()]},
+                        {type, bag},
                         {attributes, record_info(fields, sip_session)}]),
+    mnesia:add_table_index(sip_session, mref),
+    mnesia:add_table_index(sip_session, socket),
     mnesia:add_table_copy(sip_session, node(), ram_copies),
     {ok, #state{}}.
 
-handle_call({write, Session}, _From, State) ->
-    Res = write_session(Session),
+handle_call({write, Sessions, Supported}, _From, State) ->
+    Res = write_session(Sessions, Supported),
     {reply, Res, State};
 handle_call({delete, US, CallID, CSeq}, _From, State) ->
     Res = delete_session(US, CallID, CSeq),
@@ -160,8 +190,8 @@ handle_call(_Request, _From, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({write, Session}, State) ->
-    write_session(Session),
+handle_info({write, Sessions, Supported}, State) ->
+    write_session(Sessions, Supported),
     {noreply, State};
 handle_info({delete, US, CallID, CSeq}, State) ->
     delete_session(US, CallID, CSeq),
@@ -169,6 +199,14 @@ handle_info({delete, US, CallID, CSeq}, State) ->
 handle_info({timeout, TRef, US}, State) ->
     delete_expired_session(US, TRef),
     {noreply, State};
+handle_info({'DOWN', MRef, process, _Pid, _Reason}, State) ->
+    case mnesia:dirty_index_read(sip_session, MRef, #sip_session.mref) of
+       [Session] ->
+           mnesia:dirty_delete_object(Session);
+       _ ->
+           ok
+    end,
+    {noreply, State};
 handle_info(_Info, State) ->
     ?ERROR_MSG("got unexpected info: ~p", [_Info]),
     {noreply, State}.
@@ -182,107 +220,101 @@ code_change(_OldVsn, State, _Extra) ->
 %%%===================================================================
 %%% Internal functions
 %%%===================================================================
-register_session(US, SIPSocket, CallID, CSeq, ContactsWithExpires) ->
-    Bindings = lists:map(
+register_session(US, SIPSocket, CallID, CSeq, IsOutboundSupported,
+                ContactsWithExpires) ->
+    Sessions = lists:map(
                 fun({Contact, Expires}) ->
-                        #binding{socket = SIPSocket,
-                                 call_id = CallID,
-                                 cseq = CSeq,
-                                 timestamp = now(),
-                                 contact = Contact,
-                                 expires = Expires}
+                        #sip_session{us = US,
+                                     socket = SIPSocket,
+                                     call_id = CallID,
+                                     cseq = CSeq,
+                                     timestamp = now(),
+                                     contact = Contact,
+                                     expires = Expires}
                 end, ContactsWithExpires),
-    Session = #sip_session{us = US, bindings = Bindings},
-    call({write, Session}).
+    Msg = {write, Sessions, IsOutboundSupported},
+    call(Msg).
 
 unregister_session(US, CallID, CSeq) ->
     Msg = {delete, US, CallID, CSeq},
     call(Msg).
 
-write_session(#sip_session{us = {U, S} = US, bindings = NewBindings}) ->
-    PrevBindings = case mnesia:dirty_read(sip_session, US) of
-                      [#sip_session{bindings = PrevBindings1}] ->
-                          PrevBindings1;
-                      [] ->
-                          []
-                  end,
+write_session([#sip_session{us = {U, S} = US}|_] = NewSessions,
+             IsOutboundSupported) ->
+    PrevSessions = mnesia:dirty_read(sip_session, US),
     Res = lists:foldl(
            fun(_, {error, _} = Err) ->
                    Err;
-              (#binding{call_id = CallID,
-                        expires = Expires,
-                        cseq = CSeq} = Binding, {Add, Keep, Del}) ->
-                   case find_binding(Binding, PrevBindings) of
-                       {ok, #binding{call_id = CallID, cseq = PrevCSeq}}
+              (#sip_session{call_id = CallID,
+                            expires = Expires,
+                            cseq = CSeq} = Session, {Add, Del}) ->
+                   case find_session(Session, PrevSessions,
+                                     IsOutboundSupported) of
+                       {ok, normal, #sip_session{call_id = CallID,
+                                                 cseq = PrevCSeq}}
                          when PrevCSeq > CSeq ->
                            {error, cseq_out_of_order};
-                       {ok, PrevBinding} when Expires == 0 ->
-                           {Add, Keep -- [PrevBinding], [PrevBinding|Del]};
-                       {ok, PrevBinding} ->
-                           {[Binding|Add], Keep -- [PrevBinding], Del};
+                       {ok, _Type, PrevSession} when Expires == 0 ->
+                           {Add, [PrevSession|Del]};
+                       {ok, _Type, PrevSession} ->
+                           {[Session|Add], [PrevSession|Del]};
                        {error, notfound} when Expires == 0 ->
                            {error, notfound};
                        {error, notfound} ->
-                           {[Binding|Add], Keep, Del}
+                           {[Session|Add], Del}
                    end
-           end, {[], PrevBindings, []}, NewBindings),
+           end, {[], []}, NewSessions),
     MaxSessions = ejabberd_sm:get_max_user_sessions(U, S),
     case Res of
        {error, Why} ->
            {error, Why};
-       {AddBindings, KeepBindings, DelBindings} ->
+       {AddSessions, DelSessions} ->
            MaxSessions = ejabberd_sm:get_max_user_sessions(U, S),
-           AllBindings = AddBindings ++ KeepBindings,
-           if length(AllBindings) > MaxSessions ->
+           AllSessions = AddSessions ++ PrevSessions -- DelSessions,
+           if length(AllSessions) > MaxSessions ->
                    {error, too_many_sessions};
               true ->
                    lists:foreach(
-                     fun(#binding{tref = TRef}) ->
-                             erlang:cancel_timer(TRef)
-                     end, DelBindings),
-                   AddBindings1 = lists:map(
-                                    fun(#binding{tref = TRef,
-                                                 expires = Expires} = Binding) ->
-                                            erlang:cancel_timer(TRef),
-                                            NewTRef = erlang:start_timer(
-                                                        Expires * 1000, self(), US),
-                                            Binding#binding{tref = NewTRef}
-                                    end, AddBindings),
-                   AllBindings1 = AddBindings1 ++ KeepBindings,
-                   case AllBindings1 of
-                       [] ->
-                           mnesia:dirty_delete(sip_session, US),
+                     fun(#sip_session{tref = TRef, mref = MRef} = Session) ->
+                             erlang:cancel_timer(TRef),
+                             catch erlang:demonitor(MRef, [flush]),
+                             mnesia:dirty_delete_object(Session)
+                     end, DelSessions),
+                   lists:foreach(
+                     fun(Session) ->
+                             NewSession = set_monitor_and_timer(
+                                            Session, IsOutboundSupported),
+                             mnesia:dirty_write(NewSession)
+                     end, AddSessions),
+                   case {AllSessions, AddSessions} of
+                       {[], _} ->
+                           {ok, unregister};
+                       {_, []} ->
                            {ok, unregister};
                        _ ->
-                           mnesia:dirty_write(
-                             #sip_session{us = US, bindings = AllBindings1}),
-                           if length(DelBindings) == length(NewBindings) ->
-                                   {ok, unregister};
-                              true ->
-                                   {ok, register}
-                           end
+                           {ok, register}
                    end
            end
     end.
 
 delete_session(US, CallID, CSeq) ->
     case mnesia:dirty_read(sip_session, US) of
-       [#sip_session{bindings = Bindings}] ->
+       [_|_] = Sessions ->
            case lists:all(
-                  fun(B) when B#binding.call_id == CallID,
-                              B#binding.cseq > CSeq ->
+                  fun(S) when S#sip_session.call_id == CallID,
+                              S#sip_session.cseq > CSeq ->
                           false;
                      (_) ->
                           true
-                  end, Bindings) of
+                  end, Sessions) of
                true ->
                    ContactsWithExpires =
                        lists:map(
-                         fun(#binding{contact = Contact,
-                                      tref = TRef}) ->
+                         fun(#sip_session{contact = Contact,
+                                          tref = TRef}) ->
                                  erlang:cancel_timer(TRef),
                                  {Contact, 0}
-                         end, Bindings),
+                         end, Sessions),
                    mnesia:dirty_delete(sip_session, US),
                    {ok, ContactsWithExpires};
                false ->
@@ -294,19 +326,17 @@ delete_session(US, CallID, CSeq) ->
 
 delete_expired_session(US, TRef) ->
     case mnesia:dirty_read(sip_session, US) of
-       [#sip_session{bindings = Bindings}] ->
+       [_|_] = Sessions ->
            case lists:filter(
-                  fun(#binding{tref = TRef1}) when TRef1 == TRef ->
-                          false;
+                  fun(#sip_session{tref = TRef1}) when TRef1 == TRef ->
+                          true;
                      (_) ->
-                          true
-                  end, Bindings) of
+                          false
+                  end, Sessions) of
+               [Session|_] ->
+                   mnesia:dirty_delete_object(Session);
                [] ->
-                   mnesia:dirty_delete(sip_session, US);
-               NewBindings ->
-                   mnesia:dirty_write(sip_session,
-                                      #sip_session{us = US,
-                                                   bindings = NewBindings})
+                   ok
            end;
        [] ->
            ok
@@ -355,15 +385,55 @@ prepare_contacts_to_send(ContactsWithExpires) ->
              {Name, URI, Params1}
       end, ContactsWithExpires).
 
-find_binding(#binding{contact = {_, URI1, _}} = OrigBinding,
-            [#binding{contact = {_, URI2, _}} = Binding|Bindings]) ->
+contacts_have_many_reg_id(Contacts) ->
+    Sum = lists:foldl(
+           fun({_Name, _URI, Params}, Acc) ->
+                   case get_ob_params(Params) of
+                       error ->
+                           Acc;
+                       {_, _} ->
+                           Acc + 1
+                   end
+           end, 0, Contacts),
+    if Sum > 1 ->
+           true;
+       true ->
+           false
+    end.
+
+find_session(#sip_session{contact = {_, URI, Params}}, Sessions,
+            IsOutboundSupported) ->
+    if IsOutboundSupported ->
+           case get_ob_params(Params) of
+               {InstanceID, RegID} ->
+                   find_session_by_ob({InstanceID, RegID}, Sessions);
+               error ->
+                   find_session_by_uri(URI, Sessions)
+           end;
+       true ->
+           find_session_by_uri(URI, Sessions)
+    end.
+
+find_session_by_ob({InstanceID, RegID},
+                  [#sip_session{contact = {_, _, Params}} = Session|Sessions]) ->
+    case get_ob_params(Params) of
+       {InstanceID, RegID} ->
+           {ok, flow, Session};
+       _ ->
+           find_session_by_ob({InstanceID, RegID}, Sessions)
+    end;
+find_session_by_ob(_, []) ->
+    {error, notfound}.
+
+find_session_by_uri(URI1,
+                   [#sip_session{contact = {_, URI2, _}} = Session|Sessions]) ->
     case cmp_uri(URI1, URI2) of
        true ->
-           {ok, Binding};
+           {ok, normal, Session};
        false ->
-           find_binding(OrigBinding, Bindings)
+           find_session_by_uri(URI1, Sessions)
     end;
-find_binding(_, []) ->
+find_session_by_uri(_, []) ->
     {error, notfound}.
 
 %% TODO: this is *totally* wrong.
@@ -384,3 +454,78 @@ make_status(too_many_sessions) ->
     {503, <<"Too Many Registered Sessions">>};
 make_status(_) ->
     {500, esip:reason(500)}.
+
+get_ob_params(Params) ->
+    case esip:get_param(<<"+sip.instance">>, Params) of
+       <<>> ->
+           error;
+       InstanceID ->
+           case to_integer(esip:get_param(<<"reg-id">>, Params),
+                           0, (1 bsl 32)-1) of
+               {ok, RegID} ->
+                   {InstanceID, RegID};
+               error ->
+                   error
+           end
+    end.
+
+need_ob_hdrs(_Contacts, _IsOutboundSupported = false) ->
+    false;
+need_ob_hdrs(Contacts, _IsOutboundSupported = true) ->
+    lists:any(
+      fun({_Name, _URI, Params}) ->
+             case get_ob_params(Params) of
+                 error -> false;
+                 {_, _} -> true
+             end
+      end, Contacts).
+
+get_flow_timeout(LServer, #sip_socket{type = Type}) ->
+    {Option, Default} =
+       case Type of
+           udp -> {flow_timeout_datagram, ?FLOW_TIMEOUT_DATAGRAM};
+           _ -> {flow_timeout_stream, ?FLOW_TIMEOUT_STREAM}
+       end,
+    gen_mod:get_module_opt(
+      LServer, mod_sip, Option,
+      fun(I) when is_integer(I), I>0 -> I end,
+      Default).
+
+update_table() ->
+    Fields = record_info(fields, sip_session),
+    case catch mnesia:table_info(sip_session, attributes) of
+       Fields ->
+           ok;
+       [_|_] ->
+           mnesia:delete_table(sip_session);
+       {'EXIT', _} ->
+           ok
+    end.
+
+set_monitor_and_timer(#sip_session{expires = Expires} = Session,
+                     _IsOutboundSupported = false) ->
+    set_timer(Session, Expires);
+set_monitor_and_timer(#sip_session{socket = SIPSock,
+                                  mref = MRef,
+                                  expires = Expires,
+                                  us = {_, LServer},
+                                  contact = {_, _, Params}} = Session,
+                     _IsOutboundSupported = true) ->
+    case get_ob_params(Params) of
+       error ->
+           set_timer(Session, Expires);
+       {_, _} ->
+           FlowTimeout = get_flow_timeout(LServer, SIPSock),
+           Timeout = lists:min([FlowTimeout, Expires]),
+           NewSession = set_timer(Session, Timeout),
+           NewMRef = if SIPSock#sip_socket.type == udp ->
+                             MRef;
+                        true ->
+                             erlang:monitor(process, SIPSock#sip_socket.pid)
+                     end,
+           NewSession#sip_session{mref = NewMRef}
+    end.
+
+set_timer(#sip_session{us = US} = Session, Timeout) ->
+    TRef = erlang:start_timer(Timeout * 1000, self(), US),
+    Session#sip_session{tref = TRef}.