From ddfbca5830a4089d652faf3d2407554d9569007f Mon Sep 17 00:00:00 2001
From: Evgeniy Khramtsov <ekhramtsov@process-one.net>
Date: Fri, 6 Jun 2014 13:52:55 +0400
Subject: [PATCH] Use a different timer for flow control

---
 src/mod_sip_registrar.erl | 112 ++++++++++++++++++++++----------------
 1 file changed, 64 insertions(+), 48 deletions(-)

diff --git a/src/mod_sip_registrar.erl b/src/mod_sip_registrar.erl
index da2c473c2..dcc761754 100644
--- a/src/mod_sip_registrar.erl
+++ b/src/mod_sip_registrar.erl
@@ -33,8 +33,9 @@
 		      cseq = 0 :: non_neg_integer(),
 		      timestamp = now() :: erlang:timestamp(),
 		      contact :: {binary(), #uri{}, [{binary(), binary()}]},
-		      tref = make_ref() :: reference(),
-		      mref = make_ref() :: reference(),
+		      flow_tref :: reference(),
+		      reg_tref = make_ref() :: reference(),
+		      conn_mref = make_ref() :: reference(),
 		      expires = 0 :: non_neg_integer()}).
 
 -record(state, {}).
@@ -153,15 +154,8 @@ find_sockets(U, S) ->
 	    []
     end.
 
-ping(#sip_socket{type = Type} = SIPSocket) ->
-    case mnesia:dirty_index_read(sip_session, SIPSocket, #sip_session.socket) of
-	[] when Type == udp ->
-	    error;
-	[] ->
-	    drop;
-	[_|_] ->
-	    pong
-    end.
+ping(SIPSocket) ->
+    call({ping, SIPSocket}).
 
 %%%===================================================================
 %%% gen_server callbacks
@@ -172,7 +166,7 @@ init([]) ->
 			[{ram_copies, [node()]},
 			 {type, bag},
 			 {attributes, record_info(fields, sip_session)}]),
-    mnesia:add_table_index(sip_session, mref),
+    mnesia:add_table_index(sip_session, conn_mref),
     mnesia:add_table_index(sip_session, socket),
     mnesia:add_table_copy(sip_session, node(), ram_copies),
     {ok, #state{}}.
@@ -183,6 +177,9 @@ handle_call({write, Sessions, Supported}, _From, State) ->
 handle_call({delete, US, CallID, CSeq}, _From, State) ->
     Res = delete_session(US, CallID, CSeq),
     {reply, Res, State};
+handle_call({ping, SIPSocket}, _From, State) ->
+    Res = process_ping(SIPSocket),
+    {reply, Res, State};
 handle_call(_Request, _From, State) ->
     Reply = ok,
     {reply, Reply, State}.
@@ -200,7 +197,7 @@ handle_info({timeout, TRef, US}, State) ->
     delete_expired_session(US, TRef),
     {noreply, State};
 handle_info({'DOWN', MRef, process, _Pid, _Reason}, State) ->
-    case mnesia:dirty_index_read(sip_session, MRef, #sip_session.mref) of
+    case mnesia:dirty_index_read(sip_session, MRef, #sip_session.conn_mref) of
 	[Session] ->
 	    mnesia:dirty_delete_object(Session);
 	_ ->
@@ -275,7 +272,8 @@ write_session([#sip_session{us = {U, S} = US}|_] = NewSessions,
 		    {error, too_many_sessions};
 	       true ->
 		    lists:foreach(
-		      fun(#sip_session{tref = TRef, mref = MRef} = Session) ->
+		      fun(#sip_session{reg_tref = TRef,
+				       conn_mref = MRef} = Session) ->
 			      erlang:cancel_timer(TRef),
 			      catch erlang:demonitor(MRef, [flush]),
 			      mnesia:dirty_delete_object(Session)
@@ -311,7 +309,7 @@ delete_session(US, CallID, CSeq) ->
 		    ContactsWithExpires =
 			lists:map(
 			  fun(#sip_session{contact = Contact,
-					   tref = TRef}) ->
+					   reg_tref = TRef}) ->
 				  erlang:cancel_timer(TRef),
 				  {Contact, 0}
 			  end, Sessions),
@@ -327,17 +325,14 @@ delete_session(US, CallID, CSeq) ->
 delete_expired_session(US, TRef) ->
     case mnesia:dirty_read(sip_session, US) of
 	[_|_] = Sessions ->
-	    case lists:filter(
-		   fun(#sip_session{tref = TRef1}) when TRef1 == TRef ->
-			   true;
-		      (_) ->
-			   false
-		   end, Sessions) of
-		[Session|_] ->
-		    mnesia:dirty_delete_object(Session);
-		[] ->
-		    ok
-	    end;
+	    lists:foreach(
+	      fun(#sip_session{reg_tref = T1,
+			       flow_tref = T2} = Session)
+		    when T1 == TRef; T2 == TRef ->
+		      mnesia:dirty_delete_object(Session);
+		 (_) ->
+		      ok
+	      end, Sessions);
 	[] ->
 	    ok
     end.
@@ -502,30 +497,51 @@ update_table() ->
 	    ok
     end.
 
-set_monitor_and_timer(#sip_session{expires = Expires} = Session,
-		      _IsOutboundSupported = false) ->
-    set_timer(Session, Expires);
-set_monitor_and_timer(#sip_session{socket = SIPSock,
-				   mref = MRef,
+set_monitor_and_timer(#sip_session{socket = #sip_socket{type = Type,
+							pid = Pid} = SIPSock,
+				   conn_mref = MRef,
 				   expires = Expires,
 				   us = {_, LServer},
 				   contact = {_, _, Params}} = Session,
-		      _IsOutboundSupported = true) ->
-    case get_ob_params(Params) of
-	error ->
-	    set_timer(Session, Expires);
-	{_, _} ->
-	    FlowTimeout = get_flow_timeout(LServer, SIPSock),
-	    Timeout = lists:min([FlowTimeout, Expires]),
-	    NewSession = set_timer(Session, Timeout),
-	    NewMRef = if SIPSock#sip_socket.type == udp ->
-			      MRef;
-			 true ->
-			      erlang:monitor(process, SIPSock#sip_socket.pid)
-		      end,
-	    NewSession#sip_session{mref = NewMRef}
+		      IsOutboundSupported) ->
+    RegTRef = set_timer(Session, Expires),
+    Session1 = Session#sip_session{reg_tref = RegTRef},
+    if IsOutboundSupported ->
+	    case get_ob_params(Params) of
+		error ->
+		    Session1;
+		{_, _} ->
+		    FlowTimeout = get_flow_timeout(LServer, SIPSock),
+		    FlowTRef = set_timer(Session1, FlowTimeout),
+		    NewMRef = if Type == udp -> MRef;
+				 true -> erlang:monitor(process, Pid)
+			      end,
+		    Session1#sip_session{conn_mref = NewMRef,
+					 flow_tref = FlowTRef}
+	    end;
+       true ->
+	    Session1
     end.
 
-set_timer(#sip_session{us = US} = Session, Timeout) ->
-    TRef = erlang:start_timer(Timeout * 1000, self(), US),
-    Session#sip_session{tref = TRef}.
+set_timer(#sip_session{us = US}, Timeout) ->
+    erlang:start_timer(Timeout * 1000, self(), US).
+
+process_ping(SIPSocket) ->
+    ErrResponse = if SIPSocket#sip_socket.type == udp -> error;
+		     true -> drop
+		  end,
+    Sessions = mnesia:dirty_index_read(
+		 sip_session, SIPSocket, #sip_session.socket),
+    lists:foldl(
+      fun(#sip_session{flow_tref = TRef,
+		       us = {_, LServer}} = Session, _)
+	    when TRef /= undefined ->
+	      erlang:cancel_timer(TRef),
+	      mnesia:dirty_delete_object(Session),
+	      Timeout = get_flow_timeout(LServer, SIPSocket),
+	      NewTRef = set_timer(Session, Timeout),
+	      mnesia:dirty_write(
+		Session#sip_session{flow_tref = NewTRef});
+	 (_, Acc) ->
+	      Acc
+      end, ErrResponse, Sessions).
-- 
2.40.0