10 #ifndef CORAL_ASYNC_HPP 11 #define CORAL_ASYNC_HPP 23 #include "coral/config.h" 48 template<
typename StackData,
typename Result>
80 template<
typename Result>
91 template<
typename StackData>
92 struct CommThreadAnyTask
100 struct CommThreadAnyTask<void>
161 template<
typename StackData>
252 template<
typename Result>
287 bool Active()
const CORAL_NOEXCEPT;
291 void WaitForThreadTermination();
292 void DestroySilently() CORAL_NOEXCEPT;
295 zmq::socket_t m_socket;
318 const char* what()
const CORAL_NOEXCEPT
override;
331 template<
typename StackData>
332 void CommThreadMessagingLoop(
333 zmq::socket_t& bgSocket,
334 typename CommThreadAnyTask<StackData>::SharedPtr nextTask)
346 typename CommThreadAnyTask<StackData>::Type myTask;
347 swap(*nextTask, myTask);
357 inline void CommThreadMessagingLoop<void>(
358 zmq::socket_t& bgSocket,
359 typename CommThreadAnyTask<void>::SharedPtr nextTask)
370 CommThreadAnyTask<void>::Type myTask;
371 swap(*nextTask, myTask);
383 template<
typename StackData>
384 void CommThreadBackground(
387 typename CommThreadAnyTask<StackData>::SharedPtr nextTask)
391 CommThreadMessagingLoop<StackData>(
398 statusNotifier->set_value();
400 statusNotifier->set_exception(
407 bgSocket->send(
"", 0);
412 template<
typename StackData>
421 bgSocket->setsockopt(ZMQ_LINGER, 0);
422 m_socket.setsockopt(ZMQ_LINGER, 0);
424 bgSocket->bind(endpoint);
425 m_socket.connect(endpoint);
427 auto statusNotifier = std::make_shared<std::promise<void>>();
428 m_threadStatus = statusNotifier->get_future();
431 std::make_shared<typename detail::CommThreadAnyTask<StackData>::Type>();
432 m_nextTask = sharedTask;
434 std::thread{&detail::CommThreadBackground<StackData>,
435 bgSocket, statusNotifier, sharedTask}.
detach();
439 template<
typename StackData>
446 template<
typename StackData>
448 : m_active{other.m_active}
450 , m_threadStatus{
std::move(other.m_threadStatus)}
451 , m_nextTask{
std::move(other.m_nextTask)}
453 other.m_active =
false;
457 template<
typename StackData>
462 m_active = other.m_active;
464 m_threadStatus =
std::move(other.m_threadStatus);
465 m_nextTask =
std::move(other.m_nextTask);
466 other.m_active =
false;
473 template<
typename StackData,
typename Result>
474 struct CommThreadFunctions
476 static typename CommThreadAnyTask<StackData>::Type WrapTask(
480 const auto sharedPromise =
481 std::make_shared<typename std::promise<Result>>(
std::move(promise));
482 return [task, sharedPromise]
485 task(reactor, data,
std::move(*sharedPromise));
491 template<
typename Result>
492 struct CommThreadFunctions<void, Result>
494 static typename CommThreadAnyTask<void>::Type WrapTask(
498 const auto sharedPromise =
499 std::make_shared<typename std::promise<Result>>(
std::move(promise));
500 return [task, sharedPromise]
503 task(reactor,
std::move(*sharedPromise));
510 template<
typename StackData>
511 template<
typename Result>
521 if (
auto sharedTask = m_nextTask.lock()) {
522 assert(!(*sharedTask));
523 *sharedTask = detail::CommThreadFunctions<StackData, Result>::WrapTask(
529 m_socket.send(
"", 0);
531 m_socket.recv(&dummy, 1);
536 WaitForThreadTermination();
542 "CommThread background thread has terminated silently and " 543 "unexpectedly. Perhaps Reactor::Stop() was called?");
551 inline void CommThreadShutdown(
559 template<
typename StackData>
579 template<
typename StackData>
582 Execute<void>(detail::CommThreadShutdownTask<StackData>());
583 WaitForThreadTermination();
587 template<
typename StackData>
594 template<
typename StackData>
598 assert(m_threadStatus.valid());
610 m_threadStatus.get();
619 catch (
const zmq::error_t& e) {
657 # pragma warning(push) 658 # pragma warning(disable: 4101) // Unreferenced local variable 'e' 661 template<
typename StackData>
672 boost::format(
"Unexpected exception thrown in CommThread destructor: %s")
677 boost::format(
"Unexpected exception thrown in CommThread destructor: %s")
682 assert(!static_cast<void*>(m_socket));
683 assert(!m_threadStatus.valid());
687 # pragma warning(pop) 692 #endif // header guard ~CommThread() CORAL_NOEXCEPT
If the CommThread object is still active, shuts down the background thread and waits for it to termin...
Definition: async.hpp:440
An exception that signals an error that has caused CommThread's background thread to terminate unexpe...
Definition: async.hpp:308
CommThread()
Creates the background thread.
Definition: async.hpp:413
T make_exception_ptr(T...args)
std::string RandomUUID()
Returns a string that contains a random UUID.
Contains the coral::net::Reactor class and related functionality.
Creates and controls a background communications thread.
Definition: async.hpp:162
Module header for coral::net::zmqx.
T current_exception(T...args)
void Stop()
Stops the messaging loop.
void AddSocket(zmq::socket_t &socket, SocketHandler handler)
Adds a handler for the given socket.
bool Active() const CORAL_NOEXCEPT
Returns whether the CommThread object is active.
Definition: async.hpp:588
#define CORAL_LOG_DEBUG(...)
If either of the macros CORAL_LOG_DEBUG_ENABLED or CORAL_LOG_TRACE_ENABLED are defined, this is equivalent to calling Log(debug, args), except that the file and line number are also logged. Otherwise, it is a no-op.
Definition: log.hpp:75
void Shutdown()
Terminates the background thread in a controlled manner.
Definition: async.hpp:580
std::future< Result > Execute(typename CommThreadTask< StackData, Result >::Type task)
Executes a task asynchronously in the background thread.
Definition: async.hpp:512
ScopeGuard< Action > OnScopeExit(Action action)
Scope guard.
Definition: util.hpp:189
Definition: variable_io.hpp:28
Main header file for coral::util.
zmq::context_t & GlobalContext()
Returns a reference to a global ZMQ context.
#define CORAL_INPUT_CHECK(test)
Checks the value of one or more function input parameters, and throws an std::invalid_argument if the...
Definition: error.hpp:103
Main header file for coral::error.
Contains the Type member alias, which defines the signature for functions executed asynchronously by ...
Definition: async.hpp:49
#define CORAL_PRECONDITION_CHECK(test)
Throws a coral::error::PreconditionViolation if the given boolean expression evaluates to false...
Definition: error.hpp:168
std::exception_ptr OriginalException() const CORAL_NOEXCEPT
Returns a pointer to the exception that caused the thread to terminate.
void Run()
Runs the messaging loop.
Main header file for coral::log (but also contains a few macros).
An implementation of the reactor pattern.
Definition: reactor.hpp:41
T rethrow_exception(T...args)
void Log(Level level, const char *message) CORAL_NOEXCEPT
Writes a plain C string to the global logger.