TensorFlow: как я могу несколько раз оценивать очередь данных проверки во время обучения?
TL;DR
Как я могу оценить набор валидаций после каждой итерации обучения K, используя отдельные очереди для обучения и валидации данных, не прибегая к отдельным tf.Sessions
в нескольких процессах? Кажется, что нет чистого способа достичь этого, учитывая мою конкретную проблему, и мое текущее обходное решение (которое, как я думал, будет работать) дает мне поведение undefined. Помогите!
Вся история
Я хочу оценить набор валидаций для каждой итерации обучения K, и я не могу понять, как правильно реализовать это в TensorFlow. Это должно быть одной из самых распространенных операций, но, похоже, API/архитектура TensorFlow работает против меня здесь или, по крайней мере, делает вещи излишне трудными.
Мои предположения:
- [A1] Модель мультипроцесса для обучения/валидации, описанная здесь https://www.tensorflow.org/how_tos/reading_data/#multiple_input_pipelines, не применима к моей проблеме, так как у меня есть предположить, что недостаточно памяти GPU для загрузки переменных дважды.
- [A2] Я хочу оценить на валидацию, установленную каждую итерацию обучения K.
- [A3] Как данные обучения, так и данные проверки не могут быть просто прочитаны с диска, но создаются "на лету". Это делает невозможным достоверное предварительное вычисление размера проверки, установленного заранее.
- [A4] Набор проверки слишком велик, чтобы предварительно вычислить и сохранить на диске.
- [A5] Эффективный размер установленного размера не обязательно кратен размеру партии.
Конвейер ввода для обучения настроен следующим образом:
- A
tf.train.slice_input_producer()
генерирует (перетасованный) список имен файлов, каждый из которых ссылается на исходные входные данные.
- Функция генерации пользовательских данных генерирует переменное количество учебных примеров/меток из каждого фрагмента исходных входных данных.
- Сгенерированные учебные примеры/метки помещаются в очередь через
tf.train.shuffle_batch()
перед подачей в сеть.
В связи с [A3], [A4], [A5], входной конвейер проверки правильности настроен почти одинаковым образом, за исключением того, что конечная очередь ввода генерируется через tf.train.batch()
, поскольку перетасовка нежелательна. Из-за вышеприведенных предположений подход на основе feed_dict также неосуществим, а также, по-видимому, несовместим с использованием функции более высокого уровня, такой как tf.train.batch
.
Однако простая реализация с использованием двух разных наборов очередей для обучения и проверки не работает. Насколько я понимаю, у меня есть два варианта:
-
[B1] Задайте аргумент num_epochs
для проверки tf.train.slice_input_producer
на None
.
В этом случае набор проверки циклически проходит бесконечно, но мне нужно заранее знать размер проверки, чтобы явно ограничить количество партий для оценки за прогон через набор проверки. Кроме того, если размер установленного размера проверки не делится на размер партии, я всегда буду тянуть немного больше в последней партии. Поскольку это каждый раз меняет порядок оценки данных валидации, это неприемлемо.
-
[B2] Установите аргумент num_epochs
для проверки tf.train.slice_input_producer
на 1
и дополнительно установите аргумент allow_smaller_final_batch
функции tf.train.batch
на True
.
В этом случае набор проверки выполняется циклически через ровно один раз, после чего соответствующая очередь закрывается навсегда. По умолчанию, это сделает оценку, установленную два или более раза невозможным. Поскольку я не знаю, как правильно открыть очередь в TensorFlow, мне нужно обойти это ограничение.
Из-за больших ограничений опции [B1] я решил вместо этого решить проблемы с вариантом [B2]. Код (псевдо), описывающий мой текущий подход, выглядит следующим образом:
Тренировочный цикл должен быть достаточно каноническим. Каждый K-итерации вызывается функция оценки набора валидаций.
Обратите внимание, что я запускаю только очереди с именем, начинающимся с "train_"; это очередь, созданная для сбора собранных данных обучения. Для этого я создал две вспомогательные функции, get_queues_by_name
и start_queue_runners
.
def train_loop(train_ops, vali_ops, ...):
with tf.Session() as sess:
coord = tf.train.Coordinator()
sess.run([tf.initialize_all_variables(), tf.initialize_local_variables()])
load_latest_snapshot(sess, loader, snapshot_file)
# Launch the queue runners
queues = get_queues_by_name("train")
threads = start_queue_runners(sess, coord, queues)
try:
for step in range(start_iteration, num_train_iterations):
# Runs the session on validation set
if step % K == 0:
validation_results = run_validation(vali_ops, snapshot_file)
# TRAINING:
# ...
except Exception as e:
coord.request_stop(e)
finally:
coord.request_stop()
coord.join(threads)
Вспомогательные функции выглядят следующим образом:
def get_queues_by_name(name):
"""Retrieves all queues that contain the string given by 'name'"""
all_queues = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
return [q for q in all_queues if name in q.name]
def start_queue_runners(session, coordinator, queues):
"""Similar to tf.train.start_queue_runners but now accepts a list of queues instead of a graph collection"""
with session.graph.as_default():
threads = []
for queue in queues:
log("Queue", "Starting queue '%s'" % queue.name, level=2)
threads.extend(queue.create_threads(session, coordinator, daemon=True, start=True))
return threads
В функции run_validation
мой выбранный метод обхода проблемы с закрытой очередью заключается в создании нового tf.Session
. Я также запускаю потоки, связанные с набором данных для проверки достоверности очереди.
def run_validation(ops, snapshot_file): # Called inside train_loop()
results = None
loader = tf.train.Saver()
with tf.Session() as sess:
coord = tf.train.Coordinator()
sess.run([tf.initialize_local_variables()])
load_latest_snapshot(sess, loader, snapshot_file)
# Launch the queue runners
queues = get_queues_by_name("eval")
threads = start_queue_runners(sess, coord, queues)
# Performs the inference in batches
try:
# Evaluate validation set:
results = eval_in_batches(ops, sess)
except Exception as e:
coord.request_stop(e)
finally:
coord.request_stop()
coord.join(threads)
return results
Я не знаю, является ли создание нового tf.Session
хорошей идеей, но это похоже на единственный способ выполнить перезапуск очереди проверки. В идеале я также не хотел бы повторно загружать моментальный снимок модели, поскольку это кажется концептуально ненужным.
Проблема с этим кодом заключается в том, что во время работы я вижу неустойчивое поведение / undefined, например, NaN или Inf, появляющиеся внутри сети во время оценки набора валидации. Это, по-видимому, происходит преимущественно, когда очередь заданий проверки заполняется одновременно с заполнением очереди обучающих наборов (поскольку очередь обучения открыта во время оценки набора валидации). Например, это очень часто происходит, если я оцениваю набор проверки на итерации 0 (когда обе очереди еще нужно заполнить). Кажется, что очереди обучения/валидации разделяют какое-то глобальное состояние, хотя они работают в другом сеансе.
Может кто-нибудь объяснить, почему это происходит, и как я могу решить это более чисто, принимая во внимание мои приведенные выше предположения [A1] - [A5]?
Ответы
Ответ 1
В настоящее время я сталкиваюсь с аналогичной проблемой. До сих пор я вообще избегал любых очередей и просто загружал данные через feed_dict
, но я, очевидно, теряю некоторую производительность, не используя очереди и parallelism (хотя я все еще доволен текущей скоростью, так как я сделал то же самое в Theano ранее). Теперь я хочу перепроектировать это и использовать очереди и наткнулся на эту проблему. Есть this, this, this связанные с этим проблемы.
В настоящее время я думаю об этом:
-
В обучении я хочу использовать RandomShuffleQueue
, что делает его еще более сложным. Я думаю, что я просто проигнорирую проблему, и после того, как поток читателя, который вставляет тензоры в очередь, заканчивается, я дам тренировку остановиться, поэтому я освобожу оставшиеся предметы до capacity
для этой эпохи и просто использую ее для следующей эпоха. Возможно, чтобы сделать это детерминированным, я проверяю поток поезда, который я все еще читаю из очереди, пока не осталось только min_after_dequeue
элементов.
-
В оценке я хочу использовать тот же график и тот же сеанс. Я могу использовать tf.cond
для чтения из другой отдельной очереди вместо RandomShuffleQueue
. Или я мог бы использовать feed_dict
в оценке. Если бы я использовал отдельную очередь, я бы использовал FIFOQueue
и тщательно отслеживал, что делаю правильное количество шагов. Я мог бы также ввести еще один фиктивный тензор, который я в очереди в очередь, который дает мне флаг end_of_epoch
или так, поэтому я знаю в eval-потоке, когда останавливаться.
В TensorFlow 1.2 будет tf.contrib.data
интерфейс (оставить комментарий, обзор документации, документация по API), которая предоставляет API tf.contrib.data.Dataset
, который также поддерживает перетасовку, похожую на tf.RandomShuffleQueue
, а также периодические и циклические операции над несколькими эпохами. Кроме того, вы получаете доступ к данным, создавая над ним итератор, и вы можете reset итератор. Некоторые связанные вопросы StackOverflow здесь и здесь.