diff --git a/src/core/system_parameters/rtcm.cc b/src/core/system_parameters/rtcm.cc index f9adf58d8..fd10254c7 100644 --- a/src/core/system_parameters/rtcm.cc +++ b/src/core/system_parameters/rtcm.cc @@ -33,6 +33,7 @@ #include // for std::fmod #include // for strtol #include // for std::stringstream +#include #include // for to_upper_copy #include #include @@ -41,7 +42,11 @@ using google::LogMessage; DEFINE_int32(RTCM_Ref_Station_ID, 1234, "Reference Station ID in RTCM messages"); +DEFINE_int32(RTCM_Port, 2101 , "TCP port of the RTCM message server"); +// 2101 is the standard RTCM port according to the Internet Assigned Numbers Authority (IANA) +// https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xml +DEFINE_string(Remote_RTCM_Server, "localhost", "Remote RTCM server address"); Rtcm::Rtcm() @@ -49,9 +54,86 @@ Rtcm::Rtcm() Rtcm::reset_data_fields(); preamble = std::bitset<8>("11010011"); reserved_field = std::bitset<6>("000000"); + rtcm_message_queue = std::make_shared< concurrent_queue >(); + // for each server, do: + boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), FLAGS_RTCM_Port); + servers.emplace_back(io_service, endpoint, rtcm_message_queue); + server_is_running = false; } +Rtcm::~Rtcm() +{} + + + +// ***************************************************************************************************** +// +// TCP Server / Client functions +// +// ***************************************************************************************************** +void Rtcm::run_server() +{ + std::cout << "Starting a TCP Server on port " << FLAGS_RTCM_Port << std::endl; + try + { + std::thread t([&]{ io_service.run(); }); + server_is_running = true; + t.detach(); + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } +} + + +void Rtcm::stop_service() +{ + io_service.stop(); +} + + +void Rtcm::stop_server() +{ + std::cout << "Stopping TCP Server on port " << FLAGS_RTCM_Port << std::endl; + Rtcm::stop_service(); + server_is_running = false; +} + + +void Rtcm::run_client() +{ + std::cout << "Starting a TCP Client on port " << FLAGS_RTCM_Port << std::endl; + std::string remote_host = FLAGS_Remote_RTCM_Server; + std::string remote_port = std::to_string(FLAGS_RTCM_Port); + boost::asio::ip::tcp::resolver resolver(io_service); + auto endpoint_iterator = resolver.resolve({ remote_host.c_str(), remote_port.c_str() }); + + clients.emplace_back(io_service, endpoint_iterator); + try + { + std::thread t([&](){ io_service.run(); }); + t.detach(); + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } +} + + +void Rtcm::stop_client() +{ + std::cout << "Stopping TCP Client on port " << FLAGS_RTCM_Port << std::endl; + Rtcm::stop_service(); +} + + +void Rtcm::send_message(const std::string & msg) +{ + rtcm_message_queue->push(msg); +} // ***************************************************************************************************** diff --git a/src/core/system_parameters/rtcm.h b/src/core/system_parameters/rtcm.h index 98eeb0122..b87049a5e 100644 --- a/src/core/system_parameters/rtcm.h +++ b/src/core/system_parameters/rtcm.h @@ -34,12 +34,18 @@ #include +#include #include +#include +#include #include +#include #include #include +#include #include #include +#include "concurrent_queue.h" #include "gnss_synchro.h" #include "galileo_fnav_message.h" #include "gps_navigation_message.h" @@ -79,6 +85,7 @@ class Rtcm { public: Rtcm(); //(data.begin(), data.end())), + buffer_(boost::asio::buffer(*data_)) + { + } + + // Implement the ConstBufferSequence requirements. + typedef boost::asio::const_buffer value_type; + typedef const boost::asio::const_buffer* const_iterator; + const boost::asio::const_buffer* begin() const { return &buffer_; } + const boost::asio::const_buffer* end() const { return &buffer_ + 1; } + + private: + std::shared_ptr > data_; + boost::asio::const_buffer buffer_; + }; + + + class Rtcm_listener + { + public: + virtual ~Rtcm_listener() {} + virtual void deliver(const Message_buffer & msg) = 0; + }; + + + class Rtcm_session + : public Rtcm_listener, + public std::enable_shared_from_this + { + public: + Rtcm_session(boost::asio::ip::tcp::socket socket, std::shared_ptr< concurrent_queue > & queue) : socket_(std::move(socket)) , queue_(queue) { } + + void start() + { + std::cout << "Starting a session: " << std::endl; + listeners_.insert(shared_from_this()); + do_send_messages(); + } + + void deliver(const Message_buffer & msg) + { + bool write_in_progress = !write_msgs_.empty(); + std::cout << "Pushing a message " << std::endl; + write_msgs_.push_back(msg); + if (!write_in_progress) + { + do_write(); + } + } + + private: + void do_send_messages() + { + std::string message; + queue_->wait_and_pop(message); + Message_buffer buffer_out(message); + for (auto listener: listeners_) + { + listener->deliver(buffer_out); + } + do_send_messages(); + } + + void do_write() + { + auto self(shared_from_this()); + boost::asio::async_write(socket_, write_msgs_.front(), [this, self](boost::system::error_code ec, std::size_t /*length*/) + { + if(!ec) + { + std::cout << "RTCM message sent." << std::endl; + write_msgs_.pop_front(); + if(!write_msgs_.empty()) + { + do_write(); + } + } + else + { + std::cout << "Erasing listener" << std::endl; + listeners_.erase(shared_from_this()); + } + }); + } + + boost::asio::ip::tcp::socket socket_; + std::shared_ptr< concurrent_queue > & queue_; + std::string read_msg_; + std::deque write_msgs_; + std::set< std::shared_ptr > listeners_; + }; + + + class Tcp_server + { + public: + Tcp_server(boost::asio::io_service& io_service, const boost::asio::ip::tcp::endpoint& endpoint, std::shared_ptr< concurrent_queue > & queue) + : acceptor_(io_service), queue_(queue), + socket_(io_service) + { + acceptor_.open(endpoint.protocol()); + acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); + acceptor_.bind(endpoint); + acceptor_.listen(); + do_accept(); + } + + private: + void do_accept() + { + acceptor_.async_accept(socket_, [this](boost::system::error_code ec) + { + if (!ec) + { + std::cout << "Starting RTCM TCP server session..." << std::endl; + std::thread session([&]{ std::make_shared(std::move(socket_), queue_)->start(); }); + session.detach(); + std::cout << "Accepting new connections " << std::endl; + } + else + { + std::cout << "Error starting a new session: " << ec << std::endl; + } + do_accept(); + }); + } + + boost::asio::ip::tcp::acceptor acceptor_; + boost::asio::ip::tcp::socket socket_; + std::shared_ptr< concurrent_queue > & queue_; + }; + + + class Tcp_client + { + public: + Tcp_client(boost::asio::io_service& io_service, + boost::asio::ip::tcp::resolver::iterator endpoint_iterator) + : io_service_(io_service), + socket_(io_service) + { + do_connect(endpoint_iterator); + } + + void close() + { + io_service_.post([this]() { socket_.close(); }); + } + + private: + void do_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iterator) + { + std::cout << "Connecting to server..." << std::endl; + boost::asio::async_connect(socket_, endpoint_iterator, + [this](boost::system::error_code ec, boost::asio::ip::tcp::resolver::iterator) + { + if (!ec) + { + std::cout << "Connected." << std::endl; + do_read_message(); + } + else + { + std::cout << "Server is down." << std::endl; + } + }); + } + + void do_read_message() + { + // Waiting for data and reading forever, until connection is closed. + for(;;) + { + std::array buf; + boost::system::error_code error; + + size_t len = socket_.read_some(boost::asio::buffer(buf), error); + + if(error == boost::asio::error::eof) + break; // // Connection closed cleanly by peer. + else if(error) + { + std::cout << "Error: " << error << std::endl; + socket_.close(); + break; + } + std::cout << "Received message: "; + std::cout.write(buf.data(), len); + std::cout << std::endl; + } + + std::cout << std::endl; + socket_.close(); + std::cout << "Connection closed by the server. Good bye." << std::endl; + } + + boost::asio::io_service& io_service_; + boost::asio::ip::tcp::socket socket_; + }; + + boost::asio::io_service io_service; + std::shared_ptr< concurrent_queue > rtcm_message_queue; + std::thread t; + std::list servers; + std::list clients; + bool server_is_running; + void stop_service(); + // // Transport Layer // diff --git a/src/tests/formats/rtcm_test.cc b/src/tests/formats/rtcm_test.cc index 5dee39d04..9790590ca 100644 --- a/src/tests/formats/rtcm_test.cc +++ b/src/tests/formats/rtcm_test.cc @@ -30,6 +30,7 @@ */ #include +#include #include "rtcm.h" TEST(Rtcm_Test, Hex_to_bin) @@ -519,3 +520,39 @@ TEST(Rtcm_Test, MSM1) EXPECT_EQ(psrng4_s, read_psrng4_s_2); } + +TEST(Rtcm_Test, InstantiateServer) +{ + auto rtcm = std::make_shared(); + rtcm->run_server(); + std::string msg("Hello"); + rtcm->send_message(msg); + std::string test3 = "ff"; + std::string test3_bin = rtcm->hex_to_bin(test3); + EXPECT_EQ(0, test3_bin.compare("11111111")); + rtcm->stop_server(); + std::string test6 = "0011"; + std::string test6_hex = rtcm->bin_to_hex(test6); + EXPECT_EQ(0, test6_hex.compare("3")); + long unsigned int expected1 = 42; + EXPECT_EQ(expected1, rtcm->bin_to_uint("00101010")); + rtcm->run_server(); + std::string test4_bin = rtcm->hex_to_bin(test3); + std::string s("Testing"); + rtcm->send_message(s); + rtcm->stop_server(); + EXPECT_EQ(0, test4_bin.compare("11111111")); +} + + +TEST(Rtcm_Test, InstantiateClient) +{ + auto rtcm = std::make_shared(); + rtcm->run_client(); + std::string test3 = "ff"; + std::string test3_bin = rtcm->hex_to_bin(test3); + EXPECT_EQ(0, test3_bin.compare("11111111")); + rtcm->stop_client(); +} + +