coral
A C++ library for distributed co-simulation
slave_agent.hpp
Go to the documentation of this file.
1 
10 #ifndef CORAL_BUS_SLAVE_AGENT_HPP
11 #define CORAL_BUS_SLAVE_AGENT_HPP
12 
13 #include <chrono>
14 #include <exception>
15 #include <string>
16 #include <vector>
17 
18 #include "boost/bimap.hpp"
19 #include "boost/bimap/multiset_of.hpp"
20 #include "zmq.hpp"
21 
22 #include "coral/config.h"
24 #include "coral/model.hpp"
25 #include "coral/net.hpp"
26 #include "coral/net/reactor.hpp"
27 #include "coral/net/zmqx.hpp"
28 #include "coral/slave/instance.hpp"
29 #include "execution.pb.h"
30 
31 
32 namespace coral
33 {
34 namespace bus
35 {
36 
37 
43 {
44 public:
64  SlaveAgent(
65  coral::net::Reactor& reactor,
66  coral::slave::Instance& slaveInstance,
67  const coral::net::Endpoint& controlEndpoint,
68  const coral::net::Endpoint& dataPubEndpoint,
69  std::chrono::milliseconds masterInactivityTimeout);
70 
71  // Class can't be copied or moved because it leaks references to `this`
72  // through Reactor event handlers.
73  SlaveAgent(const SlaveAgent&) = delete;
74  SlaveAgent& operator=(const SlaveAgent&) = delete;
75  SlaveAgent(SlaveAgent&&) = delete;
76  SlaveAgent& operator=(SlaveAgent&&) = delete;
77 
87 
96 
97 private:
98  /*
99  \brief Responds to a message from the master.
100 
101  On input, `msg` must be the message received from master, and on output,
102  it will contain the slave's reply. Internally, the function forwards to
103  the handler function that corresponds to the slave's current state.
104  */
105  void RequestReply(std::vector<zmq::message_t>& msg);
106 
107  // Each of these functions correspond to one of the slave's possible states.
108  // On input, `msg` is a message from the master node, and when the function
109  // returns, `msg` must contain the reply. If the message triggers a state
110  // change, the handler function must update m_stateHandler to point to the
111  // function for the new state.
112  void NotConnectedHandler(std::vector<zmq::message_t>& msg);
113  void ConnectedHandler(std::vector<zmq::message_t>& msg);
114  void ReadyHandler(std::vector<zmq::message_t>& msg);
115  void PublishedHandler(std::vector<zmq::message_t>& msg);
116  void StepFailedHandler(std::vector<zmq::message_t>& msg);
117 
118  // Performs the "describe" operation, including filling `msg` with a
119  // reply message.
120  void HandleDescribe(std::vector<zmq::message_t>& msg);
121 
122  // Performs the "set variables" operation for ReadyHandler(), including
123  // filling `msg` with a reply message.
124  void HandleSetVars(std::vector<zmq::message_t>& msg);
125 
126  // Performs the "set peers" operation for ReadyHandler(), including
127  // filling `msg` with a reply message.
128  void HandleSetPeers(std::vector<zmq::message_t>& msg);
129 
130  // Performs the "prime" operation for ReadyHandler(), including
131  // filling `msg` with a reply message.
132  void HandleResendVars(std::vector<zmq::message_t>& msg);
133 
134  // Performs the time step for ReadyHandler()
135  bool Step(const coralproto::execution::StepData& stepData);
136 
137  // Publishes all variable values (used by HandleResendVars() and Step()).
138  void PublishAll();
139 
140  // A pointer to the handler function for the current state.
141  void (SlaveAgent::* m_stateHandler)(std::vector<zmq::message_t>&);
142 
143  // Class that handles timeouts in master-slave communication
144  class Timeout
145  {
146  public:
147  Timeout(
148  coral::net::Reactor& reactor,
149  std::chrono::milliseconds timeout);
150  ~Timeout() CORAL_NOEXCEPT;
151  Timeout(const Timeout&) = delete;
152  Timeout& operator=(const Timeout&) = delete;
153  Timeout(Timeout&&) = delete;
154  Timeout& operator=(Timeout&&) = delete;
155  void Reset();
156  void SetTimeout(std::chrono::milliseconds timeout);
157  private:
158  coral::net::Reactor& m_reactor;
159  int m_timerID;
160  };
161 
162  // A less-than comparison functor for Variable objects, so we can put
163  // them in a std::map.
164  struct VariableLess
165  {
166  bool operator()(const coral::model::Variable& a, const coral::model::Variable& b) const
167  {
168  return ((a.Slave() << 16) + a.ID()) < ((b.Slave() << 16) + b.ID());
169  }
170  };
171 
172  // A class which keeps track of connections to our input variables and the
173  // values we receive for them.
174  class Connections
175  {
176  public:
177  // Connects to the publisher endpoints
178  void Connect(
179  const coral::net::Endpoint* endpoints,
180  std::size_t endpointsSize);
181 
182  // Establishes a connection between a remote output variable and one of
183  // our input variables, breaking any existing connections to that input.
184  void Couple(
185  coral::model::Variable remoteOutput,
186  coral::model::VariableID localInput);
187 
188  // Waits until all data has been received for the time step specified
189  // by `stepID` and updates the slave instance with the new values.
190  bool Update(
191  coral::slave::Instance& slaveInstance,
192  coral::model::StepID stepID,
193  std::chrono::milliseconds timeout);
194 
195  private:
196  // Breaks a connection to a local input variable, if any.
197  void Decouple(coral::model::VariableID localInput);
198 
199  // A bidirectional mapping between output variables and input variables.
200  typedef boost::bimap<
201  boost::bimaps::multiset_of<coral::model::Variable, VariableLess>,
203  ConnectionBimap;
204 
205  ConnectionBimap m_connections;
206  coral::bus::VariableSubscriber m_subscriber;
207  };
208 
209  coral::slave::Instance& m_slaveInstance;
210  Timeout m_masterInactivityTimeout;
211  std::chrono::milliseconds m_variableRecvTimeout;
212 
213  coral::net::zmqx::RepSocket m_control;
214  coral::bus::VariablePublisher m_publisher;
215  Connections m_connections;
216  coral::model::SlaveID m_id; // The slave's ID number in the current execution
217 
218  coral::model::StepID m_currentStepID; // ID of ongoing or just completed step
219 };
220 
221 
223 class Shutdown : public std::exception
224 {
225 public:
226  const char* what() const CORAL_NOEXCEPT override { return "Normal shutdown requested by master"; }
227 };
228 
229 
230 }} // namespace
231 #endif // header guard
An object that identifies a variable in a simulation, and which consists of a slave ID and a variable...
Definition: model.hpp:270
Contains the coral::net::Reactor class and related functionality.
Main module header for coral::net.
Exception thrown when the slave receives a TERMINATE command.
Definition: slave_agent.hpp:223
A class which handles subscriptions to and receiving of variable values.
Definition: variable_io.hpp:95
Module header for coral::net::zmqx.
A class which contains the state of the slave and takes care of responding to requests from the maste...
Definition: slave_agent.hpp:42
A protocol/transport independent endpoint address specification.
Definition: net.hpp:34
coral::net::Endpoint BoundDataPubEndpoint() const
The endpoint to which the slave is publishing its output data.
Defines the coral::bus::VariablePublisher and coral::bus::VariableSubscriber classes.
Definition: variable_io.hpp:28
A class which handles publishing of variable values on the network.
Definition: variable_io.hpp:35
STL class.
SlaveAgent(coral::net::Reactor &reactor, coral::slave::Instance &slaveInstance, const coral::net::Endpoint &controlEndpoint, const coral::net::Endpoint &dataPubEndpoint, std::chrono::milliseconds masterInactivityTimeout)
Constructs a new SlaveAgent.
Defines the coral::slave::Instance interface.
coral::net::Endpoint BoundControlEndpoint() const
The endpoint on which the slave is listening for incoming messages from the master.
An implementation of the reactor pattern.
Definition: reactor.hpp:41
A server socket for communication with one or more client nodes in a request-reply pattern...
Definition: zmqx.hpp:224
Main module header for coral::model.
An interface for classes that represent slave instances.
Definition: instance.hpp:42