< Summary - Syki

Information
Class: Syki.Daemon.Events.DomainEventsProcessorDbListener
Assembly: Daemon
File(s): /home/runner/work/syki/syki/Daemon/Events/DomainEventsProcessorDbListener.cs
Tag: 4_16869239191
Line coverage
89%
Covered lines: 35
Uncovered lines: 4
Coverable lines: 39
Total lines: 77
Line coverage: 89.7%
Branch coverage
87%
Covered branches: 7
Total branches: 8
Branch coverage: 87.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ExecuteAsync()100%66100%
<ExecuteAsync()100%11100%
<ExecuteAsync()50%2278.94%
CreateTrigger()100%11100%

File(s)

/home/runner/work/syki/syki/Daemon/Events/DomainEventsProcessorDbListener.cs

#LineLine coverage
 1using Dapper;
 2using Npgsql;
 3
 4namespace Syki.Daemon.Events;
 5
 26public class DomainEventsProcessorDbListener(IConfiguration configuration, IServiceScopeFactory serviceScopeFactory) : B
 7{
 28    private readonly SemaphoreSlim _throttler = new(5);
 9
 10    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
 11    {
 212        await using var connection = new NpgsqlConnection(configuration.Database().ConnectionString);
 213        await connection.OpenAsync(stoppingToken);
 14
 215        await CreateTrigger(connection);
 16
 217        _ = Task.Run(async () =>
 218        {
 219            var processor = new DomainEventsProcessor(serviceScopeFactory);
 220            await processor.Run();
 421        }, CancellationToken.None);
 22
 223        connection.Notification += async (o, e) =>
 224        {
 251425            if (!await _throttler.WaitAsync(0, CancellationToken.None))
 026                return;
 227
 251428            _ = Task.Run(async () =>
 251429            {
 251430                try
 251431                {
 251432                    var processor = new DomainEventsProcessor(serviceScopeFactory);
 251433                    await processor.Run();
 251434                }
 035                catch (Exception ex)
 251436                {
 037                    Console.WriteLine($"Error processing domain event: {ex.Message}");
 038                }
 251439                finally
 251440                {
 251441                    _throttler.Release();
 251442                }
 502843            }, CancellationToken.None);
 251644        };
 45
 246        await using (var cmd = new NpgsqlCommand("LISTEN new_domain_event;", connection))
 47        {
 248            await cmd.ExecuteNonQueryAsync(stoppingToken);
 49        }
 50
 251851        while (!stoppingToken.IsCancellationRequested)
 52        {
 251653            await connection.WaitAsync(stoppingToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
 54        }
 255    }
 56
 57    private static async Task CreateTrigger(NpgsqlConnection connection)
 58    {
 59        const string sql = @"
 60            CREATE OR REPLACE FUNCTION notify_new_domain_event_function()
 61            RETURNS trigger
 62            LANGUAGE 'plpgsql'
 63            AS $BODY$
 64            BEGIN
 65                PERFORM pg_notify('new_domain_event', '');
 66                RETURN NEW;
 67            END
 68            $BODY$;
 69
 70            CREATE OR REPLACE TRIGGER notify_new_domain_event_trigger
 71            AFTER INSERT ON syki.domain_events
 72            EXECUTE PROCEDURE notify_new_domain_event_function();
 73        ";
 74
 275        await connection.ExecuteAsync(sql);
 276    }
 77}