< Summary - Syki

Information
Class: Syki.Daemon.Commands.CommandsProcessorDbListener
Assembly: Daemon
File(s): /home/runner/work/syki/syki/Daemon/Commands/CommandsProcessorDbListener.cs
Tag: 4_16869239191
Line coverage
90%
Covered lines: 38
Uncovered lines: 4
Coverable lines: 42
Total lines: 150
Line coverage: 90.4%
Branch coverage
87%
Covered branches: 7
Total branches: 8
Branch coverage: 87.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ExecuteAsync()100%66100%
<ExecuteAsync()100%11100%
<ExecuteAsync()50%2278.94%
CreateCommandTrigger()100%11100%
CreateCommandBatchTrigger()100%11100%

File(s)

/home/runner/work/syki/syki/Daemon/Commands/CommandsProcessorDbListener.cs

#LineLine coverage
 1using Dapper;
 2using Npgsql;
 3
 4namespace Syki.Daemon.Commands;
 5
 26public class CommandsProcessorDbListener(IConfiguration configuration, IServiceScopeFactory serviceScopeFactory) : Backg
 7{
 28    private readonly SemaphoreSlim _throttler = new(10);
 9
 10    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
 11    {
 212        await using var connection = new NpgsqlConnection(configuration.Database().ConnectionString);
 13
 214        await connection.OpenAsync(stoppingToken);
 15
 216        await CreateCommandTrigger(connection);
 217        await CreateCommandBatchTrigger(connection);
 18
 219        _ = Task.Run(async () =>
 220        {
 221            var processor = new CommandsProcessor(serviceScopeFactory);
 222            await processor.Run();
 423        }, CancellationToken.None);
 24
 225        connection.Notification += async (o, e) =>
 226        {
 237827            if (!await _throttler.WaitAsync(0, CancellationToken.None))
 028                return;
 229
 237830            _ = Task.Run(async () =>
 237831            {
 237832                try
 237833                {
 237834                    var processor = new CommandsProcessor(serviceScopeFactory);
 237835                    await processor.Run();
 237836                }
 037                catch (Exception ex)
 237838                {
 039                    Console.WriteLine($"Error processing command: {ex.Message}");
 040                }
 237841                finally
 237842                {
 237843                    _throttler.Release();
 237844                }
 475645            }, CancellationToken.None);
 238046        };
 47
 248        await using (var cmd = new NpgsqlCommand("LISTEN new_command;", connection))
 49        {
 250            await cmd.ExecuteNonQueryAsync(stoppingToken);
 51        }
 52
 238253        while (!stoppingToken.IsCancellationRequested)
 54        {
 238055            await connection.WaitAsync(stoppingToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
 56        }
 257    }
 58
 59    private static async Task CreateCommandTrigger(NpgsqlConnection connection)
 60    {
 61        const string sql = @"
 62            CREATE OR REPLACE FUNCTION notify_new_command_function()
 63            RETURNS trigger
 64            LANGUAGE 'plpgsql'
 65            AS $BODY$
 66            BEGIN
 67                PERFORM pg_notify('new_command', '');
 68                RETURN NEW;
 69            END
 70            $BODY$;
 71
 72            CREATE OR REPLACE TRIGGER notify_new_command_trigger
 73            AFTER INSERT ON syki.commands
 74            EXECUTE PROCEDURE notify_new_command_function();
 75        ";
 76
 277        await connection.ExecuteAsync(sql);
 278    }
 79
 80    private static async Task CreateCommandBatchTrigger(NpgsqlConnection connection)
 81    {
 82        const string sql = @"
 83            CREATE OR REPLACE FUNCTION update_command_batch_function()
 84            RETURNS trigger
 85            LANGUAGE 'plpgsql'
 86            AS $BODY$
 87            DECLARE
 88                batch_status text;
 89                batch_success boolean;
 90                batch_next_command_id uuid;
 91            BEGIN
 92                IF NEW.batch_id IS NULL THEN
 93                    RETURN NEW;
 94                END IF;
 95
 96                SELECT
 97                    status, next_command_id INTO batch_status, batch_next_command_id
 98                FROM syki.command_batches
 99                WHERE id = NEW.batch_id
 100                FOR UPDATE;
 101
 102                IF batch_status = 'Error' THEN
 103                    RETURN NEW;
 104                END IF;
 105
 106                IF batch_status = 'Pending' THEN
 107                    UPDATE syki.command_batches
 108                    SET status = 'Processing'
 109                    WHERE id = NEW.batch_id;
 110                END IF;
 111
 112                IF NEW.status = 'Error' THEN
 113                    UPDATE syki.command_batches
 114                    SET status = 'Error'
 115                    WHERE id = NEW.batch_id;
 116                    RETURN NEW;
 117                END IF;
 118
 119                SELECT count(1) = 0 INTO batch_success
 120                FROM syki.commands
 121                WHERE batch_id = NEW.batch_id AND status <> 'Success';
 122
 123                IF NOT batch_success THEN
 124                    RETURN NEW;
 125                END IF;
 126
 127                UPDATE syki.command_batches
 128                SET status = 'Success', processed_at = now()
 129                WHERE id = NEW.batch_id;
 130
 131                IF batch_next_command_id IS NOT NULL THEN
 132                    UPDATE syki.commands
 133                    SET status = 'Pending'
 134                    WHERE id = batch_next_command_id;
 135
 136                    PERFORM pg_notify('new_command', '');
 137                END IF;
 138
 139                RETURN NEW;
 140            END
 141            $BODY$;
 142
 143            CREATE OR REPLACE TRIGGER update_command_batch_trigger
 144            AFTER UPDATE ON syki.commands
 145            FOR EACH ROW EXECUTE PROCEDURE update_command_batch_function();
 146        ";
 147
 2148        await connection.ExecuteAsync(sql);
 2149    }
 150}