Планировщик построен с наблюдаемых П2 (продолжение)


Наверное, было бы слишком просто, если бы старый Scheduler работал как предназначено быть так просто. Но глаза-открытия обзор показал, что время от времени он пропустит одну секунду из-за 14 мс таймер incaccuracy. Конечно, это будет иметь плохие последствия для расписания (они не будут стрелять, если их второй был пропущен).


Я проводил когда-то эксперименты с различными решениями, чтобы исправить отсутствует вторая ошибка , и я думаю, я нашел решение, которое работает хорошо. Я не хочу, чтобы это было слишком сложным, я не (в настоящее время) написание любых систем необходима для жизни. Он просто должен убедиться, что ничего не пропало. Без этого наблюдаемого будет аккумулировать все опубликованные значения и первый подписчик будет получать их все. Мы не хотим, чтобы это произошло, потому что мы заинтересованы в том, что теперь и сейчас в истории.


Так как Scheduler работать? Я превратил его в горячую наблюдаемых с Publish + Connect. Я хочу, чтобы это было тиканье regardles если есть кто-нибудь слушает.

Новый Scheduler также можете проверить, сколько рабочих мест уже запущен и спад начался новый.

Как я могу исправить отсутствует второй вопрос? В Scheduler издательство метки точно так же, как и раньше, но теперь она дополнительно проверяет, если разница между последним и текущим временем превышает 1s. Если это так, он публикует недостающую секунду до публикации текущей. Это достигается за счет перехода от Select для SelectMany. Два интервала, опубликованных практически в тот же момент не очень важно. Более важным является, чтобы гарантировать, что ничего не осталось, и что рабочие места в любую секунду могут быть вызваны. Я думаю, это крайне маловероятно, что это вызовет какие-нибудь проблемы.

public class Scheduler : IDisposable
{
    public const int UnlimitedJobParallelism = -1;

    private const int ZeroJobCounter = 0;

    private IConnectableObservable<DateTime> _scheduler;

    private IDisposable _disconnect;

    private readonly ConcurrentDictionary<Job, int> _jobCounters;

    public Scheduler(TimeSpan period, IDateTime dateTime)
    {
        var lastTimestamp = dateTime.Now();
        _scheduler =
            Observable
                .Interval(period)
                .SelectMany(_ =>
                {
                    // If we missed one second due to time inaccuracy, 
                    // this makes sure to publish the missing second too
                    // so that all jobs at that second can also be triggered.
                    var now = dateTime.Now();
                    var timestampDifference = now.Second - lastTimestamp.Second;
                    var timestamps =
                        Enumerable
                            // When starting the scheduler it might occur 
                            // that the difference is a negative number (-59).
                            // If this is the case, make it 1.
                            .Range(1, timestampDifference < 0 ? 1 : timestampDifference)
                            .Select(second => lastTimestamp.AddSeconds(second));
                    lastTimestamp = now;
                    return timestamps.Dump();
                })
                // Share the actual data values and not just the observable instance.
                .Publish();

        // Turn this into a hot observable and start ticking
        // regardless if there is anything listening.
        _disconnect = _scheduler.Connect();
        _jobCounters = new ConcurrentDictionary<Job, int>();
    }

    public Scheduler()
    : this(TimeSpan.FromSeconds(1), new LocalDateTime())
    { }

    public IDisposable Schedule(Job job)
    {
        var cronExpression = CronExpression.Parse(job.CronExpression);
        var unschedule =
            _scheduler
                .Where(cronExpression.Contains)
                .Subscribe(async timestamp =>
                {
                    if (CanExecute(job))
                    {
                        _jobCounters.AddOrUpdate(job, 1, (j, c) => c + 1);
                        await job
                            .ExecuteAsync(timestamp)
                            .ContinueWith(_ =>
                                _jobCounters
                                    .AddOrUpdate(job, 1, (j, c) => c - 1)
                            );
                    }
                    else
                    {
                        $"'{job.Name}' is alredy running!".Dump();
                    }
                });

        return Disposable.Create(() =>
        {
            _jobCounters.TryRemove(job, out var _);
            unschedule.Dispose();
        });
    }

    private bool CanExecute(Job job)
    {
        // Reversed the second condition to make the left side consistent.
        return
            job.MaxDegreeOfParallelism == UnlimitedJobParallelism ||
            job.MaxDegreeOfParallelism > _jobCounters.GetOrAdd(job, j => ZeroJobCounter);
    }

    public void Dispose()
    {
        // Stop ticking.
        _disconnect.Dispose();
    }
}

Есть также два новых помощников. Расширений для планировщика и Job класс.

public static class SchedulerExtensions
{
    public static IDisposable Schedule(this Scheduler scheduler, string cronExpression, Func<DateTime, Task> execute, int maxDegreeOfParallelism = 1)
    {
        return scheduler.Schedule(new Job
        {
            CronExpression = cronExpression,
            ExecuteAsync = execute,
            MaxDegreeOfParallelism = maxDegreeOfParallelism
        });
    }
}

public class Job
{
    public string Name { get; set; }

    public string CronExpression { get; set; }

    public Func<DateTime, Task> ExecuteAsync { get; set; }

    public int MaxDegreeOfParallelism { get; set; } = 1;
}

Пример

Использование не изменится, просто подписи.

void Main()
{
    var scheduler = new Scheduler();

    // Let the scheduler tick for a moment...
    Thread.Sleep(3300);

    scheduler.Schedule("0/1 * * * * * *", async schedule =>
    {
        Console.WriteLine($"DEBUG: {schedule} - {schedule:ss.fff} [{Thread.CurrentThread.ManagedThreadId}]");
        await Task.Delay(1100);
    }, maxDegreeOfParallelism: Scheduler.UnlimitedJobParallelism);

    scheduler.Schedule("0/2 * * * * * *", async schedule =>
    {
        Console.WriteLine($"ACTION-1: {schedule} - {schedule:ss.fff}");
        await Task.Delay(1500);

    }, maxDegreeOfParallelism: Scheduler.UnlimitedJobParallelism);

    scheduler.Schedule(new Job
    {
        Name = "Test-job",
        CronExpression = "0/3 * * * * * *",
        ExecuteAsync = async schedule =>
        {
            Console.WriteLine($"ACTION-2: {schedule} - {schedule:ss.fff}");
            await Task.Delay(4000);
        },
        MaxDegreeOfParallelism = 1
    });
}


Комментарии