tf.data: Combining multiple from_generator() datasets to create batches padded across time windows
up vote
1
down vote
favorite
I am working on a timeseries problem where each timeseries is fairly long (10^3-10^4 timesteps, and each timeseries is of different length).
For each sequence, I can define a Python generator that yields values one timestep at a time. I am using the tf.data.Dataset.from_generator()
constructor to wrap these generators into the tf.data API. The documentation suggests using from_generator()
along with the tf.contrib.data.parallel_interleave()
transformation to parallelize the extraction from my Python generators.
My downstream use for these data is a stateful RNN (e.g. LSTM or GRU). I want to chunk up the timeseries into smaller (~10^2) windows and use each chunk as a training example (i.e., truncated BPTT). Since my data are streaming, I think that means saving up window_size
timesteps of each generator before passing it on through the pipeline, to be batched with the other generators' data. I also want to save the RNN state across these chunks so I can still learn long-term dependencies.
My issue comes with wanting to create padded batches of these generators' batched outputs. Ideally, I would want to present to my neural network windows of the generator outputs, with padding as necessary when some subset of the generators exhaust themselves before others. I know that if I consume the entire generator output for each generator, then use Dataset.padded_batch()
I can do this (and can then slice the padded batch across the time dimension into windowed chunks as necessary). However, I want to pass each window to the neural network it becomes available. If one of the generators exhausts itself before the others, I want to pad it with the padding value until all others have, so I can reset the RNN state and begin the next batch of generators with an empty initial RNN state. I am stuck here because the dataset resulting from tf.contrib.data.parallel_interleave()
transformation discards each generator when it becomes exhausted, and the timeseries do not maintain a consistent ordering across samples from it.
Here is a small example:
import tensorflow as tf
def stepwise_generator(length):
for i in range(length):
yield i
lengths = list(range(1,10,2)) # [1, 3, 5, 7, 9]
window_length = 4
batch_size = 3
dataset = tf.data.Dataset.from_tensor_slices(lengths)
gen = lambda length: tf.data.Dataset.from_generator(
stepwise_generator, tf.float32, output_shapes=, args=(length,)
).batch(window_length) # this batching saves window_length timesteps per generator
dataset = dataset.apply(
tf.contrib.data.parallel_interleave(gen, cycle_length=batch_size)
)
dataset = dataset.padded_batch(batch_size, (-1,), np.inf)
# batching 3 generators at once, and padding exhausted ones with inf.
# using a batch_size value no more than cycle_length above means we
# shouldn't start a new generator mid-batch (i think)
iterator = dataset.make_one_shot_iterator()
tensor = iterator.get_next()
outs =
with tf.Session() as sess:
while True:
try:
out = sess.run(tensor)
outs.append(out)
except tf.errors.OutOfRangeError:
break
print(np.asarray(outs))
Output:
[[[ 0. inf inf inf] # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]
[[ 4. inf inf inf] # batch 2 - the generator in index -1 in the
[ 0. 1. 2. 3.] # previous batch gets cycled to index 0 and two
[ 0. 1. 2. 3.]] # new generators are initiated
[[ 4. 5. 6. inf] # batch 3 - more generator cycling, and the one in
[ 4. 5. 6. 7.] # index 1 also gets cycled to index 2 in the same
[ 8. inf inf inf]]] # batch (because we have run out of generators in
# parallel_interleave)
My desired output would be something like
[[[ 0. inf inf inf] # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]
[[inf] # batch 2 - the leftover timestep from a padded
[inf] # batch of the first 3 generators
[4. ]]
[[ 0. 1. 2. 3.] # batch 3 - only two generators are left so this is
[ 0. 1. 2. 3.]] # an end-of-epoch smaller batch
[[ 4. 5. 6. inf] # batch 4
[ 4. 5. 6. 7.]]
[[inf] # batch 5
[ 8.]]]
Here, the internal states of the RNNs would be reset after batch 2 and 5.
Again, the desired output can be simple to create if I consume the entirety of each generator's output, then pad, batch, and slice, but I want to produce batches as the generators, which may be each receiving data in real-time from e.g. a separate simulation, make them available.
python tensorflow tensorflow-datasets
add a comment |
up vote
1
down vote
favorite
I am working on a timeseries problem where each timeseries is fairly long (10^3-10^4 timesteps, and each timeseries is of different length).
For each sequence, I can define a Python generator that yields values one timestep at a time. I am using the tf.data.Dataset.from_generator()
constructor to wrap these generators into the tf.data API. The documentation suggests using from_generator()
along with the tf.contrib.data.parallel_interleave()
transformation to parallelize the extraction from my Python generators.
My downstream use for these data is a stateful RNN (e.g. LSTM or GRU). I want to chunk up the timeseries into smaller (~10^2) windows and use each chunk as a training example (i.e., truncated BPTT). Since my data are streaming, I think that means saving up window_size
timesteps of each generator before passing it on through the pipeline, to be batched with the other generators' data. I also want to save the RNN state across these chunks so I can still learn long-term dependencies.
My issue comes with wanting to create padded batches of these generators' batched outputs. Ideally, I would want to present to my neural network windows of the generator outputs, with padding as necessary when some subset of the generators exhaust themselves before others. I know that if I consume the entire generator output for each generator, then use Dataset.padded_batch()
I can do this (and can then slice the padded batch across the time dimension into windowed chunks as necessary). However, I want to pass each window to the neural network it becomes available. If one of the generators exhausts itself before the others, I want to pad it with the padding value until all others have, so I can reset the RNN state and begin the next batch of generators with an empty initial RNN state. I am stuck here because the dataset resulting from tf.contrib.data.parallel_interleave()
transformation discards each generator when it becomes exhausted, and the timeseries do not maintain a consistent ordering across samples from it.
Here is a small example:
import tensorflow as tf
def stepwise_generator(length):
for i in range(length):
yield i
lengths = list(range(1,10,2)) # [1, 3, 5, 7, 9]
window_length = 4
batch_size = 3
dataset = tf.data.Dataset.from_tensor_slices(lengths)
gen = lambda length: tf.data.Dataset.from_generator(
stepwise_generator, tf.float32, output_shapes=, args=(length,)
).batch(window_length) # this batching saves window_length timesteps per generator
dataset = dataset.apply(
tf.contrib.data.parallel_interleave(gen, cycle_length=batch_size)
)
dataset = dataset.padded_batch(batch_size, (-1,), np.inf)
# batching 3 generators at once, and padding exhausted ones with inf.
# using a batch_size value no more than cycle_length above means we
# shouldn't start a new generator mid-batch (i think)
iterator = dataset.make_one_shot_iterator()
tensor = iterator.get_next()
outs =
with tf.Session() as sess:
while True:
try:
out = sess.run(tensor)
outs.append(out)
except tf.errors.OutOfRangeError:
break
print(np.asarray(outs))
Output:
[[[ 0. inf inf inf] # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]
[[ 4. inf inf inf] # batch 2 - the generator in index -1 in the
[ 0. 1. 2. 3.] # previous batch gets cycled to index 0 and two
[ 0. 1. 2. 3.]] # new generators are initiated
[[ 4. 5. 6. inf] # batch 3 - more generator cycling, and the one in
[ 4. 5. 6. 7.] # index 1 also gets cycled to index 2 in the same
[ 8. inf inf inf]]] # batch (because we have run out of generators in
# parallel_interleave)
My desired output would be something like
[[[ 0. inf inf inf] # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]
[[inf] # batch 2 - the leftover timestep from a padded
[inf] # batch of the first 3 generators
[4. ]]
[[ 0. 1. 2. 3.] # batch 3 - only two generators are left so this is
[ 0. 1. 2. 3.]] # an end-of-epoch smaller batch
[[ 4. 5. 6. inf] # batch 4
[ 4. 5. 6. 7.]]
[[inf] # batch 5
[ 8.]]]
Here, the internal states of the RNNs would be reset after batch 2 and 5.
Again, the desired output can be simple to create if I consume the entirety of each generator's output, then pad, batch, and slice, but I want to produce batches as the generators, which may be each receiving data in real-time from e.g. a separate simulation, make them available.
python tensorflow tensorflow-datasets
add a comment |
up vote
1
down vote
favorite
up vote
1
down vote
favorite
I am working on a timeseries problem where each timeseries is fairly long (10^3-10^4 timesteps, and each timeseries is of different length).
For each sequence, I can define a Python generator that yields values one timestep at a time. I am using the tf.data.Dataset.from_generator()
constructor to wrap these generators into the tf.data API. The documentation suggests using from_generator()
along with the tf.contrib.data.parallel_interleave()
transformation to parallelize the extraction from my Python generators.
My downstream use for these data is a stateful RNN (e.g. LSTM or GRU). I want to chunk up the timeseries into smaller (~10^2) windows and use each chunk as a training example (i.e., truncated BPTT). Since my data are streaming, I think that means saving up window_size
timesteps of each generator before passing it on through the pipeline, to be batched with the other generators' data. I also want to save the RNN state across these chunks so I can still learn long-term dependencies.
My issue comes with wanting to create padded batches of these generators' batched outputs. Ideally, I would want to present to my neural network windows of the generator outputs, with padding as necessary when some subset of the generators exhaust themselves before others. I know that if I consume the entire generator output for each generator, then use Dataset.padded_batch()
I can do this (and can then slice the padded batch across the time dimension into windowed chunks as necessary). However, I want to pass each window to the neural network it becomes available. If one of the generators exhausts itself before the others, I want to pad it with the padding value until all others have, so I can reset the RNN state and begin the next batch of generators with an empty initial RNN state. I am stuck here because the dataset resulting from tf.contrib.data.parallel_interleave()
transformation discards each generator when it becomes exhausted, and the timeseries do not maintain a consistent ordering across samples from it.
Here is a small example:
import tensorflow as tf
def stepwise_generator(length):
for i in range(length):
yield i
lengths = list(range(1,10,2)) # [1, 3, 5, 7, 9]
window_length = 4
batch_size = 3
dataset = tf.data.Dataset.from_tensor_slices(lengths)
gen = lambda length: tf.data.Dataset.from_generator(
stepwise_generator, tf.float32, output_shapes=, args=(length,)
).batch(window_length) # this batching saves window_length timesteps per generator
dataset = dataset.apply(
tf.contrib.data.parallel_interleave(gen, cycle_length=batch_size)
)
dataset = dataset.padded_batch(batch_size, (-1,), np.inf)
# batching 3 generators at once, and padding exhausted ones with inf.
# using a batch_size value no more than cycle_length above means we
# shouldn't start a new generator mid-batch (i think)
iterator = dataset.make_one_shot_iterator()
tensor = iterator.get_next()
outs =
with tf.Session() as sess:
while True:
try:
out = sess.run(tensor)
outs.append(out)
except tf.errors.OutOfRangeError:
break
print(np.asarray(outs))
Output:
[[[ 0. inf inf inf] # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]
[[ 4. inf inf inf] # batch 2 - the generator in index -1 in the
[ 0. 1. 2. 3.] # previous batch gets cycled to index 0 and two
[ 0. 1. 2. 3.]] # new generators are initiated
[[ 4. 5. 6. inf] # batch 3 - more generator cycling, and the one in
[ 4. 5. 6. 7.] # index 1 also gets cycled to index 2 in the same
[ 8. inf inf inf]]] # batch (because we have run out of generators in
# parallel_interleave)
My desired output would be something like
[[[ 0. inf inf inf] # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]
[[inf] # batch 2 - the leftover timestep from a padded
[inf] # batch of the first 3 generators
[4. ]]
[[ 0. 1. 2. 3.] # batch 3 - only two generators are left so this is
[ 0. 1. 2. 3.]] # an end-of-epoch smaller batch
[[ 4. 5. 6. inf] # batch 4
[ 4. 5. 6. 7.]]
[[inf] # batch 5
[ 8.]]]
Here, the internal states of the RNNs would be reset after batch 2 and 5.
Again, the desired output can be simple to create if I consume the entirety of each generator's output, then pad, batch, and slice, but I want to produce batches as the generators, which may be each receiving data in real-time from e.g. a separate simulation, make them available.
python tensorflow tensorflow-datasets
I am working on a timeseries problem where each timeseries is fairly long (10^3-10^4 timesteps, and each timeseries is of different length).
For each sequence, I can define a Python generator that yields values one timestep at a time. I am using the tf.data.Dataset.from_generator()
constructor to wrap these generators into the tf.data API. The documentation suggests using from_generator()
along with the tf.contrib.data.parallel_interleave()
transformation to parallelize the extraction from my Python generators.
My downstream use for these data is a stateful RNN (e.g. LSTM or GRU). I want to chunk up the timeseries into smaller (~10^2) windows and use each chunk as a training example (i.e., truncated BPTT). Since my data are streaming, I think that means saving up window_size
timesteps of each generator before passing it on through the pipeline, to be batched with the other generators' data. I also want to save the RNN state across these chunks so I can still learn long-term dependencies.
My issue comes with wanting to create padded batches of these generators' batched outputs. Ideally, I would want to present to my neural network windows of the generator outputs, with padding as necessary when some subset of the generators exhaust themselves before others. I know that if I consume the entire generator output for each generator, then use Dataset.padded_batch()
I can do this (and can then slice the padded batch across the time dimension into windowed chunks as necessary). However, I want to pass each window to the neural network it becomes available. If one of the generators exhausts itself before the others, I want to pad it with the padding value until all others have, so I can reset the RNN state and begin the next batch of generators with an empty initial RNN state. I am stuck here because the dataset resulting from tf.contrib.data.parallel_interleave()
transformation discards each generator when it becomes exhausted, and the timeseries do not maintain a consistent ordering across samples from it.
Here is a small example:
import tensorflow as tf
def stepwise_generator(length):
for i in range(length):
yield i
lengths = list(range(1,10,2)) # [1, 3, 5, 7, 9]
window_length = 4
batch_size = 3
dataset = tf.data.Dataset.from_tensor_slices(lengths)
gen = lambda length: tf.data.Dataset.from_generator(
stepwise_generator, tf.float32, output_shapes=, args=(length,)
).batch(window_length) # this batching saves window_length timesteps per generator
dataset = dataset.apply(
tf.contrib.data.parallel_interleave(gen, cycle_length=batch_size)
)
dataset = dataset.padded_batch(batch_size, (-1,), np.inf)
# batching 3 generators at once, and padding exhausted ones with inf.
# using a batch_size value no more than cycle_length above means we
# shouldn't start a new generator mid-batch (i think)
iterator = dataset.make_one_shot_iterator()
tensor = iterator.get_next()
outs =
with tf.Session() as sess:
while True:
try:
out = sess.run(tensor)
outs.append(out)
except tf.errors.OutOfRangeError:
break
print(np.asarray(outs))
Output:
[[[ 0. inf inf inf] # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]
[[ 4. inf inf inf] # batch 2 - the generator in index -1 in the
[ 0. 1. 2. 3.] # previous batch gets cycled to index 0 and two
[ 0. 1. 2. 3.]] # new generators are initiated
[[ 4. 5. 6. inf] # batch 3 - more generator cycling, and the one in
[ 4. 5. 6. 7.] # index 1 also gets cycled to index 2 in the same
[ 8. inf inf inf]]] # batch (because we have run out of generators in
# parallel_interleave)
My desired output would be something like
[[[ 0. inf inf inf] # batch 1
[ 0. 1. 2. inf]
[ 0. 1. 2. 3.]]
[[inf] # batch 2 - the leftover timestep from a padded
[inf] # batch of the first 3 generators
[4. ]]
[[ 0. 1. 2. 3.] # batch 3 - only two generators are left so this is
[ 0. 1. 2. 3.]] # an end-of-epoch smaller batch
[[ 4. 5. 6. inf] # batch 4
[ 4. 5. 6. 7.]]
[[inf] # batch 5
[ 8.]]]
Here, the internal states of the RNNs would be reset after batch 2 and 5.
Again, the desired output can be simple to create if I consume the entirety of each generator's output, then pad, batch, and slice, but I want to produce batches as the generators, which may be each receiving data in real-time from e.g. a separate simulation, make them available.
python tensorflow tensorflow-datasets
python tensorflow tensorflow-datasets
edited Nov 9 at 21:40
asked Nov 9 at 20:52
mattw
62
62
add a comment |
add a comment |
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53233123%2ftf-data-combining-multiple-from-generator-datasets-to-create-batches-padded-a%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown