OSDN Git Service

[Orc][RPC] Lock the pending results data structure when installing new result
authorLang Hames <lhames@gmail.com>
Sun, 8 Jan 2017 20:09:35 +0000 (20:09 +0000)
committerLang Hames <lhames@gmail.com>
Sun, 8 Jan 2017 20:09:35 +0000 (20:09 +0000)
handlers, make abandonPendingResults public API.

This should make installing asynchronous result handlers thread safe.

The abandonPendingResults method is made public so that clients can disconnect
from a remote even if they have asynchronous handlers awaing results from that
remote. The asynchronous handlers will all receive "abandoned result" errors as
their argument.

git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@291399 91177308-0d34-0410-b5e6-96231b3b80d8

include/llvm/ExecutionEngine/Orc/RPCUtils.h

index 588deb0..37e2e66 100644 (file)
@@ -788,15 +788,21 @@ public:
       return FnIdOrErr.takeError();
     }
 
-    // Allocate a sequence number.
-    auto SeqNo = SequenceNumberMgr.getSequenceNumber();
-    assert(!PendingResponses.count(SeqNo) &&
-           "Sequence number already allocated");
+    SequenceNumberT SeqNo; // initialized in locked scope below.
+    {
+      // Lock the pending responses map and sequence number manager.
+      std::lock_guard<std::mutex> Lock(ResponsesMutex);
+
+      // Allocate a sequence number.
+      SeqNo = SequenceNumberMgr.getSequenceNumber();
+      assert(!PendingResponses.count(SeqNo) &&
+             "Sequence number already allocated");
 
-    // Install the user handler.
-    PendingResponses[SeqNo] =
+      // Install the user handler.
+      PendingResponses[SeqNo] =
         detail::createResponseHandler<ChannelT, typename Func::ReturnType>(
             std::move(Handler));
+    }
 
     // Open the function call message.
     if (auto Err = C.startSendMessage(FnId, SeqNo)) {
@@ -863,6 +869,24 @@ public:
     return detail::ReadArgs<ArgTs...>(Args...);
   }
 
+  /// Abandon all outstanding result handlers.
+  ///
+  /// This will call all currently registered result handlers to receive an
+  /// "abandoned" error as their argument. This is used internally by the RPC
+  /// in error situations, but can also be called directly by clients who are
+  /// disconnecting from the remote and don't or can't expect responses to their
+  /// outstanding calls. (Especially for outstanding blocking calls, calling
+  /// this function may be necessary to avoid dead threads).
+  void abandonPendingResponses() {
+    // Lock the pending responses map and sequence number manager.
+    std::lock_guard<std::mutex> Lock(ResponsesMutex);
+
+    for (auto &KV : PendingResponses)
+      KV.second->abandon();
+    PendingResponses.clear();
+    SequenceNumberMgr.reset();
+  }
+
 protected:
   // The LaunchPolicy type allows a launch policy to be specified when adding
   // a function handler. See addHandlerImpl.
@@ -888,28 +912,32 @@ protected:
         wrapHandler<Func>(std::move(Handler), std::move(Launch));
   }
 
-  // Abandon all outstanding results.
-  void abandonPendingResponses() {
-    for (auto &KV : PendingResponses)
-      KV.second->abandon();
-    PendingResponses.clear();
-    SequenceNumberMgr.reset();
-  }
-
   Error handleResponse(SequenceNumberT SeqNo) {
-    auto I = PendingResponses.find(SeqNo);
-    if (I == PendingResponses.end()) {
-      abandonPendingResponses();
-      return orcError(OrcErrorCode::UnexpectedRPCResponse);
+    using Handler = typename decltype(PendingResponses)::mapped_type;
+    Handler PRHandler;
+
+    {
+      // Lock the pending responses map and sequence number manager.
+      std::unique_lock<std::mutex> Lock(ResponsesMutex);
+      auto I = PendingResponses.find(SeqNo);
+
+      if (I != PendingResponses.end()) {
+        PRHandler = std::move(I->second);
+        PendingResponses.erase(I);
+        SequenceNumberMgr.releaseSequenceNumber(SeqNo);
+      } else {
+        // Unlock the pending results map to prevent recursive lock.
+        Lock.unlock();
+        abandonPendingResponses();
+        return orcError(OrcErrorCode::UnexpectedRPCResponse);
+      }
     }
 
-    auto PRHandler = std::move(I->second);
-    PendingResponses.erase(I);
-    SequenceNumberMgr.releaseSequenceNumber(SeqNo);
+    assert(PRHandler &&
+           "If we didn't find a response handler we should have bailed out");
 
     if (auto Err = PRHandler->handleResponse(C)) {
       abandonPendingResponses();
-      SequenceNumberMgr.reset();
       return Err;
     }
 
@@ -1016,6 +1044,7 @@ protected:
 
   std::map<FunctionIdT, WrappedHandlerFn> Handlers;
 
+  std::mutex ResponsesMutex;
   detail::SequenceNumberManager<SequenceNumberT> SequenceNumberMgr;
   std::map<SequenceNumberT, std::unique_ptr<detail::ResponseHandler<ChannelT>>>
       PendingResponses;