5 #include "Connection.h" 9 #include "Exceptions.h" 11 #include "Redispatch.h" 13 #include "EventService.h" 14 #include "TypeService.h" 16 #include <Atlas/Objects/Encoder.h> 17 #include <Atlas/Objects/Operation.h> 18 #include <Atlas/Objects/Entity.h> 19 #include <sigc++/bind.h> 21 #include <Atlas/Codecs/Bach.h> 29 using Atlas::Objects::Root;
30 using Atlas::Objects::Entity::RootEntity;
31 using Atlas::Objects::smart_dynamic_cast;
40 ObjectsDecoder(factories), m_connection(connection) {
43 void objectArrived(Root obj)
override {
44 m_connection.objectArrived(std::move(obj));
48 Connection::Connection(boost::asio::io_service& io_service,
50 std::string clientName,
51 const std::string& host,
55 _eventService(eventService),
59 m_defaultRouter(nullptr),
68 std::string clientName,
72 _eventService(eventService),
75 _localSocket(
std::move(socket)),
77 m_defaultRouter(nullptr),
85 Connection::~Connection() {
97 if (!_localSocket.empty()) {
107 warning() <<
"duplicate disconnect on Connection that's already disconnecting";
112 warning() <<
"called disconnect on already disconnected Connection";
124 _socket->getEncoder().streamObjectsMessage(Logout());
147 void Connection::dispatch() {
151 RootOperation op = std::move(
m_opDeque.front());
157 m_finishedRedispatches.clear();
162 error() <<
"called send on closed connection";
173 std::stringstream debugStream;
175 Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *
this );
176 Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
177 debugEncoder.streamObjectsMessage(obj);
178 debugStream << std::flush;
180 debug() <<
"sending:" << debugStream.str();
183 _socket->getEncoder().streamObjectsMessage(obj);
187 void Connection::registerRouterForTo(
Router* router,
const std::string& toId) {
188 m_toRouters[toId] = router;
191 void Connection::unregisterRouterForTo(
Router* router,
const std::string& toId) {
192 assert(m_toRouters[toId] == router);
193 m_toRouters.erase(toId);
196 void Connection::registerRouterForFrom(
Router* router,
const std::string& fromId) {
197 m_fromRouters[fromId] = router;
200 void Connection::unregisterRouterForFrom(
const std::string& fromId) {
201 m_fromRouters.erase(fromId);
204 void Connection::setDefaultRouter(
Router* router) {
205 if (m_defaultRouter || !router) {
206 error() <<
"setDefaultRouter duplicate set or null argument";
210 m_defaultRouter = router;
213 void Connection::clearDefaultRouter() {
214 m_defaultRouter =
nullptr;
229 debug() <<
"Connection unlocked in DISCONNECTING, closing socket";
236 warning() <<
"Connection unlocked in spurious state : this may cause a failure later";
248 warning() <<
"called refreshServerInfo while not connected, ignoring";
252 m_info.status = ServerInfo::QUERYING;
258 void Connection::objectArrived(Root obj) {
260 std::stringstream debugStream;
261 Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *
this );
262 Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
263 debugEncoder.streamObjectsMessage(obj);
264 debugStream << std::flush;
266 debug() <<
"received:" << debugStream.str();
268 debug() <<
"received op:" << obj->getParent();
270 RootOperation op = smart_dynamic_cast<RootOperation>(obj);
274 error() <<
"Con::objectArrived got non-op";
278 void Connection::dispatchOp(
const RootOperation& op) {
280 bool anonymous = op->isDefaultTo();
282 Router::RouterResult rr = m_responder->handleOp(op);
283 if (rr == Router::HANDLED) {
288 if (!op->isDefaultFrom()) {
289 auto R = m_fromRouters.find(op->getFrom());
290 if (R != m_fromRouters.end()) {
291 rr = R->second->handleOperation(op);
292 if (rr == Router::HANDLED) {
300 auto R = m_toRouters.find(op->getTo());
301 if (R != m_toRouters.end()) {
302 rr = R->second->handleOperation(op);
303 if (rr == Router::HANDLED) {
306 }
else if (!m_toRouters.empty()) {
307 warning() <<
"received op with TO=" << op->getTo() <<
", but no router is registered for that id";
312 if (op->getClassNo() == INFO_NO && anonymous) {
313 handleServerInfo(op);
318 if (m_defaultRouter) {
319 rr = m_defaultRouter->handleOperation(op);
321 if (rr != Router::HANDLED) {
322 warning() <<
"no-one handled op:" << op;
324 }
catch (
const Atlas::Exception& ae) {
325 error() <<
"caught Atlas exception: '" << ae.getDescription() <<
326 "' while dispatching op:\n" << op;
342 void Connection::handleTimeout(
const std::string& msg) {
346 void Connection::handleServerInfo(
const RootOperation& op) {
347 if (!op->getArgs().empty()) {
348 RootEntity svr = smart_dynamic_cast<RootEntity>(op->getArgs().front());
349 if (!svr.isValid()) {
350 error() <<
"server INFO argument object is broken";
355 GotServerInfo.emit();
361 m_typeService->init();
365 void Connection::onDisconnectTimeout() {
366 handleTimeout(
"timed out waiting for disconnection");
371 RootOperation op = smart_dynamic_cast<RootOperation>(obj);
372 assert(op.isValid());
376 std::stringstream debugStream;
377 Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *
this );
378 Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
379 debugEncoder.streamObjectsMessage(obj);
380 debugStream << std::flush;
382 debug() <<
"posted for re-dispatch:" << debugStream.str();
386 void Connection::cleanupRedispatch(
Redispatch* r) {
387 m_finishedRedispatches.push_back(std::unique_ptr<Redispatch>(r));
391 static std::int64_t _nextSerial = 1001;
395 return _nextSerial++;
void onConnect() override
derived-class notification when connection and negotiation is completed
void setStatus(Status sc) override
Status
possible states for the connection
virtual int connectLocal(const std::string &socket)
void handleFailure(const std::string &msg) override
Process failures (to track when reconnection should be permitted)
void hardDisconnect(bool emit)
int disconnect()
Initiate disconnection from the server.
void postForDispatch(const Atlas::Objects::Root &obj)
virtual int connectRemote(const std::string &host, short port)
connection fully established
OpDeque m_opDeque
store of all the received ops waiting to be dispatched
clean disconnection in progress
sigc::signal< bool > Disconnecting
Connection(boost::asio::io_service &io_service, EventService &eventService, std::string clientName, const std::string &host, short port)
Create an unconnected instance.
Handles polling of the IO system as well as making sure that registered handlers are run on the main ...
virtual void send(const Atlas::Objects::Root &obj)
Transmit an Atlas::Objects instance to the server.
std::int64_t getNewSerialno()
operation serial number sequencing
sigc::signal< void, Status > StatusChanged
indicates a status change on the connection
const short _port
port of the server
Status _status
current status of the connection
sigc::signal< void, const std::string & > Failure
void processServer(const Atlas::Objects::Entity::RootEntity &svr)
virtual void onConnect()
derived-class notification when connection and negotiation is completed
Underlying Atlas connection, providing a send interface, and receive (dispatch) system.
void getServerInfo(ServerInfo &) const