Пытаюсь создать надежный механизм выполнения задач


Я создал простой асинхронный двигатель исполнения, и я хотел бы помочь определить места, он может быть улучшен. Основной функцией системы является мониторинг msmq и услуги брокера очередей и выполнить любые задачи, которые приходят через них.

Вот код:

using System;
using System.Collections.Generic;
using System.Linq;
using Castle.Windsor;
using ThreadingTasks = System.Threading.Tasks;

public class Engine
{
    /// <summary>
    /// Start's the Engine and spins up appropriate message listeners
    /// </summary>
    public void Start()
    {
        _logger.Info("Starting the Engine, calling GetAsync for all queues");
        _cancelToken = false;
        GetAsync(AsyncLaunch.All);
    }

    /// <summary>
    /// Sends the stop message to the engine which will let currently running tasks finish
    /// and stop future tasks from being picked up. 
    /// </summary>
    public void Stop()
    {
        _logger.Info("Stopping the Engine");
        _cancelToken = true;
    }

    protected enum AsyncLaunch { All, MSMQ, ServiceBroker };

    /// <summary>
    /// Starts all appropriate asynchronous Message listeners
    /// </summary>
    /// <param name="async"></param>
    protected void GetAsync(AsyncLaunch async)
    {
        if (_cancelToken)
            return;
        _logger.Debug("Attempting to get messages to process Asynchronously");
        if (SingleThreaded)
        {
            _logger.Debug("Engine is running in single-threaded mode and must wait till all tasks finish");
            ThreadingTasks.Task.WaitAll(_threadTasks.ToArray());
            _threadTasks.Clear();
        }

        if (async == AsyncLaunch.ServiceBroker || async == AsyncLaunch.All)
            _serviceBroker.GetAsync(x => RunServiceBrokerJob(x));

        if (async == AsyncLaunch.MSMQ || async == AsyncLaunch.All)
            _msmq.GetManyAsync(x => RunMSMQJob(x));
    }

    /// <summary>
    /// Runs the job for the given message and after the job has launched, calls 
    /// GetAsync to get the next ServiceBroker message
    /// </summary>
    protected void RunServiceBrokerJob(TransportMessage msg)
    {
        RunJob(msg);
        GetAsync(AsyncLaunch.ServiceBroker);
    }

    /// <summary>
    /// Runs the job for the given collection of messages and after the job has 
    /// launched, calls GetAsync to get the next MSMQ message
    /// </summary>
    protected void RunMSMQJob(IEnumerable<TransportMessage> msgs)
    {
        RunJob(msgs);
        GetAsync(AsyncLaunch.MSMQ);
    }


    /// <summary>
    /// Initializes and executes the appropriate job for the given collection of messages
    /// </summary>
    /// <param name="msgs">A collection of messages passed into a Job</param>
    protected ThreadingTasks.Task RunJob(IEnumerable<TransportMessage> msgs)
    {
        try
        {
            var task = ThreadingTasks.Task.Factory.StartNew(() =>
            {
                try
                {
                    _eventSink.Broadcast(msgs);
                }
                catch (Exception ex)
                {
                    _logger.Error(ex, "Error when attempting to run jobs {0} using message types {1} with values {2}", string.Join(",", msgs.Select(x => x.Id).ToArray()), string.Join(",", msgs.Select(x => x.Name).Distinct().ToArray()), string.Join(",",msgs.Select(x => x.ToString())));
                }

            });

            if (SingleThreaded)
                _threadTasks.Add(task);

            return task; 
        }
        catch (Exception ex)
        {
            _logger.Error(ex, "Error when attempting to invoke Task for a collection of {0} messages with ids {1} of type(s) {2}", msgs.Count(), string.Join(",", msgs.Select(x => x.Id).ToArray()), string.Join(",", msgs.Select(x => x.Name).Distinct().ToArray()));
        }

        return null; 
    }

    /// <summary>
    /// Initializes and executes the appropriate job for the given message
    /// </summary>
    /// <param name="msg">Message containing information pertinent to the executed Job</param>
    protected ThreadingTasks.Task RunJob(TransportMessage msg)
    {
        _logger.Info("RunJob was invoked for msg id " + msg.Id);
        try
        {
            var task = ThreadingTasks.Task.Factory.StartNew(() =>
            {
                _logger.Debug("Attempting to Run Job with ID: " + msg.Id);
                try
                {
                    _eventSink.Broadcast(msg);
                }
                catch (Exception ex)
                {
                    _logger.Error(ex, "Error when attempting to run job {0} using message type {1} with values {2}", msg.Id, msg.GetType(), msg.ToString());
                }

            });

            if (SingleThreaded)
                _threadTasks.Add(task);

            return task; 
        }
        catch (Exception ex)
        {
            _logger.Error(ex, "Error when attempting to invoke Task for message with id {0} of type {1}", msg.Id, msg.GetType());
        }

        return null; 
    }

    private void ValidateObjectIsNotNull(object obj, string objectName)
    {
        if (obj == null) throw new ArgumentNullException(objectName + " can not be null");
    }

    /// <summary>
    /// Instantiate a new engine used to access messaging queues and launch Tasks
    /// </summary>
    public Engine(IWindsorContainer container, IEventSink eventSink, IEngineConfiguration config, ILogging logger, IDataAccess<MessageBase> serviceBroker, IDataAccess<TransportMessage> msmq)
    {
        _eventSink = eventSink;
        _config = config;
        _threadTasks = new List<ThreadingTasks.Task>();
        _logger = logger;
        _container = container;
        _serviceBroker = serviceBroker;
        _msmq = msmq;

        //ThreadCount 0 is unlimited threads
        if (SingleThreaded || _config.ThreadCount == 0)
            return;

        // We need at least 3 threads, otherwise on a single CPU box we are stuck with a single thread
        // and only two threads might stop one of the job pollers from running
        int minThreadCount = System.Math.Max(Environment.ProcessorCount, 3);

        // do not set maxThreadCount less than the minThreadCount
        int maxThreadCount = System.Math.Max(minThreadCount, _config.ThreadCount);

        System.Threading.ThreadPool.SetMaxThreads(maxThreadCount, maxThreadCount);
    }

    private readonly IEngineConfiguration _config;
    private readonly IEventSink _eventSink;
    private readonly IList<ThreadingTasks.Task> _threadTasks;
    private readonly ILogging _logger;
    private readonly IWindsorContainer _container;
    private readonly IDataAccess<MessageBase> _serviceBroker;
    private readonly IDataAccess<TransportMessage> _msmq;
    private bool _cancelToken;

    private bool SingleThreaded { get { return _config.ThreadCount == 1; } }
}

Пару вопросов у меня: - Если задачи ТПЛ взорвать, я не есть хороший способ получения этой информации - Ничто так не ограничивает, как "жадный" этот двигатель, то есть он будет стаскивать задачи, даже если она очень занята, возможно, получения других экземпляров этого двигателя работы

Любые предложения с благодарностью, но таковы две главные проблемы для меня.



2149
6
задан 5 октября 2011 в 09:10 Источник Поделиться
Комментарии
1 ответ


  1. С getasync - это плохое название для этого метода. StartListening кажется, что выразить лучше то, что он делает.

  2. AsyncLaunch - это плохое название для этого перечисления. Кажется, для описания того, какие цели должны быть услышаны, так почему бы не назвать это ListenTarget.

  3. перечисления могут быть использованы в качестве флагов:

    enum ListenTarget
    {
    MSMQ = 1 << 0,
    ServiceBroker = 1 << 1,
    All = ~0,
    }

    После тестирования код становится:

    if (async & AsyncLaunch.ServiceBroker)
    _serviceBroker.GetAsync(x => RunServiceBrokerJob(x));

    if (async & AsyncLaunch.MSMQ)
    _msmq.GetManyAsync(x => RunMSMQJob(x));

    кроме того, вы можете пройти в любой произвольной комбинации целей (в случае, если вы добавить больше позже).


  4. ValidateIsNotNull должен быть переименован в состоянии более четко, что он делает: ThrowIfObjectIsNull.

  5. У вас есть много повторяющегося кода в два RunJob методов. В одном сообщении может быть сокращен до одной строки кода RunJobs(новый [] { сообщение }).

3
ответ дан 7 октября 2013 в 09:10 Источник Поделиться