Правильно создавая задания для опроса и отправки заданий ребенок с отменой для каждого


Я создаю класс диспетчеру, что само по себе является длительная задача, которая может быть отменена в любое время пользователем. Эта задача будет опрашивать базу данных, чтобы увидеть, если есть любая работа, что должно быть сделано, и работать до X [5] # дочерних задач.

Насколько я могу сказать - это работает прекрасно, но у меня есть несколько вопросов/опасений по поводу кода. Более или менее, поскольку я не мог найти другой пример такого, я все правильно делаю? Есть вещи, которые я мог бы улучшить?

  1. Я с помощью ConcurrentDictionary для отслеживания ребенка задачи, которые выполняются. Этот словарь хранит RequestKey , что обрабатывается, и CancellationTokenSource для этой задачи.

    • Это лучший способ, чтобы сделать это? В StartDownloadProcess (это задание ребенку) я создаю CancellationTokenSource и добавления его в словарь, затем начинаются задачи. Я добавил продолжение к нему, которая затем удаляет элемент из словаря , когда обработка будет завершена, так что его не вызвали в отмена способ.
  2. В задании ребенку я передаю токен отмены, чтобы метод, который фактически выполняет работу. Тогда этот процесс будет проверять, чтобы увидеть, если он нуждается, чтобы прервать путем проверки того, что маркер периодически. Это правильно?

  3. В отменить метод, я создаю копию ключей в словаре, обходя его и стараясь открыть и удалить элемент из словаря и выдача отменить запрос.

    • Это лучший способ, чтобы сделать это? Нужно ли мне ждать, чтобы увидеть, если задача на самом деле отменили? Я Могу?
    • Я должен избавляться от ТТ?
  4. Я делаю нить.Спать в основных задач. Это хорошо или плохо? Я должен использовать SpinWait вместо? Есть ли другой способ/лучшее средство для первичного опроса ложитесь спать и запустите в определенные интервалы?

Примечание: в StartDownloadProcess я использую пока(истина) цикл до тех пор, пока задача не завершится или будет отменена, чтобы выполнить итерации до Дж > requestKey. В реальном коде не будет , пока петля. Было бы просто запустить новую задачу и запустить процесс загрузки.

/// <summary>
/// Primary dispatcher token source
/// </summary>
CancellationTokenSource primaryTokenSource;
/// <summary>
/// A collection of Worker Tokens which can be used to cancel worker tasks and keep track of how many
/// there are.
/// </summary>
ConcurrentDictionary<int, CancellationTokenSource> workerTokens = new ConcurrentDictionary<int, CancellationTokenSource>();

/// <summary>
/// Runs this instance.
/// </summary>
public void Run() {
  //  Only one dispatcher can be running
  if (IsRunning)
    return;

  //  Create a new token source
  primaryTokenSource = new CancellationTokenSource();
  //  Create the cancellation token to pass into the Task
  CancellationToken token = primaryTokenSource.Token;

  //  Set flag on
  IsRunning = true;

  //  Fire off the dispatcher
  Task.Factory.StartNew(
    () => {
      //  Loop forever
      while (true) {
        //  If there are more than 5 threads running, don't add a new one
        if (workerTokens.Count < 5) {
          //  Check to see if we've been cancelled
          if (token.IsCancellationRequested)
            return;

          //  Check to see if there are pending requests
          int? requestKey = null;

          //  Query database (removed)
          requestKey = new Random().Next(1550);

          //  If we got a request, start processing it
          if (requestKey != null) {
            //  Check to see if we've been cancelled before running the child task
            if (token.IsCancellationRequested)
              return;

            //  Start the child downloader task
            StartDownloadProcess(requestKey.Value);
          }
        } else {
          //  Do nothing, we've exceeded our max tasks
          Console.WriteLine("MAX TASKS RUNNING, NOT STARTING NEW");
        }

        //  Sleep for the alloted time
        Thread.Sleep(Properties.Settings.Default.PollingInterval);
    }
  }, token)
  //  Turn running flag off
  .ContinueWith((t) => IsRunning = false)
  //  Notify that we've finished
  .ContinueWith(OnDispatcherStopped);
}

/// <summary>
/// Starts the download process.
/// </summary>
/// <param name="requestKey">The request key.</param>
private void StartDownloadProcess(int requestKey) {
  CancellationTokenSource workerTokenSource = new CancellationTokenSource();
  CancellationToken token = workerTokenSource.Token;

  //  Add the token source to the queue
  workerTokens.GetOrAdd(requestKey, workerTokenSource);

  //  Start the child downloader task
  Task.Factory.StartNew(
    () => {
      int j = 0;
      while (true) {
        if (token.IsCancellationRequested) {
          Console.WriteLine("Sub-Task Cancelled {0}", requestKey);
          return;
        }

        //  Create a new downloader, pass it the RequestKey and token
        //var downloader = new Downloader(requestKey, token);
        //downloader.Run();

        //  Simulate work
        Thread.Sleep(250);
        Console.WriteLine("SUB-Task {0} is RUNNING! - #{1}", requestKey, j);

        //  Simulate - automatically end task when j > requestkey
        if (j++ > requestKey) {
          Console.WriteLine("SUB TASK {0} IS ENDING!", requestKey);
          return;
        }
      }
    },
    token
  ).ContinueWith((t) => {
    //  If we ended naturally, the cancellationtoken will need to be removed from the dictionary
    CancellationTokenSource source = null;
    workerTokens.TryRemove(requestKey, out source);
  });
}

/// <summary>
/// Cancels this instance.
/// </summary>
public void Cancel() {
  //  Cancel the primary task first so new new child tasks are created
  if (primaryTokenSource != null)
    primaryTokenSource.Cancel();

  //  Iterate over running cancellation sources and terminate them
  foreach (var item in workerTokens.Keys.ToList()) {
    CancellationTokenSource source = null;
    if (workerTokens.TryRemove(item, out source)) {
      source.Cancel();
    }
  }
}

Кроме того, не показано в примере выше: несколько событий также может быть поднят с помощью в рамках задачи. Эти события все выглядеть следующим образом:

public event EventHandler DispatcherStarted;
private void OnDispatcherStarted() {
  EventHandler handler = DispatcherStarted;
  if (handler != null) 
    Task.Factory.StartNew(() => handler(this, EventArgs.Empty), CancellationToken.None, TaskCreationOptions.None, taskScheduler).Wait();      
}

В метод run() метод, в различных точках, он назвал бы OnDispatcher*() , чтобы поднять события таким образом, абонент может подписаться и получать уведомления. Те задачи, что событие будет работать на основном потоке.

Я смотрел в диспетчер родовых и передав опроса объекта, который проверяет базу данных. В случае успеха, он создает задачу ребенка и переходит в параметр(ы), которые он должен. Я столкнулся некоторые проблемы, такие как передача данных по, которой предметы, чтобы пройти в интерфейсы/классы/функции<,,,>/Действий<> и т. д. Как я могу превратить это в общих диспетчер/Поллер, который работает, который возвращает параметры (я думал, словарь), который затем создает задачу-ребенка, который использует эти параметры, а также поддерживает отмены и уведомления о событии?



3514
1
задан 6 сентября 2011 в 07:09 Источник Поделиться
Комментарии
1 ответ

Ошибка в коде: если вы имеете 5 или более выполняемых задач, нет задач будет всегда отвечать для отмены() запрос на CancellationToken.

2
ответ дан 25 мая 2012 в 08:05 Источник Поделиться