Алгоритм многопоточности для сверхпроизводительных параллельных вычислений и производитель-потребитель очереди при использовании POSIX потоков


Следующий производитель-потребитель алгоритма за кусок программного обеспечения, что я повышаю, чтобы воспользоваться преимуществами многоядерных процессоров. Предполагаемая платформа-это некоторая разновидность Linux работает на EC2 в КВД cc4x.большие кластера, которые имеют 2 х Intel процессор Xeon X5570, четырехъядерный “Нихалем” архитектура процессоров (2*4=8 ядер). Программное обеспечение работает генетический алгоритм для оптимизации искусственных нейронных сетей.

Мой доминирующей проблемой является производительность. Емкость ОЗУ и HD-качестве не проблема, но времени ЦП и все остальное, что задержки обработки. Сейчас я сделал несколько (указано ниже), надеюсь тривиально, компромиссы, чтобы сделать программы портативные для Mac ОС X, которая находится на моем домашнем компьютере, который я использую для кодирования/отладки. Отмечу несколько других мелких вопросов в комментариях, особенно неопределенность теме-Безопасность в потребительскую функцию. Это мой первый раз работаю с потоками. Советы/критика любого рода приветствуется.

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <vector>

#define NUM_THREADS 3     //will be >= to # of cores
#define N           30

//globals
sem_t* producer_gate;
sem_t* consumer_gate;
pthread_mutex_t access_queued =PTHREAD_MUTEX_INITIALIZER;
int queued;
int completed;

//a dummy class for testing thread-safeness
class the_class
{
public:

    void find_num()
    {
        //make sure completion is non-instant and variable
        double num = rand();
        for(double q=0; 1; q++)
            if(num == q)
                return;
    }    
};

//the consumer function for handling the parallelizable code
void* Consumer(void* argument)
{

    std::vector <the_class>* p_v_work = (std::vector <the_class>*) argument ;

    while(1)
    {
        sem_wait(consumer_gate);
        pthread_mutex_lock (&access_queued);
        int index = queued;
        queued--;
        pthread_mutex_unlock (&access_queued);
        (*p_v_work)[index-1].find_num();
        completed++;                            //<-- thread safe??
        std::cout << "\n" << index;
        if(completed == N)
            sem_post(producer_gate);
    }

    return NULL;
}

int main () 
{
    srand(5);

    //holds data to be processed and the methods for processing it
    std::vector <the_class> work(N);
    std::vector <the_class>* p_work = &work;

    sem_unlink("producer_gate");
    sem_unlink("consumer_gate");
    //can't use `sem_init();` under Mac OS X so use sem_open instead
    producer_gate = sem_open("producer_gate", O_CREAT, 0700, 0);           
    consumer_gate = sem_open("consumer_gate", O_CREAT, 0700, N);
    completed   = 0;
    queued      = N;            


    pthread_attr_t attr;
    pthread_attr_init (&attr);
    pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); //no joining necessary

    pthread_t threads[NUM_THREADS];
    for(int q=0; q<NUM_THREADS; q++)
        //on OS X, a pthread_t must be provided or a bus error occurs. relevant to performance?
        pthread_create(&threads[q], &attr, Consumer, (void*) p_work );


    for( int q=0; q < 4; q++)
    {
        sem_wait(producer_gate);
        std::cout << "\nDONE\n";

        ////////////////////////////////////////////////////////
        //  Summate work done and layout work for next iteration
        ////////////////////////////////////////////////////////

        completed   = 0;
        queued      = N;  
        for(int q=0;q<N;q++)
            sem_post(consumer_gate);  //some way to just set this instead of incrementing?
    }

    sem_wait(producer_gate);    
    std::cout << "\n\nCompleted (:   !!\n";

}


Комментарии
1 ответ

Стиль

Я вижу нечто, что выглядит как потокобезопасные очереди абстракция, которая таинственным образом реализован как набор глобальных переменных и один аргумент. Это довольно известный подвид контейнера, чтобы написать (или найти уже существующий, который соответствует вашим целям)

template <typename T>
class MTQueue
{
public:
// simple operations will lock & unlock on every call
void push(T const &);
T pop();

// ...
};

Просто передать указатель на этот поток функцию, и ваши потоки могут поп - по желанию.
Обратите внимание, это может тривиально использовать std::queue или любой другой контейнер внутренне (и можно ФИФО, а не ЛИФО) - все, что ты делаешь-это объединение некоторых контейнер с мьютексом и применение блокировки семантики при изменении (или проверять) ее.

Я тоже вижу что-то похожее на барьер (использование семафоров), но я подозреваю, что ваши потребители могут начать уничтожать как только что-то там в очереди, так что вам просто нужен способ для производителей, чтобы ждать, пока они все сделали. Мы могли бы добавить метод, чтобы ждать, пока очередь пуста, но это не говорит нам, работы полно. Должен ли быть результат? Если это так, используйте другой экземпляр того же типа: производитель может появляться результаты параллельно, не нужно ждать, пока последний из них не рассчитывается.

Производительности

Есть несколько подходов, чтобы сделать это быстро:

Пакетирование

Рассмотреть относительную стоимость синхронизации заключается в выталкивание элемента из очереди, и действительно работает: это может быть быстрее в целом, чтобы добавить более сложный интерфейс, так что вы можете толкать/поп несколько элементов с той же синхронизации, например.

  // more elaborate interface to amortise synchronization cost?
void push(std::vector<T> const &);
void pop(std::vector<T> &out);

или вы могли бы просто переключиться на пуша/выталкивание пакета работ объекта вместо одного задания: тот же эффект, может оставить вас с более простой и более многоразовых очереди.

Альтернативные Стратегии Синхронизации

Взаимного исключения только один способ обмена данными между несколькими ядрами, и честно говоря, это, наверное, хорошо подходит сюда (особенно если откалибровать правильно размер пакета).
Однако, если вы действительно хотите, чтобы застрять в, посмотреть замок бесплатно или неблокирующих очередей, а также - для некоторых рабочих нагрузок и количества ядер, они могут подойти вам лучше.

4
ответ дан 14 ноября 2011 в 05:11 Источник Поделиться