commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 01/03: digital: Major update to header/payl


From: git
Subject: [Commit-gnuradio] [gnuradio] 01/03: digital: Major update to header/payload demuxer (HPD)
Date: Mon, 2 May 2016 04:54:38 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 5283e459f6d83aea2d5133fecb0a11a873f0c3ad
Author: Martin Braun <address@hidden>
Date:   Mon Apr 4 22:19:07 2016 -0700

    digital: Major update to header/payload demuxer (HPD)
    
    - Added padding feature (trigger/tag can now be off by some items)
    - Payload offset can also be specified
    - Fixed some index counting bugs
    - More and better unit tests, cleaned up the unit test file
    - Cleanups:
      - Consistent whitespace
      - Consistent use of size_t and other types
      - Used more enums where it increases readability
---
 gr-digital/grc/digital_header_payload_demux.xml    |   7 +
 .../gnuradio/digital/header_payload_demux.h        |  91 ++-
 gr-digital/lib/header_payload_demux_impl.cc        | 670 ++++++++++++---------
 gr-digital/lib/header_payload_demux_impl.h         |  75 ++-
 .../python/digital/qa_header_payload_demux.py      | 451 ++++++++++----
 5 files changed, 865 insertions(+), 429 deletions(-)

diff --git a/gr-digital/grc/digital_header_payload_demux.xml 
b/gr-digital/grc/digital_header_payload_demux.xml
index 24c6c5b..a2fe80e 100644
--- a/gr-digital/grc/digital_header_payload_demux.xml
+++ b/gr-digital/grc/digital_header_payload_demux.xml
@@ -13,6 +13,7 @@
          $timing_tag_key,
           $samp_rate,
           $special_tags,
+          $header_padding,
     )</make>
   <param>
     <name>Header Length (Symbols)</name>
@@ -20,6 +21,12 @@
     <type>int</type>
   </param>
   <param>
+    <name>Header Padding (Uncertainty / Symbols)</name>
+    <key>header_padding</key>
+    <value>0</value>
+    <type>int</type>
+  </param>
+  <param>
     <name>Items per symbol</name>
     <key>items_per_symbol</key>
     <type>int</type>
diff --git a/gr-digital/include/gnuradio/digital/header_payload_demux.h 
b/gr-digital/include/gnuradio/digital/header_payload_demux.h
index 303bebb..bcd6bd1 100644
--- a/gr-digital/include/gnuradio/digital/header_payload_demux.h
+++ b/gr-digital/include/gnuradio/digital/header_payload_demux.h
@@ -1,5 +1,5 @@
 /* -*- c++ -*- */
-/* Copyright 2012 Free Software Foundation, Inc.
+/* Copyright 2012-2016 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
@@ -29,7 +29,7 @@ namespace gr {
   namespace digital {
 
     /*!
-     * \brief Header/Payload demuxer.
+     * \brief Header/Payload demuxer (HPD).
      * \ingroup packet_operators_blk
      *
      * \details
@@ -58,6 +58,9 @@ namespace gr {
      * and taken as the payload length. The payload, together with the header 
data
      * as tags, is then copied to output 1.
      *
+     * If the header demodulation fails, the header must send a PMT with value
+     * pmt::PMT_F. The state gets reset and the header is ignored.
+     *
      * \section hpd_item_sizes Symbols, Items and Item Sizes
      *
      * To generically and transparently handle different kinds of modulations,
@@ -68,20 +71,66 @@ namespace gr {
      * grouping items. In OFDM, we usually don't care about individual 
samples, but
      * we do care about full OFDM symbols, so we set \p items_per_symbol to the
      * IFFT / FFT length of the OFDM modulator / demodulator.
-     * For most single-carrier modulations, this value can be set to 1 (the 
default
-     * value).
+     * For single-carrier modulations, this value can be set to the number of
+     * samples per symbol, to handle data in number of symbols, or to 1 to
+     * handle data in number of samples.
      * If specified, \p guard_interval items are discarded before every symbol.
      * This is useful for demuxing bursts of OFDM signals.
      *
      * On the output, we can deal with symbols directly by setting \p 
output_symbols
      * to true. In that case, the output item size is the <em>symbol size</em>.
      *
-     * \b Example: OFDM with 48 sub-carriers, using a length-64 IFFT on the 
modulator,
-     * and a cyclic-prefix length of 16 samples. In this case, the itemsize is
-     * `sizeof(gr_complex)`, because we're receiving complex samples. One OFDM 
symbol
-     * has 64 samples, hence \p items_per_symbol is set to 64, and \p 
guard_interval to
-     * 16. The header length is specified in number of OFDM symbols. Because 
we want to
-     * deal with full OFDM symbols, we set \p output_symbols to true.
+     * \b Example: OFDM with 48 sub-carriers, using a length-64 IFFT on the
+     * modulator, and a cyclic-prefix length of 16 samples. In this case,
+     * \p itemsize is `sizeof(gr_complex)`, because we're receiving complex
+     * samples. One OFDM symbol has 64 samples, hence \p items_per_symbol is
+     * set to 64, and \p guard_interval to 16. The header length is specified
+     * in number of OFDM symbols. Because we want to deal with full OFDM
+     * symbols, we set \p output_symbols to true.
+     *
+     * \b Example: PSK-modulated signals, with 4 samples per symbol. Again,
+     * \p itemsize is `sizeof(gr_complex)` because we're still dealing with
+     * complex samples. \p items_per_symbol is 4, because one item is one
+     * sample. \p guard_interval must be set to 0. The header length is
+     * given in number of PSK symbols.
+     *
+     * \section hpd_uncertainty Handling timing uncertainty on the trigger
+     *
+     * By default, the assumption is made that the trigger arrives on *exactly*
+     * the sample that the header starts. These triggers typically come from
+     * timing synchronization algorithms which may be suboptimal, and have a
+     * known timing uncertainty (e.g., we know the trigger might be a sample
+     * too early or too late).
+     *
+     * The demuxer has an option for this case, the \p header_padding. If this
+     * value is non-zero, it specifies the number of items that are prepended
+     * and appended to the header before copying it to the header output.
+     *
+     * Example: Say our synchronization algorithm can be off by up to two
+     * samples, and the header length is 20 samples. So we set \p header_len
+     * to 20, and \p header_padding to 2.
+     * Now assume a trigger arrives on sample index 100. We copy a total of
+     * 24 samples to the header port, starting at sample index 98.
+     *
+     * The payload is *not* padded. Let's say the header demod reports a
+     * payload length of 100. In the previous examples, we would copy 100
+     * samples to the payload port, starting at sample index 120 (this means
+     * the padded samples appended to the header are copied to both ports!).
+     * However, the header demodulator has the option to specify a payload
+     * offset, which cannot exceed the padding value. To do this, include
+     * a key `payload_offset` in the message sent back to the HPD. A negative
+     * value means the payload starts earlier than otherwise.
+     * (If you wanted to always pad the payload, you could set `payload_offset`
+     * to `-header_padding` and increase the reported length of the payload).
+     *
+     * Because the padding is specified in number of items, and not symbols,
+     * this value can only be multiples of the number of items per symbol *if*
+     * either \p output_symbols is true, or a guard interval is specified (or
+     * both). Note that in practice, it is rare that both a guard interval is
+     * specified *and* a padding value is required. The difference between the
+     * padding value and a guard interval is that a guard interval is part of
+     * the signal, and comes with *every* symbol, whereas the header padding
+     * is added to only the header, and is not by design.
      *
      * \section hpd_tag_handling Tag Handling
      *
@@ -95,12 +144,14 @@ namespace gr {
      * it belongs to this packet or the following. In this case, it is 
possible that the
      * tag might be propagated twice.
      *
-     * Tags outside of packets are generally discarded. If this information is 
important,
-     * there are two additional mechanisms to preserve the tags:
+     * Tags outside of packets are generally discarded. If there are tags that
+     * carry important information that must not be list, there are two
+     * additional mechanisms to preserve the tags:
      * - Timing tags might be relevant to know \b when a packet was received. 
By
      *   specifying the name of a timestamp tag and the sample rate at this 
block, it
      *   keeps track of the time and will add the time to the first item of 
every packet.
-     *   The name of the timestamp tag is usually 'rx_time' (see 
gr::uhd::usrp_source::make()).
+     *   The name of the timestamp tag is usually 'rx_time' (see, e.g.,
+     *   gr::uhd::usrp_source::make()).
      *   The time value must be specified in the UHD time format.
      * - Other tags are simply stored and updated. As an example, the user 
might want to know the
      *   rx frequency, which UHD stores in the rx_freq tag. In this case, add 
the tag name 'rx_freq'
@@ -124,18 +175,20 @@ namespace gr {
        * \param timing_tag_key The name of the tag with timing information, 
usually 'rx_time' or empty (this means timing info is discarded)
        * \param samp_rate Sampling rate at the input. Necessary to calculate 
the rx time of packets.
        * \param special_tags A vector of strings denoting tags which shall be 
preserved (see \ref hpd_tag_handling)
+       * \param header_padding A number of items that is appended and 
prepended to the header.
        */
       static sptr make(
-          int header_len,
-          int items_per_symbol=1,
-          int guard_interval=0,
+          const int header_len,
+          const int items_per_symbol=1,
+          const int guard_interval=0,
           const std::string &length_tag_key="frame_len",
           const std::string &trigger_tag_key="",
-          bool output_symbols=false,
-          size_t itemsize=sizeof(gr_complex),
+          const bool output_symbols=false,
+          const size_t itemsize=sizeof(gr_complex),
           const std::string &timing_tag_key="",
           const double samp_rate=1.0,
-          const std::vector<std::string> 
&special_tags=std::vector<std::string>()
+          const std::vector<std::string> 
&special_tags=std::vector<std::string>(),
+          const size_t header_padding=0
       );
     };
 
diff --git a/gr-digital/lib/header_payload_demux_impl.cc 
b/gr-digital/lib/header_payload_demux_impl.cc
index 89428fa..f887ea1 100644
--- a/gr-digital/lib/header_payload_demux_impl.cc
+++ b/gr-digital/lib/header_payload_demux_impl.cc
@@ -1,5 +1,5 @@
 /* -*- c++ -*- */
-/* Copyright 2012-2014 Free Software Foundation, Inc.
+/* Copyright 2012-2016 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
@@ -23,10 +23,10 @@
 #include "config.h"
 #endif
 
-#include <climits>
-#include <boost/format.hpp>
-#include <gnuradio/io_signature.h>
 #include "header_payload_demux_impl.h"
+#include <gnuradio/io_signature.h>
+#include <boost/format.hpp>
+#include <climits>
 
 namespace gr {
   namespace digital {
@@ -55,55 +55,63 @@ namespace gr {
 
     enum out_port_indexes_t {
       PORT_HEADER = 0,
-      PORT_PAYLOAD = 1
+      PORT_PAYLOAD = 1,
+      PORT_INPUTDATA = 0,
+      PORT_TRIGGER = 1
     };
 
 #define msg_port_id pmt::mp("header_data")
 
     header_payload_demux::sptr
     header_payload_demux::make(
-       int header_len,
-       int items_per_symbol,
-       int guard_interval,
-       const std::string &length_tag_key,
-       const std::string &trigger_tag_key,
-       bool output_symbols,
-       size_t itemsize,
-       const std::string &timing_tag_key,
-       const double samp_rate,
-       const std::vector<std::string> &special_tags
+        int header_len,
+        int items_per_symbol,
+        int guard_interval,
+        const std::string &length_tag_key,
+        const std::string &trigger_tag_key,
+        bool output_symbols,
+        size_t itemsize,
+        const std::string &timing_tag_key,
+        const double samp_rate,
+        const std::vector<std::string> &special_tags,
+        const size_t header_padding
     ){
       return gnuradio::get_initial_sptr (
-         new header_payload_demux_impl(
-           header_len,
-           items_per_symbol,
-           guard_interval,
-           length_tag_key,
-           trigger_tag_key,
-           output_symbols,
-           itemsize,
-           timing_tag_key,
-           samp_rate,
-           special_tags
-         )
+          new header_payload_demux_impl(
+            header_len,
+            items_per_symbol,
+            guard_interval,
+            length_tag_key,
+            trigger_tag_key,
+            output_symbols,
+            itemsize,
+            timing_tag_key,
+            samp_rate,
+            special_tags,
+            header_padding
+          )
       );
     }
 
     header_payload_demux_impl::header_payload_demux_impl(
-       int header_len,
-       int items_per_symbol,
-       int guard_interval,
-       const std::string &length_tag_key,
-       const std::string &trigger_tag_key,
-       bool output_symbols,
-       size_t itemsize,
-       const std::string &timing_tag_key,
-       const double samp_rate,
-       const std::vector<std::string> &special_tags
+        int header_len,
+        int items_per_symbol,
+        int guard_interval,
+        const std::string &length_tag_key,
+        const std::string &trigger_tag_key,
+        bool output_symbols,
+        size_t itemsize,
+        const std::string &timing_tag_key,
+        const double samp_rate,
+        const std::vector<std::string> &special_tags,
+        const size_t header_padding
     ) : block("header_payload_demux",
-                     io_signature::make2(1, 2, itemsize, sizeof(char)),
-                     io_signature::make(2, 2, (output_symbols ? itemsize * 
items_per_symbol : itemsize))),
+          io_signature::make2(1, 2, itemsize, sizeof(char)),
+          io_signature::make(2, 2, (output_symbols ? itemsize * 
items_per_symbol : itemsize))),
       d_header_len(header_len),
+      d_header_padding_symbols(header_padding / items_per_symbol),
+      d_header_padding_items(header_padding % items_per_symbol),
+      d_header_padding_total_items(header_padding),
       d_items_per_symbol(items_per_symbol),
       d_gi(guard_interval),
       d_len_tag_key(pmt::string_to_symbol(length_tag_key)),
@@ -113,32 +121,42 @@ namespace gr {
       d_uses_trigger_tag(!trigger_tag_key.empty()),
       d_state(STATE_FIND_TRIGGER),
       d_curr_payload_len(0),
+      d_curr_payload_offset(0),
       d_payload_tag_keys(0),
       d_payload_tag_values(0),
       d_track_time(!timing_tag_key.empty()),
       d_timing_key(pmt::intern(timing_tag_key)),
+      d_payload_offset_key(pmt::intern("payload_offset")),
       d_last_time_offset(0),
       d_last_time(pmt::make_tuple(pmt::from_uint64(0L), 
pmt::from_double(0.0))),
       d_sampling_time(1.0/samp_rate)
     {
       if (d_header_len < 1) {
-       throw std::invalid_argument("Header length must be at least 1 symbol.");
+        throw std::invalid_argument("Header length must be at least 1 
symbol.");
+      }
+      if (header_padding < 0) {
+        throw std::invalid_argument("Header padding must be non-negative.");
       }
       if (d_items_per_symbol < 1 || d_gi < 0 || d_itemsize < 1) {
-       throw std::invalid_argument("Items and symbol sizes must be at least 
1.");
+        throw std::invalid_argument("Items and symbol sizes must be at least 
1.");
       }
       if (d_output_symbols) {
-       set_relative_rate(1.0 / (d_items_per_symbol + d_gi));
+        set_relative_rate(1.0 / (d_items_per_symbol + d_gi));
       } else {
-       set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + 
d_gi));
-       set_output_multiple(d_items_per_symbol);
+        set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + 
d_gi));
+        set_output_multiple(d_items_per_symbol);
+      }
+      if ((d_output_symbols || d_gi) && d_header_padding_items) {
+        throw std::invalid_argument(
+            "If output_symbols is true or a guard interval is given, padding 
must be a multiple of items_per_symbol!"
+        );
       }
       set_tag_propagation_policy(TPP_DONT);
       message_port_register_in(msg_port_id);
       set_msg_handler(msg_port_id, 
boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1));
-      for (unsigned i = 0; i < special_tags.size(); i++) {
-       d_special_tags.push_back(pmt::string_to_symbol(special_tags[i]));
-       d_special_tags_last_value.push_back(pmt::PMT_NIL);
+      for (size_t i = 0; i < special_tags.size(); i++) {
+        d_special_tags.push_back(pmt::string_to_symbol(special_tags[i]));
+        d_special_tags_last_value.push_back(pmt::PMT_NIL);
       }
     }
 
@@ -146,144 +164,219 @@ namespace gr {
     {
     }
 
+    // forecast() depends on state:
+    // - When waiting for a header, we require at least the header length
+    // - when waiting for a payload, we require at least the payload length
+    // - Otherwise, pretend this is a sync block with a 
decimation/interpolation
+    //   depending on symbol size and if we output symbols or items
     void
-    header_payload_demux_impl::forecast (int noutput_items, gr_vector_int 
&ninput_items_required)
-    {
+    header_payload_demux_impl::forecast(
+        int noutput_items,
+        gr_vector_int &ninput_items_required
+    ) {
       int n_items_reqd = 0;
       if (d_state == STATE_HEADER) {
-       n_items_reqd = d_header_len * (d_items_per_symbol + d_gi);
+        n_items_reqd = d_header_len * (d_items_per_symbol + d_gi)
+                       + 2*d_header_padding_total_items;
       } else if (d_state == STATE_PAYLOAD) {
-       n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi);
+        n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi);
       } else {
-       n_items_reqd = noutput_items * (d_items_per_symbol + d_gi);
-       if (!d_output_symbols) {
-         // here, noutput_items is an integer multiple of d_items_per_symbol!
-         n_items_reqd /= d_items_per_symbol;
-       }
+        n_items_reqd = noutput_items * (d_items_per_symbol + d_gi);
+        if (!d_output_symbols) {
+          // Here, noutput_items is an integer multiple of d_items_per_symbol!
+          n_items_reqd /= d_items_per_symbol;
+        }
       }
 
       for (unsigned i = 0; i < ninput_items_required.size(); i++) {
-       ninput_items_required[i] = n_items_reqd;
+        ninput_items_required[i] = n_items_reqd;
       }
     }
 
 
-    inline bool
-    header_payload_demux_impl::check_items_available(
-       int n_symbols,
-       gr_vector_int &ninput_items,
-       int noutput_items,
-       int nread
-    )
-    {
-      // Check there's enough items on the input
-      if ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[0]-nread)
-         || (ninput_items.size() == 2 && ((n_symbols * (d_items_per_symbol + 
d_gi)) > (ninput_items[1]-nread)))) {
-       return false;
-      }
-
+    bool header_payload_demux_impl::check_buffers_ready(
+        int output_symbols_reqd,
+        int extra_output_items_reqd,
+        int noutput_items,
+        int input_items_reqd,
+        gr_vector_int &ninput_items,
+        int n_items_read
+    ) {
       // Check there's enough space on the output buffer
       if (d_output_symbols) {
-       if (noutput_items < n_symbols) {
-         return false;
-       }
+        if (noutput_items < output_symbols_reqd + extra_output_items_reqd) {
+          return false;
+        }
       } else {
-       if (noutput_items < n_symbols * d_items_per_symbol) {
-         return false;
-       }
+        if (noutput_items < (output_symbols_reqd * d_items_per_symbol) + 
extra_output_items_reqd) {
+          return false;
+        }
+      }
+
+      // Check there's enough items on the input
+      if (input_items_reqd > (ninput_items[0]-n_items_read)
+          || (ninput_items.size() == 2 && (input_items_reqd > 
(ninput_items[1]-n_items_read)))) {
+        return false;
       }
 
+      // All good
       return true;
     }
 
 
     int
-    header_payload_demux_impl::general_work (int noutput_items,
-                       gr_vector_int &ninput_items,
-                       gr_vector_const_void_star &input_items,
-                       gr_vector_void_star &output_items)
-    {
-      const unsigned char *in = (const unsigned char *) input_items[0];
+    header_payload_demux_impl::general_work(
+            int noutput_items,
+            gr_vector_int &ninput_items,
+            gr_vector_const_void_star &input_items,
+            gr_vector_void_star &output_items
+    ) {
+      const unsigned char *in = (const unsigned char *) 
input_items[PORT_INPUTDATA];
       unsigned char *out_header = (unsigned char *) output_items[PORT_HEADER];
       unsigned char *out_payload = (unsigned char *) 
output_items[PORT_PAYLOAD];
 
-      int nread = 0;
-      int trigger_offset = 0;
-
+      const int n_input_items = (ninput_items.size() == 2) ?
+                                std::min(ninput_items[0], ninput_items[1]) :
+                                ninput_items[0];
+      // Items read going into general_work()
+      const uint64_t n_items_read_base = nitems_read(PORT_INPUTDATA);
+      // Items read during this call to general_work()
+      int n_items_read = 0;
+
+      #define CONSUME_ITEMS(items_to_consume) \
+          update_special_tags( \
+              n_items_read_base + n_items_read, \
+              n_items_read_base + n_items_read + (items_to_consume) \
+          ); \
+          consume_each(items_to_consume); \
+          n_items_read += (items_to_consume); \
+          in += (items_to_consume) * d_itemsize;
       switch (d_state) {
-       case STATE_WAIT_FOR_MSG:
-         // In an ideal world, this would never be called
-         return 0;
-
-       case STATE_HEADER_RX_FAIL:
-         update_special_tags(0, 1);
-         consume_each (1);
-         in += d_itemsize;
-         nread++;
-         d_state = STATE_FIND_TRIGGER;
-         // The following break was added to this state as well as 
STATE_FIND_TRIGGER
-      // and STATE_HEADER. There appears to be a bug somewhere in this code 
without
-      // the breaks that can lead to failure of this block. With the breaks in 
the code
-      // testing has shown more stable performance with various block 
paramters.
-      // If an offset calculation bug is found and fixed, it should be 
possible to 
-      // remove these breaks for some performance increase.
-    break; 
-
-       case STATE_FIND_TRIGGER:
-         trigger_offset = find_trigger_signal(nread, noutput_items, 
input_items);
-         if (trigger_offset == -1) {
-           update_special_tags(0, noutput_items - nread);
-           consume_each(noutput_items - nread);
-           break;
-         }
-         update_special_tags(0, trigger_offset);
-         consume_each (trigger_offset);
-         in += trigger_offset * d_itemsize;
-         d_state = STATE_HEADER;
-    break;
-
-       case STATE_HEADER:
-         if (check_items_available(d_header_len, ninput_items, noutput_items, 
nread)) {
-           copy_n_symbols(in, out_header, PORT_HEADER, d_header_len);
-           d_state = STATE_WAIT_FOR_MSG;
-           add_special_tags();
-           produce(
-               PORT_HEADER,
-               d_header_len * (d_output_symbols ? 1 : d_items_per_symbol)
-           );
-         }
-         break;
-
-       case STATE_HEADER_RX_SUCCESS:
-         for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) {
-           add_item_tag(
-               PORT_PAYLOAD,
-               nitems_written(PORT_PAYLOAD),
-               d_payload_tag_keys[i],
-               d_payload_tag_values[i]
-           );
-         }
-         nread += d_header_len * (d_items_per_symbol + d_gi);
-         update_special_tags(0, nread);
-         consume_each (nread);
-         in += nread * d_itemsize;
-         d_state = STATE_PAYLOAD;
-    break;
-
-       case STATE_PAYLOAD:
-         if (check_items_available(d_curr_payload_len, ninput_items, 
noutput_items, nread)) {
-           // The -1 because we won't consume the last item, it might hold the 
next trigger.
-           update_special_tags(0, (d_curr_payload_len - 1) * 
(d_items_per_symbol + d_gi));
-           copy_n_symbols(in, out_payload, PORT_PAYLOAD, d_curr_payload_len);
-           produce(PORT_PAYLOAD, d_curr_payload_len * (d_output_symbols ? 1 : 
d_items_per_symbol));
-           consume_each ((d_curr_payload_len - 1) * (d_items_per_symbol + 
d_gi)); // Same here
-           set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + 
d_gi));
-           d_state = STATE_FIND_TRIGGER;
-         }
-       break;
-
-       default:
-         throw std::runtime_error("invalid state");
+        case STATE_WAIT_FOR_MSG:
+          // In an ideal world, this would never be called
+          // parse_header_data_msg() is the only place that can kick us out
+          // of this state.
+          return 0;
+
+        case STATE_HEADER_RX_FAIL:
+          // Actions:
+          // - Consume a single item to make sure we're not deleting any other
+          //   info
+          CONSUME_ITEMS(1);
+          d_state = STATE_FIND_TRIGGER;
+          break;
+
+        case STATE_FIND_TRIGGER: {
+          // Assumptions going into this state:
+          // - No other state was active for this call to general_work()
+          //   - i.e. n_items_read == 0
+          // Start looking for a trigger after any header padding.
+          // The trigger offset is relative to 'in'.
+          // => The absolute trigger offset is on n_items_read_base + 
n_items_read_base + trigger_offset
+          const int max_rel_offset = n_input_items - n_items_read;
+          const int trigger_offset = find_trigger_signal(
+              d_header_padding_total_items,
+              max_rel_offset,
+              n_items_read_base + n_items_read,
+              (input_items.size() == 2) ?
+                  ((const unsigned char *) input_items[PORT_TRIGGER]) + 
n_items_read : NULL
+          );
+          if (trigger_offset < max_rel_offset) {
+            d_state = STATE_HEADER;
+          }
+          // If we're using padding, don't consume everything, or we might
+          // end up with not enough items before the trigger
+          const int items_to_consume = trigger_offset - 
d_header_padding_total_items;
+          CONSUME_ITEMS(items_to_consume);
+          break;
+        } /* case STATE_FIND_TRIGGER */
+
+        case STATE_HEADER:
+          // Assumptions going into this state:
+          // - The first items on `in' are the header samples (including 
padding)
+          //   - So we can just copy from the beginning of `in'
+          // - The trigger is on item index `d_header_padding * 
d_items_per_symbol'
+          // Actions:
+          // - Copy the entire header (including padding) to the header port
+          //   - Special tags are added to the header port
+          if (check_buffers_ready(
+                d_header_len + 2*d_header_padding_symbols,
+                d_header_padding_items,
+                noutput_items,
+                d_header_len * (d_items_per_symbol + d_gi) + 
2*d_header_padding_total_items,
+                ninput_items,
+                n_items_read)) {
+            add_special_tags();
+            copy_n_symbols(
+                in,
+                out_header,
+                PORT_HEADER,
+                n_items_read_base + n_items_read,
+                d_header_len+2*d_header_padding_symbols, // Number of symbols 
to copy
+                2*d_header_padding_items
+            );
+            d_state = STATE_WAIT_FOR_MSG;
+          }
+          break;
+
+        case STATE_HEADER_RX_SUCCESS:
+          // Copy tags from header to payload
+          for (size_t i = 0; i < d_payload_tag_keys.size(); i++) {
+            add_item_tag(
+                PORT_PAYLOAD,
+                nitems_written(PORT_PAYLOAD),
+                d_payload_tag_keys[i],
+                d_payload_tag_values[i]
+            );
+          }
+          // Consume header from input
+          {
+            // Consume the padding only once, we leave the second
+            // part in there because it might be part of the payload
+            const int items_to_consume =
+                            d_header_len * (d_items_per_symbol + d_gi)
+                            + d_header_padding_total_items
+                            + d_curr_payload_offset;
+            CONSUME_ITEMS(items_to_consume);
+            d_curr_payload_offset = 0;
+            d_state = STATE_PAYLOAD;
+          }
+          break;
+
+        case STATE_PAYLOAD:
+          // Assumptions:
+          // - Input buffer is in the right spot to just start copying
+          if (check_buffers_ready(
+                d_curr_payload_len,
+                0,
+                noutput_items,
+                d_curr_payload_len * (d_items_per_symbol + d_gi),
+                ninput_items,
+                n_items_read)) {
+            // Write payload
+            copy_n_symbols(
+                in,
+                out_payload,
+                PORT_PAYLOAD,
+                n_items_read_base + n_items_read,
+                d_curr_payload_len
+            );
+            // Consume payload
+            // We can't consume the full payload, because we need to hold off
+            // at least the padding value. We'll use a minimum padding of 1
+            // item here.
+            const int items_padding = std::max(d_header_padding_total_items, 
1);
+            const int items_to_consume =
+                      d_curr_payload_len * (d_items_per_symbol + d_gi)
+                      - items_padding;
+            CONSUME_ITEMS(items_to_consume);
+            set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + 
d_gi));
+            d_state = STATE_FIND_TRIGGER;
+          }
+          break;
+
+        default:
+          throw std::runtime_error("invalid state");
       } /* switch */
 
       return WORK_CALLED_PRODUCE;
@@ -292,35 +385,41 @@ namespace gr {
 
     int
     header_payload_demux_impl::find_trigger_signal(
-       int nread,
-       int noutput_items,
-       gr_vector_const_void_star &input_items)
-    {
-      if (input_items.size() == 2) {
-       unsigned char *in_trigger = (unsigned char *) input_items[1];
-       in_trigger += nread;
-       for (int i = 0; i < noutput_items-nread; i++) {
-         if (in_trigger[i]) {
-           return i;
-         }
-       }
+          int skip_items,
+          int max_rel_offset,
+          uint64_t base_offset,
+          const unsigned char *in_trigger
+    ) {
+      int rel_offset = max_rel_offset;
+      if (max_rel_offset < skip_items) {
+        return rel_offset;
+      }
+      if (in_trigger) {
+        for (int i = skip_items; i < max_rel_offset; i++) {
+          if (in_trigger[i]) {
+            rel_offset = i;
+            break;
+          }
+        }
       }
       if (d_uses_trigger_tag) {
         std::vector<tag_t> tags;
-        get_tags_in_range(tags, 0, nitems_read(0), 
nitems_read(0)+noutput_items, d_trigger_tag_key);
-        uint64_t min_offset = ULLONG_MAX;
-        int tag_index = -1;
-        for (unsigned i = 0; i < tags.size(); i++) {
-          if (tags[i].offset < min_offset) {
-            tag_index = (int) i;
-            min_offset = tags[i].offset;
+        get_tags_in_range(
+            tags,
+            PORT_INPUTDATA,
+            base_offset + skip_items,
+            base_offset + max_rel_offset,
+            d_trigger_tag_key
+        );
+        if (!tags.empty()) {
+          std::sort(tags.begin(), tags.end(), tag_t::offset_compare);
+          const int tag_rel_offset = tags[0].offset - base_offset;
+          if (tag_rel_offset < rel_offset) {
+            rel_offset = tag_rel_offset;
           }
         }
-        if (tag_index != -1) {
-          return min_offset - nitems_read(0);
-        }
       }
-      return -1;
+      return rel_offset;
     } /* find_trigger_signal() */
 
 
@@ -332,77 +431,100 @@ namespace gr {
       d_state = STATE_HEADER_RX_FAIL;
 
       if (pmt::is_integer(header_data)) {
-       d_curr_payload_len = pmt::to_long(header_data);
-       d_payload_tag_keys.push_back(d_len_tag_key);
-       d_payload_tag_values.push_back(header_data);
-       d_state = STATE_HEADER_RX_SUCCESS;
+        d_curr_payload_len = pmt::to_long(header_data);
+        d_payload_tag_keys.push_back(d_len_tag_key);
+        d_payload_tag_values.push_back(header_data);
+        d_state = STATE_HEADER_RX_SUCCESS;
       } else if (pmt::is_dict(header_data)) {
-       pmt::pmt_t dict_items(pmt::dict_items(header_data));
-       while (!pmt::is_null(dict_items)) {
-         pmt::pmt_t this_item(pmt::car(dict_items));
-         d_payload_tag_keys.push_back(pmt::car(this_item));
-         d_payload_tag_values.push_back(pmt::cdr(this_item));
-         if (pmt::equal(pmt::car(this_item), d_len_tag_key)) {
-           d_curr_payload_len = pmt::to_long(pmt::cdr(this_item));
-           d_state = STATE_HEADER_RX_SUCCESS;
-         }
-         dict_items = pmt::cdr(dict_items);
-       }
-       if (d_state == STATE_HEADER_RX_FAIL) {
-         GR_LOG_CRIT(d_logger, "no length tag passed from header data");
-       }
+        pmt::pmt_t dict_items(pmt::dict_items(header_data));
+        while (!pmt::is_null(dict_items)) {
+          pmt::pmt_t this_item(pmt::car(dict_items));
+          d_payload_tag_keys.push_back(pmt::car(this_item));
+          d_payload_tag_values.push_back(pmt::cdr(this_item));
+          if (pmt::equal(pmt::car(this_item), d_len_tag_key)) {
+            d_curr_payload_len = pmt::to_long(pmt::cdr(this_item));
+            d_state = STATE_HEADER_RX_SUCCESS;
+          }
+          if (pmt::equal(pmt::car(this_item), d_payload_offset_key)) {
+            d_curr_payload_offset = pmt::to_long(pmt::cdr(this_item));
+            if (std::abs(d_curr_payload_offset) > 
d_header_padding_total_items) {
+              GR_LOG_CRIT(d_logger, "Payload offset exceeds padding");
+              d_state = STATE_HEADER_RX_FAIL;
+              return;
+            }
+          }
+          dict_items = pmt::cdr(dict_items);
+        }
+        if (d_state == STATE_HEADER_RX_FAIL) {
+          GR_LOG_CRIT(d_logger, "no payload length passed from header data");
+        }
       } else if (header_data == pmt::PMT_F || pmt::is_null(header_data)) {
-       GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") % 
pmt::write_string(header_data));
+        GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") % 
pmt::write_string(header_data));
       } else {
-       GR_LOG_ALERT(d_logger, boost::format("Received illegal header data 
(%1%)") % pmt::write_string(header_data));
+        GR_LOG_ALERT(d_logger, boost::format("Received illegal header data 
(%1%)") % pmt::write_string(header_data));
       }
       if (d_state == STATE_HEADER_RX_SUCCESS)
       {
-       if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) 
> max_output_buffer(1)/2) {
-         d_state = STATE_HEADER_RX_FAIL;
-         GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than 
max frame size (%1% symbols)") % d_curr_payload_len);
-       } else {
-         set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : 
d_items_per_symbol));
-       }
+        if (d_curr_payload_len < 0) {
+          GR_LOG_WARN(d_logger, boost::format("Detected a packet larger than 
max frame size (%1% symbols)") % d_curr_payload_len);
+          d_curr_payload_len = 0;
+          d_state = STATE_HEADER_RX_FAIL;
+        }
+        if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) 
> max_output_buffer(1)/2) {
+          d_state = STATE_HEADER_RX_FAIL;
+          GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than 
max frame size (%1% symbols)") % d_curr_payload_len);
+        } else {
+          set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : 
d_items_per_symbol));
+        }
       }
     } /* parse_header_data_msg() */
 
 
     void
     header_payload_demux_impl::copy_n_symbols(
-       const unsigned char *in,
-       unsigned char *out,
-       int port,
-       int n_symbols
-    )
-    {
+        const unsigned char *in,
+        unsigned char *out,
+        int port,
+        const uint64_t n_items_read_base,
+        int n_symbols,
+        int n_padding_items
+    ) {
       // Copy samples
       if (d_gi) {
-       for (int i = 0; i < n_symbols; i++) {
-         memcpy((void *) out, (void *) (in + d_gi * d_itemsize), 
d_items_per_symbol * d_itemsize);
-         in  += d_itemsize * (d_items_per_symbol + d_gi);
-         out += d_itemsize * d_items_per_symbol;
-       }
+        // Here we know n_padding_items must be 0 (see contract),
+        // because all padding items will be part of n_symbols
+        for (int i = 0; i < n_symbols; i++) {
+          memcpy(
+              (void *) out,
+              (void *) (in + d_gi * d_itemsize),
+              d_items_per_symbol * d_itemsize
+          );
+          in  += d_itemsize * (d_items_per_symbol + d_gi);
+          out += d_itemsize * d_items_per_symbol;
+        }
       } else {
-       memcpy(
-           (void *) out,
-           (void *) in,
-           n_symbols * d_items_per_symbol * d_itemsize
-       );
+        memcpy(
+            (void *) out,
+            (void *) in,
+            (n_symbols * d_items_per_symbol + n_padding_items) * d_itemsize
+        );
       }
       // Copy tags
       std::vector<tag_t> tags;
       get_tags_in_range(
-          tags, 0,
-          nitems_read(0),
-          nitems_read(0) + n_symbols * (d_items_per_symbol + d_gi)
+          tags,
+          PORT_INPUTDATA,
+          n_items_read_base,
+          n_items_read_base
+            + n_symbols * (d_items_per_symbol + d_gi)
+            + n_padding_items
       );
       for (size_t t = 0; t < tags.size(); t++) {
         // The trigger tag is *not* propagated
         if (tags[t].key == d_trigger_tag_key) {
           continue;
         }
-        int new_offset = tags[t].offset - nitems_read(0);
+        int new_offset = tags[t].offset - n_items_read_base;
         if (d_output_symbols) {
           new_offset /= (d_items_per_symbol + d_gi);
         } else if (d_gi) {
@@ -418,43 +540,49 @@ namespace gr {
             tags[t].value
         );
       }
+      // Advance write pointers
+      // Items to produce might actually be symbols
+      const int items_to_produce = d_output_symbols ?
+                            n_symbols :
+                            (n_symbols * d_items_per_symbol + n_padding_items);
+      produce(port, items_to_produce);
     } /* copy_n_symbols() */
 
     void
     header_payload_demux_impl::update_special_tags(
-       int range_start,
-       int range_end
+        uint64_t range_start,
+        uint64_t range_end
     ){
       if (d_track_time) {
-       std::vector<tag_t> tags;
-       get_tags_in_range(tags, 0,
-           nitems_read(0) + range_start,
-           nitems_read(0) + range_end,
-           d_timing_key
-       );
-       for (unsigned t = 0; t < tags.size(); t++) {
-         if(tags[t].offset >= d_last_time_offset) {
-           d_last_time = tags[t].value;
-           d_last_time_offset = tags[t].offset;
-         }
-       }
+        std::vector<tag_t> tags;
+        get_tags_in_range(
+            tags,
+            PORT_INPUTDATA,
+            range_start,
+            range_end,
+            d_timing_key
+        );
+        if (!tags.empty()) {
+          std::sort(tags.begin(), tags.end(), tag_t::offset_compare);
+          d_last_time = tags.back().value;
+          d_last_time_offset = tags.back().offset;
+        }
       }
 
       std::vector<tag_t> tags;
-      for (unsigned i = 0; i < d_special_tags.size(); i++) {
-       uint64_t offset = 0;
-       // TODO figure out if it's better to get all tags at once instead of 
doing this for every tag individually
-       get_tags_in_range(tags, 0,
-           nitems_read(0) + range_start,
-           nitems_read(0) + range_end,
-           d_special_tags[i]
-       );
-       for (unsigned t = 0; t < tags.size(); t++) {
-         if(tags[t].offset >= offset) {
-           d_special_tags_last_value[i] = tags[t].value;
-           offset = tags[t].offset;
-         }
-       }
+      for (size_t i = 0; i < d_special_tags.size(); i++) {
+        // TODO figure out if it's better to get all tags at once instead of 
doing this for every tag individually
+        get_tags_in_range(
+            tags,
+            PORT_INPUTDATA, // Read from port 0
+            range_start,
+            range_end,
+            d_special_tags[i]
+        );
+        std::sort(tags.begin(), tags.end(), tag_t::offset_compare);
+        for (size_t t = 0; t < tags.size(); t++) {
+          d_special_tags_last_value[i] = tags[t].value;
+        }
       }
     } /* update_special_tags() */
 
@@ -462,24 +590,24 @@ namespace gr {
     header_payload_demux_impl::add_special_tags(
     ){
       if (d_track_time) {
-       add_item_tag(
-           PORT_HEADER,
-           nitems_written(PORT_HEADER),
-           d_timing_key,
-           _update_pmt_time(
-             d_last_time,
-             d_sampling_time * (nitems_read(0) - d_last_time_offset)
-           )
-       );
+        add_item_tag(
+            PORT_HEADER,
+            nitems_written(PORT_HEADER),
+            d_timing_key,
+            _update_pmt_time(
+              d_last_time,
+              d_sampling_time * (nitems_read(PORT_INPUTDATA) - 
d_last_time_offset)
+            )
+        );
       }
 
       for (unsigned i = 0; i < d_special_tags.size(); i++) {
-       add_item_tag(
-           PORT_HEADER,
-           nitems_written(PORT_HEADER),
-           d_special_tags[i],
-           d_special_tags_last_value[i]
-       );
+        add_item_tag(
+            PORT_HEADER,
+            nitems_written(PORT_HEADER),
+            d_special_tags[i],
+            d_special_tags_last_value[i]
+        );
       }
     } /* add_special_tags() */
 
diff --git a/gr-digital/lib/header_payload_demux_impl.h 
b/gr-digital/lib/header_payload_demux_impl.h
index 1d45dc7..0a70e7d 100644
--- a/gr-digital/lib/header_payload_demux_impl.h
+++ b/gr-digital/lib/header_payload_demux_impl.h
@@ -1,18 +1,18 @@
 /* -*- c++ -*- */
-/* Copyright 2012 Free Software Foundation, Inc.
- * 
+/* Copyright 2012-2016 Free Software Foundation, Inc.
+ *
  * This file is part of GNU Radio
- * 
+ *
  * GNU Radio is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  * the Free Software Foundation; either version 3, or (at your option)
  * any later version.
- * 
+ *
  * GNU Radio is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
- * 
+ *
  * You should have received a copy of the GNU General Public License
  * along with GNU Radio; see the file COPYING.  If not, write to
  * the Free Software Foundation, Inc., 51 Franklin Street,
@@ -31,6 +31,9 @@ namespace gr {
     {
      private:
       int d_header_len; //!< Number of bytes per header
+      const int d_header_padding_symbols; //!< Symbols header padding
+      const int d_header_padding_items; //!< Items header padding
+      const int d_header_padding_total_items; //!< Items header padding
       int d_items_per_symbol; //!< Bytes per symbol
       int d_gi; //!< Bytes per guard interval
       pmt::pmt_t d_len_tag_key; //!< Key of length tag
@@ -40,10 +43,12 @@ namespace gr {
       bool d_uses_trigger_tag; //!< If a trigger tag is used
       int d_state; //!< Current read state
       int d_curr_payload_len; //!< Length of the next payload (symbols)
+      int d_curr_payload_offset; //!< Offset of the next payload (symbols)
       std::vector<pmt::pmt_t> d_payload_tag_keys; //!< Temporary buffer for 
PMTs that go on the payload (keys)
       std::vector<pmt::pmt_t> d_payload_tag_values; //!< Temporary buffer for 
PMTs that go on the payload (values)
       bool d_track_time; //!< Whether or not to keep track of the rx time
       pmt::pmt_t d_timing_key; //!< Key of the timing tag (usually 'rx_time')
+      pmt::pmt_t d_payload_offset_key; //!< Key of payload offset (usually 
'payload_offset')
       uint64_t d_last_time_offset; //!< Item number of the last time tag
       pmt::pmt_t d_last_time; //!< The actual time that was indicated
       double d_sampling_time; //!< Inverse sampling rate
@@ -53,7 +58,14 @@ namespace gr {
       // Helper functions to make the state machine more readable
 
       //! Checks if there are enough items on the inputs and enough space on 
the output buffers to copy \p n_symbols symbols
-      inline bool check_items_available(int n_symbols, gr_vector_int 
&ninput_items, int noutput_items, int nread);
+      bool check_buffers_ready(
+          int output_symbols_reqd,
+          int extra_output_items_reqd,
+          int noutput_items,
+          int input_items_reqd,
+          gr_vector_int &ninput_items,
+          int n_items_read
+      );
 
       //! Message handler: Reads the result from the header demod and sets 
length tag (and other tags)
       void parse_header_data_msg(pmt::pmt_t header_data);
@@ -62,49 +74,54 @@ namespace gr {
       //  Searches input 1 (if active), then the tags. Returns the offset in 
the input buffer
       //  (or -1 if none is found)
       int find_trigger_signal(
-       int nread,
-       int noutput_items,
-       gr_vector_const_void_star &input_items);
+          int skip_items,
+          int noutput_items,
+          uint64_t base_offset,
+          const unsigned char *in_trigger
+      );
 
       //! Copies n symbols from in to out, makes sure tags are propagated 
properly. Does neither consume nor produce.
       void copy_n_symbols(
-         const unsigned char *in,
-         unsigned char *out,
-         int port,
-         int n_symbols
+          const unsigned char *in,
+          unsigned char *out,
+          int port,
+          const uint64_t n_items_read_base,
+          int n_symbols,
+          int n_padding_items=0
       );
 
       //! Scans a given range for tags in d_special_tags
       void update_special_tags(
-         int range_start,
-         int range_end
+          uint64_t range_start,
+          uint64_t range_end
       );
 
       //! Adds all tags in d_special_tags and timing info to the first item of 
the header.
       void add_special_tags();
 
-
      public:
       header_payload_demux_impl(
-       int header_len,
-       int items_per_symbol,
-       int guard_interval,
-       const std::string &length_tag_key,
-       const std::string &trigger_tag_key,
-       bool output_symbols,
-       size_t itemsize,
-       const std::string &timing_tag_key,
-       const double samp_rate,
-       const std::vector<std::string> &special_tags
+          const int header_len,
+          const int items_per_symbol,
+          const int guard_interval,
+          const std::string &length_tag_key,
+          const std::string &trigger_tag_key,
+          const bool output_symbols,
+          const size_t itemsize,
+          const std::string &timing_tag_key,
+          const double samp_rate,
+          const std::vector<std::string> &special_tags,
+          const size_t header_padding
       );
       ~header_payload_demux_impl();
 
       void forecast (int noutput_items, gr_vector_int &ninput_items_required);
 
       int general_work(int noutput_items,
-                      gr_vector_int &ninput_items,
-                      gr_vector_const_void_star &input_items,
-                      gr_vector_void_star &output_items);
+          gr_vector_int &ninput_items,
+          gr_vector_const_void_star &input_items,
+          gr_vector_void_star &output_items
+      );
     };
 
   } // namespace digital
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py 
b/gr-digital/python/digital/qa_header_payload_demux.py
index 8006d44..f36d710 100755
--- a/gr-digital/python/digital/qa_header_payload_demux.py
+++ b/gr-digital/python/digital/qa_header_payload_demux.py
@@ -1,29 +1,69 @@
 #!/usr/bin/env python
-# Copyright 2012 Free Software Foundation, Inc.
-# 
+# Copyright 2012-2016 Free Software Foundation, Inc.
+#
 # This file is part of GNU Radio
-# 
+#
 # GNU Radio is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 # the Free Software Foundation; either version 3, or (at your option)
 # any later version.
-# 
+#
 # GNU Radio is distributed in the hope that it will be useful,
 # but WITHOUT ANY WARRANTY; without even the implied warranty of
 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 # GNU General Public License for more details.
-# 
+#
 # You should have received a copy of the GNU General Public License
 # along with GNU Radio; see the file COPYING.  If not, write to
 # the Free Software Foundation, Inc., 51 Franklin Street,
 # Boston, MA 02110-1301, USA.
-# 
+#
 
+from __future__ import print_function
 import time
-
-from gnuradio import gr, gr_unittest, digital, blocks
+import random
+import numpy
+from gnuradio import gr
+from gnuradio import gr_unittest
+from gnuradio import digital
+from gnuradio import blocks
 import pmt
 
+def make_tag(key, value, offset):
+    tag = gr.tag_t()
+    tag.offset = offset
+    tag.key = pmt.string_to_symbol(key)
+    tag.value = pmt.to_pmt(value)
+    return tag
+
+
+class HeaderToMessageBlock(gr.sync_block):
+    """
+    Helps with testing the HPD. Receives a header, stores it, posts
+    a predetermined message.
+    """
+    def __init__(self, itemsize, header_len, messages, header_is_symbol=False):
+        gr.sync_block.__init__(
+            self,
+            name="HeaderToMessageBlock",
+            in_sig=[itemsize],
+            out_sig=[itemsize],
+        )
+        self.header_len = header_len
+        self.message_port_register_out(pmt.intern('header_data'))
+        self.messages = messages
+        self.msg_count = 0
+
+    def work(self, input_items, output_items):
+        for i in xrange(len(input_items[0])/self.header_len):
+            msg = self.messages[self.msg_count] or False
+            #print("Sending message: {0}".format(msg))
+            self.message_port_pub(pmt.intern('header_data'), pmt.to_pmt(msg))
+            self.msg_count += 1
+        output_items[0][:] = input_items[0][:]
+        return len(input_items[0])
+
+
 class qa_header_payload_demux (gr_unittest.TestCase):
 
     def setUp (self):
@@ -32,6 +72,36 @@ class qa_header_payload_demux (gr_unittest.TestCase):
     def tearDown (self):
         self.tb = None
 
+    def connect_all_blocks(self,
+            data_src, trigger_src,
+            hpd,
+            mock_header_demod,
+            payload_sink, header_sink
+    ):
+        """
+        Connect the standard HPD test flowgraph
+        """
+        self.tb.connect(data_src,    (hpd, 0))
+        if trigger_src is not None:
+            self.tb.connect(trigger_src, (hpd, 1))
+        self.tb.connect((hpd, 0), mock_header_demod)
+        self.tb.connect(mock_header_demod, header_sink)
+        self.tb.msg_connect(
+                mock_header_demod, 'header_data',
+                hpd, 'header_data'
+        )
+        self.tb.connect((hpd, 1), payload_sink)
+
+    def run_tb(self, payload_sink, payload_len, header_sink, header_len, 
timeout=30):
+        stop_time = time.time() + timeout
+        self.tb.start()
+        while len(payload_sink.data()) < payload_len and \
+                len(header_sink.data()) < header_len and \
+                time.time() < stop_time:
+            time.sleep(.2)
+        self.tb.stop()
+        self.tb.wait()
+
     def test_001_t (self):
         """ Simplest possible test: put in zeros, then header,
         then payload, trigger signal, try to demux.
@@ -45,25 +115,13 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         trigger_signal = [0,] * len(data_signal)
         trigger_signal[n_zeros] = 1
         # This is dropped:
-        testtag1 = gr.tag_t()
-        testtag1.offset = 0
-        testtag1.key = pmt.string_to_symbol('tag1')
-        testtag1.value = pmt.from_long(0)
+        testtag1 = make_tag('tag1', 0, 0)
         # This goes on output 0, item 0:
-        testtag2 = gr.tag_t()
-        testtag2.offset = n_zeros
-        testtag2.key = pmt.string_to_symbol('tag2')
-        testtag2.value = pmt.from_long(23)
+        testtag2 = make_tag('tag2', 23, n_zeros)
         # This goes on output 0, item 2:
-        testtag3 = gr.tag_t()
-        testtag3.offset = n_zeros + len(header) - 1
-        testtag3.key = pmt.string_to_symbol('tag3')
-        testtag3.value = pmt.from_long(42)
+        testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1)
         # This goes on output 1, item 3:
-        testtag4 = gr.tag_t()
-        testtag4.offset = n_zeros + len(header) + 3
-        testtag4.key = pmt.string_to_symbol('tag4')
-        testtag4.value = pmt.from_long(314)
+        testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
         data_src = blocks.vector_source_f(
                 data_signal,
                 False,
@@ -73,26 +131,17 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         hpd = digital.header_payload_demux(
             len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
         )
+        mock_header_demod = HeaderToMessageBlock(
+                numpy.float32,
+                len(header),
+                [len(payload)]
+        )
         self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system 
port defined for you
-        header_sink = blocks.vector_sink_f()
         payload_sink = blocks.vector_sink_f()
-
-        self.tb.connect(data_src,    (hpd, 0))
-        self.tb.connect(trigger_src, (hpd, 1))
-        self.tb.connect((hpd, 0), header_sink)
-        self.tb.connect((hpd, 1), payload_sink)
-        self.tb.start()
-        time.sleep(.2) # Need this, otherwise, the next message is ignored
-        hpd.to_basic_block()._post(
-                pmt.intern('header_data'),
-                pmt.from_long(len(payload))
-        )
-        while len(payload_sink.data()) < len(payload):
-            time.sleep(.2)
-        self.tb.stop()
-        self.tb.wait()
-
-        self.assertEqual(header_sink.data(),  header)
+        header_sink = blocks.vector_sink_f()
+        self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, 
payload_sink, header_sink)
+        self.run_tb(payload_sink, len(payload), header_sink, len(header))
+        self.assertEqual(header_sink.data(), header)
         self.assertEqual(payload_sink.data(), payload)
         ptags_header = []
         for tag in header_sink.tags():
@@ -122,30 +171,15 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         payload = tuple(range(5, 20))
         data_signal = (0,) * n_zeros + header + payload
         # Trigger tag
-        trigger_tag = gr.tag_t()
-        trigger_tag.offset = n_zeros
-        trigger_tag.key = pmt.string_to_symbol('detect')
-        trigger_tag.value = pmt.PMT_T
+        trigger_tag = make_tag('detect', True, n_zeros)
         # This is dropped:
-        testtag1 = gr.tag_t()
-        testtag1.offset = 0
-        testtag1.key = pmt.string_to_symbol('tag1')
-        testtag1.value = pmt.from_long(0)
+        testtag1 = make_tag('tag1', 0, 0)
         # This goes on output 0, item 0:
-        testtag2 = gr.tag_t()
-        testtag2.offset = n_zeros
-        testtag2.key = pmt.string_to_symbol('tag2')
-        testtag2.value = pmt.from_long(23)
+        testtag2 = make_tag('tag2', 23, n_zeros)
         # This goes on output 0, item 2:
-        testtag3 = gr.tag_t()
-        testtag3.offset = n_zeros + len(header) - 1
-        testtag3.key = pmt.string_to_symbol('tag3')
-        testtag3.value = pmt.from_long(42)
+        testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1)
         # This goes on output 1, item 3:
-        testtag4 = gr.tag_t()
-        testtag4.offset = n_zeros + len(header) + 3
-        testtag4.key = pmt.string_to_symbol('tag4')
-        testtag4.value = pmt.from_long(314)
+        testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
         data_src = blocks.vector_source_f(
                 data_signal,
                 False,
@@ -157,21 +191,14 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system 
port defined for you
         header_sink = blocks.vector_sink_f()
         payload_sink = blocks.vector_sink_f()
-
-        self.tb.connect(data_src,    (hpd, 0))
-        self.tb.connect((hpd, 0), header_sink)
-        self.tb.connect((hpd, 1), payload_sink)
-        self.tb.start()
-        time.sleep(.2) # Need this, otherwise, the next message is ignored
-        hpd.to_basic_block()._post(
-                pmt.intern('header_data'),
-                pmt.from_long(len(payload))
+        mock_header_demod = HeaderToMessageBlock(
+                numpy.float32,
+                len(header),
+                [len(payload)]
         )
-        while len(payload_sink.data()) < len(payload):
-            time.sleep(.2)
-        self.tb.stop()
-        self.tb.wait()
-
+        self.connect_all_blocks(data_src, None, hpd, mock_header_demod, 
payload_sink, header_sink)
+        self.run_tb(payload_sink, len(payload), header_sink, len(header))
+        # Check results
         self.assertEqual(header_sink.data(),  header)
         self.assertEqual(payload_sink.data(), payload)
         ptags_header = []
@@ -193,8 +220,143 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         ]
         self.assertEqual(expected_tags_payload, ptags_payload)
 
+    def test_001_headerpadding (self):
+        """ Like test 1, but with header padding. """
+        n_zeros = 3
+        header = (1, 2, 3)
+        header_padding = 1
+        payload = tuple(range(5, 20))
+        data_signal = (0,) * n_zeros + header + payload
+        trigger_signal = [0,] * len(data_signal)
+        trigger_signal[n_zeros] = 1
+        # This is dropped:
+        testtag1 = make_tag('tag1', 0, 0)
+        # This goes on output 0, item 0:
+        testtag2 = make_tag('tag2', 23, n_zeros)
+        # This goes on output 0, item 2:
+        testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1)
+        # This goes on output 1, item 3:
+        testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
+        data_src = blocks.vector_source_f(
+                data_signal,
+                False,
+                tags=(testtag1, testtag2, testtag3, testtag4)
+        )
+        trigger_src = blocks.vector_source_b(trigger_signal, False)
+        hpd = digital.header_payload_demux(
+            len(header),
+            1, # Items per symbol
+            0, # Guard interval
+            "frame_len", # TSB tag key
+            "detect", # Trigger tag key
+            False, # No symbols please
+            gr.sizeof_float, # Item size
+            "", # Timing tag key
+            1.0, # Samp rate
+            (), # No special tags
+            header_padding
+        )
+        mock_header_demod = HeaderToMessageBlock(
+            numpy.float32,
+            len(header),
+            [len(payload)]
+        )
+        header_sink = blocks.vector_sink_f()
+        payload_sink = blocks.vector_sink_f()
+        self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, 
payload_sink, header_sink)
+        self.run_tb(payload_sink, len(payload), header_sink, len(header)+2)
+        # Check values
+        # Header now is padded:
+        self.assertEqual(header_sink.data(),  (0,) + header + (payload[0],))
+        self.assertEqual(payload_sink.data(), payload)
+        ptags_header = []
+        for tag in header_sink.tags():
+            ptag = gr.tag_to_python(tag)
+            ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
+        expected_tags_header = [
+                {'key': 'tag2', 'offset': 1},
+                {'key': 'tag3', 'offset': 3},
+        ]
+        self.assertEqual(expected_tags_header, ptags_header)
+        ptags_payload = []
+        for tag in payload_sink.tags():
+            ptag = gr.tag_to_python(tag)
+            ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
+        expected_tags_payload = [
+                {'key': 'frame_len', 'offset': 0},
+                {'key': 'tag4', 'offset': 3},
+        ]
+        self.assertEqual(expected_tags_payload, ptags_payload)
+
+    def test_001_headerpadding_payload_offset (self):
+        """ Like test 1, but with header padding + payload offset. """
+        n_zeros = 3
+        header = (1, 2, 3)
+        header_padding = 1
+        payload_offset = -1
+        payload = tuple(range(5, 20))
+        data_signal = (0,) * n_zeros + header + payload + (0,) * 100
+        trigger_signal = [0,] * len(data_signal)
+        trigger_signal[n_zeros] = 1
+        # This goes on output 1, item 3 + 1 (for payload offset)
+        testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
+        data_src = blocks.vector_source_f(
+                data_signal,
+                False,
+                tags=(testtag4,)
+        )
+        trigger_src = blocks.vector_source_b(trigger_signal, False)
+        hpd = digital.header_payload_demux(
+            len(header),
+            1, # Items per symbol
+            0, # Guard interval
+            "frame_len", # TSB tag key
+            "detect", # Trigger tag key
+            False, # No symbols please
+            gr.sizeof_float, # Item size
+            "", # Timing tag key
+            1.0, # Samp rate
+            (), # No special tags
+            header_padding
+        )
+        self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system 
port defined for you
+        header_sink = blocks.vector_sink_f()
+        payload_sink = blocks.vector_sink_f()
+        self.tb.connect(data_src,    (hpd, 0))
+        self.tb.connect(trigger_src, (hpd, 1))
+        self.tb.connect((hpd, 0), header_sink)
+        self.tb.connect((hpd, 1), payload_sink)
+        self.tb.start()
+        time.sleep(.2) # Need this, otherwise, the next message is ignored
+        hpd.to_basic_block()._post(
+                pmt.intern('header_data'),
+                pmt.to_pmt({'frame_len': len(payload), 'payload_offset': 
payload_offset})
+        )
+        while len(payload_sink.data()) < len(payload):
+            time.sleep(.2)
+        self.tb.stop()
+        self.tb.wait()
+        # Header is now padded:
+        self.assertEqual(header_sink.data(),  (0,) + header + (payload[0],))
+        # Payload is now offset:
+        self.assertEqual(
+                payload_sink.data(),
+                data_signal[n_zeros + len(header) + payload_offset:n_zeros + 
len(header) + payload_offset + len(payload)]
+        )
+        ptags_payload = {}
+        for tag in payload_sink.tags():
+            ptag = gr.tag_to_python(tag)
+            ptags_payload[ptag.key] = ptag.offset
+        expected_tags_payload = {
+                'frame_len': 0,
+                'payload_offset': 0,
+                'tag4': 3 - payload_offset,
+        }
+        self.assertEqual(expected_tags_payload, ptags_payload)
+
+
     def test_002_symbols (self):
-        """ 
+        """
         Same as before, but operate on symbols
         """
         n_zeros = 1
@@ -207,25 +369,13 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         trigger_signal = [0,] * len(data_signal)
         trigger_signal[n_zeros] = 1
         # This is dropped:
-        testtag1 = gr.tag_t()
-        testtag1.offset = 0
-        testtag1.key = pmt.string_to_symbol('tag1')
-        testtag1.value = pmt.from_long(0)
+        testtag1 = make_tag('tag1', 0, 0)
         # This goes on output 0, item 0 (from the GI)
-        testtag2 = gr.tag_t()
-        testtag2.offset = n_zeros
-        testtag2.key = pmt.string_to_symbol('tag2')
-        testtag2.value = pmt.from_long(23)
+        testtag2 = make_tag('tag2', 23, n_zeros)
         # This goes on output 0, item 0 (middle of the header symbol)
-        testtag3 = gr.tag_t()
-        testtag3.offset = n_zeros + gi + 1
-        testtag3.key = pmt.string_to_symbol('tag3')
-        testtag3.value = pmt.from_long(42)
+        testtag3 = make_tag('tag3', 42, n_zeros + gi + 1)
         # This goes on output 1, item 1 (middle of the first payload symbol)
-        testtag4 = gr.tag_t()
-        testtag4.offset = n_zeros + (gi + items_per_symbol) * 2 + 1
-        testtag4.key = pmt.string_to_symbol('tag4')
-        testtag4.value = pmt.from_long(314)
+        testtag4 = make_tag('tag4', 314, n_zeros + (gi + items_per_symbol) * 2 
+ 1)
         data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, 
testtag2, testtag3, testtag4))
         trigger_src = blocks.vector_source_b(trigger_signal, False)
         hpd = digital.header_payload_demux(
@@ -291,25 +441,20 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         trigger_signal[n_zeros] = 1
         trigger_signal[len(data_signal)] = 1
         trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1
-        tx_signal = data_signal + header_fail + (0,) * n_zeros + header + 
payload2 + (0,) * 1000
+        print("Triggers at: {0} {1} {2}".format(
+            n_zeros,
+            len(data_signal),
+            len(data_signal)+len(header_fail)+n_zeros)
+        )
+        tx_signal = data_signal + \
+                header_fail + (0,) * n_zeros + \
+                header + payload2 + (0,) * 1000
         # Timing tag: This is preserved and updated:
-        timing_tag = gr.tag_t()
-        timing_tag.offset = 0
-        timing_tag.key = pmt.string_to_symbol('rx_time')
-        timing_tag.value = pmt.to_pmt((0, 0))
+        timing_tag = make_tag('rx_time', (0, 0), 0)
         # Rx freq tags:
-        rx_freq_tag1 = gr.tag_t()
-        rx_freq_tag1.offset = 0
-        rx_freq_tag1.key = pmt.string_to_symbol('rx_freq')
-        rx_freq_tag1.value = pmt.from_double(1.0)
-        rx_freq_tag2 = gr.tag_t()
-        rx_freq_tag2.offset = 29
-        rx_freq_tag2.key = pmt.string_to_symbol('rx_freq')
-        rx_freq_tag2.value = pmt.from_double(1.5)
-        rx_freq_tag3 = gr.tag_t()
-        rx_freq_tag3.offset = 30
-        rx_freq_tag3.key = pmt.string_to_symbol('rx_freq')
-        rx_freq_tag3.value = pmt.from_double(2.0)
+        rx_freq_tag1 = make_tag('rx_freq', 1.0, 0)
+        rx_freq_tag2 = make_tag('rx_freq', 1.5, 29)
+        rx_freq_tag3 = make_tag('rx_freq', 2.0, 30)
         ### Flow graph
         data_src = blocks.vector_source_f(
             tx_signal, False,
@@ -388,6 +533,92 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         self.assertEqual(tags_header, tags_expected_header)
         self.assertEqual(tags_payload, tags_expected_payload)
 
+    def test_004_fuzz(self):
+        """
+        Long random test
+        """
+        def create_signal(
+                n_bursts,
+                header_len,
+                max_gap,
+                max_burstsize,
+                fail_rate,
+        ):
+            signal = []
+            indexes = []
+            burst_sizes = []
+            total_payload_len = 0
+            for burst_count in xrange(n_bursts):
+                gap_size = random.randint(0, max_gap)
+                signal += [0] * gap_size
+                is_failure = random.random() < fail_rate
+                if not is_failure:
+                    burst_size = random.randint(0, max_burstsize)
+                else:
+                    burst_size = 0
+                total_payload_len += burst_size
+                indexes += [len(signal)]
+                signal += [1] * header_len
+                signal += [2] * burst_size
+                burst_sizes += [burst_size]
+            return (signal, indexes, total_payload_len, burst_sizes)
+        def indexes_to_triggers(indexes, signal_len):
+            """
+            Convert indexes to a mix of trigger signals and tags
+            """
+            trigger_signal = [0] * signal_len
+            trigger_tags = []
+            for index in indexes:
+                if random.random() > 0.5:
+                    trigger_signal[index] = 1
+                else:
+                    trigger_tags += [make_tag('detect', True, index)]
+            return (trigger_signal, trigger_tags)
+        ### Go, go, go
+        # The divide-by-20 means we'll usually get the same random seed
+        # between the first run and the XML run.
+        random_seed = int(time.time()/20)
+        random.seed(random_seed)
+        print("Random seed: {0}".format(random_seed))
+        n_bursts = 400
+        header_len = 5
+        max_gap = 50
+        max_burstsize = 100
+        fail_rate = 0.05
+        signal, indexes, total_payload_len, burst_sizes = create_signal(
+            n_bursts, header_len, max_gap, max_burstsize, fail_rate
+        )
+        trigger_signal, trigger_tags = indexes_to_triggers(indexes, 
len(signal))
+        # Flow graph
+        data_src = blocks.vector_source_f(
+            signal, False,
+            tags=trigger_tags
+        )
+        trigger_src = blocks.vector_source_b(trigger_signal, False)
+        hpd = digital.header_payload_demux(
+            header_len=header_len,
+            items_per_symbol=1,
+            guard_interval=0,
+            length_tag_key="frame_len",
+            trigger_tag_key="detect",
+            output_symbols=False,
+            itemsize=gr.sizeof_float,
+            timing_tag_key='rx_time',
+            samp_rate=1.0,
+            special_tags=('rx_freq',),
+        )
+        mock_header_demod = HeaderToMessageBlock(
+                numpy.float32,
+                header_len,
+                burst_sizes
+        )
+        header_sink = blocks.vector_sink_f()
+        payload_sink = blocks.vector_sink_f()
+        self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, 
payload_sink, header_sink)
+        self.run_tb(payload_sink, total_payload_len, header_sink, 
header_len*n_bursts)
+        self.assertEqual(header_sink.data(), tuple([1]*header_len*n_bursts))
+        self.assertEqual(payload_sink.data(), tuple([2]*total_payload_len))
+
 if __name__ == '__main__':
     gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml")
 



reply via email to

[Prev in Thread] Current Thread [Next in Thread]