]> granicus.if.org Git - ejabberd/commitdiff
implemented backend-independed key-value cacheing table; applied to mod_caps.erl...
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Mon, 30 Aug 2010 04:02:47 +0000 (14:02 +1000)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Mon, 30 Aug 2010 04:02:47 +0000 (14:02 +1000)
src/cache_tab.erl [new file with mode: 0644]
src/cache_tab_sup.erl [new file with mode: 0644]
src/ejabberd_sup.erl
src/mod_caps.erl

diff --git a/src/cache_tab.erl b/src/cache_tab.erl
new file mode 100644 (file)
index 0000000..e2551ec
--- /dev/null
@@ -0,0 +1,480 @@
+%%%-------------------------------------------------------------------
+%%% File    : cache_tab.erl
+%%% Author  : Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%% Description : Caching key-value table
+%%%
+%%% Created : 29 Aug 2010 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(cache_tab).
+
+-define(GEN_SERVER, gen_server).
+
+-behaviour(?GEN_SERVER).
+
+%% API
+-export([start_link/4, new/2, delete/1, delete/3, lookup/3,
+        insert/4, info/2, tab2list/1, setopts/2, all/0, test/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+        terminate/2, code_change/3]).
+
+-include("ejabberd.hrl").
+
+-record(state, {tab = treap:empty(),
+               name,
+               size = 0,
+               owner,
+               max_size,
+               life_time,
+               warn,
+               hits = 0,
+               miss = 0,
+               procs_num,
+               cache_missed,
+               shrink_size}).
+
+-define(PROCNAME, ?MODULE).
+-define(CALL_TIMEOUT, 60000).
+
+%% Defaults
+-define(MAX_SIZE, 1000).
+-define(WARN, true).
+-define(CACHE_MISSED, true).
+-define(LIFETIME, 600). %% 10 minutes
+
+%%====================================================================
+%% API
+%%====================================================================
+start_link(Proc, Tab, Opts, Owner) ->
+    ?GEN_SERVER:start_link(
+      {local, Proc}, ?MODULE, [Tab, Opts, get_proc_num(), Owner], []).
+
+new(Tab, Opts) ->
+    Res = lists:flatmap(
+           fun(Proc) ->
+                   Spec = {{Tab, Proc},
+                           {?MODULE, start_link,
+                            [Proc, Tab, Opts, self()]},
+                           permanent,
+                           brutal_kill,
+                           worker,
+                           [?MODULE]},
+                   case supervisor:start_child(cache_tab_sup, Spec) of
+                       {ok, _Pid} ->
+                           [ok];
+                       R ->
+                           [R]
+                   end
+           end, get_all_procs(Tab)),
+    case lists:filter(fun(ok) -> false; (_) -> true end, Res) of
+       [] ->
+           ok;
+       Err ->
+           {error, Err}
+    end.
+
+delete(Tab) ->
+    lists:foreach(
+      fun(Proc) ->
+             supervisor:terminate_child(cache_tab_sup, {Tab, Proc}),
+             supervisor:delete_child(cache_tab_sup, {Tab, Proc})
+      end, get_all_procs(Tab)).
+
+delete(Tab, Key, F) ->
+    ?GEN_SERVER:call(
+      get_proc_by_hash(Tab, Key), {delete, Key, F}, ?CALL_TIMEOUT).
+
+lookup(Tab, Key, F) ->
+    ?GEN_SERVER:call(
+      get_proc_by_hash(Tab, Key), {lookup, Key, F}, ?CALL_TIMEOUT).
+
+insert(Tab, Key, Val, F) ->
+    ?GEN_SERVER:call(
+      get_proc_by_hash(Tab, Key), {insert, Key, Val, F}, ?CALL_TIMEOUT).
+
+info(Tab, Info) ->
+    case lists:map(
+          fun(Proc) ->
+                  ?GEN_SERVER:call(Proc, {info, Info}, ?CALL_TIMEOUT)
+          end, get_all_procs(Tab)) of
+       Res when Info == size ->
+           {ok, lists:sum(Res)};
+       Res when Info == all ->
+           {ok, Res};
+       Res when Info == ratio ->
+           {H, M} = lists:foldl(
+                      fun({Hits, Miss}, {HitsAcc, MissAcc}) ->
+                              {HitsAcc + Hits, MissAcc + Miss}
+                      end, {0, 0}, Res),
+           {ok, [{hits, H}, {miss, M}]};
+       _ ->
+           {error, badarg}
+    end.
+
+setopts(Tab, Opts) ->
+    lists:foreach(
+      fun(Proc) ->
+             ?GEN_SERVER:call(Proc, {setopts, Opts}, ?CALL_TIMEOUT)
+      end, get_all_procs(Tab)).
+
+tab2list(Tab) ->
+    lists:flatmap(
+      fun(Proc) ->
+             ?GEN_SERVER:call(Proc, tab2list, ?CALL_TIMEOUT)
+      end, get_all_procs(Tab)).
+
+all() ->
+    lists:usort(
+      [Tab || {{Tab, _}, _, _, _} <- supervisor:which_children(cache_tab_sup)]).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+init([Tab, Opts, N, Pid]) ->
+    State = #state{procs_num = N,
+                  owner = Pid,
+                  name = Tab},
+    {ok, do_setopts(State, Opts)}.
+
+handle_call({lookup, Key, F}, _From, #state{tab = T} = State) ->
+    case treap:lookup(Key, T) of
+       {ok, _Prio, Val} ->
+           Hits = State#state.hits,
+           NewState = treap_update(Key, Val, State#state{hits = Hits + 1}),
+           case Val of
+               '$cached_mismatch' ->
+                   {reply, error, NewState};
+               _ ->
+                   {reply, {ok, Val}, NewState}
+           end;
+       _ ->
+           case catch F() of
+               {ok, Val} ->
+                   Miss = State#state.miss,
+                   NewState = treap_insert(Key, Val, State),
+                   {reply, {ok, Val}, NewState#state{miss = Miss + 1}};
+               {'EXIT', Reason} ->
+                   print_error(lookup, [Key], Reason, State),
+                   {reply, error, State};
+               _ ->
+                   Miss = State#state.miss,
+                   NewState = State#state{miss = Miss + 1},
+                   if State#state.cache_missed ->
+                           {reply, error,
+                            treap_insert(Key, '$cached_mismatch', NewState)};
+                      true ->
+                           {reply, error, NewState}
+                   end
+           end
+    end;
+handle_call({insert, Key, Val, F}, _From, #state{tab = T} = State) ->
+    case treap:lookup(Key, T) of
+       {ok, _Prio, Val} ->
+           {reply, ok, State};
+       Res ->
+           case catch F() of
+               {'EXIT', Reason} ->
+                   print_error(insert, [Key, Val], Reason, State),
+                   {reply, ok, State};
+               _ ->
+                   NewState = case Res of
+                                  {ok, _, _} ->
+                                      treap_update(Key, Val, State);
+                                  _ ->
+                                      treap_insert(Key, Val, State)
+                              end,
+                   {reply, ok, NewState}
+           end
+    end;
+handle_call({delete, Key, F}, _From, State) ->
+    NewState = treap_delete(Key, State),
+    case catch F() of
+       {'EXIT', Reason} ->
+           print_error(delete, [Key], Reason, State);
+       _ ->
+           ok
+    end,
+    {reply, ok, NewState};
+handle_call({info, Info}, _From, State) ->
+    Res = case Info of
+             size ->
+                 State#state.size;
+             ratio ->
+                 {State#state.hits, State#state.miss};
+             all ->
+                 [{max_size, State#state.max_size},
+                  {life_time, State#state.life_time},
+                  {shrink_size, State#state.shrink_size},
+                  {size, State#state.size},
+                  {owner, State#state.owner},
+                  {hits, State#state.hits},
+                  {miss, State#state.miss},
+                  {cache_missed, State#state.cache_missed},
+                  {warn, State#state.warn}];
+             _ ->
+                 badarg
+         end,
+    {reply, Res, State};
+handle_call(tab2list, _From, #state{tab = T} = State) ->
+    Res = treap:fold(
+           fun({Key, _, Val}, Acc) ->
+                   [{Key, Val}|Acc]
+           end, [], T),
+    {reply, Res, State};
+handle_call({setopts, Opts}, _From, State) ->
+    {reply, ok, do_setopts(State, Opts)};
+handle_call(_Request, _From, State) ->
+    Reply = ok,
+    {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+do_setopts(#state{procs_num = N} = State, Opts) ->
+    MaxSize = case {proplists:get_value(max_size, Opts),
+                   State#state.max_size} of
+                 {MS, _} when is_integer(MS), MS > 0 ->
+                     round(MS/N);
+                 {unlimited, _} ->
+                     unlimited;
+                 {_, undefined} ->
+                     round(?MAX_SIZE/N);
+                 {_, MS} ->
+                     MS
+             end,
+    LifeTime = case {proplists:get_value(life_time, Opts),
+                    State#state.life_time} of
+                  {LT, _} when is_integer(LT), LT > 0 ->
+                      LT*1000;
+                  {unlimited, _} ->
+                      unlimited;
+                  {_, undefined} ->
+                      ?LIFETIME*1000;
+                  {_, LT} ->
+                      LT
+              end,
+    ShrinkSize = case {proplists:get_value(shrink_size, Opts),
+                      State#state.shrink_size} of
+                    {SS, _} when is_integer(SS), SS > 0 ->
+                        round(SS/N);
+                    _ when is_integer(MaxSize) ->
+                        round(MaxSize/2);
+                    _ ->
+                        unlimited
+                end,
+    Warn = case {proplists:get_value(warn, Opts),
+                State#state.warn} of
+              {true, _} ->
+                  true;
+              {false, _} ->
+                  false;
+              {_, undefined} ->
+                  ?WARN;
+              {_, W} ->
+                  W
+          end,
+    CacheMissed = case proplists:get_value(
+                        cache_missed, Opts, State#state.cache_missed) of
+                     false ->
+                         false;
+                     true ->
+                         true;
+                     _ ->
+                         ?CACHE_MISSED
+                 end,
+    State#state{max_size = MaxSize,
+               warn = Warn,
+               life_time = LifeTime,
+               cache_missed = CacheMissed,
+               shrink_size = ShrinkSize}.
+
+get_proc_num() ->
+    erlang:system_info(logical_processors).
+
+get_proc_by_hash(Tab, Term) ->
+    N = erlang:phash2(Term, get_proc_num()) + 1,
+    get_proc(Tab, N).
+
+get_proc(Tab, N) ->
+    list_to_atom(atom_to_list(?PROCNAME) ++ "_" ++
+                atom_to_list(Tab) ++ "_" ++ integer_to_list(N)).
+
+get_all_procs(Tab) ->
+    [get_proc(Tab, N) || N <- lists:seq(1, get_proc_num())].
+
+now_priority() ->
+    {MSec, Sec, USec} = now(),
+    -((MSec*1000000 + Sec)*1000000 + USec).
+
+treap_update(Key, Val, #state{tab = T} = State) ->
+    Priority = now_priority(),
+    NewT = treap:insert(Key, Priority, Val, T),
+    State#state{tab = NewT}.
+
+treap_insert(Key, Val, State) ->
+    State1 = clean_treap(State),
+    #state{size = Size} = State2 = shrink_treap(State1),
+    treap_update(Key, Val, State2#state{size = Size+1}).
+
+treap_delete(Key, #state{tab = T, size = Size} = State) ->
+    case treap:lookup(Key, T) of
+       {ok, _, _} ->
+           NewT = treap:delete(Key, T),            
+           clean_treap(State#state{tab = NewT, size = Size-1});
+       _ ->
+           State
+    end.
+
+clean_treap(#state{tab = T, size = Size, life_time = LifeTime} = State) ->
+    if is_integer(LifeTime) ->
+           Priority = now_priority(),
+           {Cleaned, NewT} = clean_treap(T, Priority + LifeTime, 0),
+           State#state{size = Size - Cleaned, tab = NewT};
+       true ->
+           State
+    end.
+
+clean_treap(Treap, CleanPriority, N) ->
+    case treap:is_empty(Treap) of
+        true ->
+            {N, Treap};
+        false ->
+            {_Key, Priority, _Value} = treap:get_root(Treap),
+            if Priority > CleanPriority ->
+                    clean_treap(treap:delete_root(Treap), CleanPriority, N+1);
+              true ->
+                    {N, Treap}
+            end
+    end.
+
+shrink_treap(#state{tab = T,
+                   max_size = MaxSize,
+                   shrink_size = ShrinkSize,
+                   warn = Warn,
+                   size = Size} = State) when Size >= MaxSize ->
+    if Warn ->
+           ?WARNING_MSG("shrinking table:~n"
+                        "** Table: ~p~n"
+                        "** Processes Number: ~p~n"
+                        "** Max Size: ~p items~n"
+                        "** Shrink Size: ~p items~n"
+                        "** Life Time: ~p microseconds~n"
+                        "** Hits/Miss: ~p/~p~n"
+                        "** Owner: ~p~n"
+                        "** Cache Missed: ~p~n"
+                        "** Instruction: you have to tune cacheing options"
+                        " if this message repeats too frequently",
+                        [State#state.name, State#state.procs_num,
+                         MaxSize, ShrinkSize, State#state.life_time,
+                         State#state.hits, State#state.miss,
+                         State#state.owner, State#state.cache_missed]);
+       true ->
+           ok
+    end,
+    {Shrinked, NewT} = shrink_treap(T, ShrinkSize, 0),
+    State#state{tab = NewT, size = Size - Shrinked};
+shrink_treap(State) ->
+    State.
+
+shrink_treap(T, ShrinkSize, ShrinkSize) ->
+    {ShrinkSize, T};
+shrink_treap(T, ShrinkSize, N) ->
+    case treap:is_empty(T) of
+       true ->
+           {N, T};
+       false ->
+           shrink_treap(treap:delete_root(T), ShrinkSize, N+1)
+    end.
+
+print_error(Operation, Args, Reason, State) ->
+    ?ERROR_MSG("callback failed:~n"
+              "** Tab: ~p~n"
+              "** Owner: ~p~n"
+              "** Operation: ~p~n"
+              "** Args: ~p~n"
+              "** Reason: ~p",
+              [State#state.name, State#state.owner,
+               Operation, Args, Reason]).
+
+%%--------------------------------------------------------------------
+%%% Tests
+%%--------------------------------------------------------------------
+test() ->
+    LifeTime = 2,
+    ok = new(test_tbl, [{life_time, LifeTime}, {max_size, unlimited}]),
+    check([]),
+    ok = insert(test_tbl, "key", "value", fun() -> ok end),
+    check([{"key", "value"}]),
+    {ok, "value"} = lookup(test_tbl, "key", fun() -> error end),
+    check([{"key", "value"}]),
+    io:format("** waiting for ~p seconds to check if cleaner works fine...~n",
+             [LifeTime+1]),
+    timer:sleep(timer:seconds(LifeTime+1)),
+    ok = insert(test_tbl, "key1", "value1", fun() -> ok end),
+    check([{"key1", "value1"}]),
+    ok = delete(test_tbl, "key1", fun() -> ok end),
+    {ok, "value"} = lookup(test_tbl, "key", fun() -> {ok, "value"} end),
+    check([{"key", "value"}]),
+    ok = delete(test_tbl, "key", fun() -> ok end),
+    check([]),
+    %% io:format("** testing buggy callbacks...~n"),
+    %% delete(test_tbl, "key", fun() -> erlang:error(badarg) end),
+    %% insert(test_tbl, "key", "val", fun() -> erlang:error(badarg) end),
+    %% lookup(test_tbl, "key", fun() -> erlang:error(badarg) end),
+    check([]),
+    delete(test_tbl),
+    test1().
+
+test1() ->
+    MaxSize = 10,
+    ok = new(test_tbl, [{max_size, MaxSize}, {shrink_size, 1}, {warn, false}]),
+    lists:foreach(
+      fun(N) ->
+             ok = insert(test_tbl, N, N, fun() -> ok end)
+      end, lists:seq(1, MaxSize*get_proc_num())),
+    {ok, MaxSize} = info(test_tbl, size),
+    delete(test_tbl),
+    success.
+%%     io:format("** testing speed, this may take a while...~n"),
+%%     test2(1000),
+%%     test2(10000),
+%%     test2(100000),
+%%     test2(1000000).
+
+%% test2(Iter) ->
+%%     ok = new(test_tbl, [{max_size, unlimited}, {life_time, unlimited}]),
+%%     L = lists:seq(1, Iter),
+%%     T1 = now(),
+%%     lists:foreach(
+%%       fun(N) ->
+%%           ok = insert(test_tbl, N, N, fun() -> ok end)
+%%       end, L),
+%%     io:format("** average insert (size = ~p): ~p usec~n",
+%%           [Iter, round(timer:now_diff(now(), T1)/Iter)]),
+%%     T2 = now(),
+%%     lists:foreach(
+%%       fun(N) ->
+%%           {ok, N} = lookup(test_tbl, N, fun() -> ok end)
+%%       end, L),
+%%     io:format("** average lookup (size = ~p): ~p usec~n",
+%%           [Iter, round(timer:now_diff(now(), T2)/Iter)]),
+%%     {ok, Iter} = info(test_tbl, size),
+%%     delete(test_tbl).
+
+check(List) ->
+    Size = length(List),
+    {ok, Size} = info(test_tbl, size),
+    List = tab2list(test_tbl).
diff --git a/src/cache_tab_sup.erl b/src/cache_tab_sup.erl
new file mode 100644 (file)
index 0000000..d417c54
--- /dev/null
@@ -0,0 +1,34 @@
+%%%-------------------------------------------------------------------
+%%% File    : cache_tab_sup.erl
+%%% Author  : Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%% Description : 
+%%%
+%%% Created : 30 Aug 2010 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(cache_tab_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+%%====================================================================
+%% API functions
+%%====================================================================
+start_link() ->
+    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%%====================================================================
+%% Supervisor callbacks
+%%====================================================================
+init([]) ->
+    {ok, {{one_for_one,10,1}, []}}.
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
index f0119326a1214f8abf2737473a389a7ad9ba6cb6..7dc3068edf8732279500e43f76fdfd2300746f2e 100644 (file)
@@ -177,6 +177,13 @@ init([]) ->
         infinity,
         supervisor,
         [ejabberd_tmp_sup]},
+    CacheTabSupervisor =
+       {cache_tab_sup,
+        {cache_tab_sup, start_link, []},
+        permanent,
+        infinity,
+        supervisor,
+        [cache_tab_sup]},
     {ok, {{one_for_one, 10, 1},
          [Hooks,
           NodeGroups,
@@ -196,6 +203,7 @@ init([]) ->
           IQSupervisor,
           STUNSupervisor,
           FrontendSocketSupervisor,
+          CacheTabSupervisor,
           Listener]}}.
 
 
index d9f4f30c8f42b1cea3a533604f06a60973f3a090..01056fbe5d2a23658e1f948c0ed5ec92f6ae16a6 100644 (file)
@@ -98,11 +98,12 @@ get_features(#caps{node = Node, version = Version, exts = Exts}) ->
     SubNodes = [Version | Exts],
     lists:foldl(
       fun(SubNode, Acc) ->
-             case mnesia:dirty_read({caps_features,
-                                     node_to_binary(Node, SubNode)}) of
-                 [] ->
+             BinaryNode = node_to_binary(Node, SubNode),
+             case cache_tab:lookup(caps_features, BinaryNode,
+                                   caps_read_fun(BinaryNode)) of
+                 error ->
                      Acc;
-                 [#caps_features{features = Features}] ->
+                 {ok, Features} ->
                      binary_to_features(Features) ++ Acc
              end
       end, [], SubNodes).
@@ -196,12 +197,23 @@ disco_info(Acc, _Host, _Module, _Node, _Lang) ->
 %%====================================================================
 %% gen_server callbacks
 %%====================================================================
-init([Host, _Opts]) ->
+init([Host, Opts]) ->
+    case catch mnesia:table_info(caps_features, storage_type) of
+       {'EXIT', _} ->
+           ok;
+       disc_only_copies ->
+           ok;
+       _ ->
+           mnesia:delete_table(caps_features)
+    end,
     mnesia:create_table(caps_features,
-                       [{disc_copies, [node()]},
+                       [{disc_only_copies, [node()]},
                         {local_content, true},
                         {attributes, record_info(fields, caps_features)}]),
-    mnesia:add_table_copy(caps_features, node(), disc_copies),
+    mnesia:add_table_copy(caps_features, node(), disc_only_copies),
+    MaxSize = gen_mod:get_opt(cache_size, Opts, 1000),
+    LifeTime = gen_mod:get_opt(cache_life_time, Opts, timer:hours(24)),
+    cache_tab:new(caps_features, [{max_size, MaxSize}, {life_time, LifeTime}]),
     ejabberd_hooks:add(user_send_packet, Host,
                       ?MODULE, user_send_packet, 75),
     ejabberd_hooks:add(c2s_stream_features, Host,
@@ -252,8 +264,9 @@ code_change(_OldVsn, State, _Extra) ->
 feature_request(Host, From, Caps, [SubNode | Tail] = SubNodes) ->
     Node = Caps#caps.node,
     BinaryNode = node_to_binary(Node, SubNode),
-    case mnesia:dirty_read({caps_features, BinaryNode}) of
-       [] ->
+    case cache_tab:lookup(caps_features, BinaryNode,
+                         caps_read_fun(BinaryNode)) of
+       error ->
            IQ = #iq{type = get,
                     xmlns = ?NS_DISCO_INFO,
                     sub_el = [{xmlelement, "query",
@@ -284,11 +297,13 @@ feature_response(#iq{type = result,
                            (_) ->
                                 []
                         end, Els),
-           mnesia:dirty_write(
-             #caps_features{node_pair = BinaryNode,
-                            features = features_to_binary(Features)});
+           BinaryFeatures = features_to_binary(Features),
+           cache_tab:insert(
+             caps_features, BinaryNode, BinaryFeatures,
+             caps_write_fun(BinaryNode, BinaryFeatures));
        false ->
-           mnesia:dirty_write(#caps_features{node_pair = BinaryNode})
+           cache_tab:insert(caps_features, BinaryNode, [],
+                            caps_write_fun(BinaryNode, []))
     end,
     feature_request(Host, From, Caps, SubNodes);
 feature_response(timeout, _Host, _From, _Caps, _SubNodes) ->
@@ -297,7 +312,8 @@ feature_response(_IQResult, Host, From, Caps, [SubNode | SubNodes]) ->
     %% We got type=error or invalid type=result stanza, so
     %% we cache empty feature not to probe the client permanently
     BinaryNode = node_to_binary(Caps#caps.node, SubNode),
-    mnesia:dirty_write(#caps_features{node_pair = BinaryNode}),
+    cache_tab:insert(caps_features, BinaryNode, [],
+                    caps_write_fun(BinaryNode, [])),
     feature_request(Host, From, Caps, SubNodes).
 
 node_to_binary(Node, SubNode) ->
@@ -306,6 +322,23 @@ node_to_binary(Node, SubNode) ->
 features_to_binary(L) -> [list_to_binary(I) || I <- L].
 binary_to_features(L) -> [binary_to_list(I) || I <- L].
 
+caps_read_fun(Node) ->
+    fun() ->
+           case mnesia:dirty_read({caps_features, Node}) of
+               [#caps_features{features = Features}] ->
+                   {ok, Features};
+               _ ->
+                   error
+           end
+    end.
+
+caps_write_fun(Node, Features) ->
+    fun() ->
+           mnesia:dirty_write(
+             #caps_features{node_pair = Node,
+                            features = Features})
+    end.
+
 make_my_disco_hash(Host) ->
     JID = jlib:make_jid("", Host, ""),
     case {ejabberd_hooks:run_fold(disco_local_features,