| | 1 | | using Newtonsoft.Json; |
| | 2 | | using System.Diagnostics; |
| | 3 | | using Syki.Back.Database; |
| | 4 | | using Syki.Daemon.Configs; |
| | 5 | | using System.Collections.Concurrent; |
| | 6 | |
|
| | 7 | | namespace Syki.Daemon.Commands; |
| | 8 | |
|
| 2380 | 9 | | public class CommandsProcessor(IServiceScopeFactory serviceScopeFactory) |
| | 10 | | { |
| 2 | 11 | | private static readonly ActivitySource _activitySource = new (OpenTelemetryConfigs.CommandsProcessing); |
| | 12 | |
|
| | 13 | | public async Task Run() |
| | 14 | | { |
| 2380 | 15 | | using var scope = serviceScopeFactory.CreateScope(); |
| 2380 | 16 | | var ctx = scope.ServiceProvider.GetRequiredService<SykiDbContext>(); |
| | 17 | |
|
| 2380 | 18 | | await Process(scope, ctx, Guid.CreateVersion7()); |
| 2380 | 19 | | } |
| | 20 | |
|
| | 21 | | private static async Task Process(IServiceScope scope, SykiDbContext ctx, Guid processorId) |
| | 22 | | { |
| 4698 | 23 | | var commands = await ctx.Commands.FromSqlRaw(Sql, processorId).AsNoTracking().ToListAsync(); |
| 7078 | 24 | | if (commands.Count == 0) return; |
| | 25 | |
|
| 2318 | 26 | | var sw = Stopwatch.StartNew(); |
| | 27 | |
|
| 10012 | 28 | | foreach (var command in commands) |
| | 29 | | { |
| 2688 | 30 | | using (var activity = _activitySource.StartActivity($"Process {command.Type.Split('.').Last()}", ActivityKin |
| | 31 | | { |
| 2688 | 32 | | activity?.SetTag("command.id", command.Id); |
| 2688 | 33 | | activity?.SetTag("command.type", command.Type.Split('.').Last()); |
| 2688 | 34 | | activity?.SetTag("command.institutionId", command.InstitutionId); |
| | 35 | |
|
| | 36 | | try |
| | 37 | | { |
| 2688 | 38 | | sw.Restart(); |
| | 39 | |
|
| 2688 | 40 | | ctx.ChangeTracker.Clear(); |
| 2688 | 41 | | ctx.Attach(command); |
| | 42 | |
|
| 2688 | 43 | | await ctx.Database.BeginTransactionAsync(); |
| 2688 | 44 | | ctx.Database.AutoSavepointsEnabled = false; |
| | 45 | |
|
| 2688 | 46 | | dynamic data = GetData(command); |
| 2688 | 47 | | dynamic handler = GetHandler(scope, command); |
| 2688 | 48 | | await handler.Handle(command.Id, data); |
| 2688 | 49 | | } |
| 0 | 50 | | catch (Exception ex) |
| | 51 | | { |
| 0 | 52 | | ctx.ChangeTracker.Clear(); |
| 0 | 53 | | ctx.Attach(command); |
| 0 | 54 | | command.Error = ex.Message + ex.InnerException?.Message; |
| 0 | 55 | | activity?.SetStatus(ActivityStatusCode.Error, ex.Message); |
| 0 | 56 | | activity?.AddException(ex); |
| 0 | 57 | | } |
| | 58 | |
|
| 2688 | 59 | | sw.Stop(); |
| 2688 | 60 | | command.Processed(sw.Elapsed.TotalMilliseconds); |
| 2688 | 61 | | activity?.SetTag("command.duration", command.Duration); |
| | 62 | |
|
| 2688 | 63 | | await ctx.SaveChangesAsync(); |
| 2688 | 64 | | await ctx.Database.CommitTransactionAsync(); |
| 2688 | 65 | | } |
| 2688 | 66 | | } |
| | 67 | |
|
| 2318 | 68 | | await Process(scope, ctx, processorId); |
| 4698 | 69 | | } |
| | 70 | |
|
| 2 | 71 | | private static readonly ConcurrentDictionary<string, Type> _types = new(); |
| | 72 | | private static dynamic GetData(Command command) |
| | 73 | | { |
| 2688 | 74 | | var type = _types.GetOrAdd(command.Type, typeof(Command).Assembly.GetType(command.Type)!); |
| 2688 | 75 | | dynamic data = JsonConvert.DeserializeObject(command.Data, type)!; |
| 2688 | 76 | | return data; |
| | 77 | | } |
| | 78 | |
|
| 2 | 79 | | private static readonly ConcurrentDictionary<string, Type> _handlers = new(); |
| | 80 | | private static dynamic GetHandler(IServiceScope scope, Command command) |
| | 81 | | { |
| 2688 | 82 | | var handlerType = _handlers.GetOrAdd(command.Type, typeof(ICommand).Assembly.GetType($"{command.Type}Handler")!) |
| 2688 | 83 | | dynamic handler = scope.ServiceProvider.GetRequiredService(handlerType); |
| 2688 | 84 | | return handler; |
| | 85 | | } |
| | 86 | |
|
| 2 | 87 | | private static readonly string Sql = @" |
| 2 | 88 | | UPDATE syki.commands |
| 2 | 89 | | SET processor_id = {0}, status = 'Processing' |
| 2 | 90 | | WHERE ctid IN ( |
| 2 | 91 | | SELECT ctid |
| 2 | 92 | | FROM syki.commands |
| 2 | 93 | | WHERE processor_id IS NULL AND (not_before IS NULL OR not_before < NOW()) |
| 2 | 94 | | ORDER BY created_at |
| 2 | 95 | | FOR UPDATE SKIP LOCKED |
| 2 | 96 | | LIMIT 10 |
| 2 | 97 | | ) |
| 2 | 98 | | RETURNING *; |
| 2 | 99 | | "; |
| | 100 | | } |