coral
A C++ library for distributed co-simulation
async.hpp
Go to the documentation of this file.
1 
10 #ifndef CORAL_ASYNC_HPP
11 #define CORAL_ASYNC_HPP
12 
13 #include <cassert>
14 #include <exception>
15 #include <functional>
16 #include <future>
17 #include <memory>
18 #include <thread>
19 #include <utility>
20 
21 #include "zmq.hpp"
22 
23 #include "coral/config.h"
24 #include "coral/error.hpp"
25 #include "coral/log.hpp"
26 #include "coral/net/reactor.hpp"
27 #include "coral/net/zmqx.hpp"
28 #include "coral/util.hpp"
29 
30 
31 namespace coral
32 {
34 namespace async
35 {
36 
37 
48 template<typename StackData, typename Result>
50 {
73  using Type = std::function<void(
75  StackData&,
77 };
78 
79 // Specialisation of the above for `StackData = void`.
80 template<typename Result>
81 struct CommThreadTask<void, Result>
82 {
83  using Type = std::function<void(
86 };
87 
88 
89 namespace detail
90 {
91  template<typename StackData>
92  struct CommThreadAnyTask
93  {
95  using SharedPtr = std::shared_ptr<Type>;
96  using WeakPtr = std::weak_ptr<Type>;
97  };
98 
99  template<>
100  struct CommThreadAnyTask<void>
101  {
103  using SharedPtr = std::shared_ptr<Type>;
104  using WeakPtr = std::weak_ptr<Type>;
105  };
106 } // namespace detail
107 
108 
161 template<typename StackData>
163 {
164 public:
169  explicit CommThread();
170 
181  ~CommThread() CORAL_NOEXCEPT;
182 
183  // Copying is disabled
184  CommThread(const CommThread&) = delete;
185  CommThread& operator=(const CommThread&) = delete;
186 
191  CommThread(CommThread&& other) CORAL_NOEXCEPT;
192 
197  CommThread& operator=(CommThread&& other) CORAL_NOEXCEPT;
198 
252  template<typename Result>
253  std::future<Result> Execute(
255 
272  void Shutdown();
273 
287  bool Active() const CORAL_NOEXCEPT;
288 
289 private:
290  // Waits for the background thread to terminate and performs cleanup
291  void WaitForThreadTermination();
292  void DestroySilently() CORAL_NOEXCEPT;
293 
294  bool m_active;
295  zmq::socket_t m_socket;
296  std::future<void> m_threadStatus;
298 };
299 
300 
309 {
310 public:
312  CommThreadDead(std::exception_ptr originalException) CORAL_NOEXCEPT;
313 
315  std::exception_ptr OriginalException() const CORAL_NOEXCEPT;
316 
318  const char* what() const CORAL_NOEXCEPT override;
319 
320 private:
321  std::exception_ptr m_originalException;
322 };
323 
324 
325 // =============================================================================
326 // Templates
327 // =============================================================================
328 
329 namespace detail
330 {
331  template<typename StackData>
332  void CommThreadMessagingLoop(
333  zmq::socket_t& bgSocket,
334  typename CommThreadAnyTask<StackData>::SharedPtr nextTask)
335  {
336  coral::net::Reactor reactor;
337  StackData data;
338  reactor.AddSocket(
339  bgSocket,
340  [nextTask, &data] (coral::net::Reactor& r, zmq::socket_t& s) {
341  char dummy;
342  s.recv(&dummy, 1);
343 
344  // Now, the foreground thread is blocked waiting for our
345  // reply, so we can freely access `nextTask`.
346  typename CommThreadAnyTask<StackData>::Type myTask;
347  swap(*nextTask, myTask);
348 
349  // Unblock foreground thread again and then run the task.
350  s.send("", 0);
351  myTask(r, data);
352  });
353  reactor.Run();
354  }
355 
356  template<>
357  inline void CommThreadMessagingLoop<void>(
358  zmq::socket_t& bgSocket,
359  typename CommThreadAnyTask<void>::SharedPtr nextTask)
360  {
361  coral::net::Reactor reactor;
362  reactor.AddSocket(
363  bgSocket,
364  [nextTask] (coral::net::Reactor& r, zmq::socket_t& s) {
365  char dummy;
366  s.recv(&dummy, 1);
367 
368  // Now, the foreground thread is blocked waiting for our
369  // reply, so we can freely access `nextTask`.
370  CommThreadAnyTask<void>::Type myTask;
371  swap(*nextTask, myTask);
372 
373  // Unblock foreground thread again and then run the task.
374  s.send("", 0);
375  myTask(r);
376  });
377  reactor.Run();
378  }
379 
380  // Note: Some of the parameters are only shared_ptr because VS2013 has
381  // a bug that prevents the use of move-only objects as arguments to the
382  // thread function.
383  template<typename StackData>
384  void CommThreadBackground(
386  std::shared_ptr<std::promise<void>> statusNotifier,
387  typename CommThreadAnyTask<StackData>::SharedPtr nextTask)
388  CORAL_NOEXCEPT
389  {
390  try {
391  CommThreadMessagingLoop<StackData>(
392  *bgSocket,
393  std::move(nextTask));
394 
395  // We should possibly use set_value_at_thread_exit() and
396  // set_exception_at_thread_exit() in the following, but those are
397  // not supported in GCC 4.9.
398  statusNotifier->set_value();
399  } catch (...) {
400  statusNotifier->set_exception(
402  }
403 
404  // This is to avoid the potential race condition where the background
405  // thread dies after the foreground thread has sent a task notification
406  // and is waiting to receive an acknowledgement.
407  bgSocket->send("", 0);
408  }
409 } // namespace detail
410 
411 
412 template<typename StackData>
414  : m_active{true}
415  , m_socket{coral::net::zmqx::GlobalContext(), ZMQ_PAIR}
416  , m_threadStatus{}
417  , m_nextTask{}
418 {
419  auto bgSocket =
420  std::make_shared<zmq::socket_t>(coral::net::zmqx::GlobalContext(), ZMQ_PAIR);
421  bgSocket->setsockopt(ZMQ_LINGER, 0);
422  m_socket.setsockopt(ZMQ_LINGER, 0);
423  const auto endpoint = "inproc://" + coral::util::RandomUUID();
424  bgSocket->bind(endpoint);
425  m_socket.connect(endpoint);
426 
427  auto statusNotifier = std::make_shared<std::promise<void>>();
428  m_threadStatus = statusNotifier->get_future();
429 
430  auto sharedTask =
431  std::make_shared<typename detail::CommThreadAnyTask<StackData>::Type>(/*empty*/);
432  m_nextTask = sharedTask;
433 
434  std::thread{&detail::CommThreadBackground<StackData>,
435  bgSocket, statusNotifier, sharedTask}.detach();
436 }
437 
438 
439 template<typename StackData>
441 {
442  DestroySilently();
443 }
444 
445 
446 template<typename StackData>
448  : m_active{other.m_active}
449  , m_socket{std::move(other.m_socket)}
450  , m_threadStatus{std::move(other.m_threadStatus)}
451  , m_nextTask{std::move(other.m_nextTask)}
452 {
453  other.m_active = false;
454 }
455 
456 
457 template<typename StackData>
459  CORAL_NOEXCEPT
460 {
461  DestroySilently();
462  m_active = other.m_active;
463  m_socket = std::move(other.m_socket);
464  m_threadStatus = std::move(other.m_threadStatus);
465  m_nextTask = std::move(other.m_nextTask);
466  other.m_active = false;
467  return *this;
468 }
469 
470 
471 namespace detail
472 {
473  template<typename StackData, typename Result>
474  struct CommThreadFunctions
475  {
476  static typename CommThreadAnyTask<StackData>::Type WrapTask(
478  std::promise<Result> promise)
479  {
480  const auto sharedPromise =
481  std::make_shared<typename std::promise<Result>>(std::move(promise));
482  return [task, sharedPromise]
483  (coral::net::Reactor& reactor, StackData& data)
484  {
485  task(reactor, data, std::move(*sharedPromise));
486  };
487  }
488  };
489 
490 
491  template<typename Result>
492  struct CommThreadFunctions<void, Result>
493  {
494  static typename CommThreadAnyTask<void>::Type WrapTask(
496  std::promise<Result> promise)
497  {
498  const auto sharedPromise =
499  std::make_shared<typename std::promise<Result>>(std::move(promise));
500  return [task, sharedPromise]
501  (coral::net::Reactor& reactor)
502  {
503  task(reactor, std::move(*sharedPromise));
504  };
505  }
506  };
507 } // namespace detail
508 
509 
510 template<typename StackData>
511 template<typename Result>
514 {
516  CORAL_INPUT_CHECK(task);
517 
518  auto promise = std::promise<Result>{};
519  auto future = promise.get_future();
520 
521  if (auto sharedTask = m_nextTask.lock()) {
522  assert(!(*sharedTask));
523  *sharedTask = detail::CommThreadFunctions<StackData, Result>::WrapTask(
524  std::move(task),
525  std::move(promise));
526 
527  // Notify background thread that a task is ready and wait for it
528  // to acknowledge it before returning.
529  m_socket.send("", 0);
530  char dummy;
531  m_socket.recv(&dummy, 1);
532  return future;
533  } else {
534  // The weak_ptr has expired, meaning that the thread (which holds
535  // a shared_ptr to the same object) must be dead.
536  WaitForThreadTermination();
537 
538  // The above function should have thrown. If it didn't, it probably
539  // means that calling code did something stupid.
541  coral::log::error,
542  "CommThread background thread has terminated silently and "
543  "unexpectedly. Perhaps Reactor::Stop() was called?");
544  std::terminate();
545  }
546 }
547 
548 
549 namespace detail
550 {
551  inline void CommThreadShutdown(
552  coral::net::Reactor& reactor,
553  std::promise<void> promise)
554  {
555  reactor.Stop();
556  promise.set_value();
557  }
558 
559  template<typename StackData>
560  typename CommThreadTask<StackData, void>::Type CommThreadShutdownTask()
561  {
562  return [] (coral::net::Reactor& r, StackData&, std::promise<void> p)
563  {
564  CommThreadShutdown(r, std::move(p));
565  };
566  }
567 
568  template<>
569  inline typename CommThreadTask<void, void>::Type CommThreadShutdownTask<void>()
570  {
571  return [] (coral::net::Reactor& r, std::promise<void> p)
572  {
573  CommThreadShutdown(r, std::move(p));
574  };
575  }
576 } // namespace detail
577 
578 
579 template<typename StackData>
581 {
582  Execute<void>(detail::CommThreadShutdownTask<StackData>());
583  WaitForThreadTermination();
584 }
585 
586 
587 template<typename StackData>
588 bool CommThread<StackData>::Active() const CORAL_NOEXCEPT
589 {
590  return m_active;
591 }
592 
593 
594 template<typename StackData>
596  {
597  assert(m_active);
598  assert(m_threadStatus.valid());
599  try {
600  const auto cleanup = coral::util::OnScopeExit([this] ()
601  {
602  m_active = false;
603  m_socket.close();
604 #ifdef _MSC_VER
605  // Visual Studio does not "reset" the future after get().
606  // See: http://stackoverflow.com/q/33899615
607  m_threadStatus = std::future<void>{};
608 #endif
609  });
610  m_threadStatus.get();
611  }
612 #ifdef _MSC_VER
613  // Visual Studio (versions up to and including 2015, at least)
614  // has a bug where std::current_exception() returns a null
615  // exception pointer if it is called during stack unwinding.
616  // We have to do the best we can in this case, and special-case
617  // some well-known exceptions.
618  // See: http://stackoverflow.com/q/29652438
619  catch (const zmq::error_t& e) {
620  if (std::current_exception()) {
622  } else {
624  }
625  }
626  catch (const std::runtime_error& e) {
627  if (std::current_exception()) {
629  } else {
631  }
632  }
633  catch (const std::logic_error& e) {
634  if (std::current_exception()) {
636  } else {
638  }
639  }
640  catch (const std::exception& e) {
641  if (std::current_exception()) {
643  } else {
645  std::runtime_error(e.what())));
646  }
647  }
648 #else
649  catch (...) {
651  }
652 #endif
653  }
654 
655 
656 #ifdef _MSC_VER
657 # pragma warning(push)
658 # pragma warning(disable: 4101) // Unreferenced local variable 'e'
659 #endif
660 
661 template<typename StackData>
662 void CommThread<StackData>::DestroySilently() CORAL_NOEXCEPT
663 {
664  if (Active()) {
665  try {
666  Shutdown();
667  } catch (const CommThreadDead& e) {
668  try {
670  } catch (const std::exception& e) {
672  boost::format("Unexpected exception thrown in CommThread destructor: %s")
673  % e.what());
674  }
675  } catch (const std::exception& e) {
677  boost::format("Unexpected exception thrown in CommThread destructor: %s")
678  % e.what());
679  }
680  }
681  assert(!Active());
682  assert(!static_cast<void*>(m_socket));
683  assert(!m_threadStatus.valid());
684 }
685 
686 #ifdef _MSC_VER
687 # pragma warning(pop)
688 #endif
689 
690 
691 }} // namespace
692 #endif // header guard
~CommThread() CORAL_NOEXCEPT
If the CommThread object is still active, shuts down the background thread and waits for it to termin...
Definition: async.hpp:440
An exception that signals an error that has caused CommThread&#39;s background thread to terminate unexpe...
Definition: async.hpp:308
CommThread()
Creates the background thread.
Definition: async.hpp:413
T make_exception_ptr(T...args)
std::string RandomUUID()
Returns a string that contains a random UUID.
Contains the coral::net::Reactor class and related functionality.
Creates and controls a background communications thread.
Definition: async.hpp:162
Module header for coral::net::zmqx.
T current_exception(T...args)
void Stop()
Stops the messaging loop.
T terminate(T...args)
void AddSocket(zmq::socket_t &socket, SocketHandler handler)
Adds a handler for the given socket.
bool Active() const CORAL_NOEXCEPT
Returns whether the CommThread object is active.
Definition: async.hpp:588
T get_future(T...args)
#define CORAL_LOG_DEBUG(...)
If either of the macros CORAL_LOG_DEBUG_ENABLED or CORAL_LOG_TRACE_ENABLED are defined, this is equivalent to calling Log(debug, args), except that the file and line number are also logged. Otherwise, it is a no-op.
Definition: log.hpp:75
void Shutdown()
Terminates the background thread in a controlled manner.
Definition: async.hpp:580
T what(T...args)
std::future< Result > Execute(typename CommThreadTask< StackData, Result >::Type task)
Executes a task asynchronously in the background thread.
Definition: async.hpp:512
ScopeGuard< Action > OnScopeExit(Action action)
Scope guard.
Definition: util.hpp:189
Definition: variable_io.hpp:28
Main header file for coral::util.
zmq::context_t & GlobalContext()
Returns a reference to a global ZMQ context.
STL class.
T move(T...args)
#define CORAL_INPUT_CHECK(test)
Checks the value of one or more function input parameters, and throws an std::invalid_argument if the...
Definition: error.hpp:103
Main header file for coral::error.
STL class.
Contains the Type member alias, which defines the signature for functions executed asynchronously by ...
Definition: async.hpp:49
#define CORAL_PRECONDITION_CHECK(test)
Throws a coral::error::PreconditionViolation if the given boolean expression evaluates to false...
Definition: error.hpp:168
std::exception_ptr OriginalException() const CORAL_NOEXCEPT
Returns a pointer to the exception that caused the thread to terminate.
STL class.
void Run()
Runs the messaging loop.
T set_value(T...args)
Main header file for coral::log (but also contains a few macros).
T detach(T...args)
An implementation of the reactor pattern.
Definition: reactor.hpp:41
T rethrow_exception(T...args)
void Log(Level level, const char *message) CORAL_NOEXCEPT
Writes a plain C string to the global logger.
STL class.