< Summary - Syki

Information
Class: Syki.Back.Commands.CommandsProcessor
Assembly: Back
File(s): /home/runner/work/syki/syki/Back/Commands/CommandsProcessor.cs
Tag: 56_26538939494
Line coverage
61%
Covered lines: 56
Uncovered lines: 35
Coverable lines: 91
Total lines: 146
Line coverage: 61.5%
Branch coverage
42%
Covered branches: 12
Total branches: 28
Branch coverage: 42.8%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
.cctor()100%11100%
Execute()100%11100%
Process()46.15%512666.66%
CreateRetryCommand(...)0%620%
GetInvoker(...)100%11100%

File(s)

/home/runner/work/syki/syki/Back/Commands/CommandsProcessor.cs

#LineLine coverage
 1using Quartz;
 2using System.Diagnostics;
 3using Syki.Back.Domain.Enums;
 4using System.Collections.Concurrent;
 5using Syki.Back.Commands.Domain.Commands;
 6
 7namespace Syki.Back.Commands;
 8
 3289public class CommandsProcessor(IServiceScopeFactory serviceScopeFactory) : IJob
 10{
 211    private static readonly ActivitySource _activitySource = new (OpenTelemetryConfigs.CommandsProcessing);
 12
 13    public async Task Execute(IJobExecutionContext context)
 14    {
 32815        using var scope = serviceScopeFactory.CreateScope();
 32816        var ctx = scope.ServiceProvider.GetRequiredService<SykiDbContext>();
 17
 32818        await Process(scope, ctx);
 32819    }
 20
 21    private static async Task Process(IServiceScope scope, SykiDbContext ctx)
 22    {
 32823        var processorId = Guid.NewGuid();
 32824        var sw = Stopwatch.StartNew();
 25
 026        while (true)
 27        {
 63228            var commands = await ctx.Commands.FromSqlRaw(Sql, processorId).AsNoTracking().ToListAsync();
 63229            if (commands.Count == 0) break;
 30
 121631            foreach (var command in commands)
 32            {
 30433                using (var activity = _activitySource.StartActivity($"Process {command.Type}", ActivityKind.Consumer, co
 34                {
 30435                    activity?.SetTag("command.id", command.Id);
 30436                    activity?.SetTag("command.type", command.Type);
 30437                    ctx.ActivityId = command.ActivityId!;
 30438                    ctx.Operation = command.Type;
 30439                    ctx.CommandLogs = [];
 40
 41                    try
 42                    {
 30443                        sw.Restart();
 44
 30445                        ctx.ChangeTracker.Clear();
 30446                        ctx.Attach(command);
 47
 30448                        await ctx.Database.BeginTransactionAsync();
 30449                        ctx.Database.AutoSavepointsEnabled = false;
 50
 30451                        var invoker = GetInvoker(command);
 30452                        await invoker.Invoke(scope.ServiceProvider, command.Id, command.Data);
 30453                    }
 054                    catch (Exception ex)
 55                    {
 056                        if (ctx.Database.CurrentTransaction is not null)
 057                            await ctx.Database.RollbackTransactionAsync();
 58
 059                        ctx.ChangeTracker.Clear();
 060                        ctx.Attach(command);
 061                        command.Error = ex.Message + ex.InnerException?.Message;
 062                        activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
 063                        activity?.AddException(ex);
 064                        await ctx.Database.BeginTransactionAsync();
 065                    }
 66
 30467                    sw.Stop();
 30468                    command.Logs = ctx.CommandLogs;
 30469                    command.Processed(sw.Elapsed.TotalMilliseconds);
 30470                    activity?.SetTag("command.duration", command.Duration);
 71
 30472                    if (command.Error.HasValue() && command.MaxRetries > 0)
 73                    {
 074                        var retryCommand = CreateRetryCommand(command, ctx.ActivityId);
 075                        ctx.Commands.Add(retryCommand);
 76                    }
 77
 30478                    await ctx.SaveChangesAsync();
 30479                    await ctx.Database.CommitTransactionAsync();
 30480                }
 30481            }
 82
 30483            if (ctx.HasPendingCommands)
 84            {
 085                var scheduler = await scope.ServiceProvider.GetRequiredService<ISchedulerFactory>().GetScheduler();
 086                await scheduler.TriggerCommandsProcessorJob();
 87            }
 88        }
 32889    }
 90
 91    private static Command CreateRetryCommand(Command failedCommand, string activityId)
 92    {
 093        var invoker = GetInvoker(failedCommand);
 094        var data = invoker.Deserialize(failedCommand.Data);
 95
 096        var originalId = failedCommand.OriginalId ?? failedCommand.Id;
 97
 098        var retryAttempt = failedCommand.RetryAttempt + 1;
 099        var delaySeconds = CommandBackoffStrategies.GetDelaySeconds(
 0100            failedCommand.BackoffStrategy, failedCommand.BaseDelaySeconds, retryAttempt);
 101
 0102        var retryCommand = new Command(
 0103            failedCommand.InstitutionId,
 0104            data,
 0105            originalId: originalId,
 0106            activityId: activityId,
 0107            maxRetries: failedCommand.MaxRetries - 1,
 0108            delaySeconds: delaySeconds,
 0109            backoffStrategy: failedCommand.BackoffStrategy,
 0110            baseDelaySeconds: failedCommand.BaseDelaySeconds)
 0111        {
 0112            Type = failedCommand.Type,
 0113            RetryAttempt = retryAttempt,
 0114        };
 115
 0116        return retryCommand;
 117    }
 118
 2119    private static readonly ConcurrentDictionary<string, ICommandInvoker> _invokers = new();
 120    private static ICommandInvoker GetInvoker(Command command)
 121    {
 304122        return _invokers.GetOrAdd(command.Type, typeName =>
 304123        {
 4124            var commandType = CommandConfigs.GetCommandType(typeName);
 4125            var invokerType = typeof(CommandInvoker<>).MakeGenericType(commandType);
 4126            return (ICommandInvoker)Activator.CreateInstance(invokerType)!;
 304127        });
 128    }
 129
 130    /// <summary>
 131    /// See <see cref="CommandStatus"/> for status mapping
 132    /// </summary>
 2133    private static readonly string Sql = @"
 2134        UPDATE syki.commands
 2135        SET processor_id = {0}, status = 2
 2136        WHERE ctid IN (
 2137            SELECT ctid
 2138            FROM syki.commands
 2139            WHERE processor_id IS NULL AND status = 0 AND (not_before IS NULL OR not_before < NOW())
 2140            ORDER BY created_at
 2141            FOR UPDATE SKIP LOCKED
 2142            LIMIT 10
 2143        )
 2144        RETURNING *;
 2145    ";
 146}