2016-01-04 8 views
36

Я пытаюсь выполнить предварительную выборку данных обучения, чтобы скрыть латентность ввода-вывода. Я хотел бы написать собственный код Python, который загружает данные с диска и препроцессы данных (например, путем добавления контекстного окна). Другими словами, один поток выполняет предварительную обработку данных, а другой - обучение. Возможно ли это в TensorFlow?Как предварительно запрограммировать данные с помощью пользовательской функции python в тензорном потоке

Обновление: У меня есть рабочий пример, основанный на примере @ mrry.

import numpy as np 
import tensorflow as tf 
import threading 

BATCH_SIZE = 5 
TRAINING_ITERS = 4100 

feature_input = tf.placeholder(tf.float32, shape=[128]) 
label_input = tf.placeholder(tf.float32, shape=[128]) 

q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]]) 
enqueue_op = q.enqueue([label_input, feature_input]) 

label_batch, feature_batch = q.dequeue_many(BATCH_SIZE) 
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128]) 

sess = tf.Session() 

def load_and_enqueue(sess, enqueue_op, coord): 
    with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file: 
    while not coord.should_stop(): 
     feature_array = np.fromfile(feature_file, np.float32, 128) 
     if feature_array.shape[0] == 0: 
     print('reach end of file, reset using seek(0,0)') 
     feature_file.seek(0,0) 
     label_file.seek(0,0) 
     continue 
     label_value = np.fromfile(label_file, np.float32, 128) 

     sess.run(enqueue_op, feed_dict={feature_input: feature_array, 
             label_input: label_value}) 

coord = tf.train.Coordinator() 
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord)) 
t.start() 

for i in range(TRAINING_ITERS): 
    sum = sess.run(c) 
    print('train_iter='+str(i)) 
    print(sum) 

coord.request_stop() 
coord.join([t]) 
+3

Я только что сделал записную книжку о очередях, которая также объясняет аналогичный случай использования, я надеюсь, что он может быть полезен и другим: https://gist.github.com/akiross/23b6ae42812841bb79af4976a2525cf9 – AkiRoss

ответ

49

Это общий случай использования, и большинство реализаций используют очередей TensorFlow в отвязать код предварительной обработки из учебного кода. Существует a tutorial on how to use queues, но основные шаги заключаются в следующем:

  1. Определить очередь, q, что будет буфер препроцессированный данных. TensorFlow поддерживает простой tf.FIFOQueue, который производит элементы в том порядке, в котором они были установлены, и более продвинутый tf.RandomShuffleQueue, который производит элементы в произвольном порядке. Элемент очереди представляет собой набор из одного или нескольких тензоров (которые могут иметь разные типы и формы). Все очереди поддерживают одноэлементные (,) и пакетные (enqueue_many, dequeue_many) операции, но для использования пакетных операций вы должны указать формы каждого тензора в элементе очереди при построении очереди.

  2. Создайте подграф, который помещает предварительно обработанные элементы в очередь. Один из способов сделать это - определить некоторые tf.placeholder() ops для тензоров, соответствующих одному входному примеру, затем передать их q.enqueue(). (Если ваша предварительная обработка производит партию сразу, вместо этого вы должны использовать q.enqueue_many().) Вы также можете включить операторы TensorFlow на этом подграфе.

  3. Создайте подграф, который проводит обучение. Это будет выглядеть как обычный график TensorFlow, но получит его вход, вызвав q.dequeue_many(BATCH_SIZE).

  4. Начало сеанса.

  5. Создайте один или несколько потоков, которые выполняют вашу логику предварительной обработки, а затем выполните команду enqueue op, подавая предварительно обработанные данные. Вы можете найти полезные классы полезности tf.train.Coordinator и tf.train.QueueRunner.

  6. Запустите свой учебный график (оптимизатор и т. Д.) Как обычно.

EDIT: Вот простой load_and_enqueue() функция и фрагмент кода, чтобы вы начали:

# Features are length-100 vectors of floats 
feature_input = tf.placeholder(tf.float32, shape=[100]) 
# Labels are scalar integers. 
label_input = tf.placeholder(tf.int32, shape=[]) 

# Alternatively, could do: 
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100]) 
# label_batch_input = tf.placeholder(tf.int32, shape=[None]) 

q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []]) 
enqueue_op = q.enqueue([feature_input, label_input]) 

# For batch input, do: 
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input]) 

feature_batch, label_batch = q.dequeue_many(BATCH_SIZE) 
# Build rest of model taking label_batch, feature_batch as input. 
# [...] 
train_op = ... 

sess = tf.Session() 

def load_and_enqueue(): 
    with open(...) as feature_file, open(...) as label_file: 
    while True: 
     feature_array = numpy.fromfile(feature_file, numpy.float32, 100) 
     if not feature_array: 
     return 
     label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0] 

     sess.run(enqueue_op, feed_dict={feature_input: feature_array, 
             label_input: label_value}) 

# Start a thread to enqueue data asynchronously, and hide I/O latency. 
t = threading.Thread(target=load_and_enqueue) 
t.start() 

for _ in range(TRAINING_EPOCHS): 
    sess.run(train_op) 
+1

Спасибо за ваш совет. У меня есть еще один вопрос. В моем эксперименте функция обучения и метка хранятся в двух отдельных двоичных файлах. Должен ли я строить две очереди, одну для функции и одну для метки? Если мы хотим получить случайную пару (функцию, метку) из двух очередей, как я могу убедиться, что функция соответствует правильной метке? Другими словами, как я могу гарантировать сопоставление «один к одному»? –

+0

Чтобы сохранить сопоставление «один к одному», вы должны создать отдельную очередь, где каждый элемент является кортежем тензора характеристик и тензора метки. Вы можете сделать это, указав список типов (и фигур) в конструктор очереди. Это гарантирует, что компоненты одного и того же кортежа всегда удаляются вместе. – mrry

+0

Функции и метки хранятся отдельно в двух больших двоичных файлах. Поэтому мне нужно создать feat_queue = tf.train.string_input_producer (feat_filenames) и label_queue = tf.train.string_input_producer (label_filenames). Тогда у меня также будет два tf.FixedLengthRecordReader, чтобы получить ум от feat_queue и ярлык от label_queue отдельно. Наконец, я помещаю [feat, label] в другую очередь. Вот проблема. Когда я использую FixedLengthRecordReader для получения навыка и метки, правильно ли они отображаются правильно? –

6

Другими словами, один поток выполняет предварительную обработку данных, а другой делает обучение. Возможно ли это в TensorFlow?

Да, это так. Решение mrry работает, но проще.

выборки данных

tf.py_func обертывания функции питона и использует его в качестве оператора TensorFlow. Поэтому мы можем загружать данные по sess.run() каждый раз.Проблема с этим подходом заключается в том, что данные загружаются в течение sess.run() через основной поток.

Минимальный пример:

def get_numpy_tensor(): 
    return np.array([[1,2],[3,4]], dtype=np.float32) 
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32) 

Более сложный пример:

def get_numpy_tensors(): 
    # Load data from the disk into numpy arrays. 
    input = np.array([[1,2],[3,4]], dtype=np.float32) 
    target = np.int32(1) 
    return input, target 
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) 

tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target 

sess = tf.InteractiveSession() 
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target]) 
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2 

Предварительная выборка данных в другом потоке

Для очереди наши данные в другом потоке (так что sess.run() не будет придется ждать данных), мы можем использовать tf.train.batch() у наших операторов от tf.py_func().

Минимальный пример:

tensor_shape = get_numpy_tensor().shape 
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape]) 
# Run `tf.train.start_queue_runners()` once session is created. 

Мы можем опустить аргумент shapes если tensorflow_tensor имеет свою форму указания:

tensor_shape = get_numpy_tensor().shape 
tensorflow_tensor.set_shape(tensor_shape) 
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32) 
# Run `tf.train.start_queue_runners()` once session is created. 

Более сложный пример:

input_shape, target_shape = (2, 2),() 
def get_numpy_tensors(): 
    input = np.random.rand(*input_shape).astype(np.float32) 
    target = np.random.randint(10, dtype=np.int32) 
    print('f', end='') 
    return input, target 
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) 
batch_size = 2 
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2) 
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`. 

tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets 

sess = tf.InteractiveSession() 
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it. 
for _ in range(10): 
    numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets]) 
    assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape) 
    print('r', end='') 

# Prints `fffffrrffrfrffrffrffrffrffrffrf`. 

В случае get_numpy_tensor() возвращает партию тензоров, n tf.train.batch(..., enqueue_many=True) поможет.

 Смежные вопросы

  • Нет связанных вопросов^_^