< Summary - Syki

Information
Class: Syki.Daemon.Events.DomainEventsProcessor
Assembly: Daemon
File(s): /home/runner/work/syki/syki/Daemon/Events/DomainEventsProcessor.cs
Tag: 4_16869239191
Line coverage
91%
Covered lines: 52
Uncovered lines: 5
Coverable lines: 57
Total lines: 98
Line coverage: 91.2%
Branch coverage
65%
Covered branches: 21
Total branches: 32
Branch coverage: 65.6%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
.cctor()100%11100%
Run()100%11100%
Process()65.62%373283.33%
GetData(...)100%11100%
GetHandler(...)100%11100%

File(s)

/home/runner/work/syki/syki/Daemon/Events/DomainEventsProcessor.cs

#LineLine coverage
 1using Newtonsoft.Json;
 2using Syki.Back.Events;
 3using System.Diagnostics;
 4using Syki.Back.Database;
 5using Syki.Daemon.Configs;
 6using System.Collections.Concurrent;
 7
 8namespace Syki.Daemon.Events;
 9
 251610public class DomainEventsProcessor(IServiceScopeFactory serviceScopeFactory)
 11{
 212    private static readonly ActivitySource _activitySource = new (OpenTelemetryConfigs.DomainEventsProcessing);
 13
 14    public async Task Run()
 15    {
 251616        using var scope = serviceScopeFactory.CreateScope();
 251617        var ctx = scope.ServiceProvider.GetRequiredService<SykiDbContext>();
 18
 251619        await Process(scope, ctx, Guid.CreateVersion7());
 251620    }
 21
 22    private static async Task Process(IServiceScope scope, SykiDbContext ctx, Guid processorId)
 23    {
 489424        var events = await ctx.DomainEvents.FromSqlRaw(Sql, processorId).ToListAsync();
 741025        if (events.Count == 0) return;
 26
 237827        await ctx.Database.BeginTransactionAsync();
 237828        ctx.Database.AutoSavepointsEnabled = false;
 237829        var sw = Stopwatch.StartNew();
 30
 978431        foreach (var evt in events)
 32        {
 251433            using (var activity = _activitySource.StartActivity($"Process {evt.Type.Split('.').Last()}", ActivityKind.Co
 34            {
 251435                activity?.SetTag("event.id", evt.Id);
 251436                activity?.SetTag("event.type", evt.Type.Split('.').Last());
 251437                activity?.SetTag("event.institutionId", evt.InstitutionId);
 38
 39                try
 40                {
 251441                    sw.Restart();
 251442                    dynamic data = GetData(evt);
 251443                    dynamic handler = GetHandler(scope, evt);
 251444                    await handler.Handle(evt.InstitutionId, evt.Id, data);
 251445                }
 046                catch (Exception ex)
 47                {
 048                    evt.Error = ex.Message + ex.InnerException?.Message;
 049                    activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
 050                    activity?.AddException(ex);
 051                }
 52                finally
 53                {
 251454                    sw.Stop();
 251455                    evt.Processed(sw.Elapsed.TotalMilliseconds);
 251456                    activity?.SetTag("event.duration", evt.Duration);
 57                }
 251458            }
 251459        }
 60
 237861        await ctx.SaveChangesAsync();
 237862        await ctx.Database.CommitTransactionAsync();
 63
 237864        ctx.ChangeTracker.Clear();
 65
 237866        await Process(scope, ctx, processorId);
 489467    }
 68
 269    private static readonly ConcurrentDictionary<string, Type> _types = new();
 70    private static dynamic GetData(DomainEvent evt)
 71    {
 251472        var type = _types.GetOrAdd(evt.Type, typeof(DomainEvent).Assembly.GetType(evt.Type)!);
 251473        dynamic data = JsonConvert.DeserializeObject(evt.Data, type)!;
 251474        return data;
 75    }
 76
 277    private static readonly ConcurrentDictionary<string, Type> _handlers = new();
 78    private static dynamic GetHandler(IServiceScope scope, DomainEvent evt)
 79    {
 251480        var handlerType = _handlers.GetOrAdd(evt.Type, typeof(IDomainEvent).Assembly.GetType($"{evt.Type}Handler")!);
 251481        dynamic handler = scope.ServiceProvider.GetRequiredService(handlerType);
 251482        return handler;
 83    }
 84
 285    private static readonly string Sql = @"
 286        UPDATE syki.domain_events
 287        SET processor_id = {0}, status = 'Processing'
 288        WHERE ctid IN (
 289            SELECT ctid
 290            FROM syki.domain_events
 291            WHERE processor_id IS NULL
 292            ORDER BY occurred_at
 293            FOR UPDATE SKIP LOCKED
 294            LIMIT 100
 295        )
 296        RETURNING *;
 297    ";
 98}