Пользовательский поток-бассейн/класс очереди


Я хотел, чтобы класс, который выполняет ряд задач, но только определенное количество в то же время (например, для загрузки различного контента в интернете и сохранить общую скорость загрузки на хорошем уровне). Класс я написал, кажется, работает, но есть, вероятно, вещи, которые можно улучшить.

  1. Эти замки имеют смысл? Должно быть также другими замками?
  2. У меня есть метод и событие, которое имеет значение только при использовании конкретного конструктора. Есть ли способ, чтобы улучшить это?
  3. Класс я использую для задач ThreadStart. Это хорошая идея?
  4. Может быть лучше имена методов/имена классов.
  5. Есть какие-то общие ошибки (например, что больше потоков, чем максимальное значение, будут выполнены)?

Есть, вероятно, больше очков, что можно было бы улучшить. Также, если кто знает хорошие библиотеки с открытым исходным кодом (или родной .Чистый класс даже), который делает только то, что мой класс должен сделать, я был бы заинтересован в этом тоже.

public class ThreadQueue
{
    private readonly HashSet<ThreadStart> WorkingThreads = new HashSet<ThreadStart>();
    private readonly Queue<ThreadStart> Queue = new Queue<ThreadStart>();

    private bool RaiseCompleteEventIfQueueEmpty = false;

    private int ThreadsMaxCount;

    public ThreadQueue(int threadsMaxCount)
    {
        ThreadsMaxCount = threadsMaxCount;
    }

    /// <summary>
    /// Creates a new thread queue with a maximum number of threads and the tasks that should be executed.
    /// </summary>
    /// <param name="threadsMaxCount">The maximum number of currently active threads.</param>
    /// <param name="tasks">The tasks that should be executed by the queue.</param>
    public ThreadQueue(int threadsMaxCount, ThreadStart[] tasks) : this(threadsMaxCount)
    {
        RaiseCompleteEventIfQueueEmpty = true;
        foreach (ThreadStart task in tasks)
        {
            Queue.Enqueue(task);
        }
    }

    /// <summary>
    /// Starts to execute tasks. Used in conjunction with the constructor in which all tasks are provided.
    /// </summary>
    public void Start()
    {
        CheckQueue();
    }

    private readonly object addlock = new object();
    /// <summary>
    /// Adds a task and runs it if a execution slot is free. Otherwise it will be enqueued.
    /// </summary>
    /// <param name="task">The task that should be executed.</param>
    public void AddTask(ThreadStart task)
    {
        lock (addlock)
        {
            if (WorkingThreads.Count == ThreadsMaxCount)
            {
                Queue.Enqueue(task);
            }
            else
            {
                StartThread(task);
            }
        }
    }

    /// <summary>
    /// Starts the execution of a task.
    /// </summary>
    /// <param name="task">The task that should be executed.</param>
    private void StartThread(ThreadStart task)
    {
        WorkingThreads.Add(task);
        BackgroundWorker thread = new BackgroundWorker();
        thread.DoWork += delegate { task.Invoke(); };
        thread.RunWorkerCompleted += delegate { ThreadCompleted(task); };
        thread.RunWorkerAsync();
    }

    private void ThreadCompleted(ThreadStart start)
    {
        WorkingThreads.Remove(start);
        CheckQueue();
        if (Queue.Count == 0 && WorkingThreads.Count == 0 && RaiseCompleteEventIfQueueEmpty) OnCompleted();
    }

    private readonly object checklock = new object();
    /// <summary>
    /// Checks if the queue contains tasks and runs as many as there are free execution slots.
    /// </summary>
    private void CheckQueue()
    {
        lock (checklock)
        {
            while (Queue.Count > 0 && WorkingThreads.Count < ThreadsMaxCount)
            {
                StartThread(Queue.Dequeue());
            }
            if (Queue.Count == 0 && WorkingThreads.Count == 0 && RaiseCompleteEventIfQueueEmpty) OnCompleted();
        }
    }

    /// <summary>
    /// Raised when all tasks have been completed. Will only be used if the ThreadQueue has been initialized with all the tasks it should execute.
    /// </summary>
    public event EventHandler Completed;

    /// <summary>
    /// Raises the Completed event.
    /// </summary>
    protected void OnCompleted()
    {
        if (Completed != null)
        {
            Completed(this, null);
        }
    }
}


22355
16
задан 6 февраля 2011 в 12:02 Источник Поделиться
Комментарии
4 ответа

Есть много, что нужно учитывать при внедрении резьбовых очередь и я бы не рекомендовал делать это вручную, если у вас есть действительно хорошая причина, потому что существующие решения являются проверенными и не очень специфический дизайн должен и некоторые хардкор кодирования, чтобы пойти с ним вы, скорее всего, возится с чем-то менее эффективным/мощной, чем существующие варианты. Что сказал, как упражнение, это может быть очень познавательно вызов и я постараюсь, чтобы рассмотреть его как таковой; производство соответствующих примечаниях ниже.

Технически это просто вопрос предпочтений, но, вообще говоря, частные переменные-члены не имеют заглавной первой буквы, чтобы помочь дифференцировать между представителями публичных и внутренних деталей реализации (типовые .Чистый стиль верблюжьего для членов общественных и pascalCase для полей/переменных/параметров).

Похоже, вы используете полное событие для двух различных значений; вы должны сделать это два отдельных события. Я бы не стал заморачиваться с RaiseCompleteEventIfQueueEmpty, вместо того, чтобы просто иметь ItemComplete событие и QueueComplete событие или что-то подобное и всегда вызывают события; если абонент не волнует тот или другой (или так же) они просто не подключать обработчики к ним.

Есть проблемы синхронизации из-за вашей блокировки. Потому что ваш AddTask и CheckQueue методы используют различные блокировки объектов можно добавлять и доступе к очереди одновременно. Потому что очереди не по своей природе потокобезопасными, это может привести к проблемам. Вы лучше использовать один замок между двумя. Если вы на фреймворк 4.0 вы можете рассмотреть возможность использования коллекции concurrentqueue вместо.

BackgroundWorker содержит положения, призванные сделать его легко установить простые фоновые задачи отдельно от потока пользовательского интерфейса. BackgroundWorker является компонентом, который технически означает, что вы должны утилизировать ее, когда вы закончите (не из собственных нить) хотя я не думаю, что ее реализация зависит в данный момент, что может измениться и вы хотите следить за надлежащей практики в отношении интерфейс IDisposable. Это не выглядит, как вам нужно много удобства, которые она предлагает и в основном использовать его для события завершения, которые вы могли бы довольно легко реализовать себя. Вы бы иметь больше контроля и меньше накладных расходов, используя нитки , которые должны также держать вас осознавать все, что происходит в чем-то таком низком уровне. Вот краткое обсуждение на переполнение стека. Также стоит отметить: BackgroundWorker использует метод BeginInvoke внутренне, который выполняет работу на поток из пула потоков.

В связи с использованием ThreadStart, это просто делегат, как и любой другой, но если вы желаете, чтобы распараллелить работу, вполне вероятно, что вы хотите, чтобы обеспечить контекст или нагрузки для каждого потока. Это можно обойти эту несколько способов, в том числе с использованием блокад при определении делегата, но это грязный и не подходит для всех ситуаций. Класс Thread, который использует ThreadStart, также поддерживает ParameterizedThreadStart. Так вы используете вызвать вы, вероятно, может принять любой делегат и любое количество параметров, пожалуй, вот так:

private readonly Queue<KeyValuePair<Delegate, object[]>> Queue = new Queue<KeyValuePair<Delegate, object[]>>();
private readonly object queueSync = new object();

public void AddTask<F>(F task, params object[] parameters) where F : Delegate
{
lock (queueSync)
{
if (WorkingThreads.Count == ThreadsMaxCount)
{
Queue.Enqueue(new KeyValuePair<Delegate, object[]>(task, parameters));
}
else
{
StartThread(task, parameters);
}
}
}

private void StartThread(Delegate task, object[] parameters)
{
WorkingThreads.Add(task);
BackgroundWorker thread = new BackgroundWorker();
thread.DoWork += delegate { task.Invoke(parameters); };
thread.RunWorkerCompleted += delegate { ThreadCompleted(task); };
thread.RunWorkerAsync();
}

Предупреждение: этот код не учитывает другие мои предложения, и была написана специальная в текстовом редакторе, так что это не компиляции или тестирования; идея-это все, что я пытаюсь продемонстрировать. Есть несколько более эффективные способы, чтобы сделать использование дженериков и выбор делегата в целом, и вы, вероятно, также хотят, чтобы обеспечить контекст данных в случае завершения.

Вы, вероятно, следует предоставить немного больше информации в завершение мероприятия так, что прикрепленный код видите что завершена (рассмотреть один обработчик может быть использован для много потоков и еще много бассейнов). Использовать объект, наследуемый от EventArgs содержит задачи (и параметры, если вы берете этот совет).


В связи с Даррагые ответ и свои комментарии: если вы хотите использовать пул потоков , но нужно знать, когда работа будет завершена, вы можете использовать параллельных задач библиотека, которая теперь поставляется с 4.0 для использования задач объекты, которые по умолчанию назначены с помощью класса ThreadPool. Кроме того, если в ваши задачи будет очень долго и вы все еще хотите работать на свои собственные темы или задать свой собственный параллелизм правил можно реализовать в планировщик задач и продолжать использовать существующие шаблоны/классы. Это будет способствовать простота эксплуатации и обслуживания, а если в какой-то момент Вы хотите использовать другой механизм планирования это потребует минимальных изменений. Следует отметить (как можно увидеть в документации выше), что там уже есть поддержка длительные задачи не с помощью класса ThreadPool даже в дефолтном планировщике.

Следующие бесплатные "книги" есть много хорошей информации, как на новые расширения параллелизма в рамках (ТПЛ, язык PLINQ, параллельно.Для и т. д.) а также сведения о примитивы синхронизации и сырых шаблонов, используемых для управления многопоточной нагрузкой:

Модели параллельного программирования: понимание и применение параллельных моделей с .Чистая Framework 4 в

Параллельное программирование с Microsoft .Чистая

Многопоточность в C#

Большинство из них содержат очень похожую информацию, но каждый из них имеет свой собственный стиль, а также различные сочные крупицы мудрости.

19
ответ дан 9 февраля 2011 в 05:02 Источник Поделиться

Я знаю, что ты уже отмечен один из ответов в качестве ответа, и это правильно, это очень полная и полезная. Я добавляю этот ответ, как никто и не отметился, и это является частью первого вопроса: 1. У этих замков имеет смысла? Должно быть также другими замками?

В отношении второй половины, что, должно быть также другие замки. Да. Вы используете для поиска HashSet для отслеживания рабочих потоков. Добавление, удаление и получение графа этот набор без каких-либо замок. Это катастрофа замедленного действия. Вам нужно мьютекс вокруг вашего чтения и записи для поиска HashSet. Как уже упоминалось, если вы можете использовать .Net версии 4.0, то есть коллекции concurrentbag коллекции, которые вы можете использовать. В противном случае нужно добавить еще один замок объект и блокировку доступа к вашему поиска HashSet.

4
ответ дан 9 февраля 2011 в 08:02 Источник Поделиться

Мы используем смарт пул потоков. Это простые и проверенные. Вы должны быть в состоянии выполнить ваши требования с меньшими усилиями и большей уверенностью, опираясь на него, а не реализации собственных.

3
ответ дан 9 февраля 2011 в 07:02 Источник Поделиться

В ThreadCompleted похоже, вы поднимаете oncompleted в два раза - один раз в этот способ себя и еще один в CheckQueue способ

2
ответ дан 6 февраля 2011 в 12:02 Источник Поделиться