< Summary - Syki

Information
Class: Syki.Daemon.Events.DomainEventsProcessor
Assembly: Daemon
File(s): /home/runner/work/syki/syki/Daemon/Events/DomainEventsProcessor.cs
Tag: 21_17346963026
Line coverage
91%
Covered lines: 52
Uncovered lines: 5
Coverable lines: 57
Total lines: 98
Line coverage: 91.2%
Branch coverage
65%
Covered branches: 21
Total branches: 32
Branch coverage: 65.6%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
.cctor()100%11100%
Run()100%11100%
Process()65.62%373283.33%
GetData(...)100%11100%
GetHandler(...)100%11100%

File(s)

/home/runner/work/syki/syki/Daemon/Events/DomainEventsProcessor.cs

#LineLine coverage
 1using Newtonsoft.Json;
 2using Syki.Back.Events;
 3using System.Diagnostics;
 4using Syki.Back.Database;
 5using Syki.Daemon.Configs;
 6using System.Collections.Concurrent;
 7
 8namespace Syki.Daemon.Events;
 9
 265810public class DomainEventsProcessor(IServiceScopeFactory serviceScopeFactory)
 11{
 212    private static readonly ActivitySource _activitySource = new (OpenTelemetryConfigs.DomainEventsProcessing);
 13
 14    public async Task Run()
 15    {
 265816        using var scope = serviceScopeFactory.CreateScope();
 265817        var ctx = scope.ServiceProvider.GetRequiredService<SykiDbContext>();
 18
 265819        await Process(scope, ctx, Guid.CreateVersion7());
 265820    }
 21
 22    private static async Task Process(IServiceScope scope, SykiDbContext ctx, Guid processorId)
 23    {
 518424        var events = await ctx.DomainEvents.FromSqlRaw(Sql, processorId).ToListAsync();
 784225        if (events.Count == 0) return;
 26
 252627        await ctx.Database.BeginTransactionAsync();
 252628        ctx.Database.AutoSavepointsEnabled = false;
 252629        var sw = Stopwatch.StartNew();
 30
 1036431        foreach (var evt in events)
 32        {
 265633            using (var activity = _activitySource.StartActivity($"Process {evt.Type.Split('.').Last()}", ActivityKind.Co
 34            {
 265635                activity?.SetTag("event.id", evt.Id);
 265636                activity?.SetTag("event.type", evt.Type.Split('.').Last());
 265637                activity?.SetTag("event.institutionId", evt.InstitutionId);
 38
 39                try
 40                {
 265641                    sw.Restart();
 265642                    dynamic data = GetData(evt);
 265643                    dynamic handler = GetHandler(scope, evt);
 265644                    await handler.Handle(evt.InstitutionId, evt.Id, data);
 265645                }
 046                catch (Exception ex)
 47                {
 048                    evt.Error = ex.Message + ex.InnerException?.Message;
 049                    activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
 050                    activity?.AddException(ex);
 051                }
 52                finally
 53                {
 265654                    sw.Stop();
 265655                    evt.Processed(sw.Elapsed.TotalMilliseconds);
 265656                    activity?.SetTag("event.duration", evt.Duration);
 57                }
 265658            }
 265659        }
 60
 252661        await ctx.SaveChangesAsync();
 252662        await ctx.Database.CommitTransactionAsync();
 63
 252664        ctx.ChangeTracker.Clear();
 65
 252666        await Process(scope, ctx, processorId);
 518467    }
 68
 269    private static readonly ConcurrentDictionary<string, Type> _types = new();
 70    private static dynamic GetData(DomainEvent evt)
 71    {
 265672        var type = _types.GetOrAdd(evt.Type, typeof(DomainEvent).Assembly.GetType(evt.Type)!);
 265673        dynamic data = JsonConvert.DeserializeObject(evt.Data, type)!;
 265674        return data;
 75    }
 76
 277    private static readonly ConcurrentDictionary<string, Type> _handlers = new();
 78    private static dynamic GetHandler(IServiceScope scope, DomainEvent evt)
 79    {
 265680        var handlerType = _handlers.GetOrAdd(evt.Type, typeof(IDomainEvent).Assembly.GetType($"{evt.Type}Handler")!);
 265681        dynamic handler = scope.ServiceProvider.GetRequiredService(handlerType);
 265682        return handler;
 83    }
 84
 285    private static readonly string Sql = @"
 286        UPDATE syki.domain_events
 287        SET processor_id = {0}, status = 'Processing'
 288        WHERE ctid IN (
 289            SELECT ctid
 290            FROM syki.domain_events
 291            WHERE processor_id IS NULL
 292            ORDER BY occurred_at
 293            FOR UPDATE SKIP LOCKED
 294            LIMIT 100
 295        )
 296        RETURNING *;
 297    ";
 98}