mirror of
https://github.com/gnss-sdr/gnss-sdr
synced 2025-07-03 18:42:56 +00:00
adding a TPC client and server
This commit is contained in:
parent
49523f7f9d
commit
cf75c669c8
@ -33,6 +33,7 @@
|
|||||||
#include <cmath> // for std::fmod
|
#include <cmath> // for std::fmod
|
||||||
#include <cstdlib> // for strtol
|
#include <cstdlib> // for strtol
|
||||||
#include <sstream> // for std::stringstream
|
#include <sstream> // for std::stringstream
|
||||||
|
#include <thread>
|
||||||
#include <boost/algorithm/string.hpp> // for to_upper_copy
|
#include <boost/algorithm/string.hpp> // for to_upper_copy
|
||||||
#include <boost/dynamic_bitset.hpp>
|
#include <boost/dynamic_bitset.hpp>
|
||||||
#include <gflags/gflags.h>
|
#include <gflags/gflags.h>
|
||||||
@ -41,7 +42,11 @@
|
|||||||
using google::LogMessage;
|
using google::LogMessage;
|
||||||
|
|
||||||
DEFINE_int32(RTCM_Ref_Station_ID, 1234, "Reference Station ID in RTCM messages");
|
DEFINE_int32(RTCM_Ref_Station_ID, 1234, "Reference Station ID in RTCM messages");
|
||||||
|
DEFINE_int32(RTCM_Port, 2101 , "TCP port of the RTCM message server");
|
||||||
|
// 2101 is the standard RTCM port according to the Internet Assigned Numbers Authority (IANA)
|
||||||
|
// https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xml
|
||||||
|
|
||||||
|
DEFINE_string(Remote_RTCM_Server, "localhost", "Remote RTCM server address");
|
||||||
|
|
||||||
|
|
||||||
Rtcm::Rtcm()
|
Rtcm::Rtcm()
|
||||||
@ -49,9 +54,86 @@ Rtcm::Rtcm()
|
|||||||
Rtcm::reset_data_fields();
|
Rtcm::reset_data_fields();
|
||||||
preamble = std::bitset<8>("11010011");
|
preamble = std::bitset<8>("11010011");
|
||||||
reserved_field = std::bitset<6>("000000");
|
reserved_field = std::bitset<6>("000000");
|
||||||
|
rtcm_message_queue = std::make_shared< concurrent_queue<std::string> >();
|
||||||
|
// for each server, do:
|
||||||
|
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), FLAGS_RTCM_Port);
|
||||||
|
servers.emplace_back(io_service, endpoint, rtcm_message_queue);
|
||||||
|
server_is_running = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Rtcm::~Rtcm()
|
||||||
|
{}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// *****************************************************************************************************
|
||||||
|
//
|
||||||
|
// TCP Server / Client functions
|
||||||
|
//
|
||||||
|
// *****************************************************************************************************
|
||||||
|
void Rtcm::run_server()
|
||||||
|
{
|
||||||
|
std::cout << "Starting a TCP Server on port " << FLAGS_RTCM_Port << std::endl;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
std::thread t([&]{ io_service.run(); });
|
||||||
|
server_is_running = true;
|
||||||
|
t.detach();
|
||||||
|
}
|
||||||
|
catch (std::exception& e)
|
||||||
|
{
|
||||||
|
std::cerr << "Exception: " << e.what() << "\n";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Rtcm::stop_service()
|
||||||
|
{
|
||||||
|
io_service.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Rtcm::stop_server()
|
||||||
|
{
|
||||||
|
std::cout << "Stopping TCP Server on port " << FLAGS_RTCM_Port << std::endl;
|
||||||
|
Rtcm::stop_service();
|
||||||
|
server_is_running = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Rtcm::run_client()
|
||||||
|
{
|
||||||
|
std::cout << "Starting a TCP Client on port " << FLAGS_RTCM_Port << std::endl;
|
||||||
|
std::string remote_host = FLAGS_Remote_RTCM_Server;
|
||||||
|
std::string remote_port = std::to_string(FLAGS_RTCM_Port);
|
||||||
|
boost::asio::ip::tcp::resolver resolver(io_service);
|
||||||
|
auto endpoint_iterator = resolver.resolve({ remote_host.c_str(), remote_port.c_str() });
|
||||||
|
|
||||||
|
clients.emplace_back(io_service, endpoint_iterator);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
std::thread t([&](){ io_service.run(); });
|
||||||
|
t.detach();
|
||||||
|
}
|
||||||
|
catch (std::exception& e)
|
||||||
|
{
|
||||||
|
std::cerr << "Exception: " << e.what() << "\n";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Rtcm::stop_client()
|
||||||
|
{
|
||||||
|
std::cout << "Stopping TCP Client on port " << FLAGS_RTCM_Port << std::endl;
|
||||||
|
Rtcm::stop_service();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Rtcm::send_message(const std::string & msg)
|
||||||
|
{
|
||||||
|
rtcm_message_queue->push(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// *****************************************************************************************************
|
// *****************************************************************************************************
|
||||||
|
@ -34,12 +34,18 @@
|
|||||||
|
|
||||||
|
|
||||||
#include <bitset>
|
#include <bitset>
|
||||||
|
#include <deque>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <memory>
|
||||||
|
#include <set>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include <boost/asio.hpp>
|
||||||
#include <boost/crc.hpp>
|
#include <boost/crc.hpp>
|
||||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||||
|
#include "concurrent_queue.h"
|
||||||
#include "gnss_synchro.h"
|
#include "gnss_synchro.h"
|
||||||
#include "galileo_fnav_message.h"
|
#include "galileo_fnav_message.h"
|
||||||
#include "gps_navigation_message.h"
|
#include "gps_navigation_message.h"
|
||||||
@ -79,6 +85,7 @@ class Rtcm
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
Rtcm(); //<! Default constructor
|
Rtcm(); //<! Default constructor
|
||||||
|
~Rtcm();
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Prints message type 1001 (L1-Only GPS RTK Observables)
|
* \brief Prints message type 1001 (L1-Only GPS RTK Observables)
|
||||||
@ -261,6 +268,14 @@ public:
|
|||||||
|
|
||||||
bool check_CRC(const std::string & message); //<! Checks that the CRC of a RTCM package is correct
|
bool check_CRC(const std::string & message); //<! Checks that the CRC of a RTCM package is correct
|
||||||
|
|
||||||
|
void run_server();
|
||||||
|
void stop_server();
|
||||||
|
|
||||||
|
void run_client();
|
||||||
|
void stop_client();
|
||||||
|
|
||||||
|
void send_message(const std::string & message);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
//
|
//
|
||||||
// Generation of messages content
|
// Generation of messages content
|
||||||
@ -324,6 +339,222 @@ private:
|
|||||||
unsigned int msm_lock_time_indicator(unsigned int lock_time_period_s);
|
unsigned int msm_lock_time_indicator(unsigned int lock_time_period_s);
|
||||||
unsigned int msm_extended_lock_time_indicator(unsigned int lock_time_period_s);
|
unsigned int msm_extended_lock_time_indicator(unsigned int lock_time_period_s);
|
||||||
|
|
||||||
|
//
|
||||||
|
// Classes for TCP communication
|
||||||
|
//
|
||||||
|
class Message_buffer
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
// Construct from a std::string.
|
||||||
|
explicit Message_buffer(const std::string& data)
|
||||||
|
: data_(new std::vector<char>(data.begin(), data.end())),
|
||||||
|
buffer_(boost::asio::buffer(*data_))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implement the ConstBufferSequence requirements.
|
||||||
|
typedef boost::asio::const_buffer value_type;
|
||||||
|
typedef const boost::asio::const_buffer* const_iterator;
|
||||||
|
const boost::asio::const_buffer* begin() const { return &buffer_; }
|
||||||
|
const boost::asio::const_buffer* end() const { return &buffer_ + 1; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::shared_ptr<std::vector<char> > data_;
|
||||||
|
boost::asio::const_buffer buffer_;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
class Rtcm_listener
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
virtual ~Rtcm_listener() {}
|
||||||
|
virtual void deliver(const Message_buffer & msg) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
class Rtcm_session
|
||||||
|
: public Rtcm_listener,
|
||||||
|
public std::enable_shared_from_this<Rtcm_session>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Rtcm_session(boost::asio::ip::tcp::socket socket, std::shared_ptr< concurrent_queue<std::string> > & queue) : socket_(std::move(socket)) , queue_(queue) { }
|
||||||
|
|
||||||
|
void start()
|
||||||
|
{
|
||||||
|
std::cout << "Starting a session: " << std::endl;
|
||||||
|
listeners_.insert(shared_from_this());
|
||||||
|
do_send_messages();
|
||||||
|
}
|
||||||
|
|
||||||
|
void deliver(const Message_buffer & msg)
|
||||||
|
{
|
||||||
|
bool write_in_progress = !write_msgs_.empty();
|
||||||
|
std::cout << "Pushing a message " << std::endl;
|
||||||
|
write_msgs_.push_back(msg);
|
||||||
|
if (!write_in_progress)
|
||||||
|
{
|
||||||
|
do_write();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void do_send_messages()
|
||||||
|
{
|
||||||
|
std::string message;
|
||||||
|
queue_->wait_and_pop(message);
|
||||||
|
Message_buffer buffer_out(message);
|
||||||
|
for (auto listener: listeners_)
|
||||||
|
{
|
||||||
|
listener->deliver(buffer_out);
|
||||||
|
}
|
||||||
|
do_send_messages();
|
||||||
|
}
|
||||||
|
|
||||||
|
void do_write()
|
||||||
|
{
|
||||||
|
auto self(shared_from_this());
|
||||||
|
boost::asio::async_write(socket_, write_msgs_.front(), [this, self](boost::system::error_code ec, std::size_t /*length*/)
|
||||||
|
{
|
||||||
|
if(!ec)
|
||||||
|
{
|
||||||
|
std::cout << "RTCM message sent." << std::endl;
|
||||||
|
write_msgs_.pop_front();
|
||||||
|
if(!write_msgs_.empty())
|
||||||
|
{
|
||||||
|
do_write();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
std::cout << "Erasing listener" << std::endl;
|
||||||
|
listeners_.erase(shared_from_this());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
boost::asio::ip::tcp::socket socket_;
|
||||||
|
std::shared_ptr< concurrent_queue<std::string> > & queue_;
|
||||||
|
std::string read_msg_;
|
||||||
|
std::deque<Message_buffer> write_msgs_;
|
||||||
|
std::set< std::shared_ptr<Rtcm_listener> > listeners_;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
class Tcp_server
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Tcp_server(boost::asio::io_service& io_service, const boost::asio::ip::tcp::endpoint& endpoint, std::shared_ptr< concurrent_queue<std::string> > & queue)
|
||||||
|
: acceptor_(io_service), queue_(queue),
|
||||||
|
socket_(io_service)
|
||||||
|
{
|
||||||
|
acceptor_.open(endpoint.protocol());
|
||||||
|
acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
|
||||||
|
acceptor_.bind(endpoint);
|
||||||
|
acceptor_.listen();
|
||||||
|
do_accept();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void do_accept()
|
||||||
|
{
|
||||||
|
acceptor_.async_accept(socket_, [this](boost::system::error_code ec)
|
||||||
|
{
|
||||||
|
if (!ec)
|
||||||
|
{
|
||||||
|
std::cout << "Starting RTCM TCP server session..." << std::endl;
|
||||||
|
std::thread session([&]{ std::make_shared<Rtcm_session>(std::move(socket_), queue_)->start(); });
|
||||||
|
session.detach();
|
||||||
|
std::cout << "Accepting new connections " << std::endl;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
std::cout << "Error starting a new session: " << ec << std::endl;
|
||||||
|
}
|
||||||
|
do_accept();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
boost::asio::ip::tcp::acceptor acceptor_;
|
||||||
|
boost::asio::ip::tcp::socket socket_;
|
||||||
|
std::shared_ptr< concurrent_queue<std::string> > & queue_;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
class Tcp_client
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Tcp_client(boost::asio::io_service& io_service,
|
||||||
|
boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
|
||||||
|
: io_service_(io_service),
|
||||||
|
socket_(io_service)
|
||||||
|
{
|
||||||
|
do_connect(endpoint_iterator);
|
||||||
|
}
|
||||||
|
|
||||||
|
void close()
|
||||||
|
{
|
||||||
|
io_service_.post([this]() { socket_.close(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void do_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
|
||||||
|
{
|
||||||
|
std::cout << "Connecting to server..." << std::endl;
|
||||||
|
boost::asio::async_connect(socket_, endpoint_iterator,
|
||||||
|
[this](boost::system::error_code ec, boost::asio::ip::tcp::resolver::iterator)
|
||||||
|
{
|
||||||
|
if (!ec)
|
||||||
|
{
|
||||||
|
std::cout << "Connected." << std::endl;
|
||||||
|
do_read_message();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
std::cout << "Server is down." << std::endl;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void do_read_message()
|
||||||
|
{
|
||||||
|
// Waiting for data and reading forever, until connection is closed.
|
||||||
|
for(;;)
|
||||||
|
{
|
||||||
|
std::array<char, 10> buf;
|
||||||
|
boost::system::error_code error;
|
||||||
|
|
||||||
|
size_t len = socket_.read_some(boost::asio::buffer(buf), error);
|
||||||
|
|
||||||
|
if(error == boost::asio::error::eof)
|
||||||
|
break; // // Connection closed cleanly by peer.
|
||||||
|
else if(error)
|
||||||
|
{
|
||||||
|
std::cout << "Error: " << error << std::endl;
|
||||||
|
socket_.close();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::cout << "Received message: ";
|
||||||
|
std::cout.write(buf.data(), len);
|
||||||
|
std::cout << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << std::endl;
|
||||||
|
socket_.close();
|
||||||
|
std::cout << "Connection closed by the server. Good bye." << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
boost::asio::io_service& io_service_;
|
||||||
|
boost::asio::ip::tcp::socket socket_;
|
||||||
|
};
|
||||||
|
|
||||||
|
boost::asio::io_service io_service;
|
||||||
|
std::shared_ptr< concurrent_queue<std::string> > rtcm_message_queue;
|
||||||
|
std::thread t;
|
||||||
|
std::list<Rtcm::Tcp_server> servers;
|
||||||
|
std::list<Rtcm::Tcp_client> clients;
|
||||||
|
bool server_is_running;
|
||||||
|
void stop_service();
|
||||||
|
|
||||||
//
|
//
|
||||||
// Transport Layer
|
// Transport Layer
|
||||||
//
|
//
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <thread>
|
||||||
#include "rtcm.h"
|
#include "rtcm.h"
|
||||||
|
|
||||||
TEST(Rtcm_Test, Hex_to_bin)
|
TEST(Rtcm_Test, Hex_to_bin)
|
||||||
@ -519,3 +520,39 @@ TEST(Rtcm_Test, MSM1)
|
|||||||
EXPECT_EQ(psrng4_s, read_psrng4_s_2);
|
EXPECT_EQ(psrng4_s, read_psrng4_s_2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TEST(Rtcm_Test, InstantiateServer)
|
||||||
|
{
|
||||||
|
auto rtcm = std::make_shared<Rtcm>();
|
||||||
|
rtcm->run_server();
|
||||||
|
std::string msg("Hello");
|
||||||
|
rtcm->send_message(msg);
|
||||||
|
std::string test3 = "ff";
|
||||||
|
std::string test3_bin = rtcm->hex_to_bin(test3);
|
||||||
|
EXPECT_EQ(0, test3_bin.compare("11111111"));
|
||||||
|
rtcm->stop_server();
|
||||||
|
std::string test6 = "0011";
|
||||||
|
std::string test6_hex = rtcm->bin_to_hex(test6);
|
||||||
|
EXPECT_EQ(0, test6_hex.compare("3"));
|
||||||
|
long unsigned int expected1 = 42;
|
||||||
|
EXPECT_EQ(expected1, rtcm->bin_to_uint("00101010"));
|
||||||
|
rtcm->run_server();
|
||||||
|
std::string test4_bin = rtcm->hex_to_bin(test3);
|
||||||
|
std::string s("Testing");
|
||||||
|
rtcm->send_message(s);
|
||||||
|
rtcm->stop_server();
|
||||||
|
EXPECT_EQ(0, test4_bin.compare("11111111"));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TEST(Rtcm_Test, InstantiateClient)
|
||||||
|
{
|
||||||
|
auto rtcm = std::make_shared<Rtcm>();
|
||||||
|
rtcm->run_client();
|
||||||
|
std::string test3 = "ff";
|
||||||
|
std::string test3_bin = rtcm->hex_to_bin(test3);
|
||||||
|
EXPECT_EQ(0, test3_bin.compare("11111111"));
|
||||||
|
rtcm->stop_client();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user