Минималистский динамический планировщик задач


Динамическая политика выполнения не сделать разрез для C++17. Представленный на вашу критику, минималистский динамического планировщика задач для использования до СТД::асинхронные разбирается...

Меня интересует не только в качество кода и корректности, а также, является ли сама стратегия может быть улучшена при сохранении простоты.

Задач будет осуществляться через std::async. Стратегия заключается в ногу с тем, как многие новые потоки были запущены с помощью планировщика, но еще не завершена. Если это число+1, равна или превышает количество ядер, а новые задачи будут запланированы как launch::deferredиначе launch::async. Очевидно, что не может быть сделано идеально, но если задача редко получает запланированные "неправильно", это не катастрофа, пока все пыхтит.

Я определил обертку для std::будущее называется dj::t_future. Он держит памятку, какая политика была использована для запуска потока. (Это то, что нужно? Какой лучше способ можно РАИИ?) Есть два конструктора для t_futureвсе вырезать-и-вставить, потому что я не выяснить, как использовать только один, который охватывает как функции, которые возвращают значения и пустота функций. Поле боя в dj::t_future<T>::get() показывает свидетельство произошла перестрелка с участием животных исключений. (Разве хорошие парни побеждают?)

Если вы изучите код, Пожалуйста, скажите, что вы думаете, даже в маловероятном событие, что вы не видите ничего критиковать.

Увидеть обновленный код в ответе я написал.

Вот оригинал, только я исправлена одна глупая ошибка, которая могла поднять свою уродливую голову "только заголовок" режим. Я украсил num_threads с static inline, который не имеет смысла. Если статический, встроенный избыточна.

#include <future>
#include <atomic>
#include <thread>

namespace dj {

    namespace global { // Wasa bug. This was "static". I blame my fingers.
        inline std::atomic<unsigned> num_threads{ 1 };
    };

    namespace {
        struct finish {
            std::launch policy;
            finish(std::launch p) : policy(p) {}
            ~finish() {
                if (policy != std::launch::deferred) {
                    global::num_threads -= 1;
                }
            }
        };
    }

    // Wrapper for std::future that remembers whether thread was spawned
    // and decrements thread count on "get" completion when appropriate.
    template<class T>
    class t_future {
        std::future<T> fut;
        std::launch policy;
    public:
        t_future(std::future<T> &&fut, std::launch policy)
            : policy(policy)
            , fut(std::move(fut))
        {}

        T get() {
            struct finish finally(policy); // In case get() throws exception

            T ret;
            try { ret = fut.get(); } catch (...) { throw; } // See notes below
            return ret;
        }
    };

    // Copy-paste-edit of the above for functions returning void.
    // Is there a way to avoid this?
    template<>
    class t_future<void> {
        std::future<void> fut;
        std::launch policy;
    public:
        t_future(std::future<void> &&fut, std::launch policy)
            : policy(policy)
            , fut(std::move(fut))
        {}

        void get() {
            struct finish finally(policy); // In case get() throws exception

            // Why do I need this try/catch/throw?
            try { fut.get(); } catch (...) { throw; } 
            // without it, the program does not abort when 
            // exception is not caught in user code.
        }
    };

    // dj::async sets std::launch policy automatically 
    // based on how many threads have
    // already been launched.
    template<typename F, typename... Args>
    auto async(F& f, Args&&... args) {
        using ret_t = decltype(f(std::forward<Args>(args)...));

        auto policy{ std::launch::deferred };
        unsigned count{ 0 };
        unsigned current { global::num_threads };
        while(current < std::thread::hardware_concurrency() 
          && ++count < 1'000) {
            if (global::num_threads.compare_exchange_strong(current, current + 1)) {
                policy = std::launch::async;
                break;
            }
        }

        auto fut = std::async(policy, f, std::forward<Args>(args)...);
        return t_future<ret_t> {std::move(fut), policy};
    }
}

// Code to review ends here

// Minimal test code begins here...
#include <chrono>
static void busy_sleep(long double time) noexcept
{
    using duration_t = std::chrono::duration<long long, std::nano>;
    const auto end = std::chrono::high_resolution_clock::now() 
         + duration_t(static_cast<long long> (time * 1e9));
    do {
        ;
    } while (std::chrono::high_resolution_clock::now() < end);
}

#include <random>
std::default_random_engine re;
std::uniform_real_distribution<double> ud(.02, .5);

#include <iostream>
int main() {
    auto f = []() {
        std::cout << "(";  
        auto spin = ud(re);
        busy_sleep(spin);
        std::cout << ")";  
    };
    std::vector<dj::t_future<void>> threads;
    for (unsigned i = 0; i < 2*std::thread::hardware_concurrency() ; ++i) {
        threads.emplace_back(dj::async(f));
    }
    for (auto& t: threads) {
        t.get();
    }
    return 0;
}


Комментарии
1 ответ

Замечания

Без фактического времени все это просто домыслы ...

Похоже, вы в основном используя deferred как стратегия, когда вы думаете, что бассейн переполнен, что означает, что задача будет выполнена, когда кто-то звонит get() на будущее. Я не уверен, если это реальное улучшение по сдаче в аренду ОС обрабатывать переподписки.

Например, если есть кто-нить сидит с кучей мелких задач, они будут запланированы асинхронного до предела, то они получат отсрочку. Как на cppreference (http://en.cppreference.com/w/cpp/thread/async) тех, кто будет выполняться на Первом wait() или get() звонок на будущее, так как только основной поток хочет, чтобы результаты его позову фьючерсов, что означает, что deferred будут называется однопоточным из основного потока. Или фьючерсы должны быть передан другим потокам для выполнения, которая, кажется, не очень удобно.

num_threads является глобальной, если dj:async используется только с небольшим количеством потоков, которые могли бы быть ок, если он используется большее количество нитей, стоимость того, чтобы поделиться этой стоимости между ядрами может негативно сказаться на производительности.

Если вы выполняете петлю вокруг сравнивать и обмена вы могли бы также подумать об использовании weak а не сильным.

2
ответ дан 12 марта 2018 в 02:03 Источник Поделиться