From 4c5f97bb9ac4984b6e5466c5287e4fdb98e07ae8 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Sat, 15 Apr 2017 15:38:48 +0300 Subject: [PATCH] Add Riak as mod_proxy65 RAM backend --- src/mod_proxy65_riak.erl | 111 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 src/mod_proxy65_riak.erl diff --git a/src/mod_proxy65_riak.erl b/src/mod_proxy65_riak.erl new file mode 100644 index 000000000..dd3cd64b5 --- /dev/null +++ b/src/mod_proxy65_riak.erl @@ -0,0 +1,111 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% Created : 15 Apr 2017 by Evgeny Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2017 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%------------------------------------------------------------------- +-module(mod_proxy65_riak). +-behaviour(mod_proxy65). + +%% API +-export([init/0, register_stream/2, unregister_stream/1, activate_stream/4]). + +-include("logger.hrl"). + +-record(proxy65, {sid :: binary(), + pid_t :: pid(), + pid_i :: pid() | undefined, + jid_i :: binary() | undefined}). + +%%%=================================================================== +%%% API +%%%=================================================================== +init() -> + clean_table(). + +register_stream(SID, Pid) -> + case ejabberd_riak:get(proxy65, proxy65_schema(), SID) of + {error, notfound} -> + ejabberd_riak:put(#proxy65{sid = SID, pid_t = Pid}, + proxy65_schema()); + {ok, #proxy65{pid_i = undefined} = R} -> + ejabberd_riak:put(R#proxy65{pid_i = Pid}, + proxy65_schema()); + {ok, _} -> + {error, conflict}; + {error, _} -> + {error, db_failure} + end. + +unregister_stream(SID) -> + case ejabberd_riak:delete(proxy65, SID) of + ok -> ok; + {error, _} -> {error, db_failure} + end. + +activate_stream(SID, IJID, MaxConnections, _Node) -> + try + case ejabberd_riak:get(proxy65, proxy65_schema(), SID) of + {ok, #proxy65{pid_t = TPid, pid_i = IPid, + jid_i = undefined} = R} when is_pid(IPid) -> + {ok, Num} = ejabberd_riak:count_by_index( + proxy65, <<"jid_i">>, IJID), + if Num >= MaxConnections -> + {error, {limit, IPid, TPid}}; + true -> + ok = ejabberd_riak:put( + R#proxy65{jid_i = IJID}, + proxy65_schema(), + [{'2i', [{<<"jid_i">>, IJID}]}]), + {ok, IPid, TPid} + end; + {ok, #proxy65{jid_i = JID}} -> + if is_binary(JID) -> {error, conflict}; + true -> {error, notfound} + end; + {error, _} = Err -> + Err + end + catch _:{badmatch, {error, _}} -> + {error, db_failure} + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +proxy65_schema() -> + {record_info(fields, proxy65), #proxy65{}}. + +clean_table() -> + ?INFO_MSG("Cleaning Riak 'proxy65' table...", []), + case ejabberd_riak:get(proxy65, proxy65_schema()) of + {ok, Rs} -> + lists:foreach( + fun(#proxy65{sid = SID, pid_t = TPid, pid_i = IPid}) -> + if node(TPid) == node() orelse + (is_pid(IPid) andalso node(IPid) == node()) -> + ejabberd_riak:delete(proxy65, SID); + true -> + ok + end + end, Rs); + {error, Reason} = Err -> + ?ERROR_MSG("Failed to clean Riak 'proxy65' table: ~p", [Reason]), + Err + end. -- 2.40.0