Распределенные TensorFlow | сайт VGG 16


Я пытаюсь разработать сайт VGG 16 модель распределенных на одном сервере.

У меня есть один доступный GPU-сервер с 1 процессором и двумя видеокартами. У меня есть код, чтобы рассматривать их как разные сервера и реализация распределенных tensorflow над этим.

Но основная проблема, которую я вижу в моем коде заключается в том, что во время выполнения-это очень, очень плохо - его в настоящее время на 20 минут для одной эпохи против 180 секунд на одном GPU.

Я предполагаю, что что-то неправильно в моем коде, как распределены tensorflow занимает 20 минут и один ГПУ версия занимает 180 секунд.

Может кто подскажет, что за горло я создаю?

Ниже представлен код для распределенных TensorFlow. Я хочу сначала с этим разобраться.

Также моей конфигурации аппаратного и программного обеспечения:

  • TensorFlow 1.2
  • Технология CUDA 7.5
  • cuDNN 5.1

У меня есть одна утилита и один основной код.

conv_utils.py

import tensorflow as tf

import numpy as np
import numpy.random as rnd
from numpy import genfromtxt
import pandas as pd

import matplotlib
import matplotlib.pyplot as plt
import seaborn as sb

from PIL import Image, ImageOps

from sklearn.preprocessing import Imputer
from sklearn.model_selection import train_test_split
import sklearn.preprocessing as skp
import sklearn

import time
import csv
import random

path = "***"

def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)

def return_config():
    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    config.log_device_placement = False
    #config.gpu_options.allocator_type = 'BFC'
    return config

cpu = ':/cpu:0'
gpu1 = ':/gpu:0'
gpu2 = ':/gpu:0'

def parameters(var_name, shape, var_type):
    var = tf.placeholder(dtype=var_type, shape=shape, name=var_name)
    return var  

def weights(var_name, shape, var_type):
    var = tf.get_variable(name=var_name,shape=shape,dtype=var_type,initializer=tf.contrib.layers.xavier_initializer(seed=0))
    return var

def import_images(name):
    img = Image.open(name)
    img = np.asarray(img)
    return img

def one_hot_encoding(data):
    a = tf.one_hot(data,len(np.unique(data)))
    with tf.Session(config=return_config()) as sess:
        b = sess.run(a)
    return b

def label_import_oh(file_name):
    input_labels = np.genfromtxt(path + file_name, dtype='U', skip_header=1, delimiter=',')
    input_label_op = input_labels[:,1]
    le = skp.LabelEncoder()
    input_label_le = le.fit_transform(input_label_op)

    input_label_oh = one_hot_encoding(input_label_le)
    return input_labels, input_label_oh

def iterate_minibatches(inputs, targets, batchsize, shuffle=False):
    assert len(inputs) == len(targets)
    if shuffle:
        indices = np.arange(len(inputs))
        np.random.shuffle(indices)
    for start_idx in range(0, len(inputs) - batchsize + 1, batchsize):
        if shuffle:
            excerpt = indices[start_idx:start_idx + batchsize]
        else:
            excerpt = slice(start_idx, start_idx + batchsize)
        yield inputs[excerpt], targets[excerpt]
    print("Out of mini batches")

def conv(inp, weight, stride, conv_type):
    conv = tf.nn.conv2d(inp, weight, strides=[1, stride, stride, 1], padding=conv_type)
    conv_nr = tf.nn.local_response_normalization(conv)
    conv_dp = tf.nn.dropout(conv_nr, keep_prob= 0.7)
    conv_relu = tf.nn.relu(conv_dp)
    return conv_relu

def conv2d(inp, num_filters, filter_size, padding_type, stride=1, acti=tf.nn.relu, bias=True, is_train=True):
    conv = tf.layers.conv2d(inputs=inp,
                           strides= stride,
                           filters=num_filters,
                           padding = padding_type,
                           kernel_size=[filter_size, filter_size],
                           activation=acti,
                           trainable=is_train,
                           use_bias=bias)
    bn = tf.layers.batch_normalization(conv, training=is_train)
    dp = tf.layers.dropout(bn, rate=0.2, training=is_train)
    return dp    

def max_pool(inp, pfilter_size, pstride, pconv_type):
    max_pool = tf.nn.max_pool(inp, ksize=[1, pfilter_size, pfilter_size, 1], strides=[1, pstride, pstride, 1], padding=pconv_type)
    return max_pool

def avg_pool(inp, filter_size, stride, conv_type):
    avg_pool = tf.nn.avg_pool(inp, ksize=[1, filter_size, filter_size, 1], stride=[1, pstride, pstride, 1], padding=conv_type)
    return avg_pool

def avg_pool2d(inp, filter_size, stride, conv_type):
    avg_pool = tf.layers.average_pooling2d(inp, pool_size=[filter_size, filter_size], strides=stride, padding=conv_type)
    return avg_pool

def max_pool2d(inp, filter_size, stride, conv_type):
    max_pool = tf.layers.max_pooling2d(inp, pool_size=[filter_size, filter_size], strides=stride, padding=conv_type)
    return max_pool

def import_image_resize(name, size):
    img = Image.open(name)
    img = img.resize((size,size), Image.ANTIALIAS)
    img = np.asarray(img)
    return img

main.py

from conv_utils import *

def import_data():

    with tf.device('/cpu:0'):

        vgg_raw_file, vgg_labels = label_import_oh('Dog breed dataset//labels.csv')
        data = []
        for i in range(len(vgg_raw_file[:,0])):
            data.append(import_image_resize(path + 'Dog breed dataset//train//' + vgg_raw_file[i,0] +'.jpg', 229))  

        vgg_data = np.asarray(data)

        X_train, X_test, Y_train, Y_test = train_test_split(vgg_data, vgg_labels, test_size= 0.1)
        unique_label = vgg_labels.shape[1]
        X_train = X_train/255
        X_test = X_test/255
        return X_train, Y_train, X_test, Y_test, unique_label

def main():

    fsize = 3
    fstride = 1
    conv_type='SAME'

    pfsize = 2
    pstride = 2
    pconv_type='VALID'

    mini_batch_size = 16

    parameter_server = ["xxxx:yyyy"]
    worker_server = ["xxxx:yyyz", "xxxx:yyyu"]

    cluster = tf.train.ClusterSpec({'ps' : parameter_server, 'worker' : worker_server})

    if job == "ps":
        print('starting ps')
        server = tf.train.Server(cluster, job_name="ps", task_index = task, config=return_config())
        server.join()

    if job == "worker":

        X_train, Y_train, X_test, Y_test, unique_label = import_data()

        is_chief = (task == 0)
        server = tf.train.Server(cluster, job_name="worker", task_index = task, config=return_config())

        worker_device = "/job:%s/task:%d/cpu:0/" % (job, task)

        with tf.device(tf.train.replica_device_setter(cluster = cluster, worker_device = worker_device)):

            global_step = tf.Variable(0,dtype=tf.int32,trainable=False,name='global_step')

            with tf.name_scope('input'):
                X = tf.placeholder(dtype=tf.float32, shape=(None,X_train.shape[1], X_train.shape[2], X_train.shape[3]))
                Y = tf.placeholder(dtype=tf.float32, shape=(None, Y_train.shape[1]))

            with tf.name_scope('model_build'):

                NC = [64, 128, 128, 256, 256, 256, 512, 512, 512, 512, 512, 512, 4096, 4096, 1000, unique_label]

                conv64 = conv2d(X, NC[0], fsize, conv_type, stride=1)
                maxpool1 = max_pool2d(conv64, pfsize, pstride, pconv_type)

                conv128 = conv2d(maxpool1, NC[1], fsize, conv_type, stride=1)
                conv128a = conv2d(conv128, NC[2], fsize, conv_type, stride=1)
                maxpool2 = max_pool2d(conv128a, pfsize, pstride, pconv_type)

                conv256a = conv2d(maxpool2, NC[3], fsize, conv_type, stride=1)
                conv256b = conv2d(conv256a, NC[4], fsize, conv_type, stride=1)
                conv256c = conv2d(conv256b, NC[5], fsize, conv_type, stride=1)
                maxpool3 = max_pool2d(conv256c, pfsize, pstride, pconv_type)

                conv512a = conv2d(maxpool3, NC[6], fsize, conv_type, stride=1)
                conv512b = conv2d(conv512a, NC[7], fsize, conv_type, stride=1)
                conv512c = conv2d(conv512b, NC[8], fsize, conv_type, stride=1)
                maxpool4 = max_pool2d(conv512c, pfsize, pstride, pconv_type)

                conv512d = conv2d(maxpool4, NC[9], fsize, conv_type, stride=1)
                conv512e = conv2d(conv512d, NC[10], fsize, conv_type, stride=1)
                conv512f = conv2d(conv512e, NC[11], fsize, conv_type, stride=1)
                maxpool5 = max_pool2d(conv512f, pfsize, pstride, pconv_type)

                flatten = tf.contrib.layers.flatten(maxpool5)
                fc1 = tf.layers.dense(flatten, units=NC[12], activation=tf.nn.relu)
                fc2 = tf.layers.dense(fc1, units=NC[13], activation=tf.nn.relu)
                fc3 = tf.layers.dense(fc2, units=NC[14], activation=tf.nn.relu)
                Z = tf.layers.dense(fc3, units=NC[15], activation=None)

            with tf.name_scope('cost'):
                cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits= Z, labels=Y))

            with tf.name_scope('accuracy'):
                predict = tf.argmax(Z, 1)
                correct_predict = tf.equal(predict, tf.argmax(Y, 1))
                accuracy = tf.reduce_mean(tf.cast(correct_predict, tf.float32))

            optimizer = tf.train.AdamOptimizer(learning_rate)
            optimizer1 = tf.train.SyncReplicasOptimizer(optimizer, replicas_to_aggregate=len(worker_server)
                                                        , total_num_replicas=len(worker_server))
            opt = optimizer1.minimize(cost, global_step = global_step)

        init_token_op = optimizer1.get_init_tokens_op()
        chief_queue_runner = optimizer1.get_chief_queue_runner()
        init_op = tf.global_variables_initializer()

        total_splits = len(worker_server)
        size_of_data = int(X_train.shape[0]/total_splits)
        total_steps = epoch * int( size_of_data / mini_batch_size)


        sv = tf.train.Supervisor(
                                is_chief=is_chief,
                                init_op=init_op,
                                global_step = global_step
                                )

        print('Starting training on worker %d'%task)

        print('The size of the data is : ',size_of_data)
        if task == (len(worker_server) - 1):
            X_data = X_train[(task * size_of_data):]
            Y_data = Y_train[(task * size_of_data):]
        else:
            X_data = X_train[(task):(task + 1 ) * size_of_data]
            Y_data = Y_train[(task):(task + 1 ) * size_of_data]

        start=time.time()
        with  sv.prepare_or_wait_for_session(server.target, config=return_config()) as sess:

            if is_chief:
                sv.start_queue_runners(sess, [chief_queue_runner])
                sess.run(init_token_op)

            for i in range(epoch):
                #if sv.should_stop() : break
                for X_batch, Y_batch in iterate_minibatches(X_data, Y_data, mini_batch_size, shuffle=True):
                    _, _cost = sess.run([opt, cost], feed_dict={X:X_batch, Y:Y_batch})
                    #print('step : ', gs, ' worker : ', task, 'epoch : ', epoch)

                if i % 5 == 0:
                    _a = sess.run([accuracy], feed_dict={X:X_test, Y:Y_test})
                    print('The accuracy is : ', _a)

            if is_chief:
                time.sleep(15)


            if not is_chief:
                sv.stop()                                       

        end = time.time() - start
        print('Done , The total time taken is : ', end)

if __name__ == "__main__":

    import argparse
    import os

    parser = argparse.ArgumentParser()

    parser.add_argument(
    "--job_name",
    type=str,
    default="",
       help="One of 'ps', 'worker'"
    )
    parser.add_argument(
       "--task_index",
        type=int,
        default=0,
        help="Index of task within the job"
    )
    parser.add_argument(
       "--CUDA_VISIBLE_DEVICES",
        type=int,
        default=0,
        help="The GPUs on which you the code should run"
    )
    parser.add_argument(
        "--learning_rate",
        type=float,
        default=0.001,
        help="the learning rate for the model "
    )
    parser.add_argument(
        "--epoch",
        type=int,
        default=1,
        help="the learning rate for the model "
    )

    flags, unparsed = parser.parse_known_args()
    job = flags.job_name
    task = flags.task_index
    CUDA_VISIBLE_DEVICES = flags.CUDA_VISIBLE_DEVICES
    learning_rate = flags.learning_rate
    epoch = flags.epoch

    os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
    os.environ["CUDA_VISIBLE_DEVICES"]="%d" % CUDA_VISIBLE_DEVICES
    main()


213
4
задан 29 марта 2018 в 09:03 Источник Поделиться
Комментарии