diff --git a/docs/doxygen/other/reference_docs.dox b/docs/doxygen/other/reference_docs.dox index 78acda714..3759c876a 100644 --- a/docs/doxygen/other/reference_docs.dox +++ b/docs/doxygen/other/reference_docs.dox @@ -40,7 +40,7 @@ Check the Galileo website of the European Space Agency. There is a website with Galileo constellation status information from the International GNSS Service. -\li Galileo E5, E6, and E1: European GNSS (Galileo) Open Service. Signal In Space Interface Control Document. Ref: OS SIS ICD, Issue 1.1, European Commission, Sept. 2010. +\li Galileo E5, E6, and E1: European GNSS (Galileo) Open Service. Signal In Space Interface Control Document. Ref: OS SIS ICD, Issue 1.2, European Commission, Nov. 2015. \li European GNSS (Galileo) Open Service Signal-In-Space Operational Status Definition, European Commission, Sept. 2015. diff --git a/src/core/system_parameters/rtcm.cc b/src/core/system_parameters/rtcm.cc index f9adf58d8..0aaf8a1ab 100644 --- a/src/core/system_parameters/rtcm.cc +++ b/src/core/system_parameters/rtcm.cc @@ -30,9 +30,11 @@ #include "rtcm.h" #include // for std::reverse +#include // std::chrono::seconds #include // for std::fmod #include // for strtol #include // for std::stringstream +#include #include // for to_upper_copy #include #include @@ -41,7 +43,12 @@ using google::LogMessage; DEFINE_int32(RTCM_Ref_Station_ID, 1234, "Reference Station ID in RTCM messages"); +DEFINE_int32(RTCM_Port, 2101 , "TCP port of the RTCM message server"); +// 2101 is the standard RTCM port according to the Internet Assigned Numbers Authority (IANA) +// https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xml +DEFINE_string(Remote_RTCM_Server, "localhost", "Remote RTCM server address"); +DEFINE_int32(Remote_RTCM_Port, 2101 , "Remote TCP port of the RTCM message server"); Rtcm::Rtcm() @@ -49,9 +56,100 @@ Rtcm::Rtcm() Rtcm::reset_data_fields(); preamble = std::bitset<8>("11010011"); reserved_field = std::bitset<6>("000000"); + rtcm_message_queue = std::make_shared< concurrent_queue >(); + // for each server, do: + boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), FLAGS_RTCM_Port); + servers.emplace_back(io_service, endpoint); + server_is_running = false; } +Rtcm::~Rtcm() +{} + + + +// ***************************************************************************************************** +// +// TCP Server / Client helper classes +// +// ***************************************************************************************************** +void Rtcm::run_server() +{ + std::cout << "Starting a TCP Server on port " << FLAGS_RTCM_Port << std::endl; + try + { + std::thread tq([&]{ std::make_shared(io_service, rtcm_message_queue, FLAGS_RTCM_Port)->do_read_queue(); }); + tq.detach(); + + std::thread t([&]{ io_service.run(); }); + server_is_running = true; + t.detach(); + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } +} + + +void Rtcm::stop_service() +{ + io_service.stop(); +} + + +void Rtcm::stop_server() +{ + std::cout << "Stopping TCP Server on port " << FLAGS_RTCM_Port << std::endl; + rtcm_message_queue->push("Goodbye"); // this terminates tq + Rtcm::stop_service(); + servers.front().close_server(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + server_is_running = false; +} + + +void Rtcm::run_client() +{ + std::cout << "Starting a TCP Client on port " << FLAGS_Remote_RTCM_Port << std::endl; + std::string remote_host = FLAGS_Remote_RTCM_Server; + std::string remote_port = std::to_string(FLAGS_Remote_RTCM_Port); + boost::asio::ip::tcp::resolver resolver(io_service); + auto endpoint_iterator = resolver.resolve({ remote_host.c_str(), remote_port.c_str() }); + + clients.emplace_back(io_service, endpoint_iterator); + try + { + std::thread t([&](){ io_service.run(); }); + t.detach(); + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } +} + + +void Rtcm::stop_client() +{ + std::cout << "Stopping TCP Client on port " << FLAGS_Remote_RTCM_Port << std::endl; + clients.front().close(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + Rtcm::stop_service(); +} + + +void Rtcm::send_message(const std::string & msg) +{ + rtcm_message_queue->push(msg); +} + + +bool Rtcm::is_server_running() +{ + return server_is_running; +} // ***************************************************************************************************** diff --git a/src/core/system_parameters/rtcm.h b/src/core/system_parameters/rtcm.h index 98eeb0122..203a9c287 100644 --- a/src/core/system_parameters/rtcm.h +++ b/src/core/system_parameters/rtcm.h @@ -34,12 +34,18 @@ #include +#include #include +#include +#include #include +#include #include #include +#include #include #include +#include "concurrent_queue.h" #include "gnss_synchro.h" #include "galileo_fnav_message.h" #include "gps_navigation_message.h" @@ -79,6 +85,7 @@ class Rtcm { public: Rtcm(); // max_body_length) + body_length_ = max_body_length; + } + + bool decode_header() + { + char header[header_length + 1] = ""; + std::strncat(header, data_, header_length); + if(header[0] != 'G' || header[1] != 'S') + { + return false; + } + + char header2_[header_length - 1] = ""; + std::strncat(header2_, data_ + 2 , header_length - 2); + body_length_ = std::atoi(header2_); + if(body_length_ == 0) + { + return false; + } + + if (body_length_ > max_body_length) + { + body_length_ = 0; + return false; + } + return true; + } + + void encode_header() + { + char header[header_length + 1] = ""; + std::sprintf(header, "GS%4d", static_cast(body_length_)); + std::memcpy(data_, header, header_length); + } + + private: + char data_[header_length + max_body_length]; + std::size_t body_length_; + }; + + + class Rtcm_Listener + { + public: + virtual ~Rtcm_Listener() {} + virtual void deliver(const Rtcm_Message & msg) = 0; + }; + + + class Rtcm_Listener_Room + { + public: + void join(std::shared_ptr participant) + { + participants_.insert(participant); + for (auto msg: recent_msgs_) + participant->deliver(msg); + } + + void leave(std::shared_ptr participant) + { + participants_.erase(participant); + } + + void deliver(const Rtcm_Message & msg) + { + recent_msgs_.push_back(msg); + while (recent_msgs_.size() > max_recent_msgs) + recent_msgs_.pop_front(); + + for (auto participant: participants_) + participant->deliver(msg); + } + + private: + std::set > participants_; + enum { max_recent_msgs = 1 }; + std::deque recent_msgs_; + }; + + + class Rtcm_Session + : public Rtcm_Listener, + public std::enable_shared_from_this + { + public: + Rtcm_Session(boost::asio::ip::tcp::socket socket, Rtcm_Listener_Room & room) : socket_(std::move(socket)), room_(room) { } + + void start() + { + room_.join(shared_from_this()); + do_read_message_header(); + } + + void deliver(const Rtcm_Message & msg) + { + bool write_in_progress = !write_msgs_.empty(); + write_msgs_.push_back(msg); + if (!write_in_progress) + { + do_write(); + } + } + + private: + void do_read_message_header() + { + auto self(shared_from_this()); + boost::asio::async_read(socket_, + boost::asio::buffer(read_msg_.data(), Rtcm_Message::header_length), + [this, self](boost::system::error_code ec, std::size_t /*length*/) + { + if (!ec && read_msg_.decode_header()) + { + do_read_message_body(); + } + else + { + std::cout << "Closing connection with client from " << socket_.remote_endpoint().address() << std::endl; + room_.leave(shared_from_this()); + } + }); + } + + void do_read_message_body() + { + auto self(shared_from_this()); + boost::asio::async_read(socket_, + boost::asio::buffer(read_msg_.body(), read_msg_.body_length()), + [this, self](boost::system::error_code ec, std::size_t /*length*/) + { + if (!ec) + { + room_.deliver(read_msg_); + //std::cout << "Delivered message (session): "; + //std::cout.write(read_msg_.body(), read_msg_.body_length()); + //std::cout << std::endl; + do_read_message_header(); + } + else + { + std::cout << "Closing connection with client from " << socket_.remote_endpoint().address() << std::endl; + room_.leave(shared_from_this()); + } + }); + } + + void do_write() + { + auto self(shared_from_this()); + boost::asio::async_write(socket_, + boost::asio::buffer(write_msgs_.front().body(), + write_msgs_.front().body_length()), [this, self](boost::system::error_code ec, std::size_t /*length*/) + { + if(!ec) + { + write_msgs_.pop_front(); + if(!write_msgs_.empty()) + { + do_write(); + } + } + else + { + std::cout << "Closing connection with client from " << socket_.remote_endpoint().address() << std::endl; + room_.leave(shared_from_this()); + } + }); + } + + boost::asio::ip::tcp::socket socket_; + Rtcm_Listener_Room & room_; + Rtcm_Message read_msg_; + std::deque write_msgs_; + }; + + + class Tcp_Internal_Client + : public std::enable_shared_from_this + { + public: + Tcp_Internal_Client(boost::asio::io_service& io_service, + boost::asio::ip::tcp::resolver::iterator endpoint_iterator) + : io_service_(io_service), socket_(io_service) + { + do_connect(endpoint_iterator); + } + + void close() + { + io_service_.post([this]() { socket_.close(); }); + } + + void write(const Rtcm_Message & msg) + { + io_service_.post( + [this, msg]() + { + bool write_in_progress = !write_msgs_.empty(); + write_msgs_.push_back(msg); + if (!write_in_progress) + { + do_write(); + } + }); + } + + private: + void do_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iterator) + { + boost::asio::async_connect(socket_, endpoint_iterator, + [this](boost::system::error_code ec, boost::asio::ip::tcp::resolver::iterator) + { + if (!ec) + { + do_read_message(); + } + else + { + std::cout << "Server is down." << std::endl; + } + }); + } + + void do_read_message() + { + boost::asio::async_read(socket_, + boost::asio::buffer(read_msg_.data(), 1029), + [this](boost::system::error_code ec, std::size_t /*length*/) + { + if (!ec ) + { + do_read_message(); + } + else + { + std::cout << "Error in client" << std::endl; + socket_.close(); + } + }); + } + + void do_write() + { + + boost::asio::async_write(socket_, + boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()), + [this](boost::system::error_code ec, std::size_t /*length*/) + { + if (!ec) + { + write_msgs_.pop_front(); + if (!write_msgs_.empty()) + { + do_write(); + } + } + else + { + socket_.close(); + } + }); + } + + boost::asio::io_service& io_service_; + boost::asio::ip::tcp::socket socket_; + Rtcm_Message read_msg_; + std::deque write_msgs_; + }; + + + class Queue_Reader + { + public: + Queue_Reader(boost::asio::io_service& io_service, std::shared_ptr< concurrent_queue > & queue, int port) : queue_(queue) + { + boost::asio::ip::tcp::resolver resolver(io_service); + std::string host("localhost"); + std::string port_str = std::to_string(port); + auto queue_endpoint_iterator = resolver.resolve({ host.c_str(), port_str.c_str() }); + c = std::make_shared(io_service, queue_endpoint_iterator); + } + + void do_read_queue() + { + for(;;) + { + std::string message; + Rtcm_Message msg; + queue_->wait_and_pop(message); + if(message.compare("Goodbye") == 0) break; + const char *char_msg = message.c_str(); + msg.body_length(message.length()); + std::memcpy(msg.body(), char_msg, msg.body_length()); + msg.encode_header(); + c->write(msg); + } + } + private: + std::shared_ptr c; + std::shared_ptr< concurrent_queue > & queue_; + }; + + + class Tcp_Server + { + public: + Tcp_Server(boost::asio::io_service& io_service, const boost::asio::ip::tcp::endpoint& endpoint) + : io_service_(io_service), acceptor_(io_service), socket_(io_service) + { + acceptor_.open(endpoint.protocol()); + acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); + acceptor_.bind(endpoint); + acceptor_.listen(); + do_accept(); + } + + void close_server() + { + socket_.close(); + acceptor_.close(); + } + + private: + void do_accept() + { + acceptor_.async_accept(socket_, [this](boost::system::error_code ec) + { + if (!ec) + { + if(first_client) + { + std::cout << "The TCP Server is up and running. Accepting connections ..." << std::endl; + first_client = false; + } + else + { + std::cout << "Starting RTCM TCP server session..." << std::endl; + std::cout << "Serving client from " << socket_.remote_endpoint().address() << std::endl; + } + std::make_shared(std::move(socket_), room_)->start(); + } + else + { + std::cout << "Error when invoking a RTCM session. " << ec << std::endl; + } + do_accept(); + }); + } + + boost::asio::io_service& io_service_; + boost::asio::ip::tcp::acceptor acceptor_; + boost::asio::ip::tcp::socket socket_; + Rtcm_Listener_Room room_; + bool first_client = true; + }; + + + class Tcp_Client + { + public: + Tcp_Client(boost::asio::io_service& io_service, + boost::asio::ip::tcp::resolver::iterator endpoint_iterator) + : io_service_(io_service), socket_(io_service) + { + do_connect(endpoint_iterator); + } + + void close() + { + io_service_.post([this]() { socket_.close(); }); + } + + private: + void do_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iterator) + { + std::cout << "Connecting to server..." << std::endl; + boost::asio::async_connect(socket_, endpoint_iterator, + [this](boost::system::error_code ec, boost::asio::ip::tcp::resolver::iterator) + { + if (!ec) + { + std::cout << "Connected." << std::endl; + do_read_message(); + } + else + { + std::cout << "Server is down." << std::endl; + } + }); + } + + void do_read_message() + { + // Waiting for data and reading forever, until connection is closed. + for(;;) + { + std::array buf; + boost::system::error_code error; + + size_t len = socket_.read_some(boost::asio::buffer(buf), error); + + if(error == boost::asio::error::eof) + break; // // Connection closed cleanly by peer. + else if(error) + { + std::cout << "Error: " << error << std::endl; + //socket_.close(); + break; + } + std::cout << "Received message: "; + std::cout.write(buf.data(), len); + std::cout << std::endl; + } + + std::cout << std::endl; + socket_.close(); + std::cout << "Connection closed by the server. Good bye." << std::endl; + } + + boost::asio::io_service& io_service_; + boost::asio::ip::tcp::socket socket_; + }; + + + boost::asio::io_service io_service; + std::shared_ptr< concurrent_queue > rtcm_message_queue; + std::thread t; + std::thread tq; + std::list servers; + std::list clients; + bool server_is_running; + void stop_service(); + // // Transport Layer // diff --git a/src/tests/formats/rtcm_test.cc b/src/tests/formats/rtcm_test.cc index 5dee39d04..91716f363 100644 --- a/src/tests/formats/rtcm_test.cc +++ b/src/tests/formats/rtcm_test.cc @@ -30,6 +30,7 @@ */ #include +#include #include "rtcm.h" TEST(Rtcm_Test, Hex_to_bin) @@ -519,3 +520,41 @@ TEST(Rtcm_Test, MSM1) EXPECT_EQ(psrng4_s, read_psrng4_s_2); } + +TEST(Rtcm_Test, InstantiateServer) +{ + auto rtcm = std::make_shared(); + rtcm->run_server(); + std::string msg("Hello"); + rtcm->send_message(msg); + std::string test3 = "ff"; + std::string test3_bin = rtcm->hex_to_bin(test3); + EXPECT_EQ(0, test3_bin.compare("11111111")); + rtcm->stop_server(); + std::string test6 = "0011"; + std::string test6_hex = rtcm->bin_to_hex(test6); + EXPECT_EQ(0, test6_hex.compare("3")); + long unsigned int expected1 = 42; + EXPECT_EQ(expected1, rtcm->bin_to_uint("00101010")); + rtcm->run_server(); + std::string test4_bin = rtcm->hex_to_bin(test3); + std::string s("Testing"); + rtcm->send_message(s); + rtcm->stop_server(); + EXPECT_EQ(0, test4_bin.compare("11111111")); +} + +/* +TEST(Rtcm_Test, InstantiateClient) +{ + auto rtcm = std::make_shared(); + rtcm->run_client(); + std::string test3 = "ff"; + std::string test3_bin = rtcm->hex_to_bin(test3); + EXPECT_EQ(0, test3_bin.compare("11111111")); + rtcm->stop_client(); + std::string test3_bin2 = rtcm->hex_to_bin(test3); + EXPECT_EQ(0, test3_bin2.compare("11111111")); +} */ + +