Простой универсальный пул


Вот наспех сделал пул потоков, чтобы удовлетворить мои требования и даже для моих будущих проектов. Он работает нормально и выполнения задач по назначению, но я хочу еще улучшить, и я не уверен, если это правильный способ разработки поток из пула, так как я самоучка, энтузиаст, и слабоват как юрист язык.

Задачи хранятся в очереди. Они выполняются в порядке, только когда поток простоя, доступный из пула. Новые задачи может оказаться в очереди. Для инициализации бассейн, нужно указать общее количество потоков и bool переменная, которая будет использоваться, чтобы расторгнуть бассейн и потоки изящно.

#include "stdafx.h"
#include <iostream>
#include <memory>
#include <thread>
#include <vector>
#include <queue>    
#include <chrono>    
#include <mutex>    
#include <condition_variable>

using namespace std;

typedef function<void(void)> task_t;

class thread_t
{
public:
    thread_t(int id, bool& running, condition_variable& cv)
        :id_(id)
        , running_(running)
        , idle_notify_cv_(cv)
    {
        idle_ = true;
        thread_ = new thread([=]() { run(); });
    }

    ~thread_t()
    {
        notify();
        cout << id_ << "  stopping \n";
        thread_->join();
    }

    void push(task_t task)
    {
        task_ = task;
        idle_ = false;
        cv_.notify_one();
    }

    void notify()
    {
        cv_.notify_all();
    }

    bool is_idle() const
    {
        return idle_;
    }
    int get_id() const
    {
        return id_;
    }

private:
    void run()
    {
        cout << id_ << "  starting  \n";
        while (running_)
        {
            unique_lock<mutex> lock(mu_);
            cv_.wait(lock, [=]() { return idle_ == false || !running_; });
            if (!running_) return;
            task_();
            cout << id_ << " :work done  \n";
            idle_ = true;
            idle_notify_cv_.notify_all();
        }
    }
private:
    condition_variable& idle_notify_cv_;
    mutex mu_;
    condition_variable cv_;
    task_t task_;
    thread* thread_;
    bool idle_;
    int id_;
    bool& running_;
};

class pool
{
public:
    pool(int n, bool& running)
        :nthreads_(n)
        ,running_(running)
    {
        if (n > std::thread::hardware_concurrency()) nthreads_ = n = std::thread::hardware_concurrency()-1;
        for (int i = 0; i < n; i++)
        {
            threads_.push_back(make_unique<thread_t >(i, running_, idle_notify_cv_));
        }

        pool_thread_ = new thread([=]() { run(); });
    }

    void push(task_t task)
    {
        unique_lock<mutex> lock(write_queue_mu_);
        tasks_.push(task);
        idle_notify_cv_.notify_one();
    }

    int get_idle()
    {
        for (int i = 0; i < nthreads_; i++)
        {
            if (threads_[i]->is_idle())
            {
                return i;
            }
        }
        return -1;
    }

    void run()
    {
        cout << " pool thread started \n " ;
        while (running_)
        {
            int idle;
            if (!tasks_.empty() && (idle = get_idle()) != -1)
            {
                unique_lock<mutex> lock(write_queue_mu_);
                idle_notify_cv_.wait(lock, [=]() { return idle != -1 || !running_; });
                if (!running_) return;
                auto task = tasks_.front();
                tasks_.pop(); 
                lock.unlock();
                cout << " thread# " << threads_[idle]->get_id() << " assigned a task \n";
                threads_[idle]->push(task);
            }
        }
    }


    ~pool()
    {
        pool_thread_->join();
        cout << " thread pool destroyed \n ";
    }

private:
    mutex write_queue_mu_;
    queue<task_t> tasks_;
    vector<unique_ptr<thread_t>> threads_;
    int nthreads_;
    bool& running_;
    condition_variable idle_notify_cv_;
    thread* pool_thread_;
};



int main()
{
    bool running = true;
    pool pool1(2, running);

    task_t task1 = []()
    {
        this_thread::sleep_for(chrono::seconds(2s));
        cout << " Task 1 executed \n";
    };

    task_t task2 = []()
    {
        this_thread::sleep_for(chrono::seconds(1s));
        cout << " Task 2 executed \n";
    };

    task_t task3= []()
    {
        this_thread::sleep_for(chrono::seconds(2s));
        cout << " Task 3 executed \n";
    };

    task_t task4 = []()
    {
        this_thread::sleep_for(chrono::seconds(1s));
        cout << " Task 4 executed \n";
    };

    pool1.push(task1);
    pool1.push(task2);
    pool1.push(task3);
    pool1.push(task4);

    this_thread::sleep_for(chrono::seconds(5s));
    running = false;

    return 0;
}


482
6
задан 17 марта 2018 в 01:03 Источник Поделиться
Комментарии
3 ответа

Неопределенное поведение

Ваш код содержит неопределенное поведение из-за "гонки данных". Чтение-доступ на thread_t::idle_ в thread_t::is_idle не синхронизируется с возможностью записи в thread_t::run. Это гонки данных. То же имеет место thread_t::push. Я могу push новое задание, а старое настоящее время обрабатываются.

Не использовать using namespace std

Не используйте using namespace std если ваши имена, скорее всего, столкнется с теми из стандартной библиотеки, и никогда не используйте его в заголовке. Вы должны были использовать thread_t так std::thread уже существует. Однако имена, которые заканчиваются _t зарезервированы в POSIX. Обычно это игнорировать, хотя,.

Общий дизайн

Ваш общий дизайн хороший, но некоторые функции кажутся странными. Почему thread_t::notify() частью публичного интерфейса? Делает pool::get_idle возвращает количество свободных потоков или первый код? Пользователь должен быть в состоянии назвать pool::run() или thread_t::run()?

Перенести эти функции в private раздел. Облегчу ваши занятия в использовании и трудно злоупотреблять.

Кстати, pool::run может содержать ошибки. После idle != -1if), лямбда принимает idle копия ([=]). idle'ы значение никогда не изменится в этой точке, так что проверить лямбда-лишнее.

Удалить запрещенные функции

thread_tС конструктор копирования должен сделать явно удалены, а также его копия задания. В зависимости от вашего варианта использования, может быть, даже хотят, чтобы предотвратить движения:

thread_t(thread_t&&) = delete;
thread_t(const thread_t&) = delete;
thread_t& operator=(thread_t&&) = delete;
thread_t& operator=(const thread_t&) = delete;

Добавьте документации

Это зависит от того, вы хотите, чтобы повторно использовать ваш код в (будущем) проекта, но то, что сейчас вам понятно может быть не так ясно в несколько дней/недель/месяцев. Пока мы здесь, использовать код форматирования. Некоторые из ваших один-заявление ifесть брекеты, друга не, напр.

        // nice
if (threads_[i]->is_idle())
{
return i;
}

против

    // not so nice :(
if (n > std::thread::hardware_concurrency()) nthreads_ = n = std::thread::hardware_concurrency()-1;

Кроме того, что вы застряли в одной вмятины/брекеты стиль, и это здорово.

Уменьшить сложность

У вас обоих idle_notify_cv_ в качестве справочного и cv_ в качестве рефери. Я думаю, у тебя было два std::condition_variableы сначала а потом убрали одну. Если это так, то я предлагаю вам снять одну из них в следующий раз. Компилятор скажет вам, где эта переменная используется, и вы можете сделать каждом конкретном случае решение о целесообразности его еще нужно зарегестрироваться или могут быть удалены.

Разделение функциональности

Это скорее игрушка для вас, чтобы возиться с std::threadно для библиотеки разделить свои функции в нескольких файлах, например

thread.h
thread.cpp
thread_pool.h
thread_pool.cpp

Если вы разделяете вашу реализацию из своего заголовка, время перекомпиляции должны получить вниз, если ваше приложение растет в размерах. Это не будет заметно на вашей текущей программе, хотя.

6
ответ дан 17 марта 2018 в 02:03 Источник Поделиться

Это тест анализ вышеуказанных пула потоков, нажав 100 задач в бассейн.

Результат показывает, многопоточная гораздо быстрее (в этом примере х10), чем обычный вызов функции , но в некоторых простых случаях может различаться. Я хочу заказать 1-2 потоков в пуле для критических сроков задач в следующем шаге.

Я не могу редактировать свой пост, но std::this_thread::sleep_for(std::chrono::milliseconds(1)) могут вставить в бассейн::выполнить() цикл while для предотвращения блокировки во время толчка

int main()
{
bool running = true;
thread_pool pool(10, running);

task_t task = []()
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
};

auto start = std::chrono::system_clock::now();
for(int i= 0;i<100;i++)
pool.push_task(task);
while (!pool.empty())
{
std::this_thread::sleep_for(std::chrono::nanoseconds(30));
}
auto stop = std::chrono::system_clock::now();
std::cout <<"Multithreaded took "<< (std::chrono::duration_cast<std::chrono::milliseconds>(stop - start)).count()/1000 << " sec\n";

start = std::chrono::system_clock::now();
for (int i = 0; i<100; i++)
task();
stop = std::chrono::system_clock::now();
std::cout <<"Plain function took "<< (std::chrono::duration_cast<std::chrono::milliseconds>(stop - start)).count()/1000 << " sec\n";

running = false;
return 0;
}

Вот интересный результат ( исключая бассейн время установки):
enter image description here

2
ответ дан 17 марта 2018 в 10:03 Источник Поделиться

Один очень важный вопрос до сих пор не упоминал. Вы должны окончательно изменить активный подождать цикл, когда ожидает потока в run метод:

void run()
{
while (running_)
{
if (!tasks_.empty() && (idle = get_idle()) != -1)
{
// ...
}
}
}

Этот активный цикл потребляет процессорное время, когда есть задача, ожидающая, но не доступный рабочий поток. Так что не только все рабочие заняты своей работой, но вы также держит занят диспетчером. Лучше использовать некоторые сигнализации механизм, который будет приостановить run способ и возобновить, когда некоторые задачи завершается.

И еще одна деталь. Прохождение ссылка running флаг pool конструктор в принципе не очень хорошая идея. Обратите внимание если вы пройдете false в конструктор и позже вы решите активировать свой бассейн, изменив ссылку, она просто не будет - ничего не начинается run метод снова. Вы должны рассмотреть вопрос об осуществлении включения/выключения логики в публичных методов, а не простой логический флаг.

0
ответ дан 19 марта 2018 в 07:03 Источник Поделиться