Подавать данные в tf.contrib.data.Dataset, как очередь

О tf.contrib.data.Dataset (от TensorFlow 1.2 см. здесь и здесь) использование: Способ получения данных не подходит никоим образом, как я обычно получаю данные. В моем случае у меня есть поток, и я получаю данные там, и я не знаю заранее, когда он закончится, но я вижу, когда он закончится. Затем я жду, пока не обработаю все буферы, а затем закончил одну эпоху. Как я могу получить эту логику с помощью Dataset?

Обратите внимание, что я предпочитаю интерфейс Dataset через интерфейс QueueBase, потому что он дает мне интерфейс итератора, который я могу повторно инициализировать и даже reset для другого Dataset. Это более мощно по сравнению с очередями, которые не могут быть повторно открыты в настоящее время после их закрытия (см. здесь и здесь).

Возможно, аналогичный вопрос или тот же вопрос: как я могу обернуть вокруг Dataset в очереди? У меня есть некоторый поток, который где-то считывает некоторые данные и который может его кормить и как-то ставить в очередь. Как получить данные в Dataset? Я мог бы повторить несколько фиктивных тензоров бесконечных раз, а затем использовать map, чтобы просто вернуть мой queue.dequeue(), но это действительно возвращает меня ко всем исходным проблемам с очередью, то есть как снова открыть очередь.

Ответы

Ответ 1

Новый метод Dataset.from_generator() позволяет определить Dataset, который подается генератором Python. (Чтобы использовать эту функцию в настоящее время, вы должны загрузить ночную сборку TensorFlow или создать ее самостоятельно из источника. Она будет частью TensorFlow 1.4.)

Самый простой способ реализовать ваш пример - заменить ваш принимающий поток генератором с псевдокодом следующим образом:

def receiver():
  while True:
    next_element = ...  # Receive next element from external source.
                        # Note that this method may block.

    end_of_epoch = ...  # Decide whether or not to stop based on next_element.

    if not end_of_epoch:
      yield next_element  # Note: you may need to convert this to an array.
    else:
      return  # Returning will signal OutOfRangeError on downstream iterators.

dataset = tf.contrib.data.Dataset.from_generator(receiver, output_types=...)

# You can chain other `Dataset` methods after the generator. For example:
dataset = dataset.prefetch(...)  # This will start a background thread
                                 # to prefetch elements from `receiver()`.

dataset = dataset.repeat(...)  # Note that each repetition will call
                               # `receiver()` again, and start from
                               # a fresh state.

dataset = dataset.batch(...)

Возможны более сложные топологии. Например, вы можете использовать Dataset.interleave() для создания нескольких приемников параллельно.