7 #include "Metaserver.h" 9 #include "ServerInfo.h" 11 #include "EventService.h" 12 #include "Exceptions.h" 14 #include <Atlas/Objects/Operation.h> 15 #include <Atlas/Objects/RootEntity.h> 16 #include <sigc++/slot.h> 29 #define snprintf _snprintf 35 using Atlas::Objects::smart_dynamic_cast;
36 using Atlas::Objects::Root;
37 using Atlas::Objects::Entity::RootEntity;
41 char* pack_uint32(uint32_t data,
char* buffer,
unsigned int& size);
43 char* unpack_uint32(uint32_t& dest,
char* buffer);
45 const char* META_SERVER_PORT =
"8453";
48 const uint32_t CKEEP_ALIVE = 2,
57 const uint32_t LIST_RESP2 = 999;
63 ObjectsDecoder(factories), m_meta(meta) {
66 void objectArrived(Root obj)
override {
67 m_meta.objectArrived(std::move(obj));
71 Meta::Meta(boost::asio::io_service& io_service,
73 std::string metaServer,
74 unsigned int maxQueries) :
75 m_factories(new
Atlas::Objects::Factories()),
76 m_io_service(io_service),
77 m_event_service(eventService),
80 m_metaHost(
std::move(metaServer)),
81 m_maxActiveQueries(maxQueries),
83 m_resolver(io_service),
85 m_metaTimer(io_service),
86 m_receive_stream(&m_receive_buffer),
87 m_send_buffer(new boost::asio::streambuf()),
88 m_send_stream(m_send_buffer.get()),
95 unsigned int max_half_open = FD_SETSIZE;
96 if (m_maxActiveQueries > (max_half_open - 2)) {
97 m_maxActiveQueries = max_half_open - 2;
125 error() <<
"called queryServerByIndex with invalid server list";
129 if (index >= m_gameServers.size()) {
130 error() <<
"called queryServerByIndex with bad server index " << index;
134 if (m_gameServers[index].status == ServerInfo::QUERYING) {
135 warning() <<
"called queryServerByIndex on server already being queried";
139 internalQuery(index);
143 if (!m_activeQueries.empty()) {
144 warning() <<
"called meta::refresh() while doing another query, ignoring";
148 if (m_status ==
VALID) {
150 m_lastValidList = m_gameServers;
153 m_gameServers.clear();
160 m_activeQueries.clear();
165 if (!m_lastValidList.empty()) {
166 m_gameServers = m_lastValidList;
170 m_gameServers.clear();
172 m_nextQuery = m_gameServers.size();
176 if (index >= m_gameServers.size()) {
177 error() <<
"passed out-of-range index " << index <<
178 " to getInfoForServer";
179 throw BaseException(
"Out of bounds exception when getting server info.");
181 return m_gameServers[index];
186 return m_gameServers.size();
190 boost::asio::ip::udp::resolver::query query(m_metaHost, META_SERVER_PORT);
191 m_resolver.async_resolve(query,
192 [&](
const boost::system::error_code& ec, boost::asio::ip::udp::resolver::iterator iterator) {
193 if (!ec && iterator != boost::asio::ip::udp::resolver::iterator()) {
201 void Meta::connect(
const boost::asio::ip::udp::endpoint& endpoint) {
202 m_socket.open(boost::asio::ip::udp::v4());
203 m_socket.async_connect(endpoint, [&](boost::system::error_code ec) {
208 unsigned int dsz = 0;
209 pack_uint32(CKEEP_ALIVE, m_data.data(), dsz);
210 this->m_send_stream << std::string(m_data.data(), dsz) << std::flush;
212 this->setupRecvCmd();
215 this->startTimeout();
217 this->doFailure(
"Couldn't open connection to metaserver " + this->m_metaHost);
223 if (m_socket.is_open()) {
226 m_metaTimer.cancel();
229 void Meta::startTimeout() {
230 m_metaTimer.cancel();
231 m_metaTimer.expires_from_now(std::chrono::seconds(8));
232 m_metaTimer.async_wait([&](boost::system::error_code ec) {
240 void Meta::do_read() {
241 if (m_socket.is_open()) {
242 m_socket.async_receive(m_receive_buffer.prepare(DATA_BUFFER_SIZE),
243 [
this](boost::system::error_code ec, std::size_t length) {
245 m_receive_buffer.commit(length);
252 if (ec != boost::asio::error::operation_aborted) {
253 this->doFailure(std::string(
"Connection to the meta-server failed: ") + ec.message());
261 if (m_socket.is_open()) {
262 if (m_send_buffer->size() != 0) {
263 std::shared_ptr<boost::asio::streambuf> send_buffer(std::move(m_send_buffer));
264 m_send_buffer = std::make_unique<boost::asio::streambuf>();
265 m_send_stream.rdbuf(m_send_buffer.get());
266 m_socket.async_send(send_buffer->data(),
267 [&, send_buffer](boost::system::error_code ec, std::size_t length) {
269 send_buffer->consume(length);
271 if (ec != boost::asio::error::operation_aborted) {
272 this->doFailure(std::string(
"Connection to the meta-server failed: ") + ec.message());
280 void Meta::gotData() {
284 void Meta::deleteQuery(
MetaQuery* query) {
285 auto I = std::find_if(m_activeQueries.begin(), m_activeQueries.end(), [&](
const std::unique_ptr<MetaQuery>& entry){
return entry.get() == query;});
287 if (I != m_activeQueries.end()) {
288 auto containedQuery = I->release();
289 m_activeQueries.erase(I);
293 delete containedQuery;
296 if (m_activeQueries.empty() && m_nextQuery == m_gameServers.size()) {
302 error() <<
"Tried to delete meta server query which wasn't " 303 "among the active queries. This indicates an error " 304 "with the flow in Metaserver.";
309 if (m_bytesToRecv == 0) {
310 error() <<
"No bytes to receive when calling recv().";
314 m_receive_stream.peek();
315 std::streambuf* iobuf = m_receive_stream.rdbuf();
316 std::streamsize len = std::min(m_bytesToRecv, iobuf->in_avail());
318 iobuf->sgetn(m_dataPtr, len);
319 m_bytesToRecv -= len;
328 if (m_bytesToRecv > 0) {
329 error() <<
"Fragment data received by Meta::recv";
335 unpack_uint32(op, m_data.data());
342 if (m_bytesToRecv && m_receive_stream.rdbuf()->in_avail())
346 void Meta::recvCmd(uint32_t op) {
349 setupRecvData(1, HANDSHAKE);
353 doFailure(
"Got list range error from Metaserver");
357 setupRecvData(2, LIST_RESP);
361 doFailure(
"Unknown Meta server command");
366 void Meta::processCmd() {
368 error() <<
"Command received when not expecting any. It will be ignored. The command was: " << m_gotCmd;
375 unpack_uint32(stamp, m_data.data());
377 unsigned int dsz = 0;
378 m_dataPtr = pack_uint32(CLIENTSHAKE, m_data.data(), dsz);
379 pack_uint32(stamp, m_dataPtr, dsz);
381 m_send_stream << std::string(m_data.data(), dsz) << std::flush;
384 m_metaTimer.cancel();
392 uint32_t total_servers;
393 m_dataPtr = unpack_uint32(total_servers, m_data.data());
394 if (!m_gameServers.empty()) {
395 if (total_servers != m_totalServers) {
396 warning() <<
"Server total in new packet has changed. " << total_servers <<
":" << m_totalServers;
399 m_totalServers = total_servers;
401 unpack_uint32(m_packed, m_dataPtr);
408 setupRecvData(m_packed, LIST_RESP2);
411 if (m_gameServers.empty()) {
413 assert(m_nextQuery == 0);
414 m_gameServers.reserve(m_totalServers);
420 m_dataPtr = m_data.data();
423 m_dataPtr = unpack_uint32(ip, m_dataPtr);
426 snprintf(buf, 32,
"%u.%u.%u.%u",
428 (ip & 0x0000FF00) >> 8u,
429 (ip & 0x00FF0000) >> 16u,
430 (ip & 0xFF000000) >> 24u
437 if (m_gameServers.size() < m_totalServers) {
439 listReq((
unsigned int) m_gameServers.size());
453 std::stringstream ss;
454 ss <<
"Unknown Meta server command: " << m_gotCmd;
460 void Meta::listReq(
unsigned int base) {
461 unsigned int dsz = 0;
462 char* _dataPtr = pack_uint32(LIST_REQ, m_data.data(), dsz);
463 pack_uint32(base, _dataPtr, dsz);
465 m_send_stream << std::string(m_data.data(), dsz) << std::flush;
472 void Meta::setupRecvCmd() {
474 m_bytesToRecv =
sizeof(uint32_t);
475 m_dataPtr = m_data.data();
478 void Meta::setupRecvData(
int words, uint32_t got) {
480 m_bytesToRecv = words *
sizeof(uint32_t);
481 m_dataPtr = m_data.data();
488 char* pack_uint32(uint32_t data,
char* buffer,
unsigned int& size) {
491 netorder = htonl(data);
492 memcpy(buffer, &netorder,
sizeof(uint32_t));
493 size +=
sizeof(uint32_t);
494 return buffer +
sizeof(uint32_t);
499 char* unpack_uint32(uint32_t& dest,
char* buffer) {
502 memcpy(&netorder, buffer,
sizeof(uint32_t));
503 dest = ntohl(netorder);
504 return buffer +
sizeof(uint32_t);
507 void Meta::internalQuery(
size_t index) {
508 assert(index < m_gameServers.size());
511 auto q = std::make_unique<MetaQuery>(m_io_service, *m_decoder, *
this, sv.host, index);
515 sv.status = ServerInfo::INVALID;
517 m_activeQueries.emplace_back(std::move(q));
518 sv.status = ServerInfo::QUERYING;
522 void Meta::objectArrived(Root obj) {
523 Info info = smart_dynamic_cast<Info>(obj);
524 if (!info.isValid()) {
525 error() <<
"Meta::objectArrived, failed to convert object to INFO op";
530 auto refno = info->getRefno();
531 QuerySet::iterator Q;
533 for (Q = m_activeQueries.begin(); Q != m_activeQueries.end(); ++Q)
534 if ((*Q)->getQueryNo() == refno)
break;
536 if (Q == m_activeQueries.end()) {
537 error() <<
"Couldn't locate query for meta-query reply";
541 RootEntity svr = smart_dynamic_cast<RootEntity>(info->getArgs().front());
542 if (!svr.isValid()) {
543 error() <<
"Query INFO argument object is broken";
545 if ((*Q)->getServerIndex() >= m_gameServers.size()) {
546 error() <<
"Got server info with out of bounds index.";
548 ServerInfo& sv = m_gameServers[(*Q)->getServerIndex()];
551 sv.
ping = (int) (*Q)->getElapsed();
557 deleteQuery(Q->get());
562 void Meta::doFailure(
const std::string& msg) {
567 void Meta::dispatch() {
571 void Meta::metaTimeout() {
573 m_metaTimer.cancel();
576 doFailure(
"Connection to the meta-server timed out");
579 void Meta::queryFailure(
MetaQuery* q,
const std::string& msg) {
583 m_gameServers[q->getServerIndex()].status = ServerInfo::INVALID;
590 while ((m_activeQueries.size() < m_maxActiveQueries) && (m_nextQuery < m_gameServers.size())) {
591 internalQuery(m_nextQuery++);
stream / socket connection in progress
Handles polling of the IO system as well as making sure that registered handlers are run on the main ...
Atlas negotiation in progress.
void processServer(const Atlas::Objects::Entity::RootEntity &svr)
void runOnMainThread(const std::function< void()> &handler, std::shared_ptr< bool > activeMarker=std::make_shared< bool >(true))
Adds a handler which will be run on the main thread.