< Summary - Syki

Information
Class: Syki.Back.Webhooks.ReceivedWebhookEventsProcessor
Assembly: Back
File(s): /home/runner/work/syki/syki/Back/Webhooks/ReceivedWebhookEventsProcessor.cs
Tag: 97_27801654829
Line coverage
0%
Covered lines: 0
Uncovered lines: 65
Coverable lines: 65
Total lines: 110
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 26
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
.cctor()100%210%
Execute()100%210%
Process()0%702260%
GetInvoker(...)100%210%

File(s)

/home/runner/work/syki/syki/Back/Webhooks/ReceivedWebhookEventsProcessor.cs

#LineLine coverage
 1using Quartz;
 2using System.Diagnostics;
 3using System.Collections.Concurrent;
 4
 5namespace Syki.Back.Webhooks;
 6
 07public class ReceivedWebhookEventsProcessor(IServiceScopeFactory serviceScopeFactory) : IJob
 8{
 09    private static readonly ActivitySource _activitySource = new(OpenTelemetryConfigs.WebhookEventsProcessing);
 10
 11    public async Task Execute(IJobExecutionContext context)
 12    {
 013        using var scope = serviceScopeFactory.CreateScope();
 014        var ctx = scope.ServiceProvider.GetRequiredService<SykiDbContext>();
 15
 016        await Process(scope, ctx);
 017    }
 18
 19    private static async Task Process(IServiceScope scope, SykiDbContext ctx)
 20    {
 021        var processorId = Guid.NewGuid();
 022        var commandsCreated = false;
 23
 024        while (true)
 25        {
 026            var events = await ctx.ReceivedWebhookEvents.FromSqlRaw(Sql, processorId).ToListAsync();
 027            if (events.Count == 0) break;
 28
 029            foreach (var evt in events)
 30            {
 031                using var activity = _activitySource.StartActivity($"Process {evt.Type}", ActivityKind.Consumer);
 032                activity?.SetTag("webhook_event.id", evt.Id);
 033                activity?.SetTag("webhook_event.type", evt.Type);
 034                activity?.SetTag("webhook_event.source", evt.Source.ToString());
 35
 36                try
 37                {
 038                    ctx.ChangeTracker.Clear();
 039                    ctx.Attach(evt);
 40
 041                    await ctx.Database.BeginTransactionAsync();
 42
 043                    if (WebhookConfigs.TryGetEventType(evt.Type, out _))
 44                    {
 045                        var invoker = GetInvoker(evt.Type);
 046                        await invoker.Invoke(scope.ServiceProvider, evt);
 047                        commandsCreated = commandsCreated || evt.CommandId != null;
 48                    }
 49                    else
 50                    {
 051                        evt.Status = ReceivedWebhookEventStatus.Ignored;
 052                        evt.ProcessedAt = DateTime.UtcNow;
 53                    }
 54
 055                    await ctx.SaveChangesAsync();
 056                    await ctx.Database.CommitTransactionAsync();
 057                }
 058                catch (Exception ex)
 59                {
 060                    if (ctx.Database.CurrentTransaction is not null)
 061                        await ctx.Database.RollbackTransactionAsync();
 62
 063                    ctx.ChangeTracker.Clear();
 064                    ctx.Attach(evt);
 065                    evt.Status = ReceivedWebhookEventStatus.Error;
 066                    evt.Error = ex.Message + ex.InnerException?.Message;
 067                    evt.ProcessedAt = DateTime.UtcNow;
 068                    await ctx.SaveChangesAsync();
 69
 070                    activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
 071                    activity?.AddException(ex);
 072                }
 073            }
 74        }
 75
 076        if (commandsCreated)
 77        {
 078            var scheduler = await scope.ServiceProvider.GetRequiredService<ISchedulerFactory>().GetScheduler();
 079            await scheduler.TriggerCommandsProcessorJob();
 80        }
 081    }
 82
 083    private static readonly ConcurrentDictionary<string, IWebhookEventInvoker> _invokers = new();
 84    private static IWebhookEventInvoker GetInvoker(string eventType)
 85    {
 086        return _invokers.GetOrAdd(eventType, type =>
 087        {
 088            WebhookConfigs.TryGetEventType(type, out var webhookEventType);
 089            var invokerType = typeof(WebhookEventInvoker<>).MakeGenericType(webhookEventType);
 090            return (IWebhookEventInvoker)Activator.CreateInstance(invokerType)!;
 091        });
 92    }
 93
 94    /// <summary>
 95    /// See <see cref="ReceivedWebhookEventStatus"/> for status mapping
 96    /// </summary>
 097    private static readonly string Sql = @"
 098        UPDATE web.received_webhook_events
 099        SET processor_id = {0}, status = 1
 0100        WHERE ctid IN (
 0101            SELECT ctid
 0102            FROM web.received_webhook_events
 0103            WHERE processor_id IS NULL AND status = 0
 0104            ORDER BY created_at
 0105            FOR UPDATE SKIP LOCKED
 0106            LIMIT 10
 0107        )
 0108        RETURNING *;
 0109    ";
 110}