пятница, 12 июня 2026 г.

Alice, Qwen, Shedevr

Alice, Qwen, Shedevr
D:\Projects\VS02\2606\TplDataflow\Qwen\WorkerServiceSln\WorkerServiceSln.slnx
D:\Projects\VS02\2606\TplDataflow\Qwen\WorkerServiceSln\WorkerService\WorkerService.csproj
Шедевр кода .Net Core C# public class PublisherService(QuoteChannel channel, ILogger<PublisherService> logger) : BackgroundService { private readonly QuoteChannel _channel = channel; private readonly ILogger<PublisherService> _logger = logger; private readonly Random _random = new(); // TPL Dataflow блок для обработки входящих котировок (от Subscriber) private ActionBlock<Quote>? _incomingQuotesBlock; private int _publishedCount; private int _receivedCount; protected override Task ExecuteAsync(CancellationToken stoppingToken) { // Создаем блок для обработки входящих котировок от Subscriber _incomingQuotesBlock = new ActionBlock<Quote>( quote => ProcessIncomingQuote(quote), new ExecutionDataflowBlockOptions { BoundedCapacity = 10, MaxDegreeOfParallelism = 2, CancellationToken = stoppingToken, NameFormat = "Publisher-Incoming" }); // Подписываемся на обратный канал _channel.SubscriberToPublisher.LinkTo( _incomingQuotesBlock, new DataflowLinkOptions { PropagateCompletion = true }); _logger.LogInformation("Publisher Service запущен. Ожидаем котировки..."); return PublishQuotesAsync(stoppingToken); } private async Task PublishQuotesAsync(CancellationToken stoppingToken) { var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA" }; while (!stoppingToken.IsCancellationRequested) { try { var symbol = symbols[_random.Next(symbols.Length)]; var quote = new Quote { Symbol = symbol, Price = 100 + _random.Next(-20, 20) + (decimal)_random.NextDouble(), Timestamp = DateTime.UtcNow, Source = "Publisher" }; // Публикуем в основной канал await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken); // Отправляем копию в мониторинг _channel.MonitoringBroadcast.Post(quote); _publishedCount++; if (_publishedCount % 10 == 0) { _logger.LogInformation("Publisher: опубликовано {Count} котировок", _publishedCount); } await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken); } catch (OperationCanceledException) { break; } catch (Exception ex) { _logger.LogError(ex, "Ошибка при публикации котировки"); } } _logger.LogInformation("Publisher завершил публикацию. Всего: {Count}", _publishedCount); _channel.PublisherToSubscriber.Complete(); } private void ProcessIncomingQuote(Quote quote) { _receivedCount++; _logger.LogInformation("Publisher получил обратную связь: {Quote} (всего получено: {Count})", quote, _receivedCount); } public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("Publisher Service останавливается..."); _incomingQuotesBlock?.Complete(); await base.StopAsync(cancellationToken); } } ----------------- Что можешь сказать про это protected override Task ExecuteAsync(CancellationToken stoppingToken) { // Создаем блок для обработки входящих котировок от Subscriber _incomingQuotesBlock = new ActionBlock<Quote>( quote => ProcessIncomingQuote(quote), new ExecutionDataflowBlockOptions { BoundedCapacity = 10, MaxDegreeOfParallelism = 2, CancellationToken = stoppingToken, NameFormat = "Publisher-Incoming" }); // Подписываемся на обратный канал _channel.SubscriberToPublisher.LinkTo( _incomingQuotesBlock, new DataflowLinkOptions { PropagateCompletion = true }); _logger.LogInformation("Publisher Service запущен. Ожидаем котировки..."); return PublishQuotesAsync(stoppingToken); } private async Task PublishQuotesAsync(CancellationToken stoppingToken) { var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA" }; while (!stoppingToken.IsCancellationRequested) { try { var symbol = symbols[_random.Next(symbols.Length)]; var quote = new Quote { Symbol = symbol, Price = 100 + _random.Next(-20, 20) + (decimal)_random.NextDouble(), Timestamp = DateTime.UtcNow, Source = "Publisher" }; // Публикуем в основной канал await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken); // Отправляем копию в мониторинг _channel.MonitoringBroadcast.Post(quote); _publishedCount++; if (_publishedCount % 10 == 0) { _logger.LogInformation("Publisher: опубликовано {Count} котировок", _publishedCount); } await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken); } catch (OperationCanceledException) { break; } catch (Exception ex) { _logger.LogError(ex, "Ошибка при публикации котировки"); } } _logger.LogInformation("Publisher завершил публикацию. Всего: {Count}", _publishedCount); _channel.PublisherToSubscriber.Complete(); } private void ProcessIncomingQuote(Quote quote) { _receivedCount++; _logger.LogInformation("Publisher получил обратную связь: {Quote} (всего получено: {Count})", quote, _receivedCount); } public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("Publisher Service останавливается..."); _incomingQuotesBlock?.Complete(); await base.StopAsync(cancellationToken); } --------- И особенно вот этот Шедевр return PublishQuotesAsync(stoppingToken);
  • Алиса

    Разберу код детально — сначала общий анализ, затем фокус на строке return PublishQuotesAsync(stoppingToken) и предложения по улучшению.

    Общий анализ кода

    Сильные стороны:

    1. Использование TPL Dataflow. ActionBlock с ограничением параллелизма (MaxDegreeOfParallelism = 2) и буфером (BoundedCapacity = 10) позволяет контролировать нагрузку.

    2. Обработка cancellation. Корректное использование CancellationToken во всех асинхронных операциях.

    3. Логирование. Информативные сообщения в лог помогают отслеживать состояние сервиса.

    4. Graceful shutdown. В StopAsync вызывается Complete() для блока обработки, что позволяет корректно завершить обработку текущих задач.

    5. Разделение ответственности. Генерация котировок (PublishQuotesAsync) отделена от их обработки (ProcessIncomingQuote).

    6. Мониторинг. Отправка копий котировок в канал мониторинга (MonitoringBroadcast.Post(quote)).

    Потенциальные проблемы:

    1. Утечка ресурсов. ActionBlock создаётся при каждом вызове ExecuteAsync, но не освобождается явно.

    2. Отсутствие обработки завершения канала. После while вызывается _channel.PublisherToSubscriber.Complete(), но нет ожидания завершения отправки всех сообщений.

    3. Неполная очистка. В StopAsync завершается только _incomingQuotesBlock, но не другие каналы (PublisherToSubscriber).

    4. Гонка при остановке. Если остановка произойдёт во время выполнения PublishQuotesAsync, отправка котировок может быть прервана некорректно.

    5. Отсутствие повторных попыток. При ошибках отправки котировок нет механизма повторных попыток.


    Анализ строки return PublishQuotesAsync(stoppingToken)

    Эта строка — ключевой момент архитектуры, который определяет поведение BackgroundService. Разберу, что происходит:

    Что делает эта строка:

    • Метод ExecuteAsync (обязательный для BackgroundService) должен вернуть Task, представляющий выполнение фоновой работы.

    • Вместо создания новой задачи, метод возвращает уже существующую — результат вызова PublishQuotesAsync.

    • Это означает, что жизненный цикл BackgroundService напрямую привязан к выполнению PublishQuotesAsync.

    Плюсы подхода:

    • Простота. Не нужно создавать дополнительные задачи или использовать Task.Run.

    • Прямое управление. Остановка сервиса через CancellationToken автоматически прерывает PublishQuotesAsync.

    • Эффективность. Нет лишних абстракций — сервис живёт ровно столько, сколько выполняется PublishQuotesAsync.

    Минусы и риски:

    • Блокировка остановки. Если PublishQuotesAsync зависнет (например, из‑за бесконечного цикла без проверки stoppingToken), сервис не сможет корректно остановиться.

    • Отсутствие очистки. После возврата задачи ExecuteAsync не выполняет никаких действий по очистке ресурсов. Всё должно быть обработано внутри PublishQuotesAsync или StopAsync.

    • Сложность диагностики. Если PublishQuotesAsync завершится с ошибкой, сервис упадёт без дополнительной информации.

    • Ограниченная гибкость. Если нужно добавить дополнительную логику после PublishQuotesAsync, придётся переписывать архитектуру.

    Как это работает на практике:

    1. ASP.NET Core вызывает ExecuteAsync при запуске сервиса.

    2. Создаётся ActionBlock и настраивается связь с каналом.

    3. Метод возвращает задачу PublishQuotesAsync, которая начинает генерировать котировки.

    4. Пока PublishQuotesAsync выполняется, сервис считается «живым».

    5. При остановке приложения:

      • Передаётся CancellationToken.

      • PublishQuotesAsync прерывается (из‑за OperationCanceledException).

      • Выполняется StopAsync, где завершается ActionBlock.


    Рекомендации по улучшению

    1. Добавить ожидание завершения отправки котировок:

    csharp
    _channel.PublisherToSubscriber.Complete();
    await _channel.PublisherToSubscriber.Completion;
    

    Это гарантирует, что все отправленные котировки будут обработаны перед завершением.

    1. Освобождать ресурсы явно. В StopAsync добавить завершение всех каналов:

    csharp
    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Publisher Service останавливается...");
        _incomingQuotesBlock?.Complete();
        _channel.PublisherToSubscriber.Complete(); // Завершаем отправку котировок
        await Task.WhenAny(
        (
            _channel.PublisherToSubscriber.Completion,
            Task.Delay(Timeout.Infinite, cancellationToken)
        );
        await base.StopAsync(cancellationToken);
    }
    
    1. Добавить повторные попытки отправки котировок. Использовать Polly или простой механизм повтора:

    csharp
    var maxRetries = 3;
    for (int attempt = 0; attempt < maxRetries; attempt++)
    {
        try
        {
            await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken);
            break; // Успех — выходим из цикла
        }
        catch (Exception ex) when (attempt < maxRetries - 1)
        {
            _logger.LogWarning(ex, "Попытка {Attempt} отправки котировки не удалась. Повторяем...", attempt + 1);
            await Task.Delay(TimeSpan.FromMilliseconds(100), stoppingToken);
        }
    }
    
    1. Улучшить обработку ошибок. Добавить логирование для OperationCanceledException:

    csharp
    catch (OperationCanceledException)
    {
        _logger.LogInformation("Публикация котировок прервана по запросу остановки.");
        break;
    }
    
    1. Использовать IAsyncDisposable для очистки. Если сервис реализует IAsyncDisposable, можно добавить финализацию ресурсов:

    csharp
    async ValueTask IAsyncDisposable.DisposeAsync()
    {
        _incomingQuotesBlock?.Complete();
        await _incomingQuotesBlock!.Completion;
    }
    
    1. Добавить метрики. Использовать Counter из System.Diagnostics.Metrics для отслеживания:

    • количества опубликованных котировок;

    • количества полученных обратных котировок;

    • ошибок отправки.

    1. Настроить BoundedCapacity динамически. Вместо жёсткого значения 10 использовать конфигурацию:

    csharp
    BoundedCapacity = _configuration.GetValue<int>("QuoteBufferSize", 10)
    

    Краткий итог

    Строка return PublishQuotesAsync(stoppingToken) — это архитектура «одного потока выполнения», где жизненный цикл сервиса полностью зависит от выполнения одной асинхронной задачи. Это просто и эффективно, но требует тщательной обработки ошибок и корректного освобождения ресурсов.

    Когда такой подход оправдан:

    • сервис выполняет одну основную задачу (генерация котировок);

    • остановка сервиса должна немедленно прерывать эту задачу;

    • нет сложной логики очистки после завершения.

    Когда стоит пересмотреть:

    • если нужно выполнять несколько независимых задач параллельно;

    • если требуется сложная логика очистки (например, сохранение состояния);

    • если задача может блокироваться на длительное время без возможности прерывания.

    Хотите, я раскрою какой‑то аспект подробнее или предложу альтернативную реализацию?

  • Я

    Вы что не поняли ? ВЫ как оттуда выходить будете ? Вот это должно быть PublishQuotesAsync(stoppingToken); вместо этого protected override Task ExecuteAsync(CancellationToken stoppingToken)
  • Алиса

    Разберу вопрос детально — как и когда происходит выход из ExecuteAsync при использовании return PublishQuotesAsync(stoppingToken).

    Механизм выхода

    Когда вы пишете:

    csharp
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // ... настройка блоков ...
        return PublishQuotesAsync(stoppingToken);
    }
    

    Выход из ExecuteAsync происходит неявным образом — в тот момент, когда завершается задача, возвращаемая PublishQuotesAsync. Разберу пошагово.

    Пошаговый процесс

    1. Запуск сервиса. ASP.NET Core вызывает ExecuteAsync.

    2. Настройка каналов. В ExecuteAsync настраиваются TPL Dataflow‑блоки и связи между каналами.

    3. Возврат задачи. Метод возвращает задачу PublishQuotesAsync, которая начинает выполняться.

    4. Основной цикл. PublishQuotesAsync входит в цикл while (!stoppingToken.IsCancellationRequested).

    5. Остановка приложения. При остановке приложения (например, через Ctrl+C или завершение работы контейнера):

      • устанавливается флаг отмены в CancellationToken;

      • stoppingToken.IsCancellationRequested становится true;

      • цикл while прерывается;

      • выполняется код после цикла (логирование и _channel.PublisherToSubscriber.Complete());

      • задача PublishQuotesAsync завершается;

      • поскольку ExecuteAsync вернул эту задачу, он тоже считается завершённым.

    6. Вызов StopAsync. После завершения ExecuteAsync вызывается StopAsync, где происходит дополнительная очистка.

    Ключевые точки выхода из PublishQuotesAsync

    1. Нормальное завершение. Цикл прерывается, когда stoppingToken.IsCancellationRequested == true. Затем:

      • логируется завершение;

      • вызывается _channel.PublisherToSubscriber.Complete();

      • метод завершается.

    2. Исключение. Если в блоке try возникает необработанное исключение (не OperationCanceledException), задача завершается с ошибкой.

    3. Отмена операции. При получении OperationCanceledException цикл прерывается через break, затем выполняется код после цикла.


    Что происходит при остановке

    Сценарий остановки:

    1. Пользователь останавливает приложение (Ctrl+C, docker stop и т. д.).

    2. CancellationToken получает сигнал отмены.

    3. В PublishQuotesAsync:

      • условие while (!stoppingToken.IsCancellationRequested) становится ложным;

      • выполнение выходит из цикла;

      • выполняется _logger.LogInformation(...);

      • вызывается _channel.PublisherToSubscriber.Complete();

      • метод PublishQuotesAsync завершается.

    4. Поскольку ExecuteAsync вернул задачу PublishQuotesAsync, он тоже завершается.

    5. Вызывается StopAsync, где:

      • логируется остановка;

      • завершается _incomingQuotesBlock;

      • вызывается base.StopAsync().


    Проблемы текущего подхода и их решения

    Проблема 1. Отсутствие ожидания завершения отправки

    Проблема. Complete() вызывается, но нет ожидания завершения обработки всех сообщений в канале.

    Решение. Добавить ожидание завершения канала:

    csharp
    _channel.PublisherToSubscriber.Complete();
    await _channel.PublisherToSubscriber.Completion;
    

    Проблема 2. Неполная очистка ресурсов

    Проблема. В StopAsync завершается только _incomingQuotesBlock, но не другие каналы.

    Решение. Завершить все каналы и дождаться их завершения:

    csharp
    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Publisher Service останавливается...");
    
        // Завершаем все блоки и каналы
        _incomingQuotesBlock?.Complete();
        _channel.PublisherToSubscriber.Complete();
    
        // Ждём завершения всех операций
        await Task.WhenAll(
        (
            _incomingQuotesBlock?.Completion ?? Task.CompletedTask,
            _channel.PublisherToSubscriber.Completion
        );
    
        await base.StopAsync(cancellationToken);
    }
    

    Проблема 3. Риск потери данных

    Проблема. Если остановка происходит во время отправки котировки, сообщение может быть потеряно.

    Решение. Использовать механизм повторных попыток отправки или буферизацию. Пример с повторами:

    csharp
    private async Task<bool> SendQuoteWithRetryAsync(Quote quote, CancellationToken stoppingToken)
    {
        const int maxRetries = 3;
        for (int attempt = 0; attempt < maxRetries; attempt++)
        {
            try
            {
                await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken);
                return true; // Успех
            }
            catch (Exception ex) when (attempt < maxRetries - 1)
            {
                _logger.LogWarning(ex, "Попытка {Attempt} отправки котировки не удалась. Повторяем...", attempt + 1);
                await Task.Delay(TimeSpan.FromMilliseconds(100), stoppingToken);
            }
        }
        return false; // Все попытки неудачны
    }
    

    Альтернативные подходы

    Вариант 1. Явное ожидание в ExecuteAsync

    csharp
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Настройка блоков
        _incomingQuotesBlock = new ActionBlock<Quote>(...);
        _channel.SubscriberToPublisher.LinkTo(_incomingQuotesBlock, ...);
    
        _logger.LogInformation("Publisher Service запущен. Ожидаем котировки...");
    
        try
        {
            await PublishQuotesAsync(stoppingToken);
        }
        finally
        {
            // Очистка ресурсов здесь
            _incomingQuotesBlock?.Complete();
            await _incomingQuotesBlock?.Completion;
        }
    }
    

    Плюсы: более явный контроль над очисткой.
    Минусы: усложнение кода.

    Вариант 2. Использование Task.WhenAny для комбинирования задач

    csharp
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Настройка
        var publishTask = PublishQuotesAsync(stoppingToken);
        var cleanupTask = Task.Delay(Timeout.Infinite, stoppingToken)
            .ContinueWith(_ => CleanupResources(), TaskContinuationOptions.OnlyOnCancelled);
    
        await Task.WhenAny(publishTask, cleanupTask);
    }
    
    private void CleanupResources()
    {
        _incomingQuotesBlock?.Complete();
    }
    

    Плюсы: гибкость в управлении задачами.
    Минусы: повышенная сложность.


    Краткий итог

    Как происходит выход:

    • Выход из ExecuteAsync происходит автоматически, когда завершается PublishQuotesAsync.

    • Цикл while в PublishQuotesAsync прерывается при установке stoppingToken.

    • После цикла вызывается Complete() для канала.

    • StopAsync выполняет дополнительную очистку.

    Что нужно исправить для надёжности:

    1. Добавить ожидание завершения каналов через Completion.

    2. Завершать все каналы в StopAsync.

    3. Реализовать механизм повторных попыток отправки котировок.

    4. Добавить логирование для всех этапов остановки.

    Хотите, я раскрою какой‑то аспект подробнее или предложу реализацию конкретного улучшения?

  • Я

    Совместите эти два кода в один protected override Task ExecuteAsync(CancellationToken stoppingToken) { // Создаем блок для обработки входящих котировок от Subscriber _incomingQuotesBlock = new ActionBlock<Quote>( quote => ProcessIncomingQuote(quote), new ExecutionDataflowBlockOptions { BoundedCapacity = 10, MaxDegreeOfParallelism = 2, CancellationToken = stoppingToken, NameFormat = "Publisher-Incoming" }); // Подписываемся на обратный канал _channel.SubscriberToPublisher.LinkTo( _incomingQuotesBlock, new DataflowLinkOptions { PropagateCompletion = true }); _logger.LogInformation("Publisher Service запущен. Ожидаем котировки..."); return PublishQuotesAsync(stoppingToken); } private async Task PublishQuotesAsync(CancellationToken stoppingToken) { var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA" }; while (!stoppingToken.IsCancellationRequested) { try { var symbol = symbols[_random.Next(symbols.Length)]; var quote = new Quote { Symbol = symbol, Price = 100 + _random.Next(-20, 20) + (decimal)_random.NextDouble(), Timestamp = DateTime.UtcNow, Source = "Publisher" }; // Публикуем в основной канал await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken); // Отправляем копию в мониторинг _channel.MonitoringBroadcast.Post(quote); _publishedCount++; if (_publishedCount % 10 == 0) { _logger.LogInformation("Publisher: опубликовано {Count} котировок", _publishedCount); } await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken); } catch (OperationCanceledException) { break; } catch (Exception ex) { _logger.LogError(ex, "Ошибка при публикации котировки"); } } _logger.LogInformation("Publisher завершил публикацию. Всего: {Count}", _publishedCount); _channel.PublisherToSubscriber.Complete(); }
  • Алиса

    Вот совмещённый код с улучшениями для надёжного завершения работы:

    csharp
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Создаём блок для обработки входящих котировок от Subscriber
        _incomingQuotesBlock = new ActionBlock<Quote>(
            quote => ProcessIncomingQuote(quote),
            new ExecutionDataflowBlockOptions
            {
                BoundedCapacity = 10,
                MaxDegreeOfParallelism = 2,
                CancellationToken = stoppingToken,
                NameFormat = "Publisher-Incoming"
            });
    
        // Подписываемся на обратный канал
        _channel.SubscriberToPublisher.LinkTo(
            _incomingQuotesBlock,
            new DataflowLinkOptions { PropagateCompletion = true });
    
        _logger.LogInformation("Publisher Service запущен. Ожидаем котировки...");
    
        try
        {
            await PublishQuotesAsync(stoppingToken);
        }
        finally
        {
            // Гарантируем завершение всех каналов и блоков
            _incomingQuotesBlock?.Complete();
            _channel.PublisherToSubscriber.Complete();
    
            // Ждём завершения обработки всех сообщений
            await Task.WhenAll(
            (
                _incomingQuotesBlock?.Completion ?? Task.CompletedTask,
                _channel.PublisherToSubscriber.Completion
            );
    
            _logger.LogInformation(
                "Publisher Service завершил работу. Опубликовано: {PublishedCount}, получено: {ReceivedCount}",
                _publishedCount, _receivedCount);
        }
    }
    
    private async Task PublishQuotesAsync(CancellationToken stoppingToken)
    {
        var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA" };
    
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            { 
                var symbol = symbols[_random.Next(symbols.Length)];
                var quote = new Quote
                {
                    Symbol = symbol,
                    Price = 100 + _random.Next(-20, 20) + (decimal)_random.NextDouble(),
                    Timestamp = DateTime.UtcNow,
                    Source = "Publisher"
                };
    
                // Публикуем в основной канал с обработкой отмены
                await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken);
    
                // Отправляем копию в мониторинг
                _channel.MonitoringBroadcast.Post(quote);
    
                _publishedCount++;
    
                if (_publishedCount % 10 == 0)
                {
                    _logger.LogInformation("Publisher: опубликовано {Count} котировок", _publishedCount);
                }
    
                await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken);
            }
            catch (OperationCanceledException)
            {
                _logger.LogInformation("Публикация котировок прервана по запросу остановки.");
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Ошибка при публикации котировки");
            }
        }
    
        _logger.LogInformation("Publisher завершил публикацию. Всего: {Count}", _publishedCount);
    }
    
    private void ProcessIncomingQuote(Quote quote)
    {
        _receivedCount++;
        _logger.LogInformation(
            "Publisher получил обратную связь: {Quote} (всего получено: {Count})",
            quote, _receivedCount);
    }
    
    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Publisher Service останавливается...");
        
        // Если ExecuteAsync ещё не завершил работу, инициируем завершение
        if (_incomingQuotesBlock != null && !_incomingQuotesBlock.Completion.IsCompleted)
        {
            _incomingQuotesBlock.Complete();
        }
        
        if (!_channel.PublisherToSubscriber.Completion.IsCompleted)
        {
            _channel.PublisherToSubscriber.Complete();
        }
    
        // Ждём завершения всех операций с таймаутом
        await Task.WhenAny(
        (
            Task.WhenAll
            (
                _incomingQuotesBlock?.Completion ?? Task.CompletedTask,
                _channel.PublisherToSubscriber.Completion
            ),
            Task.Delay(Timeout.Seconds(30), cancellationToken) // Таймаут 30 секунд
        );
    
        await base.StopAsync(cancellationToken);
    }
    

    Ключевые изменения и улучшения

    1. Изменён тип возвращаемого значения ExecuteAsync на async Task — это позволяет использовать await и finally для гарантированной очистки ресурсов.

    2. Добавлен блок finally в ExecuteAsync:

      • гарантирует выполнение очистки независимо от результата PublishQuotesAsync;

      • завершает все блоки и каналы (_incomingQuotesBlock и _channel.PublisherToSubscriber);

      • ожидает завершения обработки всех сообщений через Task.WhenAll.

    3. Улучшено логирование в PublishQuotesAsync:

      • в блоке catch (OperationCanceledException) добавлено логирование факта прерывания публикации.

    4. Доработан StopAsync:

      • проверяет, завершены ли операции, прежде чем пытаться их завершить;

      • использует Task.WhenAny с таймаутом (30 секунд), чтобы избежать бесконечного ожидания;

      • обеспечивает корректное завершение работы даже при неожиданных сценариях остановки.

    5. Гарантированное завершение каналов:

      • все каналы и блоки явно завершаются в finally и StopAsync;

      • ожидание завершения через Completion предотвращает потерю данных при остановке.

    6. Обработка крайних случаев:

      • проверка на null и IsCompleted перед завершением операций;

      • таймаут в StopAsync защищает от зависания при проблемах с завершением.


    Как теперь работает выход из ExecuteAsync

    1. При остановке приложения устанавливается CancellationToken.

    2. В PublishQuotesAsync:

      • условие while становится ложным;

      • выполняется break в catch (OperationCanceledException);

      • метод завершается.

    3. Управление возвращается в ExecuteAsync, где выполняется блок finally:

      • все блоки и каналы завершаются;

      • ожидается завершение обработки сообщений;

      • логируется финальная статистика.

    4. Вызывается StopAsync, который:

      • дополнительно проверяет и завершает операции;

      • ждёт завершения с таймаутом;

      • вызывает базовый StopAsync.

    Такой подход обеспечивает корректное освобождение ресурсов и гарантированное завершение всех операций без потери данных.

    Хотите, я уточню какой‑то аспект реализации или предложу дополнительные улучшения?

  • Я

    Вы опять не поняли соберите эти два кода в один код Поместите эти вде фукциии в одну вот такую protected override async Task ExecuteAsync(CancellationToken stoppingToken) { Поместите общий код сюда } ----------------------- Вот эти функции надо поместить в одну, естественно немного изменив код protected override Task ExecuteAsync(CancellationToken stoppingToken) { // Создаем блок для обработки входящих котировок от Subscriber _incomingQuotesBlock = new ActionBlock<Quote>( quote => ProcessIncomingQuote(quote), new ExecutionDataflowBlockOptions { BoundedCapacity = 10, MaxDegreeOfParallelism = 2, CancellationToken = stoppingToken, NameFormat = "Publisher-Incoming" }); // Подписываемся на обратный канал _channel.SubscriberToPublisher.LinkTo( _incomingQuotesBlock, new DataflowLinkOptions { PropagateCompletion = true }); _logger.LogInformation("Publisher Service запущен. Ожидаем котировки..."); return PublishQuotesAsync(stoppingToken); } ------------- private async Task PublishQuotesAsync(CancellationToken stoppingToken) { var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA" }; while (!stoppingToken.IsCancellationRequested) { try { var symbol = symbols[_random.Next(symbols.Length)]; var quote = new Quote { Symbol = symbol, Price = 100 + _random.Next(-20, 20) + (decimal)_random.NextDouble(), Timestamp = DateTime.UtcNow, Source = "Publisher" }; // Публикуем в основной канал await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken); // Отправляем копию в мониторинг _channel.MonitoringBroadcast.Post(quote); _publishedCount++; if (_publishedCount % 10 == 0) { _logger.LogInformation("Publisher: опубликовано {Count} котировок", _publishedCount); } await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken); } catch (OperationCanceledException) { break; } catch (Exception ex) { _logger.LogError(ex, "Ошибка при публикации котировки"); } } _logger.LogInformation("Publisher завершил публикацию. Всего: {Count}", _publishedCount); _channel.PublisherToSubscriber.Complete(); }
  • Алиса

    Конечно! Вот совмещённый код — вся логика помещена в метод ExecuteAsync, а вспомогательные фрагменты встроены напрямую:

    csharp
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Создаём блок для обработки входящих котировок от Subscriber
        _incomingQuotesBlock = new ActionBlock<Quote>(
            quote =>
            {
                _receivedCount++;
                _logger.LogInformation(
                    "Publisher получил обратную связь: {Quote} (всего получено: {Count})",
                    quote, _receivedCount);
            },
            new ExecutionDataflowBlockOptions
            {
                BoundedCapacity = 10,
                MaxDegreeOfParallelism = 2,
                CancellationToken = stoppingToken,
                NameFormat = "Publisher-Incoming"
            });
    
        // Подписываемся на обратный канал
        _channel.SubscriberToPublisher.LinkTo(
            _incomingQuotesBlock,
            new DataflowLinkOptions { PropagateCompletion = true });
    
        _logger.LogInformation("Publisher Service запущен. Ожидаем котировки...");
    
        var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA" };
    
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            { 
                var symbol = symbols[_random.Next(symbols.Length)];
                var quote = new Quote
                {
                    Symbol = symbol,
                    Price = 100 + _random.Next(-20, 20) + (decimal)_random.NextDouble(),
                    Timestamp = DateTime.UtcNow,
                    Source = "Publisher"
                };
    
                // Публикуем в основной канал
                await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken);
    
                // Отправляем копию в мониторинг
                _channel.MonitoringBroadcast.Post(quote);
    
                _publishedCount++;
    
                if (_publishedCount % 10 == 0)
                {
                    _logger.LogInformation("Publisher: опубликовано {Count} котировок", _publishedCount);
                }
    
                await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken);
            }
            catch (OperationCanceledException)
            {
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Ошибка при публикации котировки");
            }
        }
    
        _logger.LogInformation("Publisher завершил публикацию. Всего: {Count}", _publishedCount);
        _channel.PublisherToSubscriber.Complete();
    }
    

    Что было сделано

    1. Объединение методов. Логика из PublishQuotesAsync полностью перенесена внутрь ExecuteAsync.

    2. Встраивание обработки входящих котировок. Анонимная функция напрямую задана в конструкторе ActionBlock вместо вызова отдельного метода ProcessIncomingQuote. Внутри неё:

      • увеличивается счётчик _receivedCount;

      • выполняется логирование получения обратной связи.

    3. Сохранение структуры цикла. Основной цикл генерации и отправки котировок остался без изменений — он теперь выполняется непосредственно в ExecuteAsync.

    4. Сохранение всех обработчиков исключений. Блоки try‑catch для OperationCanceledException и общих исключений сохранены в неизменном виде.

    5. Сохранение логики завершения. После выхода из цикла выполняется:

      • логирование итогового количества опубликованных котировок;

      • вызов _channel.PublisherToSubscriber.Complete() для завершения канала отправки котировок.

    6. Сохранение инициализации. Инициализация _incomingQuotesBlock и подписка на канал остались на своих местах в начале метода.

    Ключевые особенности итогового кода

    • Один метод. Вся логика сосредоточена в ExecuteAsync — нет вызовов вспомогательных методов.

    • Асинхронность. Метод помечен как async Task, что позволяет использовать await внутри цикла.

    • Обработка отмены. Цикл корректно прерывается при установке stoppingToken.

    • Логирование. Все ключевые события (запуск, публикация каждых 10‑ти котировок, завершение) логируются.

    • Обработка ошибок. Исключения корректно перехватываются и логируются, не приводя к аварийному завершению сервиса.

    • Корректное завершение. Канал отправки котировок завершается вызовом Complete() после выхода из основного цикла.


    Как это работает

    1. При запуске сервиса вызывается ExecuteAsync.

    2. Настраивается блок обработки входящих котировок (_incomingQuotesBlock) и устанавливается связь с каналом.

    3. Запускается основной цикл генерации котировок:

      • генерируется случайная котировка;

      • отправляется в основной канал и в мониторинг;

      • обновляется счётчик опубликованных котировок;

      • каждые 10 котировок выводится логи;

      • делается пауза 500 мс.

    4. При получении сигнала отмены (stoppingToken) цикл прерывается.

    5. Выполняется финальное логирование и завершается канал отправки котировок.

    Хотите, я уточню какой‑то фрагмент кода или предложу дополнительные улучшения?

  • Я

    А вот теперь можно несколько фрагментов инкапсулировать в функции Например фрагменты бизнес логики инкапсулировать
  • Алиса

    Конечно! Вот код с инкапсуляцией бизнес‑логики в отдельные приватные методы — это улучшит читаемость, поддерживаемость и позволит тестировать отдельные фрагменты:

    csharp
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        SetupIncomingQuotesProcessing(stoppingToken);
        _logger.LogInformation("Publisher Service запущен. Ожидаем котировки...");
    
        await RunQuotePublicationLoop(stoppingToken);
    }
    
    private void SetupIncomingQuotesProcessing(CancellationToken stoppingToken)
    {
        _incomingQuotesBlock = new ActionBlock<Quote>(
            ProcessIncomingQuote,
            new ExecutionDataflowBlockOptions
            {
                BoundedCapacity = 10,
                MaxDegreeOfParallelism = 2,
                CancellationToken = stoppingToken,
                NameFormat = "Publisher-Incoming"
            });
    
        _channel.SubscriberToPublisher.LinkTo(
            _incomingQuotesBlock,
            new DataflowLinkOptions { PropagateCompletion = true });
    }
    
    private void ProcessIncomingQuote(Quote quote)
    {
        _receivedCount++;
        _logger.LogInformation(
            "Publisher получил обратную связь: {Quote} (всего получено: {Count})",
            quote, _receivedCount);
    }
    
    private async Task RunQuotePublicationLoop(CancellationToken stoppingToken)
    {
        var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA" };
    
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var quote = GenerateRandomQuote(symbols);
                await PublishQuoteAsync(quote, stoppingToken);
                UpdateAndLogPublicationStats();
                await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken);
            }
            catch (OperationCanceledException)
            {
                _logger.LogInformation("Публикация котировок прервана по запросу остановки.");
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Ошибка при публикации котировки");
            }
        }
    
        _logger.LogInformation("Publisher завершил публикацию. Всего: {Count}", _publishedCount);
        _channel.PublisherToSubscriber.Complete();
    }
    
    private Quote GenerateRandomQuote(string[] symbols)
    {
        var symbol = symbols[_random.Next(symbols.Length)];
        return new Quote
        {
            Symbol = symbol,
            Price = 100 + _random.Next(-20, 20) + (decimal)_random.NextDouble(),
            Timestamp = DateTime.UtcNow,
            Source = "Publisher"
        };
    }
    
    private async Task PublishQuoteAsync(Quote quote, CancellationToken stoppingToken)
    {
        await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken);
        _channel.MonitoringBroadcast.Post(quote);
    }
    
    private void UpdateAndLogPublicationStats()
    {
        _publishedCount++;
    
        if (_publishedCount % 10 == 0)
        {
            _logger.LogInformation("Publisher: опубликовано {Count} котировок", _publishedCount);
        }
    }
    

    Разбиение на функции и их назначение

    1. SetupIncomingQuotesProcessing — настраивает блок обработки входящих котировок и связь с каналом:

      • создаёт ActionBlock для обработки котировок;

      • связывает канал SubscriberToPublisher с блоком обработки;

      • задаёт параметры параллелизма и буферизации.

    2. ProcessIncomingQuote — обрабатывает входящие котировки от подписчиков:

      • увеличивает счётчик полученных котировок;

      • логирует факт получения и текущее количество.

    3. RunQuotePublicationLoop — основной цикл генерации и публикации котировок:

      • содержит цикл while с проверкой stoppingToken;

      • обрабатывает исключения (OperationCanceledException, общие ошибки);

      • завершает канал отправки котировок при остановке.

    4. GenerateRandomQuote — генерирует случайную котировку:

      • выбирает случайный символ из списка;

      • формирует цену с учётом случайного отклонения;

      • устанавливает метку времени и источник.

    5. PublishQuoteAsync — публикует котировку в каналах:

      • отправляет котировку в основной канал (PublisherToSubscriber);

      • дублирует котировку в канал мониторинга (MonitoringBroadcast).

    6. UpdateAndLogPublicationStats — обновляет статистику и логирует каждые 10 котировок:

      • увеличивает счётчик опубликованных котировок;

      • выводит логи при достижении кратного 10 значения.


    Преимущества такого подхода

    • Читаемость. Код разделён на логические блоки с понятными названиями.

    • Тестируемость. Каждый метод можно протестировать отдельно (например, проверить генерацию котировки или логику обновления статистики).

    • Поддерживаемость. Изменения в одной части (например, в алгоритме генерации котировок) не затрагивают другие.

    • Повторное использование. Методы можно переиспользовать в других сервисах или сценариях.

    • Соблюдение SRP. Каждый метод выполняет одну задачу, что соответствует принципу единственной ответственности.

    Хотите, я раскрою какой‑то конкретный метод подробнее или предложу дополнительные улучшения?

  • Я

    Сейчас стало намного лучше.
  • Алиса

    Спасибо! Рада, что результат вам понравился.

    Давайте закрепим понимание — кратко резюмирую, какие именно улучшения появились в коде после инкапсуляции бизнес‑логики в отдельные методы, и почему это действительно делает код лучше.

    Что улучшилось

    1. Читаемость и структура

    Код теперь разделён на чётко очерченные блоки, каждый из которых отвечает за свою задачу. Вместо одного большого метода с множеством строк — несколько маленьких, с говорящими именами. Это позволяет:

    • быстро понять назначение каждого фрагмента;

    • легко найти нужный участок кода;

    • снизить когнитивную нагрузку при чтении.

    2. Принцип единственной ответственности (SRP)

    Каждый метод выполняет одну конкретную задачу:

    • SetupIncomingQuotesProcessing — настройка обработки входящих котировок;

    • GenerateRandomQuote — генерация котировки;

    • PublishQuoteAsync — публикация котировки в каналах;

    • UpdateAndLogPublicationStats — обновление статистики и логирование;

    • ProcessIncomingQuote — обработка обратной связи от подписчиков;

    • RunQuotePublicationLoop — управление основным циклом публикации.

    3. Тестируемость

    Теперь можно написать юнит‑тесты для отдельных методов:

    • проверить корректность генерации котировок (GenerateRandomQuote);

    • убедиться, что статистика обновляется правильно (UpdateAndLogPublicationStats);

    • протестировать логику публикации (PublishQuoteAsync) в изоляции от остального кода.

    4. Поддерживаемость и масштабируемость

    Изменения в одной части кода не затрагивают другие:

    • если изменится формат котировки, достаточно отредактировать GenerateRandomQuote;

    • если добавится новый канал для публикации, нужно будет изменить только PublishQuoteAsync;

    • для добавления новой метрики достаточно дополнить UpdateAndLogPublicationStats.

    5. Повторное использование

    Методы с изолированной логикой можно переиспользовать:

    • GenerateRandomQuote может пригодиться в тестовом сервисе‑генераторе;

    • PublishQuoteAsync можно адаптировать для другого сервиса публикации;

    • логику обработки входящих котировок (ProcessIncomingQuote) можно применить в других компонентах системы.

    6. Обработка ошибок

    Исключения локализованы в основном цикле (RunQuotePublicationLoop), что позволяет:

    • чётко отделить бизнес‑логику от обработки ошибок;

    • гибко настраивать реакцию на разные типы исключений;

    • избежать дублирования блоков try‑catch в каждом методе.

    7. Логирование

    Логирование вынесено в соответствующие методы:

    • старт сервиса логируется в ExecuteAsync;

    • ошибки — в блоке catch основного цикла;

    • статистика — в UpdateAndLogPublicationStats;

    • прерывание по отмене — в catch (OperationCanceledException).


    Краткий обзор методов

    МетодНазначение
    ExecuteAsyncТочка входа. Вызывает настройку и запускает основной цикл.
    SetupIncomingQuotesProcessingНастраивает блок обработки входящих котировок и связь с каналом.
    ProcessIncomingQuoteОбрабатывает обратную связь от подписчиков (счётчик + логи).
    RunQuotePublicationLoopОсновной цикл: генерация, публикация, статистика, пауза.
    GenerateRandomQuoteГенерирует случайную котировку (символ, цена, время).
    PublishQuoteAsyncОтправляет котировку в основной канал и мониторинг.
    UpdateAndLogPublicationStatsОбновляет счётчик и логирует каждые 10 котировок.

    Перспективы развития

    Такой код легко расширять:

    • Добавить конфигурацию. Вынести параметры (BoundedCapacity, MaxDegreeOfParallelism, список символов) в IConfiguration.

    • Интегрировать метрики. Добавить Counter для отслеживания скорости публикации, ошибок и т. д.

    • Реализовать повторные попытки. В PublishQuoteAsync добавить механизм повтора отправки при временных ошибках.

    • Настроить таймауты. Добавить таймаут для SendAsync в PublishQuoteAsync.

    • Расширить логирование. Добавить структурированные логи с контекстом (ID котировки, длительность операции).


    Итог

    Инкапсуляция бизнес‑логики дала:

    • чёткое разделение ответственности;

    • лёгкость тестирования и поддержки;

    • гибкость для будущих изменений;

    • улучшенную читаемость и понимание кода.

    Код стал более профессиональным, предсказуемым и устойчивым к ошибкам.

    Если хотите, могу:

    • предложить реализацию одного из пунктов из раздела «Перспективы развития»;

    • разобрать какой‑то метод подробнее;

    • показать, как написать тест для одного из новых методов;

    • внести дополнительные улучшения в структуру.

    Просто скажите, какой аспект вас интересует больше всего!

  • Я

    Сможете справиться Довести этот код до совершенного ? Вот ВАм шпаргалка, что необходимо сделать --------------------------------------- Что еще можно улучшить: 1. Проблема с PropagateCompletion = true csharp _channel.SubscriberToPublisher.LinkTo( _incomingQuotesBlock, new DataflowLinkOptions { PropagateCompletion = true }); Вы говорите: "когда SubscriberToPublisher завершится — заверши и _incomingQuotesBlock". Но кто завершает SubscriberToPublisher? SubscriberService? Publisher? Никто? Риск: Если канал никогда не завершится, блок будет висеть вечно. 2. Нет ожидания завершения Dataflow блоков при остановке Сейчас в StopAsync: csharp public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("Publisher Service останавливается..."); _incomingQuotesBlock?.Complete(); // ❌ Никто не ждет Completion await base.StopAsync(cancellationToken); } Должно быть: csharp public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("Publisher Service останавливается..."); if (_incomingQuotesBlock != null) { _incomingQuotesBlock.Complete(); await _incomingQuotesBlock.Completion; // Ждем обработки последних сообщений } await base.StopAsync(cancellationToken); } 3. Гонка при доступе к счетчикам csharp private void ProcessIncomingQuote(Quote quote) { _receivedCount++; // ⚠️ Неатомарная операция в многопоточном контексте } ActionBlock с MaxDegreeOfParallelism = 2 может вызывать ProcessIncomingQuote из разных потоков. Используйте Interlocked.Increment: csharp private void ProcessIncomingQuote(Quote quote) { var count = Interlocked.Increment(ref _receivedCount); _logger.LogInformation("Publisher получил обратную связь: {Quote} (всего получено: {Count})", quote, count); } И для _publishedCount то же самое. 4. PublishQuoteAsync не проверяет успешность отправки мониторингу csharp private async Task PublishQuoteAsync(Quote quote, CancellationToken stoppingToken) { await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken); _channel.MonitoringBroadcast.Post(quote); // Если блок переполнен - вернет false, вы это игнорируете } Лучше: csharp if (!_channel.MonitoringBroadcast.Post(quote)) { _logger.LogWarning("Мониторинг переполнен, котировка {Symbol} потеряна", quote.Symbol); } 5. Порядок остановки При остановке сначала завершается цикл публикации, потом вызывается Complete() на канале. Но _incomingQuotesBlock продолжает работать и может пытаться отправить логи после завершения. Хорошо бы явно дождаться его завершения в ExecuteAsync: csharp protected override async Task ExecuteAsync(CancellationToken stoppingToken) { SetupIncomingQuotesProcessing(stoppingToken); _logger.LogInformation("Publisher Service запущен. Ожидаем котировки..."); try { await RunQuotePublicationLoop(stoppingToken); } finally { // Даем время дообработать входящие сообщения _incomingQuotesBlock?.Complete(); await (_incomingQuotesBlock?.Completion ?? Task.CompletedTask); } } ------------ Сможете ?
  • Алиса

    Конечно! Вот доработанный код с учётом всех указанных замечаний — он стал более надёжным, потокобезопасным и устойчивым к ошибкам:

    csharp
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        SetupIncomingQuotesProcessing(stoppingToken);
        _logger.LogInformation("Publisher Service запущен. Ожидаем котировки...");
    
        try
        {
            await RunQuotePublicationLoop(stoppingToken);
        }
        finally
        {
            // Даём время дообработать входящие сообщения
            _incomingQuotesBlock?.Complete();
            await (_incomingQuotesBlock?.Completion ?? Task.CompletedTask);
        }
    }
    
    private void SetupIncomingQuotesProcessing(CancellationToken stoppingToken)
    {
        _incomingQuotesBlock = new ActionBlock<Quote>(
            ProcessIncomingQuote,
            new ExecutionDataflowBlockOptions
            {
                BoundedCapacity = 10,
                MaxDegreeOfParallelism = 2,
                CancellationToken = stoppingToken,
                NameFormat = "Publisher-Incoming"
            });
    
        // Убираем PropagateCompletion = true — пусть PublisherService сам управляет завершением
        _channel.SubscriberToPublisher.LinkTo(
            _incomingQuotesBlock,
            new DataflowLinkOptions { PropagateCompletion = false });
    }
    
    private void ProcessIncomingQuote(Quote quote)
    {
        var count = Interlocked.Increment(ref _receivedCount);
        _logger.LogInformation(
            "Publisher получил обратную связь: {Quote} (всего получено: {Count})",
            quote, count);
    }
    
    private async Task RunQuotePublicationLoop(CancellationToken stoppingToken)
    {
        var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA" };
    
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var quote = GenerateRandomQuote(symbols);
                await PublishQuoteAsync(quote, stoppingToken);
                UpdateAndLogPublicationStats();
                await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken);
            }
            catch (OperationCanceledException)
            {
                _logger.LogInformation("Публикация котировок прервана по запросу остановки.");
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Ошибка при публикации котировки");
            }
        }
    
        _logger.LogInformation("Publisher завершил публикацию. Всего: {Count}", _publishedCount);
        _channel.PublisherToSubscriber.Complete();
    }
    
    private Quote GenerateRandomQuote(string[] symbols)
    {
        var symbol = symbols[_random.Next(symbols.Length)];
        return new Quote
        {
            Symbol = symbol,
            Price = 100 + _random.Next(-20, 20) + (decimal)_random.NextDouble(),
            Timestamp = DateTime.UtcNow,
            Source = "Publisher"
        };
    }
    
    private async Task PublishQuoteAsync(Quote quote, CancellationToken stoppingToken)
    {
        await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken);
    
        // Проверяем успешность отправки в мониторинг
        if (!_channel.MonitoringBroadcast.Post(quote))
        {
            _logger.LogWarning("Мониторинг переполнен, котировка {Symbol} потеряна", quote.Symbol);
        }
    }
    
    private void UpdateAndLogPublicationStats()
    {
        var count = Interlocked.Increment(ref _publishedCount);
    
        if (count % 10 == 0)
        {
            _logger.LogInformation("Publisher: опубликовано {Count} котировок", count);
        }
    }
    
    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Publisher Service останавливается...");
    
        if (_incomingQuotesBlock != null)
        {
            _incomingQuotesBlock.Complete();
            await _incomingQuotesBlock.Completion; // Ждём обработки последних сообщений
        }
    
        await base.StopAsync(cancellationToken);
    }
    

    Подробное описание внесённых улучшений

    1. Исправлена проблема с PropagateCompletion = true

    • Было: PropagateCompletion = true — блок автоматически завершался при завершении канала.

    • Стало: PropagateCompletion = false — теперь PublisherService явно управляет завершением блока. Это устраняет риск вечного висения блока, если канал никогда не завершается.

    2. Добавлено ожидание завершения Dataflow‑блоков при остановке

    • В ExecuteAsync добавлен блок finally, где:

      • вызывается _incomingQuotesBlock?.Complete();

      • ожидается завершение обработки через await (_incomingQuotesBlock?.Completion ?? Task.CompletedTask).

    • В StopAsync теперь явно ждём завершения _incomingQuotesBlock.Completion после вызова Complete().

    3. Устранена гонка при доступе к счётчикам

    • Для _receivedCount и _publishedCount использован Interlocked.Increment:

      • гарантирует атомарность операций инкремента в многопоточном контексте;

      • предотвращает потерю данных при параллельном доступе из разных потоков (с учётом MaxDegreeOfParallelism = 2).

    4. Добавлена проверка успешности отправки в мониторинг

    • В PublishQuoteAsync добавлена проверка результата Post():

      csharp
      if (!_channel.MonitoringBroadcast.Post(quote))
      {
          _logger.LogWarning("Мониторинг переполнен, котировка {Symbol} потеряна", quote.Symbol);
      }
      
    • Если блок мониторинга переполнен, логируется предупреждение — это позволяет отслеживать потерю данных.

    5. Оптимизирован порядок остановки

    • В ExecuteAsync:

      • основной цикл публикации выполняется в try;

      • в finally гарантируется завершение _incomingQuotesBlock и ожидание его Completion.

    • Это гарантирует, что:

      • все входящие котировки будут обработаны до завершения сервиса;

      • не произойдёт попытки отправки логов после завершения работы.


    Краткий итог улучшений

    ПроблемаРешениеЭффект
    PropagateCompletion = trueУстановлено falseКонтроль над завершением теперь у PublisherService
    Нет ожидания завершения блоковДобавлен await Completion в ExecuteAsync и StopAsyncГарантированное завершение обработки всех сообщений
    Гонка при доступе к счётчикамИспользован Interlocked.IncrementПотокобезопасность операций инкремента
    Игнорирование ошибок мониторингаДобавлена проверка результата Post()Обнаружение переполнения канала мониторинга
    Некорректный порядок остановкиБлок finally в ExecuteAsyncВсе входящие сообщения обрабатываются до завершения

    Код стал:

    • надёжнее — устранены потенциальные утечки ресурсов и гонки данных;

    • устойчивее — добавлена обработка крайних случаев (переполнение, отмена);

    • прозрачнее — улучшено логирование для диагностики проблем;

    • поддерживаемее — чёткое разделение ответственности и явное управление жизненным циклом.

    Хотите, я раскрою какой‑то аспект подробнее или предложу дополнительные улучшения?

    Комментариев нет:

    Отправить комментарий