| | 1 | | using Dapper; |
| | 2 | | using Npgsql; |
| | 3 | |
|
| | 4 | | namespace Syki.Daemon.Events; |
| | 5 | |
|
| 2 | 6 | | public class DomainEventsProcessorDbListener(IConfiguration configuration, IServiceScopeFactory serviceScopeFactory) : B |
| | 7 | | { |
| 2 | 8 | | private readonly SemaphoreSlim _throttler = new(5); |
| | 9 | |
|
| | 10 | | protected override async Task ExecuteAsync(CancellationToken stoppingToken) |
| | 11 | | { |
| 2 | 12 | | await using var connection = new NpgsqlConnection(configuration.Database().ConnectionString); |
| 2 | 13 | | await connection.OpenAsync(stoppingToken); |
| | 14 | |
|
| 2 | 15 | | await CreateTrigger(connection); |
| | 16 | |
|
| 2 | 17 | | _ = Task.Run(async () => |
| 2 | 18 | | { |
| 2 | 19 | | var processor = new DomainEventsProcessor(serviceScopeFactory); |
| 2 | 20 | | await processor.Run(); |
| 4 | 21 | | }, CancellationToken.None); |
| | 22 | |
|
| 2 | 23 | | connection.Notification += async (o, e) => |
| 2 | 24 | | { |
| 2514 | 25 | | if (!await _throttler.WaitAsync(0, CancellationToken.None)) |
| 0 | 26 | | return; |
| 2 | 27 | |
|
| 2514 | 28 | | _ = Task.Run(async () => |
| 2514 | 29 | | { |
| 2514 | 30 | | try |
| 2514 | 31 | | { |
| 2514 | 32 | | var processor = new DomainEventsProcessor(serviceScopeFactory); |
| 2514 | 33 | | await processor.Run(); |
| 2514 | 34 | | } |
| 0 | 35 | | catch (Exception ex) |
| 2514 | 36 | | { |
| 0 | 37 | | Console.WriteLine($"Error processing domain event: {ex.Message}"); |
| 0 | 38 | | } |
| 2514 | 39 | | finally |
| 2514 | 40 | | { |
| 2514 | 41 | | _throttler.Release(); |
| 2514 | 42 | | } |
| 5028 | 43 | | }, CancellationToken.None); |
| 2516 | 44 | | }; |
| | 45 | |
|
| 2 | 46 | | await using (var cmd = new NpgsqlCommand("LISTEN new_domain_event;", connection)) |
| | 47 | | { |
| 2 | 48 | | await cmd.ExecuteNonQueryAsync(stoppingToken); |
| | 49 | | } |
| | 50 | |
|
| 2518 | 51 | | while (!stoppingToken.IsCancellationRequested) |
| | 52 | | { |
| 2516 | 53 | | await connection.WaitAsync(stoppingToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); |
| | 54 | | } |
| 2 | 55 | | } |
| | 56 | |
|
| | 57 | | private static async Task CreateTrigger(NpgsqlConnection connection) |
| | 58 | | { |
| | 59 | | const string sql = @" |
| | 60 | | CREATE OR REPLACE FUNCTION notify_new_domain_event_function() |
| | 61 | | RETURNS trigger |
| | 62 | | LANGUAGE 'plpgsql' |
| | 63 | | AS $BODY$ |
| | 64 | | BEGIN |
| | 65 | | PERFORM pg_notify('new_domain_event', ''); |
| | 66 | | RETURN NEW; |
| | 67 | | END |
| | 68 | | $BODY$; |
| | 69 | |
|
| | 70 | | CREATE OR REPLACE TRIGGER notify_new_domain_event_trigger |
| | 71 | | AFTER INSERT ON syki.domain_events |
| | 72 | | EXECUTE PROCEDURE notify_new_domain_event_function(); |
| | 73 | | "; |
| | 74 | |
|
| 2 | 75 | | await connection.ExecuteAsync(sql); |
| 2 | 76 | | } |
| | 77 | | } |