< Summary - Syki

Information
Class: Syki.Back.Commands.CommandsProcessor
Assembly: Back
File(s): /home/runner/work/syki/syki/Back/Commands/CommandsProcessor.cs
Tag: 97_27801654829
Line coverage
61%
Covered lines: 56
Uncovered lines: 35
Coverable lines: 91
Total lines: 145
Line coverage: 61.5%
Branch coverage
42%
Covered branches: 12
Total branches: 28
Branch coverage: 42.8%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
.cctor()100%11100%
Execute()100%11100%
Process()46.15%512666.66%
CreateRetryCommand(...)0%620%
GetInvoker(...)100%11100%

File(s)

/home/runner/work/syki/syki/Back/Commands/CommandsProcessor.cs

#LineLine coverage
 1using Quartz;
 2using System.Diagnostics;
 3using System.Collections.Concurrent;
 4using Syki.Back.Commands.Domain.Commands;
 5
 6namespace Syki.Back.Commands;
 7
 3528public class CommandsProcessor(IServiceScopeFactory serviceScopeFactory) : IJob
 9{
 210    private static readonly ActivitySource _activitySource = new (OpenTelemetryConfigs.CommandsProcessing);
 11
 12    public async Task Execute(IJobExecutionContext context)
 13    {
 35214        using var scope = serviceScopeFactory.CreateScope();
 35215        var ctx = scope.ServiceProvider.GetRequiredService<SykiDbContext>();
 16
 35217        await Process(scope, ctx);
 35218    }
 19
 20    private static async Task Process(IServiceScope scope, SykiDbContext ctx)
 21    {
 35222        var processorId = Guid.NewGuid();
 35223        var sw = Stopwatch.StartNew();
 24
 025        while (true)
 26        {
 68027            var commands = await ctx.Commands.FromSqlRaw(Sql, processorId).AsNoTracking().ToListAsync();
 68028            if (commands.Count == 0) break;
 29
 131230            foreach (var command in commands)
 31            {
 32832                using (var activity = _activitySource.StartActivity($"Process {command.Type}", ActivityKind.Consumer, co
 33                {
 32834                    activity?.SetTag("command.id", command.Id);
 32835                    activity?.SetTag("command.type", command.Type);
 32836                    ctx.ActivityId = command.ActivityId!;
 32837                    ctx.Operation = command.Type;
 32838                    ctx.CommandLogs = [];
 39
 40                    try
 41                    {
 32842                        sw.Restart();
 43
 32844                        ctx.ChangeTracker.Clear();
 32845                        ctx.Attach(command);
 46
 32847                        await ctx.Database.BeginTransactionAsync();
 32848                        ctx.Database.AutoSavepointsEnabled = false;
 49
 32850                        var invoker = GetInvoker(command);
 32851                        await invoker.Invoke(scope.ServiceProvider, command.Id, command.Data);
 32852                    }
 053                    catch (Exception ex)
 54                    {
 055                        if (ctx.Database.CurrentTransaction is not null)
 056                            await ctx.Database.RollbackTransactionAsync();
 57
 058                        ctx.ChangeTracker.Clear();
 059                        ctx.Attach(command);
 060                        command.Error = ex.Message + ex.InnerException?.Message;
 061                        activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
 062                        activity?.AddException(ex);
 063                        await ctx.Database.BeginTransactionAsync();
 064                    }
 65
 32866                    sw.Stop();
 32867                    command.Logs = ctx.CommandLogs;
 32868                    command.Processed(sw.Elapsed.TotalMilliseconds);
 32869                    activity?.SetTag("command.duration", command.Duration);
 70
 32871                    if (command.Error.HasValue() && command.MaxRetries > 0)
 72                    {
 073                        var retryCommand = CreateRetryCommand(command, ctx.ActivityId);
 074                        ctx.Commands.Add(retryCommand);
 75                    }
 76
 32877                    await ctx.SaveChangesAsync();
 32878                    await ctx.Database.CommitTransactionAsync();
 32879                }
 32880            }
 81
 32882            if (ctx.HasPendingCommands)
 83            {
 084                var scheduler = await scope.ServiceProvider.GetRequiredService<ISchedulerFactory>().GetScheduler();
 085                await scheduler.TriggerCommandsProcessorJob();
 86            }
 87        }
 35288    }
 89
 90    private static Command CreateRetryCommand(Command failedCommand, string activityId)
 91    {
 092        var invoker = GetInvoker(failedCommand);
 093        var data = invoker.Deserialize(failedCommand.Data);
 94
 095        var originalId = failedCommand.OriginalId ?? failedCommand.Id;
 96
 097        var retryAttempt = failedCommand.RetryAttempt + 1;
 098        var delaySeconds = CommandBackoffStrategies.GetDelaySeconds(
 099            failedCommand.BackoffStrategy, failedCommand.BaseDelaySeconds, retryAttempt);
 100
 0101        var retryCommand = new Command(
 0102            failedCommand.InstitutionId,
 0103            data,
 0104            originalId: originalId,
 0105            activityId: activityId,
 0106            maxRetries: failedCommand.MaxRetries - 1,
 0107            delaySeconds: delaySeconds,
 0108            backoffStrategy: failedCommand.BackoffStrategy,
 0109            baseDelaySeconds: failedCommand.BaseDelaySeconds)
 0110        {
 0111            Type = failedCommand.Type,
 0112            RetryAttempt = retryAttempt,
 0113        };
 114
 0115        return retryCommand;
 116    }
 117
 2118    private static readonly ConcurrentDictionary<string, ICommandInvoker> _invokers = new();
 119    private static ICommandInvoker GetInvoker(Command command)
 120    {
 328121        return _invokers.GetOrAdd(command.Type, typeName =>
 328122        {
 4123            var commandType = CommandConfigs.GetCommandType(typeName);
 4124            var invokerType = typeof(CommandInvoker<>).MakeGenericType(commandType);
 4125            return (ICommandInvoker)Activator.CreateInstance(invokerType)!;
 328126        });
 127    }
 128
 129    /// <summary>
 130    /// See <see cref="CommandStatus"/> for status mapping
 131    /// </summary>
 2132    private static readonly string Sql = @"
 2133        UPDATE syki.commands
 2134        SET processor_id = {0}, status = 2
 2135        WHERE ctid IN (
 2136            SELECT ctid
 2137            FROM syki.commands
 2138            WHERE processor_id IS NULL AND status = 0 AND (not_before IS NULL OR not_before < NOW())
 2139            ORDER BY created_at
 2140            FOR UPDATE SKIP LOCKED
 2141            LIMIT 10
 2142        )
 2143        RETURNING *;
 2144    ";
 145}