Производителями и потребителями с использованием потока данных TPL


Я написал следующий класс, который должен выступать в качестве потребителя сообщения, которые передаются от производителей и направляет их на сервис Amazon SQS, мне пошли на компромисс, начав работу в конструкторе, и ждать, и завершения его в метод Dispose, Я сделал этот компромисс, чтобы сделать работу клиентов с помощью этого класса легче и потому что я думал, что с processorTask и клиент ЗК должны быть утилизированы в любом случае, то она должна быть тонкой, чтобы использовать метод Dispose, чтобы убедиться, что они закончили со своими обязанностями.

Я хотел бы, чтобы он был рассмотрен на какой-либо улучшение.

public class BatchMessagePublisher : IMessagePublisher
{
    private const int PublishBatchSize = 10;

    private readonly IAmazonSQS _sqsClient;
    private readonly ILogger<BatchMessagePublisher> _logger;
    private readonly string _queueUrl;
    private readonly bool _isFifo;
    private readonly BatchBlock<Message> _messageBatcher;
    private readonly Task _processorTask;

    public BatchMessagePublisher(IAmazonSQS sqsClient, ILogger<BatchMessagePublisher> logger, string queueUrl, bool isFifo)
    {
        _sqsClient = sqsClient;
        _logger = logger;
        _queueUrl = queueUrl;
        _isFifo = isFifo;

        _messageBatcher = new BatchBlock<Message>(PublishBatchSize, new GroupingDataflowBlockOptions
        {
            BoundedCapacity = 1000
        });

        _processorTask = StartAsync();
    }


    private SendMessageBatchRequestEntry CreateBatchRequestEntry(Message message)
    {
        var request = new SendMessageBatchRequestEntry
        {
            Id = message.MessageId,
            MessageBody = message.MessageBody,
            MessageAttributes =
            {
                ["MessageType"] = new MessageAttributeValue
                {
                    DataType = "String",
                    StringValue = message.MessageType
                }
            }
        };

        if (_isFifo)
        {
            request.MessageGroupId = message.GroupId;
            request.MessageDeduplicationId = message.MessageId;
        }

        return request;
    }

    private async Task StartAsync()
    {
        while (!_messageBatcher.Completion.IsCompleted)
        {
            var batch = (await _messageBatcher.ReceiveAsync().ConfigureAwait(false))
                .Select(CreateBatchRequestEntry)
                .ToList();

            _logger.LogDebug($"Publishing {PublishBatchSize} messages...");

            var messageIds = string.Join(", ", batch.Select(r => r.Id));

            try
            {
                var batchResponse = await _sqsClient.SendMessageBatchAsync(_queueUrl,batch);

                if (batchResponse.Failed.Any())
                {
                    var errors = string.Join(", ", batchResponse.Failed.Select(f => f.Message));
                    throw new MigrationException(
                        $"Failed to publish batch of messages with IDs {messageIds}: {errors}");
                }

                _logger.LogDebug($"Published batch of messages with IDs {messageIds}");
            }
            catch (AmazonServiceException ex)
            {
                throw new MigrationException(
                    $"Failed to publish batch of messages with IDs {messageIds}.", ex);
            }
        }
    }

    public void PublishMessages(params Message[] messages)
    {
        foreach (var message in messages)
        {
            _messageBatcher.Post(message);
        }
    }

    public void Dispose()
    {
        _messageBatcher.TriggerBatch();
        _messageBatcher.Complete();
        _processorTask.Wait();
        _processorTask.Dispose();
        _sqsClient.Dispose();
    }
}


Комментарии