| | 1 | | using Dapper; |
| | 2 | | using Npgsql; |
| | 3 | | using Newtonsoft.Json; |
| | 4 | |
|
| | 5 | | namespace Syki.Daemon.Tasks; |
| | 6 | |
|
| 797 | 7 | | public class SykiTasksProcessor(IConfiguration configuration, IServiceScopeFactory serviceScopeFactory) |
| | 8 | | { |
| | 9 | | public async Task Run() |
| | 10 | | { |
| 797 | 11 | | using var scope = serviceScopeFactory.CreateScope(); |
| 797 | 12 | | await using var connection = new NpgsqlConnection(configuration.Database().ConnectionString); |
| | 13 | |
|
| | 14 | | const string pickRowsSql = @" |
| | 15 | | UPDATE syki.tasks |
| | 16 | | SET processor_id = @ProcessorId |
| | 17 | | WHERE processor_id IS NULL |
| | 18 | | "; |
| | 19 | |
|
| 797 | 20 | | var processorId = Guid.NewGuid(); |
| 797 | 21 | | var rows = await connection.ExecuteAsync(pickRowsSql, new { processorId }); |
| 797 | 22 | | if (rows == 0) return; |
| | 23 | |
|
| | 24 | | const string sql = @" |
| | 25 | | SELECT * FROM syki.tasks |
| | 26 | | WHERE processor_id = @ProcessorId AND processed_at IS NULL |
| | 27 | | "; |
| | 28 | |
|
| 750 | 29 | | var tasks = await connection.QueryAsync<SykiTask>(sql, new { processorId }); |
| | 30 | |
|
| 3446 | 31 | | foreach (var task in tasks) |
| | 32 | | { |
| 973 | 33 | | dynamic data = GetData(task); |
| 973 | 34 | | dynamic handler = GetHandler(scope, task); |
| 973 | 35 | | string? error = null; |
| | 36 | |
|
| | 37 | | try |
| | 38 | | { |
| 973 | 39 | | await handler.Handle(data); |
| 973 | 40 | | } |
| 0 | 41 | | catch (Exception ex) |
| | 42 | | { |
| 0 | 43 | | error = ex.Message + ex.InnerException?.Message; |
| 0 | 44 | | } |
| | 45 | |
|
| | 46 | | const string update = @" |
| | 47 | | UPDATE syki.tasks |
| | 48 | | SET processed_at = now(), error = @Error |
| | 49 | | WHERE id = @Id |
| | 50 | | "; |
| | 51 | |
|
| 973 | 52 | | await connection.ExecuteAsync(update, new { task.Id, error }); |
| 973 | 53 | | } |
| 797 | 54 | | } |
| | 55 | |
|
| | 56 | | private static dynamic GetData(SykiTask task) |
| | 57 | | { |
| 973 | 58 | | var type = typeof(SykiTask).Assembly.GetType(task.Type)!; |
| 973 | 59 | | dynamic data = JsonConvert.DeserializeObject(task.Data, type)!; |
| 973 | 60 | | return data; |
| | 61 | | } |
| | 62 | |
|
| | 63 | | private static dynamic GetHandler(IServiceScope scope, SykiTask task) |
| | 64 | | { |
| 973 | 65 | | var type = task.Type.Split(".").Last(); |
| 973 | 66 | | var handlerType = typeof(SykiTasksProcessor).Assembly.GetType($"Syki.Daemon.Tasks.{type}Handler")!; |
| 973 | 67 | | dynamic handler = scope.ServiceProvider.GetRequiredService(handlerType); |
| 973 | 68 | | return handler; |
| | 69 | | } |
| | 70 | | } |