< Summary - Syki

Information
Class: Syki.Daemon.Commands.CommandsProcessor
Assembly: Daemon
File(s): /home/runner/work/syki/syki/Daemon/Commands/CommandsProcessor.cs
Tag: 4_16869239191
Line coverage
88%
Covered lines: 53
Uncovered lines: 7
Coverable lines: 60
Total lines: 100
Line coverage: 88.3%
Branch coverage
67%
Covered branches: 23
Total branches: 34
Branch coverage: 67.6%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
.cctor()100%11100%
Run()100%11100%
Process()67.64%453478.78%
GetData(...)100%11100%
GetHandler(...)100%11100%

File(s)

/home/runner/work/syki/syki/Daemon/Commands/CommandsProcessor.cs

#LineLine coverage
 1using Newtonsoft.Json;
 2using System.Diagnostics;
 3using Syki.Back.Database;
 4using Syki.Daemon.Configs;
 5using System.Collections.Concurrent;
 6
 7namespace Syki.Daemon.Commands;
 8
 23809public class CommandsProcessor(IServiceScopeFactory serviceScopeFactory)
 10{
 211    private static readonly ActivitySource _activitySource = new (OpenTelemetryConfigs.CommandsProcessing);
 12
 13    public async Task Run()
 14    {
 238015        using var scope = serviceScopeFactory.CreateScope();
 238016        var ctx = scope.ServiceProvider.GetRequiredService<SykiDbContext>();
 17
 238018        await Process(scope, ctx, Guid.CreateVersion7());
 238019    }
 20
 21    private static async Task Process(IServiceScope scope, SykiDbContext ctx, Guid processorId)
 22    {
 469823        var commands = await ctx.Commands.FromSqlRaw(Sql, processorId).AsNoTracking().ToListAsync();
 707824        if (commands.Count == 0) return;
 25
 231826        var sw = Stopwatch.StartNew();
 27
 1001228        foreach (var command in commands)
 29        {
 268830            using (var activity = _activitySource.StartActivity($"Process {command.Type.Split('.').Last()}", ActivityKin
 31            {
 268832                activity?.SetTag("command.id", command.Id);
 268833                activity?.SetTag("command.type", command.Type.Split('.').Last());
 268834                activity?.SetTag("command.institutionId", command.InstitutionId);
 35
 36                try
 37                {
 268838                    sw.Restart();
 39
 268840                    ctx.ChangeTracker.Clear();
 268841                    ctx.Attach(command);
 42
 268843                    await ctx.Database.BeginTransactionAsync();
 268844                    ctx.Database.AutoSavepointsEnabled = false;
 45
 268846                    dynamic data = GetData(command);
 268847                    dynamic handler = GetHandler(scope, command);
 268848                    await handler.Handle(command.Id, data);
 268849                }
 050                catch (Exception ex)
 51                {
 052                    ctx.ChangeTracker.Clear();
 053                    ctx.Attach(command);
 054                    command.Error = ex.Message + ex.InnerException?.Message;
 055                    activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
 056                    activity?.AddException(ex);
 057                }
 58
 268859                sw.Stop();
 268860                command.Processed(sw.Elapsed.TotalMilliseconds);
 268861                activity?.SetTag("command.duration", command.Duration);
 62
 268863                await ctx.SaveChangesAsync();
 268864                await ctx.Database.CommitTransactionAsync();
 268865            }
 268866        }
 67
 231868        await Process(scope, ctx, processorId);
 469869    }
 70
 271    private static readonly ConcurrentDictionary<string, Type> _types = new();
 72    private static dynamic GetData(Command command)
 73    {
 268874        var type = _types.GetOrAdd(command.Type, typeof(Command).Assembly.GetType(command.Type)!);
 268875        dynamic data = JsonConvert.DeserializeObject(command.Data, type)!;
 268876        return data;
 77    }
 78
 279    private static readonly ConcurrentDictionary<string, Type> _handlers = new();
 80    private static dynamic GetHandler(IServiceScope scope, Command command)
 81    {
 268882        var handlerType = _handlers.GetOrAdd(command.Type, typeof(ICommand).Assembly.GetType($"{command.Type}Handler")!)
 268883        dynamic handler = scope.ServiceProvider.GetRequiredService(handlerType);
 268884        return handler;
 85    }
 86
 287    private static readonly string Sql = @"
 288        UPDATE syki.commands
 289        SET processor_id = {0}, status = 'Processing'
 290        WHERE ctid IN (
 291            SELECT ctid
 292            FROM syki.commands
 293            WHERE processor_id IS NULL AND (not_before IS NULL OR not_before < NOW())
 294            ORDER BY created_at
 295            FOR UPDATE SKIP LOCKED
 296            LIMIT 10
 297        )
 298        RETURNING *;
 299    ";
 100}