| | | 1 | | using Dapper; |
| | | 2 | | using Npgsql; |
| | | 3 | | |
| | | 4 | | namespace Syki.Daemon.Events; |
| | | 5 | | |
| | 2 | 6 | | public class DomainEventsProcessorDbListener(IConfiguration configuration, IServiceScopeFactory serviceScopeFactory) : B |
| | | 7 | | { |
| | 2 | 8 | | private readonly SemaphoreSlim _throttler = new(5); |
| | | 9 | | |
| | | 10 | | protected override async Task ExecuteAsync(CancellationToken stoppingToken) |
| | | 11 | | { |
| | 2 | 12 | | await using var connection = new NpgsqlConnection(configuration.Database().ConnectionString); |
| | 2 | 13 | | await connection.OpenAsync(stoppingToken); |
| | | 14 | | |
| | 2 | 15 | | await CreateTrigger(connection); |
| | | 16 | | |
| | 2 | 17 | | _ = Task.Run(async () => |
| | 2 | 18 | | { |
| | 2 | 19 | | var processor = new DomainEventsProcessor(serviceScopeFactory); |
| | 2 | 20 | | await processor.Run(); |
| | 4 | 21 | | }, CancellationToken.None); |
| | | 22 | | |
| | 2 | 23 | | connection.Notification += async (o, e) => |
| | 2 | 24 | | { |
| | 2656 | 25 | | if (!await _throttler.WaitAsync(0, CancellationToken.None)) |
| | 0 | 26 | | return; |
| | 2 | 27 | | |
| | 2656 | 28 | | _ = Task.Run(async () => |
| | 2656 | 29 | | { |
| | 2656 | 30 | | try |
| | 2656 | 31 | | { |
| | 2656 | 32 | | var processor = new DomainEventsProcessor(serviceScopeFactory); |
| | 2656 | 33 | | await processor.Run(); |
| | 2656 | 34 | | } |
| | 0 | 35 | | catch (Exception ex) |
| | 2656 | 36 | | { |
| | 0 | 37 | | Console.WriteLine($"Error processing domain event: {ex.Message}"); |
| | 0 | 38 | | } |
| | 2656 | 39 | | finally |
| | 2656 | 40 | | { |
| | 2656 | 41 | | _throttler.Release(); |
| | 2656 | 42 | | } |
| | 5312 | 43 | | }, CancellationToken.None); |
| | 2658 | 44 | | }; |
| | | 45 | | |
| | 2 | 46 | | await using (var cmd = new NpgsqlCommand("LISTEN new_domain_event;", connection)) |
| | | 47 | | { |
| | 2 | 48 | | await cmd.ExecuteNonQueryAsync(stoppingToken); |
| | | 49 | | } |
| | | 50 | | |
| | 2660 | 51 | | while (!stoppingToken.IsCancellationRequested) |
| | | 52 | | { |
| | 2658 | 53 | | await connection.WaitAsync(stoppingToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); |
| | | 54 | | } |
| | 2 | 55 | | } |
| | | 56 | | |
| | | 57 | | private static async Task CreateTrigger(NpgsqlConnection connection) |
| | | 58 | | { |
| | | 59 | | const string sql = @" |
| | | 60 | | CREATE OR REPLACE FUNCTION notify_new_domain_event_function() |
| | | 61 | | RETURNS trigger |
| | | 62 | | LANGUAGE 'plpgsql' |
| | | 63 | | AS $BODY$ |
| | | 64 | | BEGIN |
| | | 65 | | PERFORM pg_notify('new_domain_event', ''); |
| | | 66 | | RETURN NEW; |
| | | 67 | | END |
| | | 68 | | $BODY$; |
| | | 69 | | |
| | | 70 | | CREATE OR REPLACE TRIGGER notify_new_domain_event_trigger |
| | | 71 | | AFTER INSERT ON syki.domain_events |
| | | 72 | | EXECUTE PROCEDURE notify_new_domain_event_function(); |
| | | 73 | | "; |
| | | 74 | | |
| | 2 | 75 | | await connection.ExecuteAsync(sql); |
| | 2 | 76 | | } |
| | | 77 | | } |