1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2024-12-15 12:40:35 +00:00

Fix RTCM server

Replace private member name from io_service to io_context, remove socket as private member in Tcp_Server class
Improve messages printed in terminal
This commit is contained in:
Carles Fernandez 2018-06-11 00:05:07 +02:00
parent 5b00ad1dc1
commit bd37a64260
No known key found for this signature in database
GPG Key ID: 4C583C52B0C3877D
2 changed files with 38 additions and 25 deletions

View File

@ -53,7 +53,7 @@ Rtcm::Rtcm(unsigned short port)
reserved_field = std::bitset<6>("000000"); reserved_field = std::bitset<6>("000000");
rtcm_message_queue = std::make_shared<concurrent_queue<std::string> >(); rtcm_message_queue = std::make_shared<concurrent_queue<std::string> >();
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), RTCM_port); boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), RTCM_port);
servers.emplace_back(io_service, endpoint); servers.emplace_back(io_context, endpoint);
server_is_running = false; server_is_running = false;
} }
@ -85,13 +85,13 @@ Rtcm::~Rtcm()
// ***************************************************************************************************** // *****************************************************************************************************
void Rtcm::run_server() void Rtcm::run_server()
{ {
std::cout << "Starting a TCP Server on port " << RTCM_port << std::endl; std::cout << "Starting a TCP/IP server of RTCM messages on port " << RTCM_port << std::endl;
try try
{ {
std::thread tq([&] { std::make_shared<Queue_Reader>(io_service, rtcm_message_queue, RTCM_port)->do_read_queue(); }); std::thread tq([&] { std::make_shared<Queue_Reader>(io_context, rtcm_message_queue, RTCM_port)->do_read_queue(); });
tq.detach(); tq.detach();
std::thread t([&] { io_service.run(); }); std::thread t([&] { io_context.run(); });
server_is_running = true; server_is_running = true;
t.detach(); t.detach();
} }
@ -104,13 +104,13 @@ void Rtcm::run_server()
void Rtcm::stop_service() void Rtcm::stop_service()
{ {
io_service.stop(); io_context.stop();
} }
void Rtcm::stop_server() void Rtcm::stop_server()
{ {
std::cout << "Stopping TCP Server on port " << RTCM_port << std::endl; std::cout << "Stopping TCP/IP server on port " << RTCM_port << std::endl;
rtcm_message_queue->push("Goodbye"); // this terminates tq rtcm_message_queue->push("Goodbye"); // this terminates tq
Rtcm::stop_service(); Rtcm::stop_service();
servers.front().close_server(); servers.front().close_server();

View File

@ -749,21 +749,21 @@ private:
: public std::enable_shared_from_this<Tcp_Internal_Client> : public std::enable_shared_from_this<Tcp_Internal_Client>
{ {
public: public:
Tcp_Internal_Client(boost::asio::io_service& io_service, Tcp_Internal_Client(boost::asio::io_service& io_context,
boost::asio::ip::tcp::resolver::iterator endpoint_iterator) boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
: io_service_(io_service), socket_(io_service) : io_context_(io_context), socket_(io_context)
{ {
do_connect(endpoint_iterator); do_connect(endpoint_iterator);
} }
inline void close() inline void close()
{ {
io_service_.post([this]() { socket_.close(); }); io_context_.post([this]() { socket_.close(); });
} }
inline void write(const Rtcm_Message& msg) inline void write(const Rtcm_Message& msg)
{ {
io_service_.post( io_context_.post(
[this, msg]() { [this, msg]() {
bool write_in_progress = !write_msgs_.empty(); bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg); write_msgs_.push_back(msg);
@ -827,7 +827,7 @@ private:
}); });
} }
boost::asio::io_service& io_service_; boost::asio::io_service& io_context_;
boost::asio::ip::tcp::socket socket_; boost::asio::ip::tcp::socket socket_;
Rtcm_Message read_msg_; Rtcm_Message read_msg_;
std::deque<Rtcm_Message> write_msgs_; std::deque<Rtcm_Message> write_msgs_;
@ -837,13 +837,13 @@ private:
class Queue_Reader class Queue_Reader
{ {
public: public:
Queue_Reader(boost::asio::io_service& io_service, std::shared_ptr<concurrent_queue<std::string> >& queue, int port) : queue_(queue) Queue_Reader(boost::asio::io_service& io_context, std::shared_ptr<concurrent_queue<std::string> >& queue, int port) : queue_(queue)
{ {
boost::asio::ip::tcp::resolver resolver(io_service); boost::asio::ip::tcp::resolver resolver(io_context);
std::string host("localhost"); std::string host("localhost");
std::string port_str = std::to_string(port); std::string port_str = std::to_string(port);
auto queue_endpoint_iterator = resolver.resolve({host.c_str(), port_str.c_str()}); auto queue_endpoint_iterator = resolver.resolve({host.c_str(), port_str.c_str()});
c = std::make_shared<Tcp_Internal_Client>(io_service, queue_endpoint_iterator); c = std::make_shared<Tcp_Internal_Client>(io_context, queue_endpoint_iterator);
} }
inline void do_read_queue() inline void do_read_queue()
@ -871,8 +871,8 @@ private:
class Tcp_Server class Tcp_Server
{ {
public: public:
Tcp_Server(boost::asio::io_service& io_service, const boost::asio::ip::tcp::endpoint& endpoint) Tcp_Server(boost::asio::io_service& io_context, const boost::asio::ip::tcp::endpoint& endpoint)
: acceptor_(io_service), socket_(io_service) : acceptor_(io_context)
{ {
acceptor_.open(endpoint.protocol()); acceptor_.open(endpoint.protocol());
acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
@ -883,44 +883,57 @@ private:
inline void close_server() inline void close_server()
{ {
socket_.close();
acceptor_.close(); acceptor_.close();
} }
private: private:
inline void do_accept() inline void do_accept()
{ {
acceptor_.async_accept(socket_, [this](boost::system::error_code ec) { acceptor_.async_accept([this](boost::system::error_code ec, boost::asio::ip::tcp::socket socket_) {
if (!ec) if (!ec)
{ {
if (first_client) if (first_client)
{ {
std::cout << "The TCP Server is up and running. Accepting connections ..." << std::endl; std::cout << "The TCP/IP server of RTCM messages is up and running. Accepting connections ..." << std::endl;
first_client = false; first_client = false;
} }
else else
{ {
std::cout << "Starting RTCM TCP server session..." << std::endl; std::cout << "Starting RTCM TCP/IP server session..." << std::endl;
std::cout << "Serving client from " << socket_.remote_endpoint().address() << std::endl; boost::system::error_code ec2;
LOG(INFO) << "Serving client from " << socket_.remote_endpoint().address(); boost::asio::ip::tcp::endpoint endpoint = socket_.remote_endpoint(ec2);
if (ec2)
{
// Error creating remote_endpoint
std::cout << "Error getting remote IP address, closing session." << std::endl;
LOG(INFO) << "Error getting remote IP address";
start_session = false;
}
else
{
std::string remote_addr = endpoint.address().to_string();
std::cout << "Serving client from " << remote_addr << std::endl;
LOG(INFO) << "Serving client from " << remote_addr;
}
} }
std::make_shared<Rtcm_Session>(std::move(socket_), room_)->start(); if (start_session) std::make_shared<Rtcm_Session>(std::move(socket_), room_)->start();
} }
else else
{ {
std::cout << "Error when invoking a RTCM session. " << ec << std::endl; std::cout << "Error when invoking a RTCM session. " << ec << std::endl;
} }
start_session = true;
do_accept(); do_accept();
}); });
} }
boost::asio::ip::tcp::acceptor acceptor_; boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::ip::tcp::socket socket_;
Rtcm_Listener_Room room_; Rtcm_Listener_Room room_;
bool first_client = true; bool first_client = true;
bool start_session = true;
}; };
boost::asio::io_service io_service; boost::asio::io_service io_context;
std::shared_ptr<concurrent_queue<std::string> > rtcm_message_queue; std::shared_ptr<concurrent_queue<std::string> > rtcm_message_queue;
std::thread t; std::thread t;
std::thread tq; std::thread tq;