tf.data: Combining multiple from_generator() datasets to create batches padded across time windows











up vote
1
down vote

favorite












I am working on a timeseries problem where each timeseries is fairly long (10^3-10^4 timesteps, and each timeseries is of different length).



For each sequence, I can define a Python generator that yields values one timestep at a time. I am using the tf.data.Dataset.from_generator() constructor to wrap these generators into the tf.data API. The documentation suggests using from_generator() along with the tf.contrib.data.parallel_interleave() transformation to parallelize the extraction from my Python generators.



My downstream use for these data is a stateful RNN (e.g. LSTM or GRU). I want to chunk up the timeseries into smaller (~10^2) windows and use each chunk as a training example (i.e., truncated BPTT). Since my data are streaming, I think that means saving up window_size timesteps of each generator before passing it on through the pipeline, to be batched with the other generators' data. I also want to save the RNN state across these chunks so I can still learn long-term dependencies.



My issue comes with wanting to create padded batches of these generators' batched outputs. Ideally, I would want to present to my neural network windows of the generator outputs, with padding as necessary when some subset of the generators exhaust themselves before others. I know that if I consume the entire generator output for each generator, then use Dataset.padded_batch() I can do this (and can then slice the padded batch across the time dimension into windowed chunks as necessary). However, I want to pass each window to the neural network it becomes available. If one of the generators exhausts itself before the others, I want to pad it with the padding value until all others have, so I can reset the RNN state and begin the next batch of generators with an empty initial RNN state. I am stuck here because the dataset resulting from tf.contrib.data.parallel_interleave() transformation discards each generator when it becomes exhausted, and the timeseries do not maintain a consistent ordering across samples from it.



Here is a small example:



import tensorflow as tf

def stepwise_generator(length):
for i in range(length):
yield i

lengths = list(range(1,10,2)) # [1, 3, 5, 7, 9]

window_length = 4
batch_size = 3

dataset = tf.data.Dataset.from_tensor_slices(lengths)

gen = lambda length: tf.data.Dataset.from_generator(
stepwise_generator, tf.float32, output_shapes=, args=(length,)
).batch(window_length) # this batching saves window_length timesteps per generator

dataset = dataset.apply(
tf.contrib.data.parallel_interleave(gen, cycle_length=batch_size)
)

dataset = dataset.padded_batch(batch_size, (-1,), np.inf)
# batching 3 generators at once, and padding exhausted ones with inf.
# using a batch_size value no more than cycle_length above means we
# shouldn't start a new generator mid-batch (i think)

iterator = dataset.make_one_shot_iterator()
tensor = iterator.get_next()

outs =
with tf.Session() as sess:
while True:
try:
out = sess.run(tensor)
outs.append(out)
except tf.errors.OutOfRangeError:
break

print(np.asarray(outs))


Output:



[[[ 0. inf inf inf]   # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]

[[ 4. inf inf inf] # batch 2 - the generator in index -1 in the
[ 0. 1. 2. 3.] # previous batch gets cycled to index 0 and two
[ 0. 1. 2. 3.]] # new generators are initiated

[[ 4. 5. 6. inf] # batch 3 - more generator cycling, and the one in
[ 4. 5. 6. 7.] # index 1 also gets cycled to index 2 in the same
[ 8. inf inf inf]]] # batch (because we have run out of generators in
# parallel_interleave)


My desired output would be something like



[[[ 0. inf inf inf]   # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]

[[inf] # batch 2 - the leftover timestep from a padded
[inf] # batch of the first 3 generators
[4. ]]

[[ 0. 1. 2. 3.] # batch 3 - only two generators are left so this is
[ 0. 1. 2. 3.]] # an end-of-epoch smaller batch

[[ 4. 5. 6. inf] # batch 4
[ 4. 5. 6. 7.]]

[[inf] # batch 5
[ 8.]]]


Here, the internal states of the RNNs would be reset after batch 2 and 5.



Again, the desired output can be simple to create if I consume the entirety of each generator's output, then pad, batch, and slice, but I want to produce batches as the generators, which may be each receiving data in real-time from e.g. a separate simulation, make them available.










share|improve this question




























    up vote
    1
    down vote

    favorite












    I am working on a timeseries problem where each timeseries is fairly long (10^3-10^4 timesteps, and each timeseries is of different length).



    For each sequence, I can define a Python generator that yields values one timestep at a time. I am using the tf.data.Dataset.from_generator() constructor to wrap these generators into the tf.data API. The documentation suggests using from_generator() along with the tf.contrib.data.parallel_interleave() transformation to parallelize the extraction from my Python generators.



    My downstream use for these data is a stateful RNN (e.g. LSTM or GRU). I want to chunk up the timeseries into smaller (~10^2) windows and use each chunk as a training example (i.e., truncated BPTT). Since my data are streaming, I think that means saving up window_size timesteps of each generator before passing it on through the pipeline, to be batched with the other generators' data. I also want to save the RNN state across these chunks so I can still learn long-term dependencies.



    My issue comes with wanting to create padded batches of these generators' batched outputs. Ideally, I would want to present to my neural network windows of the generator outputs, with padding as necessary when some subset of the generators exhaust themselves before others. I know that if I consume the entire generator output for each generator, then use Dataset.padded_batch() I can do this (and can then slice the padded batch across the time dimension into windowed chunks as necessary). However, I want to pass each window to the neural network it becomes available. If one of the generators exhausts itself before the others, I want to pad it with the padding value until all others have, so I can reset the RNN state and begin the next batch of generators with an empty initial RNN state. I am stuck here because the dataset resulting from tf.contrib.data.parallel_interleave() transformation discards each generator when it becomes exhausted, and the timeseries do not maintain a consistent ordering across samples from it.



    Here is a small example:



    import tensorflow as tf

    def stepwise_generator(length):
    for i in range(length):
    yield i

    lengths = list(range(1,10,2)) # [1, 3, 5, 7, 9]

    window_length = 4
    batch_size = 3

    dataset = tf.data.Dataset.from_tensor_slices(lengths)

    gen = lambda length: tf.data.Dataset.from_generator(
    stepwise_generator, tf.float32, output_shapes=, args=(length,)
    ).batch(window_length) # this batching saves window_length timesteps per generator

    dataset = dataset.apply(
    tf.contrib.data.parallel_interleave(gen, cycle_length=batch_size)
    )

    dataset = dataset.padded_batch(batch_size, (-1,), np.inf)
    # batching 3 generators at once, and padding exhausted ones with inf.
    # using a batch_size value no more than cycle_length above means we
    # shouldn't start a new generator mid-batch (i think)

    iterator = dataset.make_one_shot_iterator()
    tensor = iterator.get_next()

    outs =
    with tf.Session() as sess:
    while True:
    try:
    out = sess.run(tensor)
    outs.append(out)
    except tf.errors.OutOfRangeError:
    break

    print(np.asarray(outs))


    Output:



    [[[ 0. inf inf inf]   # batch 1
    [ 0. 1. 2. inf]
    [ 0. 1. 2. 3.]]

    [[ 4. inf inf inf] # batch 2 - the generator in index -1 in the
    [ 0. 1. 2. 3.] # previous batch gets cycled to index 0 and two
    [ 0. 1. 2. 3.]] # new generators are initiated

    [[ 4. 5. 6. inf] # batch 3 - more generator cycling, and the one in
    [ 4. 5. 6. 7.] # index 1 also gets cycled to index 2 in the same
    [ 8. inf inf inf]]] # batch (because we have run out of generators in
    # parallel_interleave)


    My desired output would be something like



    [[[ 0. inf inf inf]   # batch 1
    [ 0. 1. 2. inf]
    [ 0. 1. 2. 3.]]

    [[inf] # batch 2 - the leftover timestep from a padded
    [inf] # batch of the first 3 generators
    [4. ]]

    [[ 0. 1. 2. 3.] # batch 3 - only two generators are left so this is
    [ 0. 1. 2. 3.]] # an end-of-epoch smaller batch

    [[ 4. 5. 6. inf] # batch 4
    [ 4. 5. 6. 7.]]

    [[inf] # batch 5
    [ 8.]]]


    Here, the internal states of the RNNs would be reset after batch 2 and 5.



    Again, the desired output can be simple to create if I consume the entirety of each generator's output, then pad, batch, and slice, but I want to produce batches as the generators, which may be each receiving data in real-time from e.g. a separate simulation, make them available.










    share|improve this question


























      up vote
      1
      down vote

      favorite









      up vote
      1
      down vote

      favorite











      I am working on a timeseries problem where each timeseries is fairly long (10^3-10^4 timesteps, and each timeseries is of different length).



      For each sequence, I can define a Python generator that yields values one timestep at a time. I am using the tf.data.Dataset.from_generator() constructor to wrap these generators into the tf.data API. The documentation suggests using from_generator() along with the tf.contrib.data.parallel_interleave() transformation to parallelize the extraction from my Python generators.



      My downstream use for these data is a stateful RNN (e.g. LSTM or GRU). I want to chunk up the timeseries into smaller (~10^2) windows and use each chunk as a training example (i.e., truncated BPTT). Since my data are streaming, I think that means saving up window_size timesteps of each generator before passing it on through the pipeline, to be batched with the other generators' data. I also want to save the RNN state across these chunks so I can still learn long-term dependencies.



      My issue comes with wanting to create padded batches of these generators' batched outputs. Ideally, I would want to present to my neural network windows of the generator outputs, with padding as necessary when some subset of the generators exhaust themselves before others. I know that if I consume the entire generator output for each generator, then use Dataset.padded_batch() I can do this (and can then slice the padded batch across the time dimension into windowed chunks as necessary). However, I want to pass each window to the neural network it becomes available. If one of the generators exhausts itself before the others, I want to pad it with the padding value until all others have, so I can reset the RNN state and begin the next batch of generators with an empty initial RNN state. I am stuck here because the dataset resulting from tf.contrib.data.parallel_interleave() transformation discards each generator when it becomes exhausted, and the timeseries do not maintain a consistent ordering across samples from it.



      Here is a small example:



      import tensorflow as tf

      def stepwise_generator(length):
      for i in range(length):
      yield i

      lengths = list(range(1,10,2)) # [1, 3, 5, 7, 9]

      window_length = 4
      batch_size = 3

      dataset = tf.data.Dataset.from_tensor_slices(lengths)

      gen = lambda length: tf.data.Dataset.from_generator(
      stepwise_generator, tf.float32, output_shapes=, args=(length,)
      ).batch(window_length) # this batching saves window_length timesteps per generator

      dataset = dataset.apply(
      tf.contrib.data.parallel_interleave(gen, cycle_length=batch_size)
      )

      dataset = dataset.padded_batch(batch_size, (-1,), np.inf)
      # batching 3 generators at once, and padding exhausted ones with inf.
      # using a batch_size value no more than cycle_length above means we
      # shouldn't start a new generator mid-batch (i think)

      iterator = dataset.make_one_shot_iterator()
      tensor = iterator.get_next()

      outs =
      with tf.Session() as sess:
      while True:
      try:
      out = sess.run(tensor)
      outs.append(out)
      except tf.errors.OutOfRangeError:
      break

      print(np.asarray(outs))


      Output:



      [[[ 0. inf inf inf]   # batch 1
      [ 0. 1. 2. inf]
      [ 0. 1. 2. 3.]]

      [[ 4. inf inf inf] # batch 2 - the generator in index -1 in the
      [ 0. 1. 2. 3.] # previous batch gets cycled to index 0 and two
      [ 0. 1. 2. 3.]] # new generators are initiated

      [[ 4. 5. 6. inf] # batch 3 - more generator cycling, and the one in
      [ 4. 5. 6. 7.] # index 1 also gets cycled to index 2 in the same
      [ 8. inf inf inf]]] # batch (because we have run out of generators in
      # parallel_interleave)


      My desired output would be something like



      [[[ 0. inf inf inf]   # batch 1
      [ 0. 1. 2. inf]
      [ 0. 1. 2. 3.]]

      [[inf] # batch 2 - the leftover timestep from a padded
      [inf] # batch of the first 3 generators
      [4. ]]

      [[ 0. 1. 2. 3.] # batch 3 - only two generators are left so this is
      [ 0. 1. 2. 3.]] # an end-of-epoch smaller batch

      [[ 4. 5. 6. inf] # batch 4
      [ 4. 5. 6. 7.]]

      [[inf] # batch 5
      [ 8.]]]


      Here, the internal states of the RNNs would be reset after batch 2 and 5.



      Again, the desired output can be simple to create if I consume the entirety of each generator's output, then pad, batch, and slice, but I want to produce batches as the generators, which may be each receiving data in real-time from e.g. a separate simulation, make them available.










      share|improve this question















      I am working on a timeseries problem where each timeseries is fairly long (10^3-10^4 timesteps, and each timeseries is of different length).



      For each sequence, I can define a Python generator that yields values one timestep at a time. I am using the tf.data.Dataset.from_generator() constructor to wrap these generators into the tf.data API. The documentation suggests using from_generator() along with the tf.contrib.data.parallel_interleave() transformation to parallelize the extraction from my Python generators.



      My downstream use for these data is a stateful RNN (e.g. LSTM or GRU). I want to chunk up the timeseries into smaller (~10^2) windows and use each chunk as a training example (i.e., truncated BPTT). Since my data are streaming, I think that means saving up window_size timesteps of each generator before passing it on through the pipeline, to be batched with the other generators' data. I also want to save the RNN state across these chunks so I can still learn long-term dependencies.



      My issue comes with wanting to create padded batches of these generators' batched outputs. Ideally, I would want to present to my neural network windows of the generator outputs, with padding as necessary when some subset of the generators exhaust themselves before others. I know that if I consume the entire generator output for each generator, then use Dataset.padded_batch() I can do this (and can then slice the padded batch across the time dimension into windowed chunks as necessary). However, I want to pass each window to the neural network it becomes available. If one of the generators exhausts itself before the others, I want to pad it with the padding value until all others have, so I can reset the RNN state and begin the next batch of generators with an empty initial RNN state. I am stuck here because the dataset resulting from tf.contrib.data.parallel_interleave() transformation discards each generator when it becomes exhausted, and the timeseries do not maintain a consistent ordering across samples from it.



      Here is a small example:



      import tensorflow as tf

      def stepwise_generator(length):
      for i in range(length):
      yield i

      lengths = list(range(1,10,2)) # [1, 3, 5, 7, 9]

      window_length = 4
      batch_size = 3

      dataset = tf.data.Dataset.from_tensor_slices(lengths)

      gen = lambda length: tf.data.Dataset.from_generator(
      stepwise_generator, tf.float32, output_shapes=, args=(length,)
      ).batch(window_length) # this batching saves window_length timesteps per generator

      dataset = dataset.apply(
      tf.contrib.data.parallel_interleave(gen, cycle_length=batch_size)
      )

      dataset = dataset.padded_batch(batch_size, (-1,), np.inf)
      # batching 3 generators at once, and padding exhausted ones with inf.
      # using a batch_size value no more than cycle_length above means we
      # shouldn't start a new generator mid-batch (i think)

      iterator = dataset.make_one_shot_iterator()
      tensor = iterator.get_next()

      outs =
      with tf.Session() as sess:
      while True:
      try:
      out = sess.run(tensor)
      outs.append(out)
      except tf.errors.OutOfRangeError:
      break

      print(np.asarray(outs))


      Output:



      [[[ 0. inf inf inf]   # batch 1
      [ 0. 1. 2. inf]
      [ 0. 1. 2. 3.]]

      [[ 4. inf inf inf] # batch 2 - the generator in index -1 in the
      [ 0. 1. 2. 3.] # previous batch gets cycled to index 0 and two
      [ 0. 1. 2. 3.]] # new generators are initiated

      [[ 4. 5. 6. inf] # batch 3 - more generator cycling, and the one in
      [ 4. 5. 6. 7.] # index 1 also gets cycled to index 2 in the same
      [ 8. inf inf inf]]] # batch (because we have run out of generators in
      # parallel_interleave)


      My desired output would be something like



      [[[ 0. inf inf inf]   # batch 1
      [ 0. 1. 2. inf]
      [ 0. 1. 2. 3.]]

      [[inf] # batch 2 - the leftover timestep from a padded
      [inf] # batch of the first 3 generators
      [4. ]]

      [[ 0. 1. 2. 3.] # batch 3 - only two generators are left so this is
      [ 0. 1. 2. 3.]] # an end-of-epoch smaller batch

      [[ 4. 5. 6. inf] # batch 4
      [ 4. 5. 6. 7.]]

      [[inf] # batch 5
      [ 8.]]]


      Here, the internal states of the RNNs would be reset after batch 2 and 5.



      Again, the desired output can be simple to create if I consume the entirety of each generator's output, then pad, batch, and slice, but I want to produce batches as the generators, which may be each receiving data in real-time from e.g. a separate simulation, make them available.







      python tensorflow tensorflow-datasets






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 9 at 21:40

























      asked Nov 9 at 20:52









      mattw

      62




      62





























          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53233123%2ftf-data-combining-multiple-from-generator-datasets-to-create-batches-padded-a%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown






























          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53233123%2ftf-data-combining-multiple-from-generator-datasets-to-create-batches-padded-a%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Schultheiß

          Liste der Kulturdenkmale in Wilsdruff

          Android Play Services Check