From: Lang Hames Date: Sun, 8 Jan 2017 20:09:35 +0000 (+0000) Subject: [Orc][RPC] Lock the pending results data structure when installing new result X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=c103185b6b12334cb06afbcd028f5777f9a4299c;p=llvm [Orc][RPC] Lock the pending results data structure when installing new result handlers, make abandonPendingResults public API. This should make installing asynchronous result handlers thread safe. The abandonPendingResults method is made public so that clients can disconnect from a remote even if they have asynchronous handlers awaing results from that remote. The asynchronous handlers will all receive "abandoned result" errors as their argument. git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@291399 91177308-0d34-0410-b5e6-96231b3b80d8 --- diff --git a/include/llvm/ExecutionEngine/Orc/RPCUtils.h b/include/llvm/ExecutionEngine/Orc/RPCUtils.h index 588deb020ac..37e2e66e5af 100644 --- a/include/llvm/ExecutionEngine/Orc/RPCUtils.h +++ b/include/llvm/ExecutionEngine/Orc/RPCUtils.h @@ -788,15 +788,21 @@ public: return FnIdOrErr.takeError(); } - // Allocate a sequence number. - auto SeqNo = SequenceNumberMgr.getSequenceNumber(); - assert(!PendingResponses.count(SeqNo) && - "Sequence number already allocated"); + SequenceNumberT SeqNo; // initialized in locked scope below. + { + // Lock the pending responses map and sequence number manager. + std::lock_guard Lock(ResponsesMutex); + + // Allocate a sequence number. + SeqNo = SequenceNumberMgr.getSequenceNumber(); + assert(!PendingResponses.count(SeqNo) && + "Sequence number already allocated"); - // Install the user handler. - PendingResponses[SeqNo] = + // Install the user handler. + PendingResponses[SeqNo] = detail::createResponseHandler( std::move(Handler)); + } // Open the function call message. if (auto Err = C.startSendMessage(FnId, SeqNo)) { @@ -863,6 +869,24 @@ public: return detail::ReadArgs(Args...); } + /// Abandon all outstanding result handlers. + /// + /// This will call all currently registered result handlers to receive an + /// "abandoned" error as their argument. This is used internally by the RPC + /// in error situations, but can also be called directly by clients who are + /// disconnecting from the remote and don't or can't expect responses to their + /// outstanding calls. (Especially for outstanding blocking calls, calling + /// this function may be necessary to avoid dead threads). + void abandonPendingResponses() { + // Lock the pending responses map and sequence number manager. + std::lock_guard Lock(ResponsesMutex); + + for (auto &KV : PendingResponses) + KV.second->abandon(); + PendingResponses.clear(); + SequenceNumberMgr.reset(); + } + protected: // The LaunchPolicy type allows a launch policy to be specified when adding // a function handler. See addHandlerImpl. @@ -888,28 +912,32 @@ protected: wrapHandler(std::move(Handler), std::move(Launch)); } - // Abandon all outstanding results. - void abandonPendingResponses() { - for (auto &KV : PendingResponses) - KV.second->abandon(); - PendingResponses.clear(); - SequenceNumberMgr.reset(); - } - Error handleResponse(SequenceNumberT SeqNo) { - auto I = PendingResponses.find(SeqNo); - if (I == PendingResponses.end()) { - abandonPendingResponses(); - return orcError(OrcErrorCode::UnexpectedRPCResponse); + using Handler = typename decltype(PendingResponses)::mapped_type; + Handler PRHandler; + + { + // Lock the pending responses map and sequence number manager. + std::unique_lock Lock(ResponsesMutex); + auto I = PendingResponses.find(SeqNo); + + if (I != PendingResponses.end()) { + PRHandler = std::move(I->second); + PendingResponses.erase(I); + SequenceNumberMgr.releaseSequenceNumber(SeqNo); + } else { + // Unlock the pending results map to prevent recursive lock. + Lock.unlock(); + abandonPendingResponses(); + return orcError(OrcErrorCode::UnexpectedRPCResponse); + } } - auto PRHandler = std::move(I->second); - PendingResponses.erase(I); - SequenceNumberMgr.releaseSequenceNumber(SeqNo); + assert(PRHandler && + "If we didn't find a response handler we should have bailed out"); if (auto Err = PRHandler->handleResponse(C)) { abandonPendingResponses(); - SequenceNumberMgr.reset(); return Err; } @@ -1016,6 +1044,7 @@ protected: std::map Handlers; + std::mutex ResponsesMutex; detail::SequenceNumberManager SequenceNumberMgr; std::map>> PendingResponses;