From bde6bd6cee7ae3d97724247c74fdfb110f35f9f1 Mon Sep 17 00:00:00 2001 From: Javier Arribas Date: Fri, 11 May 2018 13:21:53 +0200 Subject: [PATCH] Replacing GNURadio udp packet source with custom libpcap-based ethernet packet source --- CMakeLists.txt | 28 ++ cmake/Modules/FindPCAP.cmake | 122 ++++++++ conf/gnss-sdr_GPS_L1_2ch_udp.conf | 11 +- .../adapters/udp_signal_source.cc | 14 +- .../adapters/udp_signal_source.h | 4 +- .../gnuradio_blocks/CMakeLists.txt | 9 +- .../gnuradio_blocks/raw_ip_packet_source.cc | 289 ++++++++++++++++++ .../gnuradio_blocks/raw_ip_packet_source.h | 99 ++++++ .../gnuradio_blocks/udp_gnss_rx_source.cc | 215 ------------- .../gnuradio_blocks/udp_gnss_rx_source.h | 91 ------ 10 files changed, 561 insertions(+), 321 deletions(-) create mode 100644 cmake/Modules/FindPCAP.cmake create mode 100644 src/algorithms/signal_source/gnuradio_blocks/raw_ip_packet_source.cc create mode 100644 src/algorithms/signal_source/gnuradio_blocks/raw_ip_packet_source.h delete mode 100644 src/algorithms/signal_source/gnuradio_blocks/udp_gnss_rx_source.cc delete mode 100644 src/algorithms/signal_source/gnuradio_blocks/udp_gnss_rx_source.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 22ad70e01..5bd42db6a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1455,6 +1455,34 @@ if(ENABLE_GPROF) endif(ENABLE_GPROF) +# - Try to find libpcap include dirs and libraries +# +# Usage of this module as follows: +# +# find_package(PCAP) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# PCAP_ROOT_DIR Set this variable to the root installation of +# libpcap if the module has problems finding the +# proper installation path. +# +# Variables defined by this module: +# +# PCAP_FOUND System has libpcap, include and library dirs found +# PCAP_INCLUDE_DIR The libpcap include directories. +# PCAP_LIBRARY The libpcap library (possibly includes a thread +# library e.g. required by pf_ring's libpcap) +# HAVE_PF_RING If a found version of libpcap supports PF_RING +find_package(PCAP) +if(NOT PCAP_FOUND) + message(FATAL_ERROR "PCAP required to compile dbfcttc") +endif() +get_filename_component(PCAP_LIBRARY_DIRS ${PCAP_LIBRARY} DIRECTORY CACHE) + + + ######################################################################## # Set compiler flags diff --git a/cmake/Modules/FindPCAP.cmake b/cmake/Modules/FindPCAP.cmake new file mode 100644 index 000000000..7e0ea6995 --- /dev/null +++ b/cmake/Modules/FindPCAP.cmake @@ -0,0 +1,122 @@ + +################################################################### +# +# Copyright (c) 2006 Frederic Heem, +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# +# * Neither the name of the Telsey nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +################################################################### +# - Find pcap +# Find the PCAP includes and library +# http://www.tcpdump.org/ +# +# The environment variable PCAPDIR allows to specficy where to find +# libpcap in non standard location. +# +# PCAP_INCLUDE_DIRS - where to find pcap.h, etc. +# PCAP_LIBRARIES - List of libraries when using pcap. +# PCAP_FOUND - True if pcap found. + + +IF(EXISTS $ENV{PCAPDIR}) + FIND_PATH(PCAP_INCLUDE_DIR + NAMES + pcap/pcap.h + pcap.h + PATHS + $ENV{PCAPDIR} + NO_DEFAULT_PATH + ) + + FIND_LIBRARY(PCAP_LIBRARY + NAMES + pcap + PATHS + $ENV{PCAPDIR} + NO_DEFAULT_PATH + ) + + +ELSE(EXISTS $ENV{PCAPDIR}) + FIND_PATH(PCAP_INCLUDE_DIR + NAMES + pcap/pcap.h + pcap.h + ) + + FIND_LIBRARY(PCAP_LIBRARY + NAMES + pcap + ) + +ENDIF(EXISTS $ENV{PCAPDIR}) + +SET(PCAP_INCLUDE_DIRS ${PCAP_INCLUDE_DIR}) +SET(PCAP_LIBRARIES ${PCAP_LIBRARY}) + +IF(PCAP_INCLUDE_DIRS) + MESSAGE(STATUS "Pcap include dirs set to ${PCAP_INCLUDE_DIRS}") +ELSE(PCAP_INCLUDE_DIRS) + MESSAGE(FATAL " Pcap include dirs cannot be found") +ENDIF(PCAP_INCLUDE_DIRS) + +IF(PCAP_LIBRARIES) + MESSAGE(STATUS "Pcap library set to ${PCAP_LIBRARIES}") +ELSE(PCAP_LIBRARIES) + MESSAGE(FATAL "Pcap library cannot be found") +ENDIF(PCAP_LIBRARIES) + +#Functions +INCLUDE(CheckFunctionExists) +SET(CMAKE_REQUIRED_INCLUDES ${PCAP_INCLUDE_DIRS}) +SET(CMAKE_REQUIRED_LIBRARIES ${PCAP_LIBRARIES}) +CHECK_FUNCTION_EXISTS("pcap_breakloop" HAVE_PCAP_BREAKLOOP) +CHECK_FUNCTION_EXISTS("pcap_datalink_name_to_val" HAVE_PCAP_DATALINK_NAME_TO_VAL) +CHECK_FUNCTION_EXISTS("pcap_datalink_val_to_name" HAVE_PCAP_DATALINK_VAL_TO_NAME) +CHECK_FUNCTION_EXISTS("pcap_findalldevs" HAVE_PCAP_FINDALLDEVS) +CHECK_FUNCTION_EXISTS("pcap_freecode" HAVE_PCAP_FREECODE) +CHECK_FUNCTION_EXISTS("pcap_get_selectable_fd" HAVE_PCAP_GET_SELECTABLE_FD) +CHECK_FUNCTION_EXISTS("pcap_lib_version" HAVE_PCAP_LIB_VERSION) +CHECK_FUNCTION_EXISTS("pcap_list_datalinks" HAVE_PCAP_LIST_DATALINKS) +CHECK_FUNCTION_EXISTS("pcap_open_dead" HAVE_PCAP_OPEN_DEAD) +CHECK_FUNCTION_EXISTS("pcap_set_datalink" HAVE_PCAP_SET_DATALINK) + + +#Is pcap found ? +IF(PCAP_INCLUDE_DIRS AND PCAP_LIBRARIES) + SET( PCAP_FOUND "YES" ) +ENDIF(PCAP_INCLUDE_DIRS AND PCAP_LIBRARIES) + + +MARK_AS_ADVANCED( + PCAP_LIBRARIES + PCAP_INCLUDE_DIRS +) \ No newline at end of file diff --git a/conf/gnss-sdr_GPS_L1_2ch_udp.conf b/conf/gnss-sdr_GPS_L1_2ch_udp.conf index 25334e569..d80ffda68 100644 --- a/conf/gnss-sdr_GPS_L1_2ch_udp.conf +++ b/conf/gnss-sdr_GPS_L1_2ch_udp.conf @@ -14,9 +14,10 @@ SignalSource.implementation=UDP_Signal_Source ;SignalSource.implementation=File_Signal_Source SignalSource.filename=/home/javier/gnss/gnss-simulator/build/signal_out.bin ; <- PUT YOUR FILE HERE SignalSource.item_type=gr_complex -SignalSource.address=0.0.0.0 +SignalSource.origin_address=0.0.0.0 +SignalSource.capture_device=eth0 SignalSource.port=1234 -SignalSource.payload_bytes=1024 +SignalSource.payload_bytes=1472 SignalSource.sample_type=cbyte SignalSource.RF_channels=1 SignalSource.select_single_channel=0 @@ -32,7 +33,7 @@ SignalConditioner.implementation=Pass_Through ;SignalConditioner1.implementation=Pass_Through ;######### CHANNELS GLOBAL CONFIG ############ -Channels_1C.count=8 +Channels_1C.count=1 Channels.in_acquisition=1 ;# CHANNEL CONNECTION @@ -49,7 +50,7 @@ Acquisition_1C.threshold=60 Acquisition_1C.use_CFAR_algorithm=false Acquisition_1C.blocking=false Acquisition_1C.doppler_max=5000 -Acquisition_1C.doppler_step=250 +Acquisition_1C.doppler_step=500 Acquisition_1C.dump=false Acquisition_1C.dump_filename=./acq_dump.dat @@ -57,7 +58,7 @@ Acquisition_1C.dump_filename=./acq_dump.dat ;######### TRACKING GLOBAL CONFIG ############ Tracking_1C.implementation=GPS_L1_CA_DLL_PLL_Tracking Tracking_1C.item_type=gr_complex -Tracking_1C.dump=true +Tracking_1C.dump=false Tracking_1C.dump_filename=./tracking_ch_ Tracking_1C.pll_bw_hz=35.0; Tracking_1C.dll_bw_hz=2.0; diff --git a/src/algorithms/signal_source/adapters/udp_signal_source.cc b/src/algorithms/signal_source/adapters/udp_signal_source.cc index f24345293..ed79ff97f 100644 --- a/src/algorithms/signal_source/adapters/udp_signal_source.cc +++ b/src/algorithms/signal_source/adapters/udp_signal_source.cc @@ -54,8 +54,14 @@ UDPSignalSource::UDPSignalSource(ConfigurationInterface* configuration, default_dump_file); // network PARAMETERS + std::string default_capture_device = "eth0"; std::string default_address = "127.0.0.1"; int default_port = 1234; + std::string address = configuration->property(role + ".origin_address", default_address); + std::string capture_device = configuration->property(role + ".capture_device", default_capture_device); + int port = configuration->property(role + ".port", default_port); + int payload_bytes = configuration->property(role + ".payload_bytes", 1024); + RF_channels_ = configuration->property(role + ".RF_channels", 1); select_single_channel_ = configuration->property(role + ".select_single_channel", 0); @@ -64,19 +70,15 @@ UDPSignalSource::UDPSignalSource(ConfigurationInterface* configuration, std::string default_sample_type = "cbyte"; std::string sample_type = configuration->property(role + ".sample_type", default_sample_type); - item_type_ = configuration->property(role + ".item_type", default_item_type); - std::string address = configuration->property(role + ".address", default_address); - int port = configuration->property(role + ".port", default_port); - int payload_bytes = configuration->property(role + ".payload_bytes", 1024); if (sample_type.compare("cbyte")==0) { - udp_gnss_rx_source_ = make_udp_gnss_rx_source(sizeof(char), address, port, payload_bytes, true); + udp_gnss_rx_source_ = raw_ip_packet_source::make(capture_device, address, port, payload_bytes); demux_=gr::blocks::deinterleave::make(sizeof(char),1); }else{ std::cout<<"WARNING: Requested UDP sample type unsuported, setting sample type to cbyte\n"; - udp_gnss_rx_source_ = make_udp_gnss_rx_source(sizeof(char), address, port, payload_bytes, true); + udp_gnss_rx_source_ = raw_ip_packet_source::make(capture_device, address, port, payload_bytes); demux_=gr::blocks::deinterleave::make(sizeof(char),1); } diff --git a/src/algorithms/signal_source/adapters/udp_signal_source.h b/src/algorithms/signal_source/adapters/udp_signal_source.h index ebb8636af..b3b43289b 100644 --- a/src/algorithms/signal_source/adapters/udp_signal_source.h +++ b/src/algorithms/signal_source/adapters/udp_signal_source.h @@ -33,7 +33,7 @@ #define GNSS_SDR_UDP_SIGNAL_SOURCE_H #include "gnss_block_interface.h" -#include "udp_gnss_rx_source.h" +#include "raw_ip_packet_source.h" #include #include #include @@ -103,7 +103,7 @@ private: std::vector> float_to_complex_; std::vector> null_sinks_; - udp_gnss_rx_source_sptr udp_gnss_rx_source_; + raw_ip_packet_source::sptr udp_gnss_rx_source_; gr::blocks::deinterleave::sptr demux_; std::vector> file_sink_; boost::shared_ptr queue_; diff --git a/src/algorithms/signal_source/gnuradio_blocks/CMakeLists.txt b/src/algorithms/signal_source/gnuradio_blocks/CMakeLists.txt index 863a90390..83a7ee7b2 100644 --- a/src/algorithms/signal_source/gnuradio_blocks/CMakeLists.txt +++ b/src/algorithms/signal_source/gnuradio_blocks/CMakeLists.txt @@ -25,7 +25,7 @@ set(SIGNAL_SOURCE_GR_BLOCKS_SOURCES unpack_2bit_samples.cc unpack_spir_gss6450_samples.cc labsat23_source.cc - udp_gnss_rx_source.cc + raw_ip_packet_source.cc ) include_directories( @@ -41,5 +41,10 @@ file(GLOB SIGNAL_SOURCE_GR_BLOCKS_HEADERS "*.h") list(SORT SIGNAL_SOURCE_GR_BLOCKS_HEADERS) add_library(signal_source_gr_blocks ${SIGNAL_SOURCE_GR_BLOCKS_SOURCES} ${SIGNAL_SOURCE_GR_BLOCKS_HEADERS}) source_group(Headers FILES ${SIGNAL_SOURCE_GR_BLOCKS_HEADERS}) -target_link_libraries(signal_source_gr_blocks signal_source_lib ${GNURADIO_RUNTIME_LIBRARIES} ${Boost_LIBRARIES}) +target_link_libraries(signal_source_gr_blocks + signal_source_lib + ${GNURADIO_RUNTIME_LIBRARIES} + ${Boost_LIBRARIES} + ${PCAP_LIBRARIES} +) add_dependencies(signal_source_gr_blocks glog-${glog_RELEASE}) diff --git a/src/algorithms/signal_source/gnuradio_blocks/raw_ip_packet_source.cc b/src/algorithms/signal_source/gnuradio_blocks/raw_ip_packet_source.cc new file mode 100644 index 000000000..5ec4c4a2c --- /dev/null +++ b/src/algorithms/signal_source/gnuradio_blocks/raw_ip_packet_source.cc @@ -0,0 +1,289 @@ +/*! + * \file raw_ip_packet_source.cc + * + * \brief Receives ip frames containing samples in UDP frame encapsulation + * using a high performance packet capture library (libpcap) + * \author Javier Arribas jarribas (at) cttc.es + * ------------------------------------------------------------------------- + * + * Copyright (C) 2010-2018 (see AUTHORS file for a list of contributors) + * + * GNSS-SDR is a software defined Global Navigation + * Satellite Systems receiver + * + * This file is part of GNSS-SDR. + * + * GNSS-SDR is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * GNSS-SDR is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNSS-SDR. If not, see . + * + * ------------------------------------------------------------------------- + */ + + +#include +#include "raw_ip_packet_source.h" + +#include +#include + + +#define FIFO_SIZE 1000000 + + +/* 4 bytes IP address */ +typedef struct gr_ip_address{ + u_char byte1; + u_char byte2; + u_char byte3; + u_char byte4; +}gr_ip_address; + +/* IPv4 header */ +typedef struct gr_ip_header{ + u_char ver_ihl; // Version (4 bits) + Internet header length (4 bits) + u_char tos; // Type of service + u_short tlen; // Total length + u_short identification; // Identification + u_short flags_fo; // Flags (3 bits) + Fragment offset (13 bits) + u_char ttl; // Time to live + u_char proto; // Protocol + u_short crc; // Header checksum + gr_ip_address saddr; // Source address + gr_ip_address daddr; // Destination address + u_int op_pad; // Option + Padding +}gr_ip_header; + +/* UDP header*/ +typedef struct gr_udp_header{ + u_short sport; // Source port + u_short dport; // Destination port + u_short len; // Datagram length + u_short crc; // Checksum +}gr_udp_header; + +raw_ip_packet_source::sptr +raw_ip_packet_source::make(std::string src_device, std::string origin_address, int udp_port, int udp_packet_size) +{ + return gnuradio::get_initial_sptr + (new raw_ip_packet_source(src_device, origin_address, udp_port, udp_packet_size)); +} + +/* + * The private constructor + */ +raw_ip_packet_source::raw_ip_packet_source(std::string src_device, std::string origin_address, int udp_port, int udp_packet_size) +: gr::sync_block("raw_ip_packet_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(1, 1, sizeof(char))) +{ + + // constructor code here + std::cout<<"Start Ethernet packet capture\n"; + + d_src_device=src_device; + d_udp_port=udp_port; + d_udp_payload_size=udp_packet_size; + d_fifo_full=false; + d_last_frame_counter=0; + d_num_rx_errors=0; + + //allocate signal samples buffer + fifo_buff=new char[FIFO_SIZE]; + fifo_read_ptr=0; + fifo_write_ptr=0; + fifo_items=0; + + //open the ethernet device + if (open()==true) + { + // start pcap capture thread + d_pcap_thread=new boost::thread(boost::bind(&raw_ip_packet_source::my_pcap_loop_thread,this,descr)); + }else{ + exit(1); //ethernet error! + } +} + +bool raw_ip_packet_source::open() +{ + char errbuf[PCAP_ERRBUF_SIZE]; + boost::mutex::scoped_lock lock(d_mutex); // hold mutex for duration of this function + /* open device for reading */ + descr = pcap_open_live(d_src_device.c_str(),1500,1,1000,errbuf); + if(descr == NULL) + { + std::cout<<"Error openning Ethernet device "<join(); + pcap_close(descr); + } + + delete fifo_buff; + std::cout<<"Stop Ethernet packet capture\n"; + +} + +void raw_ip_packet_source::static_pcap_callback(u_char *args, const struct pcap_pkthdr* pkthdr, + const u_char* packet) +{ + raw_ip_packet_source *bridge=(raw_ip_packet_source*) args; + bridge->pcap_callback(args, pkthdr, packet); +} + +void raw_ip_packet_source::pcap_callback(u_char *args, const struct pcap_pkthdr* pkthdr, + const u_char* packet) +{ + boost::mutex::scoped_lock lock(d_mutex); // hold mutex for duration of this function + + gr_ip_header *ih; + gr_udp_header *uh; + + // eth frame parameters + // **** UDP RAW PACKET DECODER **** + if ((packet[12]==0x08) & (packet[13]==0x00)) //IP FRAME + { + + /* retireve the position of the ip header */ + ih = (gr_ip_header *) (packet + + 14); //length of ethernet header + + /* retireve the position of the udp header */ + u_int ip_len; + ip_len = (ih->ver_ihl & 0xf) * 4; + uh = (gr_udp_header *) ((u_char*)ih + ip_len); + + /* convert from network byte order to host byte order */ + u_short sport,dport; + dport = ntohs( uh->dport ); + sport = ntohs( uh->sport ); + if (dport==d_udp_port) + { + // print ip addresses and udp ports +// printf("%d.%d.%d.%d.%d -> %d.%d.%d.%d.%d\n", +// ih->saddr.byte1, +// ih->saddr.byte2, +// ih->saddr.byte3, +// ih->saddr.byte4, +// sport, +// ih->daddr.byte1, +// ih->daddr.byte2, +// ih->daddr.byte3, +// ih->daddr.byte4, +// dport); +// std::cout<<"d_udp_port:"<=d_udp_payload_size) + { + //write all in a single memcpy + memcpy(&fifo_buff[fifo_write_ptr],&udp_payload[0],d_udp_payload_size); //size in bytes + fifo_write_ptr+=d_udp_payload_size; + if (fifo_write_ptr==FIFO_SIZE) fifo_write_ptr=0; + fifo_items+=d_udp_payload_size; + }else{ + //two step wrap write + memcpy(&fifo_buff[fifo_write_ptr],&udp_payload[0],aligned_write_items); //size in bytes + fifo_write_ptr=d_udp_payload_size-aligned_write_items; + memcpy(&fifo_buff[0],&udp_payload[aligned_write_items],fifo_write_ptr); //size in bytes + fifo_items+=d_udp_payload_size; + } + }else{ + std::cout<<"Ou"<=num_samples_readed) + { + //read all in a single memcpy + memcpy(&((char*)output_items[0])[0],&fifo_buff[fifo_read_ptr],num_samples_readed); + fifo_read_ptr=fifo_read_ptr+num_samples_readed; //increase the fifo pointer + if (fifo_read_ptr==FIFO_SIZE) fifo_read_ptr=0; + }else{ + //two step wrap read + memcpy(&((char*)output_items[0])[0],&fifo_buff[fifo_read_ptr],aligned_read_items); + fifo_read_ptr=num_samples_readed-aligned_read_items;//increase the fifo pointer considering the rollover + memcpy(&((char*)output_items[0])[aligned_read_items],&fifo_buff[0],fifo_read_ptr); + } + + fifo_items=fifo_items-num_samples_readed; + + // Tell runtime system how many output items we produced. + //std::cout<<"fifo_items:"<. + * + * ------------------------------------------------------------------------- + */ + + +#ifndef INCLUDED_RAW_IP_PACKET_SOURCE_H +#define INCLUDED_RAW_IP_PACKET_SOURCE_H + +#include +#include +#include +#include +#include +#include +#include +#include + +class raw_ip_packet_source : virtual public gr::sync_block +{ +private: + boost::mutex d_mutex; + + pcap_t* descr; //ethernet pcap device descriptor + int fifo_pipe[2]; + + char *fifo_buff; + + int fifo_read_ptr; + int fifo_write_ptr; + int fifo_items; + int d_sock_raw; + int d_udp_port; + struct sockaddr_in si_me; + std::string d_src_device; + std::string d_origin_address; + int d_udp_payload_size; + bool d_fifo_full; + + int d_last_frame_counter; + int d_num_rx_errors; + + + boost::thread *d_pcap_thread; + /*! + * \brief + * Opens the ethernet device using libpcap raw capture mode + * If any of these fail, the fuction retuns the error and exits. + */ + bool open(); + + void my_pcap_loop_thread(pcap_t *pcap_handle); + + void pcap_callback(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char* packet); + + static void static_pcap_callback(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char* packet); + + +public: + + typedef boost::shared_ptr sptr; + static sptr make(std::string src_device, std::string origin_address, int udp_port, int udp_packet_size); + raw_ip_packet_source(std::string src_device, std::string origin_address, int udp_port, int udp_packet_size); + ~raw_ip_packet_source(); + + // Where all the action really happens + int work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); +}; + +#endif /* INCLUDED_RAW_IP_PACKET_SOURCE_H */ + diff --git a/src/algorithms/signal_source/gnuradio_blocks/udp_gnss_rx_source.cc b/src/algorithms/signal_source/gnuradio_blocks/udp_gnss_rx_source.cc deleted file mode 100644 index ef24c829b..000000000 --- a/src/algorithms/signal_source/gnuradio_blocks/udp_gnss_rx_source.cc +++ /dev/null @@ -1,215 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2007-2010,2013 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with GNU Radio; see the file COPYING. If not, write to - * the Free Software Foundation, Inc., 51 Franklin Street, - * Boston, MA 02110-1301, USA. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include "udp_gnss_rx_source.h" -#include -#include -#include -#include -#include -#include -#include - -const int udp_gnss_rx_source::BUF_SIZE_PAYLOADS = - gr::prefs::singleton()->get_long("udp_blocks", "buf_size_payloads", 50); - -udp_gnss_rx_source_sptr -make_udp_gnss_rx_source(size_t itemsize, - const std::string &ipaddr, int port, - int payload_size, bool eof) -{ - return gnuradio::get_initial_sptr - (new udp_gnss_rx_source(itemsize, ipaddr, port, - payload_size, eof)); -} - -udp_gnss_rx_source::udp_gnss_rx_source(size_t itemsize, - const std::string &host, int port, - int payload_size, bool eof) -: sync_block("udp_gnss_rx_source", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(1, 1, itemsize)), - d_itemsize(itemsize), d_payload_size(payload_size), - d_eof(eof), d_connected(false), d_residual(0), d_sent(0), d_offset(0) -{ - // Give us some more room to play. - d_rxbuf = new char[4*d_payload_size]; - d_residbuf = new char[BUF_SIZE_PAYLOADS*d_payload_size]; - - connect(host, port); -} - -udp_gnss_rx_source::~udp_gnss_rx_source() -{ - if(d_connected) - disconnect(); - - delete [] d_rxbuf; - delete [] d_residbuf; -} - -void -udp_gnss_rx_source::connect(const std::string &host, int port) -{ - if(d_connected) - disconnect(); - - d_host = host; - d_port = static_cast(port); - - std::string s_port; - s_port = (boost::format("%d")%d_port).str(); - - if(host.size() > 0) { - boost::asio::ip::udp::resolver resolver(d_io_service); - boost::asio::ip::udp::resolver::query query(d_host, s_port, - boost::asio::ip::resolver_query_base::passive); - d_endpoint = *resolver.resolve(query); - - d_socket = new boost::asio::ip::udp::socket(d_io_service); - d_socket->open(d_endpoint.protocol()); - - boost::asio::socket_base::reuse_address roption(true); - d_socket->set_option(roption); - - d_socket->bind(d_endpoint); - - start_receive(); - d_udp_thread = gr::thread::thread(boost::bind(&udp_gnss_rx_source::run_io_service, this)); - d_connected = true; - } -} - -void -udp_gnss_rx_source::disconnect() -{ - gr::thread::scoped_lock lock(d_setlock); - - if(!d_connected) - return; - - d_io_service.reset(); - d_io_service.stop(); - d_udp_thread.join(); - - d_socket->close(); - delete d_socket; - - d_connected = false; -} - -// Return port number of d_socket -int -udp_gnss_rx_source::get_port(void) -{ - //return d_endpoint.port(); - return d_socket->local_endpoint().port(); -} - -void -udp_gnss_rx_source::start_receive() -{ - d_socket->async_receive_from(boost::asio::buffer((void*)d_rxbuf, d_payload_size), d_endpoint_rcvd, - boost::bind(&udp_gnss_rx_source::handle_read, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); -} - -void -udp_gnss_rx_source::handle_read(const boost::system::error_code& error, - size_t bytes_transferred) -{ - if(!error) { - { - boost::lock_guard lock(d_udp_mutex); - if(d_eof && (bytes_transferred == 0)) { - // If we are using EOF notification, test for it and don't - // add anything to the output. - d_residual = WORK_DONE; - d_cond_wait.notify_one(); - return; - } - else { - // Make sure we never go beyond the boundary of the - // residual buffer. This will just drop the last bit of - // data in the buffer if we've run out of room. - if((int)(d_residual + bytes_transferred) >= (BUF_SIZE_PAYLOADS*d_payload_size)) { - GR_LOG_WARN(d_logger, "Too much data; dropping packet."); - } - else { - // otherwise, copy received data into local buffer for - // copying later. - memcpy(d_residbuf+d_residual, d_rxbuf, bytes_transferred); - d_residual += bytes_transferred; - } - } - d_cond_wait.notify_one(); - } - } - start_receive(); -} - -int -udp_gnss_rx_source::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) -{ - gr::thread::scoped_lock l(d_setlock); - - char *out = (char*)output_items[0]; - - // Use async receive_from to get data from UDP buffer and wait - // on a conditional signal before proceeding. We use this - // because the conditional wait is interruptable while a - // synchronous receive_from is not. - boost::unique_lock lock(d_udp_mutex); - - //use timed_wait to avoid permanent blocking in the work function - d_cond_wait.timed_wait(lock, boost::posix_time::milliseconds(10)); - - if (d_residual < 0) { - return d_residual; - } - - int bytes_left_in_buffer = (int)(d_residual - d_sent); - int bytes_to_send = std::min(d_itemsize * noutput_items, bytes_left_in_buffer); - - // Copy the received data in the residual buffer to the output stream - memcpy(out, d_residbuf+d_sent, bytes_to_send); - int nitems = bytes_to_send/d_itemsize; - - // Keep track of where we are if we don't have enough output - // space to send all the data in the residbuf. - if (bytes_to_send == bytes_left_in_buffer) { - d_residual = 0; - d_sent = 0; - } - else { - d_sent += bytes_to_send; - } - - return nitems; -} diff --git a/src/algorithms/signal_source/gnuradio_blocks/udp_gnss_rx_source.h b/src/algorithms/signal_source/gnuradio_blocks/udp_gnss_rx_source.h deleted file mode 100644 index 5bab10b4a..000000000 --- a/src/algorithms/signal_source/gnuradio_blocks/udp_gnss_rx_source.h +++ /dev/null @@ -1,91 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2007-2010,2013 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with GNU Radio; see the file COPYING. If not, write to - * the Free Software Foundation, Inc., 51 Franklin Street, - * Boston, MA 02110-1301, USA. - */ - -#ifndef INCLUDED_GR_UDP_GNSS_RX_SOURCE_IMPL_H -#define INCLUDED_GR_UDP_GNSS_RX_SOURCE_IMPL_H - -#include -#include -#include -#include - -class udp_gnss_rx_source; - -typedef boost::shared_ptr udp_gnss_rx_source_sptr; - -udp_gnss_rx_source_sptr make_udp_gnss_rx_source(size_t itemsize, - const std::string &ipaddr, int port, - int payload_size, bool eof); - - - - class udp_gnss_rx_source : public gr::blocks::udp_source - { - private: - size_t d_itemsize; - int d_payload_size; // maximum transmission unit (packet length) - bool d_eof; // look for an EOF signal - bool d_connected; // are we connected? - char *d_rxbuf; // get UDP buffer items - char *d_residbuf; // hold buffer between calls - ssize_t d_residual; // hold information about number of bytes stored in residbuf - ssize_t d_sent; // track how much of d_residbuf we've outputted - size_t d_offset; // point to residbuf location offset - - static const int BUF_SIZE_PAYLOADS; //!< The d_residbuf size in multiples of d_payload_size - - std::string d_host; - unsigned short d_port; - - boost::asio::ip::udp::socket *d_socket; - boost::asio::ip::udp::endpoint d_endpoint; - boost::asio::ip::udp::endpoint d_endpoint_rcvd; - boost::asio::io_service d_io_service; - - gr::thread::condition_variable d_cond_wait; - gr::thread::mutex d_udp_mutex; - gr::thread::thread d_udp_thread; - - void start_receive(); - void handle_read(const boost::system::error_code& error, - size_t bytes_transferred); - void run_io_service() { d_io_service.run(); } - - public: - udp_gnss_rx_source(size_t itemsize, - const std::string &host, int port, - int payload_size, bool eof); - ~udp_gnss_rx_source(); - - void connect(const std::string &host, int port); - void disconnect(); - - int payload_size() { return d_payload_size; } - int get_port(); - - int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); - }; - - -#endif /* INCLUDED_GR_UDP_GNSS_RX_SOURCE_H */