Потокобезопасный класс с большим количеством фоновых рабочих и замки


Пытаясь рассмотреть свой собственный код, чтобы быть уверенным, я реализовал потокобезопасности правильно. Мне нужны дополнительные мнения, потому что я что-то пропустил. Количество замков выглядит как уродливый код я также не уверен о прохождении std::shared_ptr для std::thread. В принципе, любая критика приветствуется. Если вы видите что-то не относящиеся к многопоточности, но все равно неправильно/плохо-практики, пожалуйста, поделитесь.

Общая идея - HardwareManager контролирует несколько IBusManagerС. HardwareManager запускает несколько потоков для обновления его состояния: _threadUpdate звонки update() способ и _updateBusThreads потоки звонков updateBus() для обновления каждого автобуса. getState() метод HardwareManager можно назвать несколько потоков из разных частей программы, поэтому он должен мне потокобезопасным.

#include <atomic>
#include <chrono>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>

struct IoState
{
    IoState() {};

    std::vector<bool> ios;
    std::vector<bool> directions;

    //...
};

/**
 * State interface.
 */
class IDeviceState
{
public:

    IDeviceState() {};
    virtual ~IDeviceState();

    virtual std::vector<IoState> getIoStates() const = 0;
};

/**
 * Bus manager Interface.
 */
class IBusManager
{
public:

    IBusManager() {};
    virtual ~IBusManager();

    virtual bool initialize() = 0;
    virtual void update() = 0;
    virtual std::vector<IDeviceState> getState() const = 0;
} ;

class HardwareManager
{
public:

    HardwareManager();

    void start();
    void stop();

    std::vector<std::vector<IDeviceState>> getDeviceStates() const;

private:

    std::vector<std::vector<IDeviceState>> _deviceStates;
    std::vector<std::shared_ptr<IBusManager>> _busManagers; //pointer because intended to be passed to the corresponding thread

    std::thread _updateThread;
    std::vector<std::shared_ptr<std::thread>> _updateBusThreads; //pointer because std::thread is immutable?

    std::atomic<bool> _isShutdown;

    std::mutex _mutexBusManagers;
    mutable std::mutex _mutexDeviceStates;
    std::mutex _mutexUpdateThread;
    std::mutex _mutexUpdateBusThreads;

    /**
     * Initializes all resources. Starts background worker threads.
     */
    bool initialize();

    /**
     * Joins all background threads.
     */
    bool shutdown();

    /**
     * Main update loop. Executed my _updateThread.
     */
    void update();

    /**
     * Bus manager update loop. Executed by _updateBusThreads.
     * @param busManager bus manager to be updated
     */
    void updateBus(std::shared_ptr<IBusManager> busManager);

    //...
};

void HardwareManager::start()
{
    _isShutdown = false;
    auto result = initialize();

    if (!result)
    {
        _isShutdown = true;
    }
}

void HardwareManager::stop()
{
    _isShutdown = false;
    shutdown();
}

//...

bool HardwareManager::shutdown()
{
    try
    {
        //wait for the bus loops
        std::lock_guard<std::mutex> lockUpdateBusThreads(_mutexUpdateBusThreads);

        for (auto& updateBusThread : _updateBusThreads)
        {
            if (updateBusThread->joinable())
            {
                updateBusThread->join();
            }
        }

        //wait for the main loop
        std::unique_lock<std::mutex> lockUpdateThread(_mutexUpdateThread);

        if (_updateThread.joinable())
        {
            _updateThread.join();
        }
    }
    catch (const std::exception& ex)
    {
        return false;
    }

    return true;
}

bool HardwareManager::initialize()
{
    try
    {
        //lock
        std::unique_lock<std::mutex> lockBusManagers(_mutexBusManagers);
        std::unique_lock<std::mutex> lockDeviceStates(_mutexDeviceStates);
        std::unique_lock<std::mutex> lockUpdateBusThreads(_mutexUpdateBusThreads);

        //allocate
        _updateBusThreads = std::vector<std::shared_ptr<std::thread>>(_busManagers.size());
        _deviceStates = std::vector<std::vector<IDeviceState>>(_busManagers.size());

        //initialize resource and background threads
        for (auto& busManager : _busManagers)
        {
            auto result = busManager->initialize();

            if (!result)
            {
                return false;
            }

            auto state = busManager->getState();
            _deviceStates.push_back(state);

            auto thread = std::make_shared<std::thread>(&HardwareManager::updateBus, this, busManager);
            _updateBusThreads.push_back(thread);
        }

        //unlock
        lockBusManagers.unlock();
        lockDeviceStates.unlock();
        lockUpdateBusThreads.unlock();

        //launch main update
        std::lock_guard<std::mutex> lockUpdateThread(_mutexUpdateThread);
        _updateThread = std::thread(&HardwareManager::update, this);
    }
    catch (const std::exception& ex)
    {
        return false;
    }

    return true;
}

void HardwareManager::update()
{
    while (!_isShutdown)
    {
        std::lock_guard<std::mutex> lockBusManagers(_mutexBusManagers);
        std::lock_guard<std::mutex> lockDeviceStates(_mutexDeviceStates);

        for (std::size_t i = 0; i < _busManagers.size(); ++i)
        {
            _deviceStates[i] = _busManagers[i]->getState(); //getState is supposed to be thread-safe
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

void HardwareManager::updateBus(std::shared_ptr<IBusManager> busManager)
{
    while (!_isShutdown)
    {
        busManager->update(); //update is supposed to be thread-safe

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

std::vector<std::vector<IDeviceState>> HardwareManager::getDeviceStates() const
{
    std::lock_guard<std::mutex> lockDeviceStates(_mutexDeviceStates);
    return _deviceStates;
}

//...

В HardwareManager используется такой (упрощенный пример):

//...

HardwareManager manager;
manager.start();

//pass state with network clients/web interface
std::thread broadcastToNetworkThread = std::thread([&]{
    while (true)
    {
        network.send(manager.getState());
    }
});

//pass state to main task controller
while(true)
{
    controller.updateIo(manager.getState());
}


126
2
задан 5 февраля 2018 в 11:02 Источник Поделиться
Комментарии
1 ответ

Я не вижу ничего особенно важного я хочу пожаловаться.
Кажется очень твердым код.


Когда у вас есть несколько замков.
Всегда возникает вопрос, с тупика, вызванных приобретением замков в неправильном порядке. Чтобы избежать этого, вы должны использовать std::lock() чтобы убедиться, что все замки приобретаются в правильном порядке.

Так что такие вещи, как это:

    std::unique_lock<std::mutex> lockBusManagers(_mutexBusManagers);
std::unique_lock<std::mutex> lockDeviceStates(_mutexDeviceStates);
std::unique_lock<std::mutex> lockUpdateBusThreads(_mutexUpdateBusThreads);

Должно быть сделано так:

    // When you use unique lock you indicate your attention to lock
// first. But don't specifically acquire them
std::unique_lock<std::mutex> lockBusManagers(_mutexBusManagers, std::defer_lock);
std::unique_lock<std::mutex> lockDeviceStates(_mutexDeviceStates, std::defer_lock);
std::unique_lock<std::mutex> lockUpdateBusThreads(_mutexUpdateBusThreads, std::defer_lock);

// Then use `std::lock()` to acquire the locks.
// This guarantees that the locks are acquired in the correct order.
// Correct: Same order every where they are used in combination.
std::lock(_mutexBusManagers, _mutexDeviceStates, _mutexUpdateBusThreads);

Я хотел бы отметить, что вы, кажется, всегда используется последовательный порядок (поэтому не должно быть никаких проблем в коде). Но не делает это делает ваш код очень хрупким для поддержания. Даже если вы правильно напишите это в какой-то момент кто-то другой будет поддерживать его, и они не могут быть осторожны. Поэтому вам нужно остерегаться этих.


Избежать соблазна выпустить замки вручную.

    //unlock
lockBusManagers.unlock();
lockDeviceStates.unlock();
lockUpdateBusThreads.unlock();

Предпочитаю пусть std::unique_lock деструктор не разбудив.


В боол HardwareManager::initialize() вы ловите все std::exceptions и возвращать true/false, чтобы указать успех. Это нормально, на внутренний интерфейс. Но следует отметить, что не все исключения являются производными от std::exception. Я бы поймать (...) или пометить функцию как noexcept (или оба).


Это очень подозрительно:

std::this_thread::sleep_for(std::chrono::milliseconds(10));

У вас есть ошибка синхронизации в коде или есть конкретные требования, которые вы реализуете здесь. Если есть какие-то ограничения, вы должны документ с комментарием. Если это ошибка (ГРМ или иначе), что вы не компенсируя, то вы также должны подтвердить это документально.

1
ответ дан 5 февраля 2018 в 05:02 Источник Поделиться