From 7618fab0472a626ab43e6663077fbf92da4996f5 Mon Sep 17 00:00:00 2001 From: Lang Hames Date: Tue, 24 Jan 2017 06:13:47 +0000 Subject: [PATCH] [Orc][RPC] Refactor ParallelCallGroup to decouple it from RPCEndpoint. This refactor allows parallel calls to be made via an arbitrary async call dispatcher. In particular, this allows ParallelCallGroup to be used with derived RPC classes that expose custom async RPC call operations. git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@292891 91177308-0d34-0410-b5e6-96231b3b80d8 --- include/llvm/ExecutionEngine/Orc/RPCUtils.h | 59 +++++++++---------- .../ExecutionEngine/Orc/RPCUtilsTest.cpp | 16 ++--- 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/include/llvm/ExecutionEngine/Orc/RPCUtils.h b/include/llvm/ExecutionEngine/Orc/RPCUtils.h index fcebc418f4c..e739c726298 100644 --- a/include/llvm/ExecutionEngine/Orc/RPCUtils.h +++ b/include/llvm/ExecutionEngine/Orc/RPCUtils.h @@ -1276,24 +1276,40 @@ public: } }; +/// Asynchronous dispatch for a function on an RPC endpoint. +template +class RPCAsyncDispatch { +public: + RPCAsyncDispatch(RPCClass &Endpoint) : Endpoint(Endpoint) {} + + template + Error operator()(HandlerT Handler, const ArgTs &... Args) const { + return Endpoint.template appendCallAsync(std::move(Handler), Args...); + } + +private: + RPCClass &Endpoint; +}; + +/// Construct an asynchronous dispatcher from an RPC endpoint and a Func. +template +RPCAsyncDispatch rpcAsyncDispatch(RPCEndpointT &Endpoint) { + return RPCAsyncDispatch(Endpoint); +} + /// \brief Allows a set of asynchrounous calls to be dispatched, and then /// waited on as a group. -template class ParallelCallGroup { +class ParallelCallGroup { public: - /// \brief Construct a parallel call group for the given RPC. - ParallelCallGroup(RPCClass &RPC) : RPC(RPC), NumOutstandingCalls(0) {} - + ParallelCallGroup() = default; ParallelCallGroup(const ParallelCallGroup &) = delete; ParallelCallGroup &operator=(const ParallelCallGroup &) = delete; /// \brief Make as asynchronous call. - /// - /// Does not issue a send call to the RPC's channel. The channel may use this - /// to batch up subsequent calls. A send will automatically be sent when wait - /// is called. - template - Error appendCall(HandlerT Handler, const ArgTs &... Args) { + template + Error call(const AsyncDispatcher &AsyncDispatch, HandlerT Handler, + const ArgTs &... Args) { // Increment the count of outstanding calls. This has to happen before // we invoke the call, as the handler may (depending on scheduling) // be run immediately on another thread, and we don't want the decrement @@ -1316,38 +1332,21 @@ public: return Err; }; - return RPC.template appendCallAsync(std::move(WrappedHandler), - Args...); - } - - /// \brief Make an asynchronous call. - /// - /// The same as appendCall, but also calls send on the channel immediately. - /// Prefer appendCall if you are about to issue a "wait" call shortly, as - /// this may allow the channel to better batch the calls. - template - Error call(HandlerT Handler, const ArgTs &... Args) { - if (auto Err = appendCall(std::move(Handler), Args...)) - return Err; - return RPC.sendAppendedCalls(); + return AsyncDispatch(std::move(WrappedHandler), Args...); } /// \brief Blocks until all calls have been completed and their return value /// handlers run. - Error wait() { - if (auto Err = RPC.sendAppendedCalls()) - return Err; + void wait() { std::unique_lock Lock(M); while (NumOutstandingCalls > 0) CV.wait(Lock); - return Error::success(); } private: - RPCClass &RPC; std::mutex M; std::condition_variable CV; - uint32_t NumOutstandingCalls; + uint32_t NumOutstandingCalls = 0; }; /// @brief Convenience class for grouping RPC Functions into APIs that can be diff --git a/unittests/ExecutionEngine/Orc/RPCUtilsTest.cpp b/unittests/ExecutionEngine/Orc/RPCUtilsTest.cpp index 9abf401af41..d21a4acc08d 100644 --- a/unittests/ExecutionEngine/Orc/RPCUtilsTest.cpp +++ b/unittests/ExecutionEngine/Orc/RPCUtilsTest.cpp @@ -405,10 +405,11 @@ TEST(DummyRPC, TestParallelCallGroup) { { int A, B, C; - ParallelCallGroup PCG(Client); + ParallelCallGroup PCG; { - auto Err = PCG.appendCall( + auto Err = PCG.call( + rpcAsyncDispatch(Client), [&A](Expected Result) { EXPECT_TRUE(!!Result) << "Async int(int) response handler failed"; A = *Result; @@ -418,7 +419,8 @@ TEST(DummyRPC, TestParallelCallGroup) { } { - auto Err = PCG.appendCall( + auto Err = PCG.call( + rpcAsyncDispatch(Client), [&B](Expected Result) { EXPECT_TRUE(!!Result) << "Async int(int) response handler failed"; B = *Result; @@ -428,7 +430,8 @@ TEST(DummyRPC, TestParallelCallGroup) { } { - auto Err = PCG.appendCall( + auto Err = PCG.call( + rpcAsyncDispatch(Client), [&C](Expected Result) { EXPECT_TRUE(!!Result) << "Async int(int) response handler failed"; C = *Result; @@ -443,10 +446,7 @@ TEST(DummyRPC, TestParallelCallGroup) { EXPECT_FALSE(!!Err) << "Client failed to handle response from void(bool)"; } - { - auto Err = PCG.wait(); - EXPECT_FALSE(!!Err) << "Third parallel call failed for int(int)"; - } + PCG.wait(); EXPECT_EQ(A, 2) << "First parallel call returned bogus result"; EXPECT_EQ(B, 4) << "Second parallel call returned bogus result"; -- 2.40.0