1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2024-06-24 22:13:15 +00:00

Redesign of the TCP server

Now the TCP server serves RTCM messages to multiple clients
concurrently, and without loosing messages.
This commit is contained in:
Carles Fernandez 2015-12-22 18:45:07 +01:00
parent 651877af45
commit e1c6137597
3 changed files with 357 additions and 75 deletions

View File

@ -30,6 +30,7 @@
#include "rtcm.h"
#include <algorithm> // for std::reverse
#include <chrono> // std::chrono::seconds
#include <cmath> // for std::fmod
#include <cstdlib> // for strtol
#include <sstream> // for std::stringstream
@ -47,6 +48,7 @@ DEFINE_int32(RTCM_Port, 2101 , "TCP port of the RTCM message server");
// https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xml
DEFINE_string(Remote_RTCM_Server, "localhost", "Remote RTCM server address");
DEFINE_int32(Remote_RTCM_Port, 2101 , "Remote TCP port of the RTCM message server");
Rtcm::Rtcm()
@ -57,7 +59,7 @@ Rtcm::Rtcm()
rtcm_message_queue = std::make_shared< concurrent_queue<std::string> >();
// for each server, do:
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), FLAGS_RTCM_Port);
servers.emplace_back(io_service, endpoint, rtcm_message_queue);
servers.emplace_back(io_service, endpoint);
server_is_running = false;
}
@ -69,7 +71,7 @@ Rtcm::~Rtcm()
// *****************************************************************************************************
//
// TCP Server / Client functions
// TCP Server / Client helper classes
//
// *****************************************************************************************************
void Rtcm::run_server()
@ -77,6 +79,9 @@ void Rtcm::run_server()
std::cout << "Starting a TCP Server on port " << FLAGS_RTCM_Port << std::endl;
try
{
std::thread tq([&]{ std::make_shared<Queue_Reader>(io_service, rtcm_message_queue, FLAGS_RTCM_Port)->do_read_queue(); });
tq.detach();
std::thread t([&]{ io_service.run(); });
server_is_running = true;
t.detach();
@ -97,6 +102,8 @@ void Rtcm::stop_service()
void Rtcm::stop_server()
{
std::cout << "Stopping TCP Server on port " << FLAGS_RTCM_Port << std::endl;
rtcm_message_queue->push("Goodbye"); // this kills tq
std::this_thread::sleep_for(std::chrono::seconds(1));
Rtcm::stop_service();
server_is_running = false;
}
@ -104,9 +111,9 @@ void Rtcm::stop_server()
void Rtcm::run_client()
{
std::cout << "Starting a TCP Client on port " << FLAGS_RTCM_Port << std::endl;
std::cout << "Starting a TCP Client on port " << FLAGS_Remote_RTCM_Port << std::endl;
std::string remote_host = FLAGS_Remote_RTCM_Server;
std::string remote_port = std::to_string(FLAGS_RTCM_Port);
std::string remote_port = std::to_string(FLAGS_Remote_RTCM_Port);
boost::asio::ip::tcp::resolver resolver(io_service);
auto endpoint_iterator = resolver.resolve({ remote_host.c_str(), remote_port.c_str() });
@ -125,7 +132,9 @@ void Rtcm::run_client()
void Rtcm::stop_client()
{
std::cout << "Stopping TCP Client on port " << FLAGS_RTCM_Port << std::endl;
std::cout << "Stopping TCP Client on port " << FLAGS_Remote_RTCM_Port << std::endl;
clients.front().close();
std::this_thread::sleep_for(std::chrono::seconds(1));
Rtcm::stop_service();
}
@ -136,6 +145,12 @@ void Rtcm::send_message(const std::string & msg)
}
bool Rtcm::is_server_running()
{
return server_is_running;
}
// *****************************************************************************************************
//
// TRANSPORT LAYER AS DEFINED AT RTCM STANDARD 10403.2

View File

@ -250,12 +250,12 @@ public:
bool divergence_free,
bool more_messages);
unsigned int lock_time(const Gps_Ephemeris & eph, double obs_time, const Gnss_Synchro & gnss_synchro); //<! Returns the time period in which GPS L1 signals have been continually tracked.
unsigned int lock_time(const Gps_Ephemeris & eph, double obs_time, const Gnss_Synchro & gnss_synchro); //<! Returns the time period in which GPS L1 signals have been continually tracked.
unsigned int lock_time(const Gps_CNAV_Ephemeris & eph, double obs_time, const Gnss_Synchro & gnss_synchro); //<! Returns the time period in which GPS L2 signals have been continually tracked.
unsigned int lock_time(const Galileo_Ephemeris & eph, double obs_time, const Gnss_Synchro & gnss_synchro); //<! Returns the time period in which Galileo signals have been continually tracked.
unsigned int lock_time(const Galileo_Ephemeris & eph, double obs_time, const Gnss_Synchro & gnss_synchro); //<! Returns the time period in which Galileo signals have been continually tracked.
std::string bin_to_hex(const std::string& s); //<! Returns a string of hexadecimal symbols from a string of binary symbols
std::string hex_to_bin(const std::string& s); //<! Returns a string of binary symbols from a string of hexadecimal symbols
std::string bin_to_hex(const std::string& s); //<! Returns a string of hexadecimal symbols from a string of binary symbols
std::string hex_to_bin(const std::string& s); //<! Returns a string of binary symbols from a string of hexadecimal symbols
unsigned long int bin_to_uint(const std::string& s); //<! Returns an unsigned long int from a string of binary symbols
long int bin_to_int(const std::string& s); //<! Returns a long int from a string of binary symbols
@ -268,13 +268,14 @@ public:
bool check_CRC(const std::string & message); //<! Checks that the CRC of a RTCM package is correct
void run_server();
void stop_server();
void run_server(); //<! Starts running the server
void stop_server(); //<! Stops the server
void run_client();
void stop_client();
void run_client(); //<! Starts running the client
void stop_client(); //<! Stops the client
void send_message(const std::string & message);
void send_message(const std::string & message); //<! Sends a message through the server
bool is_server_running(); //<! Returns true if the server is running, false otherwise
private:
//
@ -342,54 +343,147 @@ private:
//
// Classes for TCP communication
//
class Message_buffer
class Rtcm_Message
{
public:
// Construct from a std::string.
explicit Message_buffer(const std::string& data)
: data_(new std::vector<char>(data.begin(), data.end())),
buffer_(boost::asio::buffer(*data_))
enum { header_length = 6 };
enum { max_body_length = 1029 };
Rtcm_Message()
: body_length_(0)
{ }
const char* data() const
{
return data_;
}
// Implement the ConstBufferSequence requirements.
typedef boost::asio::const_buffer value_type;
typedef const boost::asio::const_buffer* const_iterator;
const boost::asio::const_buffer* begin() const { return &buffer_; }
const boost::asio::const_buffer* end() const { return &buffer_ + 1; }
char* data()
{
return data_;
}
std::size_t length() const
{
return header_length + body_length_;
}
const char* body() const
{
return data_ + header_length;
}
char* body()
{
return data_ + header_length;
}
std::size_t body_length() const
{
return body_length_;
}
void body_length(std::size_t new_length)
{
body_length_ = new_length;
if (body_length_ > max_body_length)
body_length_ = max_body_length;
}
bool decode_header()
{
char header[header_length + 1] = "";
std::strncat(header, data_, header_length);
if(header[0] != 'G' || header[1] != 'S')
{
return false;
}
char header2_[header_length - 1] = "";
std::strncat(header2_, data_ + 2 , header_length - 2);
body_length_ = std::atoi(header2_);
if(body_length_ == 0)
{
return false;
}
if (body_length_ > max_body_length)
{
body_length_ = 0;
return false;
}
return true;
}
void encode_header()
{
char header[header_length + 1] = "";
std::sprintf(header, "GS%4d", static_cast<int>(body_length_));
std::memcpy(data_, header, header_length);
}
private:
std::shared_ptr<std::vector<char> > data_;
boost::asio::const_buffer buffer_;
char data_[header_length + max_body_length];
std::size_t body_length_;
};
class Rtcm_listener
class Rtcm_Listener
{
public:
virtual ~Rtcm_listener() {}
virtual void deliver(const Message_buffer & msg) = 0;
virtual ~Rtcm_Listener() {}
virtual void deliver(const Rtcm_Message & msg) = 0;
};
class Rtcm_session
: public Rtcm_listener,
public std::enable_shared_from_this<Rtcm_session>
class Rtcm_Listener_Room
{
public:
Rtcm_session(boost::asio::ip::tcp::socket socket, std::shared_ptr< concurrent_queue<std::string> > & queue) : socket_(std::move(socket)) , queue_(queue) { }
void join(std::shared_ptr<Rtcm_Listener> participant)
{
participants_.insert(participant);
for (auto msg: recent_msgs_)
participant->deliver(msg);
}
void leave(std::shared_ptr<Rtcm_Listener> participant)
{
participants_.erase(participant);
}
void deliver(const Rtcm_Message & msg)
{
recent_msgs_.push_back(msg);
while (recent_msgs_.size() > max_recent_msgs)
recent_msgs_.pop_front();
for (auto participant: participants_)
participant->deliver(msg);
}
private:
std::set<std::shared_ptr<Rtcm_Listener> > participants_;
enum { max_recent_msgs = 1 };
std::deque<Rtcm_Message> recent_msgs_;
};
class Rtcm_Session
: public Rtcm_Listener,
public std::enable_shared_from_this<Rtcm_Session>
{
public:
Rtcm_Session(boost::asio::ip::tcp::socket socket, Rtcm_Listener_Room & room) : socket_(std::move(socket)), room_(room) { }
void start()
{
std::cout << "Starting a session: " << std::endl;
listeners_.insert(shared_from_this());
do_send_messages();
room_.join(shared_from_this());
do_read_message_header();
}
void deliver(const Message_buffer & msg)
void deliver(const Rtcm_Message & msg)
{
bool write_in_progress = !write_msgs_.empty();
std::cout << "Pushing a message " << std::endl;
write_msgs_.push_back(msg);
if (!write_in_progress)
{
@ -398,26 +492,57 @@ private:
}
private:
void do_send_messages()
void do_read_message_header()
{
std::string message;
queue_->wait_and_pop(message);
Message_buffer buffer_out(message);
for (auto listener: listeners_)
{
listener->deliver(buffer_out);
}
do_send_messages();
auto self(shared_from_this());
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.data(), Rtcm_Message::header_length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec && read_msg_.decode_header())
{
do_read_message_body();
}
else
{
std::cout << "Closing connection with client from " << socket_.remote_endpoint().address() << std::endl;
room_.leave(shared_from_this());
}
});
}
void do_read_message_body()
{
auto self(shared_from_this());
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
room_.deliver(read_msg_);
//std::cout << "Delivered message (session): ";
//std::cout.write(read_msg_.body(), read_msg_.body_length());
//std::cout << std::endl;
do_read_message_header();
}
else
{
std::cout << "Closing connection with client from " << socket_.remote_endpoint().address() << std::endl;
room_.leave(shared_from_this());
}
});
}
void do_write()
{
auto self(shared_from_this());
boost::asio::async_write(socket_, write_msgs_.front(), [this, self](boost::system::error_code ec, std::size_t /*length*/)
{
boost::asio::async_write(socket_,
boost::asio::buffer(write_msgs_.front().body(),
write_msgs_.front().body_length()), [this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if(!ec)
{
std::cout << "RTCM message sent." << std::endl;
write_msgs_.pop_front();
if(!write_msgs_.empty())
{
@ -426,26 +551,151 @@ private:
}
else
{
std::cout << "Erasing listener" << std::endl;
listeners_.erase(shared_from_this());
std::cout << "Closing connection with client from " << socket_.remote_endpoint().address() << std::endl;
room_.leave(shared_from_this());
}
});
}
boost::asio::ip::tcp::socket socket_;
Rtcm_Listener_Room & room_;
Rtcm_Message read_msg_;
std::deque<Rtcm_Message> write_msgs_;
};
class Tcp_Internal_Client
: public std::enable_shared_from_this<Tcp_Internal_Client>
{
public:
Tcp_Internal_Client(boost::asio::io_service& io_service,
boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
: io_service_(io_service), socket_(io_service)
{
do_connect(endpoint_iterator);
}
void close()
{
io_service_.post([this]() { socket_.close(); });
}
void write(const Rtcm_Message & msg)
{
io_service_.post(
[this, msg]()
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if (!write_in_progress)
{
do_write();
}
});
}
private:
void do_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
{
boost::asio::async_connect(socket_, endpoint_iterator,
[this](boost::system::error_code ec, boost::asio::ip::tcp::resolver::iterator)
{
if (!ec)
{
do_read_message();
}
else
{
std::cout << "Server is down." << std::endl;
}
});
}
void do_read_message()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.data(), 1029),
[this](boost::system::error_code ec, std::size_t length)
{
if (!ec )
{
do_read_message();
}
else
{
std::cout << "Error in client" << std::endl;
socket_.close();
}
});
}
void do_write()
{
boost::asio::async_write(socket_,
boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
write_msgs_.pop_front();
if (!write_msgs_.empty())
{
do_write();
}
}
else
{
socket_.close();
}
});
}
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::socket socket_;
std::shared_ptr< concurrent_queue<std::string> > & queue_;
std::string read_msg_;
std::deque<Message_buffer> write_msgs_;
std::set< std::shared_ptr<Rtcm_listener> > listeners_;
Rtcm_Message read_msg_;
std::deque<Rtcm_Message> write_msgs_;
};
class Tcp_server
class Queue_Reader
{
public:
Tcp_server(boost::asio::io_service& io_service, const boost::asio::ip::tcp::endpoint& endpoint, std::shared_ptr< concurrent_queue<std::string> > & queue)
: acceptor_(io_service), queue_(queue),
socket_(io_service)
Queue_Reader(boost::asio::io_service& io_service, std::shared_ptr< concurrent_queue<std::string> > & queue, int port) : queue_(queue)
{
boost::asio::ip::tcp::resolver resolver(io_service);
std::string host("localhost");
std::string port_str = std::to_string(port);
auto queue_endpoint_iterator = resolver.resolve({ host.c_str(), port_str.c_str() });
c = std::make_shared<Tcp_Internal_Client>(io_service, queue_endpoint_iterator);
}
void do_read_queue()
{
for(;;)
{
std::string message;
Rtcm_Message msg;
queue_->wait_and_pop(message);
if(message.compare("Goodbye") == 0) break;
const char *char_msg = message.c_str();
msg.body_length(message.length());
std::memcpy(msg.body(), char_msg, msg.body_length());
msg.encode_header();
c->write(msg);
}
}
private:
std::shared_ptr<Tcp_Internal_Client> c;
std::shared_ptr< concurrent_queue<std::string> > & queue_;
};
class Tcp_Server
{
public:
Tcp_Server(boost::asio::io_service& io_service, const boost::asio::ip::tcp::endpoint& endpoint)
: io_service_(io_service), acceptor_(io_service), socket_(io_service)
{
acceptor_.open(endpoint.protocol());
acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
@ -454,40 +704,53 @@ private:
do_accept();
}
private:
void close_server()
{
socket_.close();
acceptor_.close();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_, [this](boost::system::error_code ec)
{
if (!ec)
{
std::cout << "Starting RTCM TCP server session..." << std::endl;
std::thread session([&]{ std::make_shared<Rtcm_session>(std::move(socket_), queue_)->start(); });
session.detach();
std::cout << "Accepting new connections " << std::endl;
if(first_client)
{
std::cout << "The TCP Server is up and running. Accepting connections ..." << std::endl;
first_client = false;
}
else
{
std::cout << "Starting RTCM TCP server session..." << std::endl;
std::cout << "Serving client from " << socket_.remote_endpoint().address() << std::endl;
}
std::make_shared<Rtcm_Session>(std::move(socket_), room_)->start();
}
else
{
std::cout << "Error starting a new session: " << ec << std::endl;
std::cout << "Error when invoking a RTCM session. " << ec << std::endl;
}
do_accept();
});
}
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
std::shared_ptr< concurrent_queue<std::string> > & queue_;
boost::asio::ip::tcp::socket socket_;
Rtcm_Listener_Room room_;
bool first_client = true;
};
class Tcp_client
class Tcp_Client
{
public:
Tcp_client(boost::asio::io_service& io_service,
Tcp_Client(boost::asio::io_service& io_service,
boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
: io_service_(io_service),
socket_(io_service)
: io_service_(io_service), socket_(io_service)
{
do_connect(endpoint_iterator);
}
@ -521,7 +784,7 @@ private:
// Waiting for data and reading forever, until connection is closed.
for(;;)
{
std::array<char, 10> buf;
std::array<char, 1029> buf;
boost::system::error_code error;
size_t len = socket_.read_some(boost::asio::buffer(buf), error);
@ -531,7 +794,7 @@ private:
else if(error)
{
std::cout << "Error: " << error << std::endl;
socket_.close();
//socket_.close();
break;
}
std::cout << "Received message: ";
@ -548,11 +811,13 @@ private:
boost::asio::ip::tcp::socket socket_;
};
boost::asio::io_service io_service;
std::shared_ptr< concurrent_queue<std::string> > rtcm_message_queue;
std::thread t;
std::list<Rtcm::Tcp_server> servers;
std::list<Rtcm::Tcp_client> clients;
std::thread tq;
std::list<Rtcm::Tcp_Server> servers;
std::list<Rtcm::Tcp_Client> clients;
bool server_is_running;
void stop_service();

View File

@ -553,6 +553,8 @@ TEST(Rtcm_Test, InstantiateClient)
std::string test3_bin = rtcm->hex_to_bin(test3);
EXPECT_EQ(0, test3_bin.compare("11111111"));
rtcm->stop_client();
std::string test3_bin2 = rtcm->hex_to_bin(test3);
EXPECT_EQ(0, test3_bin2.compare("11111111"));
}