-record(pubsub_state,
{
stateid ,% :: {jlib:ljid(), mod_pubsub:nodeIdx()},
+ nodeidx ,% :: mod_pubsub:nodeIdx(),
items = [] ,% :: [mod_pubsub:itemId(),...],
affiliation = 'none',% :: mod_pubsub:affiliation(),
subscriptions = [] % :: [{mod_pubsub:subscription(), mod_pubsub:subId()}]
-record(pubsub_item,
{
itemid ,% :: {mod_pubsub:itemId(), mod_pubsub:nodeIdx()},
+ nodeidx ,% :: mod_pubsub:nodeIdx(),
creation = {unknown, unknown},% :: {erlang:timestamp(), jlib:ljid()},
modification = {unknown, unknown},% :: {erlang:timestamp(), jlib:ljid()},
payload = [] % :: mod_pubsub:payload()
-callback get_items(nodeIdx(), jid(), undefined | rsm_set()) ->
{result, {[pubsubItem()], undefined | rsm_set()}}.
+-callback get_last_items(nodeIdx(), jid(), undefined | rsm_set()) ->
+ {result, {[pubsubItem()], undefined | rsm_set()}}.
+
-callback get_item(NodeIdx :: nodeIdx(),
ItemId :: itemId(),
JID :: jid(),
-type(pubsubState() ::
#pubsub_state{
stateid :: {Entity::ljid(), Nidx::mod_pubsub:nodeIdx()},
+ nodeidx :: Nidx::mod_pubsub:nodeIdx(),
items :: [ItemId::mod_pubsub:itemId()],
affiliation :: Affs::mod_pubsub:affiliation(),
subscriptions :: [{Sub::mod_pubsub:subscription(), SubId::mod_pubsub:subId()}]
-type(pubsubItem() ::
#pubsub_item{
itemid :: {ItemId::mod_pubsub:itemId(), Nidx::mod_pubsub:nodeIdx()},
+ nodeidx :: Nidx::mod_pubsub:nodeIdx(),
creation :: {erlang:timestamp(), ljid()},
modification :: {erlang:timestamp(), ljid()},
payload :: mod_pubsub:payload()
{PS, RG} = get_presence_and_roster_permissions(Host, From, Owners, AccessModel, AllowedGroups),
node_call(Host, Type, get_items, [Nidx, From, AccessModel, PS, RG, undefined, RSM]).
-get_last_item(Host, Type, Nidx, LJID) ->
- case get_cached_item(Host, Nidx) of
- undefined -> get_last_item(Host, Type, Nidx, LJID, gen_mod:db_type(serverhost(Host), ?MODULE));
- LastItem -> LastItem
- end.
-get_last_item(Host, Type, Nidx, LJID, mnesia) ->
- case node_action(Host, Type, get_items, [Nidx, LJID, undefined]) of
- {result, {[LastItem|_], _}} -> LastItem;
- _ -> undefined
- end;
-get_last_item(Host, Type, Nidx, LJID, sql) ->
- case node_action(Host, Type, get_last_items, [Nidx, LJID, 1]) of
- {result, [LastItem]} -> LastItem;
- _ -> undefined
- end;
-get_last_item(_Host, _Type, _Nidx, _LJID, _) ->
- undefined.
-
-get_last_items(Host, Type, Nidx, LJID, Number) ->
- get_last_items(Host, Type, Nidx, LJID, Number, gen_mod:db_type(serverhost(Host), ?MODULE)).
-get_last_items(Host, Type, Nidx, LJID, Number, mnesia) ->
- case node_action(Host, Type, get_items, [Nidx, LJID, undefined]) of
- {result, {Items, _}} -> lists:sublist(Items, Number);
- _ -> []
- end;
-get_last_items(Host, Type, Nidx, LJID, Number, sql) ->
- case node_action(Host, Type, get_last_items, [Nidx, LJID, Number]) of
+get_last_items(Host, Type, Nidx, LJID, Count) ->
+ case node_action(Host, Type, get_last_items, [Nidx, LJID, Count]) of
{result, Items} -> Items;
_ -> []
- end;
-get_last_items(_Host, _Type, _Nidx, _LJID, _Number, _) ->
- [].
+ end.
%% @doc <p>Resend the items of a node to the user.</p>
%% @todo use cache-last-item feature
send_items(Host, Node, Nidx, Type, Options, LJID, last) ->
- case get_last_item(Host, Type, Nidx, LJID) of
- undefined ->
- ok;
- LastItem ->
+ case get_last_items(Host, Type, Nidx, LJID, 1) of
+ [LastItem] ->
Stanza = items_event_stanza(Node, Options, [LastItem]),
- dispatch_items(Host, LJID, Node, Stanza)
+ dispatch_items(Host, LJID, Node, Stanza);
+ _ ->
+ ok
end;
send_items(Host, Node, Nidx, Type, Options, LJID, Number) when Number > 0 ->
Stanza = items_event_stanza(Node, Options, get_last_items(Host, Type, Nidx, Number, LJID)),
Error
end.
-get_subscriptions_for_send_last(Host, PType, mnesia, JID, LJID, BJID) ->
+get_subscriptions_for_send_last(Host, PType, sql, JID, LJID, BJID) ->
+ {result, Subs} = node_action(Host, PType,
+ get_entity_subscriptions_for_send_last,
+ [Host, JID]),
+ [{Node, Sub, SubId, SubJID}
+ || {Node, Sub, SubId, SubJID} <- Subs,
+ Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID)];
+get_subscriptions_for_send_last(Host, PType, _, JID, LJID, BJID) ->
{result, Subs} = node_action(Host, PType,
get_entity_subscriptions,
[Host, JID]),
[{Node, Sub, SubId, SubJID}
|| {Node, Sub, SubId, SubJID} <- Subs,
Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID),
- match_option(Node, send_last_published_item, on_sub_and_presence)];
-get_subscriptions_for_send_last(Host, PType, sql, JID, LJID, BJID) ->
- case catch node_action(Host, PType,
- get_entity_subscriptions_for_send_last,
- [Host, JID])
- of
- {result, Subs} ->
- [{Node, Sub, SubId, SubJID}
- || {Node, Sub, SubId, SubJID} <- Subs,
- Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID)];
- _ ->
- []
- end;
-get_subscriptions_for_send_last(_Host, _PType, _, _JID, _LJID, _BJID) ->
- [].
+ match_option(Node, send_last_published_item, on_sub_and_presence)].
-spec set_subscriptions(host(), binary(), jid(), [ps_subscription()]) ->
{result, undefined} | {error, stanza_error()}.
-spec node_owners_action(host(), binary(), nodeIdx(), [ljid()]) -> [ljid()].
node_owners_action(Host, Type, Nidx, []) ->
- case gen_mod:db_type(serverhost(Host), ?MODULE) of
- sql ->
- case node_action(Host, Type, get_node_affiliations, [Nidx]) of
- {result, Affs} -> [LJID || {LJID, Aff} <- Affs, Aff =:= owner];
- _ -> []
- end;
- _ ->
- []
+ case node_action(Host, Type, get_node_affiliations, [Nidx]) of
+ {result, Affs} -> [LJID || {LJID, Aff} <- Affs, Aff =:= owner];
+ _ -> []
end;
node_owners_action(_Host, _Type, _Nidx, Owners) ->
Owners.
-spec node_owners_call(host(), binary(), nodeIdx(), [ljid()]) -> [ljid()].
node_owners_call(Host, Type, Nidx, []) ->
- case gen_mod:db_type(serverhost(Host), ?MODULE) of
- sql ->
- case node_call(Host, Type, get_node_affiliations, [Nidx]) of
- {result, Affs} -> [LJID || {LJID, Aff} <- Affs, Aff =:= owner];
- _ -> []
- end;
- _ ->
- []
+ case node_call(Host, Type, get_node_affiliations, [Nidx]) of
+ {result, Affs} -> [LJID || {LJID, Aff} <- Affs, Aff =:= owner];
+ _ -> []
end;
node_owners_call(_Host, _Type, _Nidx, Owners) ->
Owners.
Tree -> Tree
end.
--spec tree(host(), binary() | atom()) -> atom().
+-spec tree(host(), binary()) -> atom().
tree(_Host, <<"virtual">>) ->
nodetree_virtual; % special case, virtual does not use any backend
tree(Host, Name) ->
- case gen_mod:db_type(serverhost(Host), ?MODULE) of
- mnesia -> aux:binary_to_atom(<<"nodetree_", Name/binary>>);
- sql -> aux:binary_to_atom(<<"nodetree_", Name/binary, "_sql">>);
- _ -> Name
- end.
+ submodule(Host, <<"nodetree_", Name/binary>>).
--spec plugin(host(), binary() | atom()) -> atom().
+-spec plugin(host(), binary()) -> atom().
plugin(Host, Name) ->
- case gen_mod:db_type(serverhost(Host), ?MODULE) of
- mnesia -> aux:binary_to_atom(<<"node_", Name/binary>>);
- sql -> aux:binary_to_atom(<<"node_", Name/binary, "_sql">>);
- _ -> Name
- end.
+ submodule(Host, <<"node_", Name/binary>>).
-spec plugins(host()) -> [binary()].
plugins(Host) ->
Plugins -> Plugins
end.
--spec subscription_plugin(host()) -> pubsub_subscription |
- pubsub_subscription_sql |
- none.
+-spec subscription_plugin(host()) -> atom().
subscription_plugin(Host) ->
+ submodule(Host, <<"pubsub_subscription">>).
+
+-spec submodule(host(), binary()) -> atom().
+submodule(Host, Name) ->
case gen_mod:db_type(serverhost(Host), ?MODULE) of
- mnesia -> pubsub_subscription;
- sql -> pubsub_subscription_sql;
- _ -> none
+ mnesia -> aux:binary_to_atom(Name);
+ Type -> aux:binary_to_atom(<<Name/binary, "_",
+ (jlib:atom_to_binary(Type))/binary>>)
end.
-spec config(binary(), any()) -> any().
{error, xmpp:err_internal_server_error(ErrTxt, ?MYLANG)}
end;
Other ->
- ?ERROR_MSG("unsupported backend: ~p~n", [Other]),
- ErrTxt = <<"Database failure">>,
- {error, xmpp:err_internal_server_error(ErrTxt, ?MYLANG)}
+ case catch Fun() of
+ {'EXIT', _} ->
+ ?ERROR_MSG("unsupported backend: ~p~n", [Other]),
+ ErrTxt = <<"Database failure">>,
+ {error, xmpp:err_internal_server_error(ErrTxt, ?MYLANG)};
+ Result ->
+ Result
+ end
end.
%% @doc <p>node plugin call.</p>
end,
catch ejabberd_sql:SqlFun(ServerHost, Fun);
_ ->
- {unsupported, DBType}
+ catch Fun()
end,
case Res of
{result, Result} ->
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1]).
node_flat:get_items(Nidx, JID, AccessModel,
PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_flat:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_flat:get_item(Nidx, ItemId).
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1]).
node_flat:get_items(Nidx, JID, AccessModel,
PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_flat:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_flat:get_item(Nidx, ItemId).
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1]).
node_hometree:get_items(Nidx, JID, AccessModel,
PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_hometree:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_hometree:get_item(Nidx, ItemId).
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1]).
node_hometree:get_items(Nidx, JID, AccessModel,
PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_hometree:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_hometree:get_item(Nidx, ItemId).
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
- path_to_node/1, can_fetch_item/2, is_subscribed/1]).
+ path_to_node/1, can_fetch_item/2, is_subscribed/1, transform/1]).
init(_Host, _ServerHost, _Opts) ->
%pubsub_subscription:init(Host, ServerHost, Opts),
ejabberd_mnesia:create(?MODULE, pubsub_state,
- [{disc_copies, [node()]},
+ [{disc_copies, [node()]}, {index, [nodeidx]},
{type, ordered_set},
{attributes, record_info(fields, pubsub_state)}]),
ejabberd_mnesia:create(?MODULE, pubsub_item,
- [{disc_only_copies, [node()]},
+ [{disc_only_copies, [node()]}, {index, [nodeidx]},
{attributes, record_info(fields, pubsub_item)}]),
ejabberd_mnesia:create(?MODULE, pubsub_orphan,
[{disc_copies, [node()]},
create_node(Nidx, Owner) ->
OwnerKey = jid:tolower(jid:remove_resource(Owner)),
set_state(#pubsub_state{stateid = {OwnerKey, Nidx},
- affiliation = owner}),
+ nodeidx = Nidx, affiliation = owner}),
{result, {default, broadcast}}.
delete_node(Nodes) ->
payload = Payload};
_ ->
#pubsub_item{itemid = {ItemId, Nidx},
+ nodeidx = Nidx,
creation = {Now, GenKey},
modification = PubId,
payload = Payload}
%% ```get_states(Nidx) ->
%% node_default:get_states(Nidx).'''</p>
get_states(Nidx) ->
- States = case catch mnesia:match_object(
- #pubsub_state{stateid = {'_', Nidx}, _ = '_'}) of
+ States = case catch mnesia:index_read(pubsub_state, Nidx, #pubsub_state.nodeidx) of
List when is_list(List) -> List;
_ -> []
end,
StateId = {Key, Nidx},
case catch mnesia:read({pubsub_state, StateId}) of
[State] when is_record(State, pubsub_state) -> State;
- _ -> #pubsub_state{stateid = StateId}
+ _ -> #pubsub_state{stateid = StateId, nodeidx = Nidx}
end.
%% @doc <p>Write a state into database.</p>
%% <p>PubSub plugins can store the items where they wants (for example in a
%% relational database), or they can even decide not to persist any items.</p>
get_items(Nidx, _From, _RSM) ->
- Items = mnesia:match_object(#pubsub_item{itemid = {'_', Nidx}, _ = '_'}),
+ Items = mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx),
{result, {lists:reverse(lists:keysort(#pubsub_item.modification, Items)), undefined}}.
get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId, RSM) ->
get_items(Nidx, JID, RSM)
end.
+get_last_items(Nidx, From, Count) when Count > 0 ->
+ {result, {Items, _}} = get_items(Nidx, From, undefined),
+ {result, lists:sublist(Items, Count)};
+get_last_items(_Nidx, _From, _Count) ->
+ {result, []}.
+
%% @doc <p>Returns an item (one item list), given its reference.</p>
get_item(Nidx, ItemId) ->
true -> {value, H};
_ -> first_in_list(Pred, T)
end.
+
+transform({pubsub_state, {Id, Nidx}, Is, A, Ss}) ->
+ {pubsub_state, {Id, Nidx}, Nidx, Is, A, Ss};
+transform({pubsub_item, {Id, Nidx}, C, M, P}) ->
+ {pubsub_item, {Id, Nidx}, Nidx, C, M, P};
+transform(Rec) ->
+ Rec.
"where nodeid=%(Nidx)d and jid=%(J)s")) of
{selected, [{SJID, Aff, Subs}]} ->
#pubsub_state{stateid = {decode_jid(SJID), Nidx},
+ nodeidx = Nidx,
affiliation = decode_affiliation(Aff),
subscriptions = decode_subscriptions(Subs)};
_ ->
- #pubsub_state{stateid = {JID, Nidx}}
+ #pubsub_state{stateid = {JID, Nidx}, nodeidx = Nidx}
end.
set_state(State) ->
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1]).
node_flat:get_items(Nidx, JID, AccessModel,
PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_flat:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_flat:get_item(Nidx, ItemId).
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1]).
get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, SubId, RSM) ->
node_pep:get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_pep:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_pep:get_item(Nidx, ItemId).
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1]).
node_flat:get_items(Nidx, JID, AccessModel,
PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_flat:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_flat:get_item(Nidx, ItemId).
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1, get_entity_subscriptions_for_send_last/2]).
node_flat_sql:get_items(Nidx, JID, AccessModel,
PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_flat_sql:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_flat_sql:get_item(Nidx, ItemId).
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1]).
node_flat:get_items(Nidx, JID, AccessModel,
PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_flat:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_flat:get_item(Nidx, ItemId).
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1, depends/3]).
node_flat:get_items(Nidx, JID, AccessModel,
PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_flat:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_flat:get_item(Nidx, ItemId).
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1]).
node_flat:get_items(Nidx, JID, AccessModel,
PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_flat:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_flat:get_item(Nidx, ItemId).
get_subscriptions/2, set_subscriptions/4,
get_pending_nodes/2, get_states/1, get_state/2,
set_state/1, get_items/7, get_items/3, get_item/7,
+ get_last_items/3,
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
path_to_node/1]).
node_flat:get_items(Nidx, JID, AccessModel,
PresenceSubscription, RosterGroup, SubId, RSM).
+get_last_items(Nidx, From, Count) ->
+ node_flat:get_last_items(Nidx, From, Count).
+
get_item(Nidx, ItemId) ->
node_flat:get_item(Nidx, ItemId).