Event-Driven Architecture
eventsarchitecturepatterns+1

Event-Driven Architecture Patterns in C#: A Complete Developer Guide

Master Event-Driven Architecture patterns with this comprehensive guide. Learn how to build scalable, resilient systems using C#, MassTransit, RabbitMQ, and Azure Service Bus. Explore real-world implementation patterns including Event Sourcing, CQRS, Saga patterns, and modern microservices communication strategies used by industry leaders.

JG
Junrill GalvezAuthor
15 min read
27 views

Event-Driven Architecture (EDA) has become the backbone of modern distributed systems. After spending years building everything from e-commerce platforms to financial trading systems, I've learned that understanding EDA patterns isn't just academic—it's essential for creating systems that can handle real-world complexity and scale.

Whether you're working on microservices, building reactive systems, or designing cloud-native applications, event-driven patterns will fundamentally change how you think about system design.

Why Event-Driven Architecture Matters

In traditional synchronous systems, services are tightly coupled through direct API calls. Event-driven architecture changes this by using asynchronous events to coordinate different components of a system, creating several key advantages:

  • Loose Coupling - Services don't need to know about each other directly
  • Scalability - Components can scale independently based on event load
  • Resilience - System continues functioning even if individual services fail
  • Real-time Responsiveness - Events enable immediate reactions to state changes
  • Business Alignment - Events naturally model business processes

An event is simply a change in state, or an update, like an item being placed in a shopping cart on an e-commerce website. But the power lies in how we architect systems around these events.

Core Event-Driven Patterns

1. Publish-Subscribe (Pub/Sub) Pattern

The foundation of event-driven systems where publishers emit events without knowing who will consume them.

1// Event definition 2public record OrderCreatedEvent( 3 Guid OrderId, 4 Guid CustomerId, 5 decimal TotalAmount, 6 DateTime CreatedAt); 7 8// Publisher service 9public class OrderService 10{ 11 private readonly IMessagePublisher _publisher; 12 13 public OrderService(IMessagePublisher publisher) 14 { 15 _publisher = publisher; 16 } 17 18 public async Task<Order> CreateOrderAsync(CreateOrderRequest request) 19 { 20 var order = new Order 21 { 22 Id = Guid.NewGuid(), 23 CustomerId = request.CustomerId, 24 TotalAmount = request.TotalAmount, 25 CreatedAt = DateTime.UtcNow 26 }; 27 28 // Save to database 29 await SaveOrderAsync(order); 30 31 // Publish event - decoupled from consumers 32 await _publisher.PublishAsync(new OrderCreatedEvent( 33 order.Id, 34 order.CustomerId, 35 order.TotalAmount, 36 order.CreatedAt)); 37 38 return order; 39 } 40} 41 42// Consumer services 43public class InventoryService : IConsumer<OrderCreatedEvent> 44{ 45 public async Task ConsumeAsync(OrderCreatedEvent @event) 46 { 47 // Reserve inventory items 48 await ReserveInventoryAsync(@event.OrderId); 49 Console.WriteLine($"Inventory reserved for order {@event.OrderId}"); 50 } 51} 52 53public class EmailService : IConsumer<OrderCreatedEvent> 54{ 55 public async Task ConsumeAsync(OrderCreatedEvent @event) 56 { 57 // Send confirmation email 58 await SendOrderConfirmationAsync(@event.CustomerId, @event.OrderId); 59 Console.WriteLine($"Order confirmation sent for {@event.OrderId}"); 60 } 61}

2. Event Sourcing Pattern

Instead of storing current state, store the sequence of events that led to that state.

1// Domain events 2public abstract record DomainEvent(Guid AggregateId, DateTime OccurredAt); 3 4public record AccountOpenedEvent(Guid AccountId, string AccountNumber, decimal InitialBalance, DateTime OccurredAt) 5 : DomainEvent(AccountId, OccurredAt); 6 7public record MoneyDepositedEvent(Guid AccountId, decimal Amount, DateTime OccurredAt) 8 : DomainEvent(AccountId, OccurredAt); 9 10public record MoneyWithdrawnEvent(Guid AccountId, decimal Amount, DateTime OccurredAt) 11 : DomainEvent(AccountId, OccurredAt); 12 13// Aggregate that builds state from events 14public class BankAccount 15{ 16 public Guid Id { get; private set; } 17 public string AccountNumber { get; private set; } = string.Empty; 18 public decimal Balance { get; private set; } 19 public DateTime CreatedAt { get; private set; } 20 21 private readonly List<DomainEvent> _uncommittedEvents = new(); 22 23 // Create account from events (Event Sourcing) 24 public static BankAccount FromEvents(IEnumerable<DomainEvent> events) 25 { 26 var account = new BankAccount(); 27 foreach (var @event in events) 28 { 29 account.Apply(@event); 30 } 31 return account; 32 } 33 34 // Apply events to build state 35 private void Apply(DomainEvent @event) 36 { 37 switch (@event) 38 { 39 case AccountOpenedEvent opened: 40 Id = opened.AccountId; 41 AccountNumber = opened.AccountNumber; 42 Balance = opened.InitialBalance; 43 CreatedAt = opened.OccurredAt; 44 break; 45 46 case MoneyDepositedEvent deposited: 47 Balance += deposited.Amount; 48 break; 49 50 case MoneyWithdrawnEvent withdrawn: 51 Balance -= withdrawn.Amount; 52 break; 53 } 54 } 55 56 // Business operations that produce events 57 public void Deposit(decimal amount) 58 { 59 if (amount <= 0) 60 throw new ArgumentException("Amount must be positive"); 61 62 var @event = new MoneyDepositedEvent(Id, amount, DateTime.UtcNow); 63 Apply(@event); 64 _uncommittedEvents.Add(@event); 65 } 66 67 public void Withdraw(decimal amount) 68 { 69 if (amount <= 0) 70 throw new ArgumentException("Amount must be positive"); 71 if (Balance < amount) 72 throw new InvalidOperationException("Insufficient funds"); 73 74 var @event = new MoneyWithdrawnEvent(Id, amount, DateTime.UtcNow); 75 Apply(@event); 76 _uncommittedEvents.Add(@event); 77 } 78 79 public IReadOnlyList<DomainEvent> GetUncommittedEvents() => _uncommittedEvents.AsReadOnly(); 80 81 public void MarkEventsAsCommitted() => _uncommittedEvents.Clear(); 82} 83 84// Event store implementation 85public interface IEventStore 86{ 87 Task SaveEventsAsync(Guid aggregateId, IEnumerable<DomainEvent> events); 88 Task<IEnumerable<DomainEvent>> GetEventsAsync(Guid aggregateId); 89} 90 91public class EventStore : IEventStore 92{ 93 private readonly IDbContext _context; 94 95 public async Task SaveEventsAsync(Guid aggregateId, IEnumerable<DomainEvent> events) 96 { 97 foreach (var @event in events) 98 { 99 var eventData = new EventData 100 { 101 AggregateId = aggregateId, 102 EventType = @event.GetType().Name, 103 EventData = JsonSerializer.Serialize(@event), 104 OccurredAt = @event.OccurredAt 105 }; 106 107 _context.Events.Add(eventData); 108 } 109 110 await _context.SaveChangesAsync(); 111 } 112 113 public async Task<IEnumerable<DomainEvent>> GetEventsAsync(Guid aggregateId) 114 { 115 var events = await _context.Events 116 .Where(e => e.AggregateId == aggregateId) 117 .OrderBy(e => e.OccurredAt) 118 .ToListAsync(); 119 120 return events.Select(DeserializeEvent); 121 } 122}

3. CQRS (Command Query Responsibility Segregation)

CQRS is often combined with event-driven patterns to handle command and query operations separately.

1// Commands (Write Operations) 2public record CreateProductCommand(string Name, decimal Price, int StockQuantity); 3public record UpdateProductPriceCommand(Guid ProductId, decimal NewPrice); 4 5// Queries (Read Operations) 6public record GetProductQuery(Guid ProductId); 7public record SearchProductsQuery(string SearchTerm, int PageSize, int Page); 8 9// Command Handler 10public class ProductCommandHandler 11{ 12 private readonly IEventStore _eventStore; 13 private readonly IMessagePublisher _publisher; 14 15 public async Task<Guid> HandleAsync(CreateProductCommand command) 16 { 17 var productId = Guid.NewGuid(); 18 var @event = new ProductCreatedEvent( 19 productId, 20 command.Name, 21 command.Price, 22 command.StockQuantity, 23 DateTime.UtcNow); 24 25 await _eventStore.SaveEventsAsync(productId, new[] { @event }); 26 await _publisher.PublishAsync(@event); 27 28 return productId; 29 } 30} 31 32// Query Handler (reads from optimized read models) 33public class ProductQueryHandler 34{ 35 private readonly IProductReadRepository _readRepository; 36 37 public async Task<ProductReadModel?> HandleAsync(GetProductQuery query) 38 { 39 return await _readRepository.GetByIdAsync(query.ProductId); 40 } 41 42 public async Task<PagedResult<ProductReadModel>> HandleAsync(SearchProductsQuery query) 43 { 44 return await _readRepository.SearchAsync(query.SearchTerm, query.Page, query.PageSize); 45 } 46} 47 48// Read Model Projector (updates read models from events) 49public class ProductProjector : IConsumer<ProductCreatedEvent>, IConsumer<ProductPriceUpdatedEvent> 50{ 51 private readonly IProductReadRepository _readRepository; 52 53 public async Task ConsumeAsync(ProductCreatedEvent @event) 54 { 55 var readModel = new ProductReadModel 56 { 57 Id = @event.ProductId, 58 Name = @event.Name, 59 Price = @event.Price, 60 StockQuantity = @event.StockQuantity, 61 CreatedAt = @event.OccurredAt 62 }; 63 64 await _readRepository.SaveAsync(readModel); 65 } 66 67 public async Task ConsumeAsync(ProductPriceUpdatedEvent @event) 68 { 69 var product = await _readRepository.GetByIdAsync(@event.ProductId); 70 if (product != null) 71 { 72 product.Price = @event.NewPrice; 73 product.UpdatedAt = @event.OccurredAt; 74 await _readRepository.UpdateAsync(product); 75 } 76 } 77}

4. Saga Pattern (Process Manager)

Manages long-running business processes that span multiple services.

1// Saga state 2public class OrderProcessingSaga 3{ 4 public Guid OrderId { get; set; } 5 public Guid CustomerId { get; set; } 6 public decimal TotalAmount { get; set; } 7 public bool PaymentProcessed { get; set; } 8 public bool InventoryReserved { get; set; } 9 public bool ShippingScheduled { get; set; } 10 public SagaStatus Status { get; set; } = SagaStatus.Started; 11} 12 13public enum SagaStatus 14{ 15 Started, 16 PaymentProcessing, 17 InventoryReserving, 18 Shipping, 19 Completed, 20 Failed, 21 Compensating 22} 23 24// Saga implementation 25public class OrderProcessingSagaHandler : 26 IConsumer<OrderCreatedEvent>, 27 IConsumer<PaymentProcessedEvent>, 28 IConsumer<PaymentFailedEvent>, 29 IConsumer<InventoryReservedEvent>, 30 IConsumer<InventoryReservationFailedEvent>, 31 IConsumer<ShippingScheduledEvent> 32{ 33 private readonly ISagaRepository _sagaRepository; 34 private readonly IMessagePublisher _publisher; 35 36 public async Task ConsumeAsync(OrderCreatedEvent @event) 37 { 38 var saga = new OrderProcessingSaga 39 { 40 OrderId = @event.OrderId, 41 CustomerId = @event.CustomerId, 42 TotalAmount = @event.TotalAmount, 43 Status = SagaStatus.PaymentProcessing 44 }; 45 46 await _sagaRepository.SaveAsync(saga); 47 48 // Start the saga by requesting payment 49 await _publisher.PublishAsync(new ProcessPaymentCommand( 50 @event.OrderId, 51 @event.CustomerId, 52 @event.TotalAmount)); 53 } 54 55 public async Task ConsumeAsync(PaymentProcessedEvent @event) 56 { 57 var saga = await _sagaRepository.GetByOrderIdAsync(@event.OrderId); 58 saga.PaymentProcessed = true; 59 saga.Status = SagaStatus.InventoryReserving; 60 61 await _sagaRepository.SaveAsync(saga); 62 63 // Next step: reserve inventory 64 await _publisher.PublishAsync(new ReserveInventoryCommand(@event.OrderId)); 65 } 66 67 public async Task ConsumeAsync(PaymentFailedEvent @event) 68 { 69 var saga = await _sagaRepository.GetByOrderIdAsync(@event.OrderId); 70 saga.Status = SagaStatus.Failed; 71 72 await _sagaRepository.SaveAsync(saga); 73 74 // Publish order failed event 75 await _publisher.PublishAsync(new OrderFailedEvent(@event.OrderId, "Payment failed")); 76 } 77 78 public async Task ConsumeAsync(InventoryReservedEvent @event) 79 { 80 var saga = await _sagaRepository.GetByOrderIdAsync(@event.OrderId); 81 saga.InventoryReserved = true; 82 saga.Status = SagaStatus.Shipping; 83 84 await _sagaRepository.SaveAsync(saga); 85 86 // Final step: schedule shipping 87 await _publisher.PublishAsync(new ScheduleShippingCommand(@event.OrderId)); 88 } 89 90 public async Task ConsumeAsync(InventoryReservationFailedEvent @event) 91 { 92 var saga = await _sagaRepository.GetByOrderIdAsync(@event.OrderId); 93 saga.Status = SagaStatus.Compensating; 94 95 await _sagaRepository.SaveAsync(saga); 96 97 // Compensate: refund payment 98 await _publisher.PublishAsync(new RefundPaymentCommand(@event.OrderId, saga.TotalAmount)); 99 } 100 101 public async Task ConsumeAsync(ShippingScheduledEvent @event) 102 { 103 var saga = await _sagaRepository.GetByOrderIdAsync(@event.OrderId); 104 saga.ShippingScheduled = true; 105 saga.Status = SagaStatus.Completed; 106 107 await _sagaRepository.SaveAsync(saga); 108 109 // Order processing complete 110 await _publisher.PublishAsync(new OrderCompletedEvent(@event.OrderId)); 111 } 112}

Implementation with MassTransit and RabbitMQ

MassTransit with RabbitMQ is a popular choice for implementing event-driven architecture in .NET applications. Here's a practical setup:

1// Program.cs - Service configuration 2public class Program 3{ 4 public static void Main(string[] args) 5 { 6 var builder = WebApplication.CreateBuilder(args); 7 8 // Add MassTransit with RabbitMQ 9 builder.Services.AddMassTransit(x => 10 { 11 // Register consumers 12 x.AddConsumer<OrderCreatedEventConsumer>(); 13 x.AddConsumer<InventoryConsumer>(); 14 x.AddConsumer<EmailConsumer>(); 15 x.AddConsumer<OrderProcessingSagaHandler>(); 16 17 x.UsingRabbitMq((context, cfg) => 18 { 19 cfg.Host("localhost", "/", h => 20 { 21 h.Username("guest"); 22 h.Password("guest"); 23 }); 24 25 // Configure endpoints 26 cfg.ReceiveEndpoint("order-events", e => 27 { 28 e.ConfigureConsumer<OrderCreatedEventConsumer>(context); 29 }); 30 31 cfg.ReceiveEndpoint("inventory-events", e => 32 { 33 e.ConfigureConsumer<InventoryConsumer>(context); 34 }); 35 36 cfg.ReceiveEndpoint("email-events", e => 37 { 38 e.ConfigureConsumer<EmailConsumer>(context); 39 }); 40 41 cfg.ReceiveEndpoint("order-saga", e => 42 { 43 e.ConfigureConsumer<OrderProcessingSagaHandler>(context); 44 }); 45 }); 46 }); 47 48 var app = builder.Build(); 49 app.Run(); 50 } 51} 52 53// Message publisher interface 54public interface IMessagePublisher 55{ 56 Task PublishAsync<T>(T message) where T : class; 57} 58 59// MassTransit implementation 60public class MassTransitMessagePublisher : IMessagePublisher 61{ 62 private readonly IPublishEndpoint _publishEndpoint; 63 64 public MassTransitMessagePublisher(IPublishEndpoint publishEndpoint) 65 { 66 _publishEndpoint = publishEndpoint; 67 } 68 69 public async Task PublishAsync<T>(T message) where T : class 70 { 71 await _publishEndpoint.Publish(message); 72 } 73}

Event-Driven Architecture with Azure Service Bus

For cloud-native applications, Azure Service Bus provides enterprise-grade messaging:

1// Azure Service Bus configuration 2public static class ServiceBusExtensions 3{ 4 public static IServiceCollection AddAzureServiceBus( 5 this IServiceCollection services, 6 string connectionString) 7 { 8 services.AddMassTransit(x => 9 { 10 x.AddConsumer<OrderCreatedEventConsumer>(); 11 x.AddConsumer<PaymentConsumer>(); 12 13 x.UsingAzureServiceBus((context, cfg) => 14 { 15 cfg.Host(connectionString); 16 17 cfg.SubscriptionEndpoint<OrderCreatedEvent>("order-processing", e => 18 { 19 e.ConfigureConsumer<OrderCreatedEventConsumer>(context); 20 }); 21 22 cfg.SubscriptionEndpoint<PaymentProcessedEvent>("payment-processing", e => 23 { 24 e.ConfigureConsumer<PaymentConsumer>(context); 25 }); 26 }); 27 }); 28 29 return services; 30 } 31} 32 33// Usage in Program.cs 34builder.Services.AddAzureServiceBus(builder.Configuration.GetConnectionString("ServiceBus"));

Handling Event Processing Challenges

The Outbox Pattern

The Outbox Pattern ensures reliable event publishing by storing events in the same database transaction as business data:

1public class OutboxEvent 2{ 3 public Guid Id { get; set; } 4 public string EventType { get; set; } = string.Empty; 5 public string EventData { get; set; } = string.Empty; 6 public DateTime CreatedAt { get; set; } 7 public bool Published { get; set; } 8} 9 10public class OrderService 11{ 12 private readonly IDbContext _context; 13 14 public async Task<Order> CreateOrderAsync(CreateOrderRequest request) 15 { 16 using var transaction = await _context.Database.BeginTransactionAsync(); 17 18 try 19 { 20 // Save business data 21 var order = new Order { /* ... */ }; 22 _context.Orders.Add(order); 23 24 // Save event to outbox (same transaction) 25 var eventData = new OrderCreatedEvent(order.Id, order.CustomerId, order.TotalAmount, DateTime.UtcNow); 26 var outboxEvent = new OutboxEvent 27 { 28 Id = Guid.NewGuid(), 29 EventType = nameof(OrderCreatedEvent), 30 EventData = JsonSerializer.Serialize(eventData), 31 CreatedAt = DateTime.UtcNow, 32 Published = false 33 }; 34 35 _context.OutboxEvents.Add(outboxEvent); 36 37 await _context.SaveChangesAsync(); 38 await transaction.CommitAsync(); 39 40 return order; 41 } 42 catch 43 { 44 await transaction.RollbackAsync(); 45 throw; 46 } 47 } 48} 49 50// Background service to publish outbox events 51public class OutboxPublisher : BackgroundService 52{ 53 private readonly IServiceProvider _serviceProvider; 54 55 protected override async Task ExecuteAsync(CancellationToken stoppingToken) 56 { 57 while (!stoppingToken.IsCancellationRequested) 58 { 59 using var scope = _serviceProvider.CreateScope(); 60 var context = scope.ServiceProvider.GetRequiredService<IDbContext>(); 61 var publisher = scope.ServiceProvider.GetRequiredService<IMessagePublisher>(); 62 63 var unpublishedEvents = await context.OutboxEvents 64 .Where(e => !e.Published) 65 .OrderBy(e => e.CreatedAt) 66 .Take(100) 67 .ToListAsync(stoppingToken); 68 69 foreach (var outboxEvent in unpublishedEvents) 70 { 71 try 72 { 73 var eventData = DeserializeEvent(outboxEvent); 74 await publisher.PublishAsync(eventData); 75 76 outboxEvent.Published = true; 77 await context.SaveChangesAsync(stoppingToken); 78 } 79 catch (Exception ex) 80 { 81 // Log error and continue with next event 82 // Consider implementing retry logic with exponential backoff 83 } 84 } 85 86 await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); 87 } 88 } 89}

Idempotency and Deduplication

1public class IdempotentConsumer<T> : IConsumer<T> where T : class, IEventWithId 2{ 3 private readonly IProcessedEventStore _processedEventStore; 4 private readonly IConsumer<T> _innerConsumer; 5 6 public async Task ConsumeAsync(T @event) 7 { 8 // Check if already processed 9 if (await _processedEventStore.IsProcessedAsync(@event.EventId)) 10 { 11 // Event already processed, skip 12 return; 13 } 14 15 try 16 { 17 // Process the event 18 await _innerConsumer.ConsumeAsync(@event); 19 20 // Mark as processed 21 await _processedEventStore.MarkAsProcessedAsync(@event.EventId); 22 } 23 catch (Exception) 24 { 25 // Don't mark as processed on failure 26 throw; 27 } 28 } 29} 30 31public interface IEventWithId 32{ 33 Guid EventId { get; } 34} 35 36public record OrderCreatedEvent( 37 Guid EventId, // Add event ID for idempotency 38 Guid OrderId, 39 Guid CustomerId, 40 decimal TotalAmount, 41 DateTime CreatedAt) : IEventWithId;

Real-World Industry Applications

E-Commerce Platform

Modern e-commerce platforms like Amazon use event-driven architectures extensively:

  • Order Processing: Order created → Payment processed → Inventory reserved → Shipping scheduled
  • Recommendation Engine: User behavior events → Real-time personalization
  • Inventory Management: Stock level changes → Automatic reordering

Financial Services

Banks and fintech companies rely on event-driven patterns:

  • Transaction Processing: Account debited → Real-time balance updates → Fraud detection
  • Risk Management: Market events → Portfolio rebalancing → Compliance reporting
  • Customer Onboarding: KYC completed → Account activation → Welcome campaign

Streaming Platforms

Netflix, Spotify, and similar platforms use events for:

  • Content Recommendations: View completed → Update user preferences → Generate recommendations
  • Analytics: User interactions → Real-time dashboards → Content optimization
  • Content Distribution: New content uploaded → Global CDN distribution → User notifications

Architecture Deployment Patterns

Centralized Event Hub

Centralized event hub patterns help ensure scalability and fault tolerance in event-driven systems:

1// Centralized event hub configuration 2public class CentralizedEventHub 3{ 4 private readonly IServiceBusClient _serviceBusClient; 5 private readonly Dictionary<Type, string> _eventTopics; 6 7 public async Task PublishAsync<T>(T @event) where T : class 8 { 9 var topicName = _eventTopics[typeof(T)]; 10 await _serviceBusClient.PublishAsync(topicName, @event); 11 } 12 13 public async Task SubscribeAsync<T>(string subscriptionName, Func<T, Task> handler) where T : class 14 { 15 var topicName = _eventTopics[typeof(T)]; 16 await _serviceBusClient.SubscribeAsync(topicName, subscriptionName, handler); 17 } 18}

Decentralized Event Mesh

For large organizations, event mesh provides distributed event routing:

1// Event mesh configuration 2public class EventMeshConfiguration 3{ 4 public Dictionary<string, ServiceEndpoint> Services { get; set; } = new(); 5 public Dictionary<string, List<string>> EventRouting { get; set; } = new(); 6} 7 8public class EventMeshRouter 9{ 10 private readonly EventMeshConfiguration _config; 11 12 public async Task RouteEventAsync<T>(T @event, string sourceService) where T : class 13 { 14 var eventType = typeof(T).Name; 15 16 if (_config.EventRouting.TryGetValue(eventType, out var targetServices)) 17 { 18 var tasks = targetServices.Select(service => 19 ForwardEventToServiceAsync(@event, service)); 20 21 await Task.WhenAll(tasks); 22 } 23 } 24}

Best Practices and Common Pitfalls

Do's

  1. Design Events as Business Facts: Events should represent meaningful business occurrences
  2. Make Events Immutable: Once published, events should never change
  3. Include Event Metadata: Timestamps, correlation IDs, event versions
  4. Plan for Schema Evolution: Use backward-compatible event schemas
  5. Implement Proper Error Handling: Dead letter queues, retry policies, circuit breakers

Don'ts

  1. Don't Include Sensitive Data: Events may be logged or replicated
  2. Don't Make Events Too Granular: Avoid event spam
  3. Don't Ignore Ordering: Some business processes require event ordering
  4. Don't Forget About Monitoring: Track event flow and processing times

Performance Considerations

1// Batch processing for high-throughput scenarios 2public class BatchEventProcessor<T> : IConsumer<T> where T : class 3{ 4 private readonly Channel<T> _channel = Channel.CreateUnbounded<T>(); 5 private readonly Timer _batchTimer; 6 private readonly List<T> _batch = new(); 7 private const int BatchSize = 100; 8 private const int BatchTimeoutMs = 5000; 9 10 public async Task ConsumeAsync(T @event) 11 { 12 await _channel.Writer.WriteAsync(@event); 13 } 14 15 private async Task ProcessBatch() 16 { 17 while (await _channel.Reader.WaitToReadAsync()) 18 { 19 while (_batch.Count < BatchSize && _channel.Reader.TryRead(out var item)) 20 { 21 _batch.Add(item); 22 } 23 24 if (_batch.Any()) 25 { 26 await ProcessBatchAsync(_batch.ToList()); 27 _batch.Clear(); 28 } 29 } 30 } 31 32 private async Task ProcessBatchAsync(List<T> events) 33 { 34 // Process events in batch for better performance 35 // Database bulk operations, batch API calls, etc. 36 } 37}

Testing Event-Driven Systems

1[TestClass] 2public class EventDrivenSystemTests 3{ 4 [TestMethod] 5 public async Task OrderCreated_ShouldTriggerInventoryReservation() 6 { 7 // Arrange 8 var eventBus = new InMemoryEventBus(); 9 var inventoryService = new InventoryService(eventBus); 10 var orderService = new OrderService(eventBus); 11 12 var orderRequest = new CreateOrderRequest 13 { 14 CustomerId = Guid.NewGuid(), 15 Items = new[] { new OrderItem { ProductId = Guid.NewGuid(), Quantity = 1 } } 16 }; 17 18 // Act 19 await orderService.CreateOrderAsync(orderRequest); 20 21 // Assert 22 var publishedEvents = eventBus.GetPublishedEvents<InventoryReservationRequestedEvent>(); 23 Assert.AreEqual(1, publishedEvents.Count()); 24 } 25 26 [TestMethod] 27 public async Task EventProcessing_ShouldBeIdempotent() 28 { 29 // Arrange 30 var consumer = new OrderCreatedEventConsumer(); 31 var orderEvent = new OrderCreatedEvent(Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid(), 100m, DateTime.UtcNow); 32 33 // Act - Process same event twice 34 await consumer.ConsumeAsync(orderEvent); 35 await consumer.ConsumeAsync(orderEvent); 36 37 // Assert - Should only process once 38 Assert.AreEqual(1, consumer.ProcessedOrdersCount); 39 } 40} 41 42// Test doubles for event bus 43public class InMemoryEventBus : IMessagePublisher 44{ 45 private readonly List<object> _publishedEvents = new(); 46 47 public async Task PublishAsync<T>(T message) where T : class 48 { 49 _publishedEvents.Add(message); 50 await Task.CompletedTask; 51 } 52 53 public IEnumerable<T> GetPublishedEvents<T>() where T : class 54 { 55 return _publishedEvents.OfType<T>(); 56 } 57}

Monitoring and Observability

1// Event tracing and metrics 2public class EventMetrics 3{ 4 private readonly IMetricsLogger _metrics; 5 6 public async Task TrackEventProcessed<T>(T @event, TimeSpan processingTime) 7 { 8 _metrics.Counter("events_processed_total") 9 .WithTag("event_type", typeof(T).Name) 10 .Increment(); 11 12 _metrics.Histogram("event_processing_duration_ms") 13 .WithTag("event_type", typeof(T).Name) 14 .Record(processingTime.TotalMilliseconds); 15 } 16 17 public async Task TrackEventFailed<T>(T @event, Exception ex) 18 { 19 _metrics.Counter("events_failed_total") 20 .WithTag("event_type", typeof(T).Name) 21 .WithTag("error_type", ex.GetType().Name) 22 .Increment(); 23 } 24} 25 26// Distributed tracing with correlation IDs 27public record EventMetadata( 28 string CorrelationId, 29 string CausationId, 30 DateTime Timestamp, 31 string Source); 32 33public abstract record BaseEvent(EventMetadata Metadata); 34 35public record OrderCreatedEvent( 36 EventMetadata Metadata, 37 Guid OrderId, 38 Guid CustomerId, 39 decimal TotalAmount) : BaseEvent(Metadata); 40 41// Correlation tracking middleware 42public class EventCorrelationMiddleware<T> : IConsumer<T> where T : BaseEvent 43{ 44 private readonly IConsumer<T> _next; 45 private readonly ILogger<EventCorrelationMiddleware<T>> _logger; 46 47 public async Task ConsumeAsync(T @event) 48 { 49 using var scope = _logger.BeginScope(new Dictionary<string, object> 50 { 51 ["CorrelationId"] = @event.Metadata.CorrelationId, 52 ["CausationId"] = @event.Metadata.CausationId, 53 ["EventType"] = typeof(T).Name, 54 ["Source"] = @event.Metadata.Source 55 }); 56 57 _logger.LogInformation("Processing event {EventType} with correlation {CorrelationId}", 58 typeof(T).Name, @event.Metadata.CorrelationId); 59 60 await _next.ConsumeAsync(@event); 61 62 _logger.LogInformation("Completed processing event {EventType}", typeof(T).Name); 63 } 64}

Migration Strategies

Moving from monolithic to event-driven architecture requires careful planning:

Strangler Fig Pattern

1// Legacy service adapter 2public class LegacyOrderServiceAdapter : IOrderService 3{ 4 private readonly ILegacyOrderSystem _legacySystem; 5 private readonly IModernEventBus _eventBus; 6 private readonly IConfiguration _config; 7 8 public async Task<Order> CreateOrderAsync(CreateOrderRequest request) 9 { 10 // Determine routing based on feature flags 11 if (_config.GetValue<bool>("UseModernOrderProcessing")) 12 { 13 return await CreateOrderModernAsync(request); 14 } 15 16 // Fallback to legacy system 17 var legacyOrder = await _legacySystem.CreateOrderAsync(request); 18 19 // Publish event for new consumers while maintaining legacy flow 20 await _eventBus.PublishAsync(new OrderCreatedEvent( 21 CreateEventMetadata(), 22 legacyOrder.Id, 23 legacyOrder.CustomerId, 24 legacyOrder.TotalAmount)); 25 26 return legacyOrder; 27 } 28 29 private async Task<Order> CreateOrderModernAsync(CreateOrderRequest request) 30 { 31 // New event-driven implementation 32 var order = new Order { /* ... */ }; 33 await _eventBus.PublishAsync(new OrderCreatedEvent( 34 CreateEventMetadata(), 35 order.Id, 36 order.CustomerId, 37 order.TotalAmount)); 38 39 return order; 40 } 41}

Event Bridge Pattern

1// Bridge between legacy and modern systems 2public class LegacyEventBridge : IConsumer<OrderCreatedEvent> 3{ 4 private readonly ILegacyNotificationSystem _legacyNotifications; 5 private readonly ILegacyInventorySystem _legacyInventory; 6 7 public async Task ConsumeAsync(OrderCreatedEvent @event) 8 { 9 // Translate modern events to legacy system calls 10 await _legacyNotifications.SendOrderConfirmationAsync( 11 @event.CustomerId, 12 @event.OrderId); 13 14 await _legacyInventory.ReserveInventoryAsync(@event.OrderId); 15 } 16}

Security Considerations

Event-driven systems require specific security measures:

1// Event encryption and signing 2public class SecureEventPublisher : IMessagePublisher 3{ 4 private readonly IMessagePublisher _innerPublisher; 5 private readonly IEventEncryption _encryption; 6 private readonly IEventSigning _signing; 7 8 public async Task PublishAsync<T>(T message) where T : class 9 { 10 // Sign the event for integrity 11 var signedMessage = await _signing.SignAsync(message); 12 13 // Encrypt sensitive data 14 var encryptedMessage = await _encryption.EncryptAsync(signedMessage); 15 16 await _innerPublisher.PublishAsync(encryptedMessage); 17 } 18} 19 20// Event authorization 21public class AuthorizedEventConsumer<T> : IConsumer<T> where T : class 22{ 23 private readonly IConsumer<T> _innerConsumer; 24 private readonly IEventAuthorizationService _authService; 25 26 public async Task ConsumeAsync(T @event) 27 { 28 // Verify consumer has permission to process this event type 29 if (!await _authService.CanConsumeAsync<T>(GetCurrentServiceIdentity())) 30 { 31 throw new UnauthorizedAccessException($"Service not authorized to consume {typeof(T).Name}"); 32 } 33 34 await _innerConsumer.ConsumeAsync(@event); 35 } 36}

Advanced Patterns

Event Sourcing with Snapshots

1public class SnapshotEventStore : IEventStore 2{ 3 private readonly IEventStore _eventStore; 4 private readonly ISnapshotStore _snapshotStore; 5 6 public async Task<T> GetAggregateAsync<T>(Guid aggregateId) where T : AggregateRoot, new() 7 { 8 // Try to load from snapshot first 9 var snapshot = await _snapshotStore.GetSnapshotAsync<T>(aggregateId); 10 var fromVersion = 0; 11 T aggregate; 12 13 if (snapshot != null) 14 { 15 aggregate = snapshot.Aggregate; 16 fromVersion = snapshot.Version; 17 } 18 else 19 { 20 aggregate = new T(); 21 } 22 23 // Load events since snapshot 24 var events = await _eventStore.GetEventsAsync(aggregateId, fromVersion); 25 aggregate.LoadFromHistory(events); 26 27 return aggregate; 28 } 29 30 public async Task SaveAggregateAsync<T>(T aggregate) where T : AggregateRoot 31 { 32 await _eventStore.SaveEventsAsync(aggregate.Id, aggregate.GetUncommittedEvents()); 33 34 // Create snapshot every N events 35 if (aggregate.Version % 100 == 0) 36 { 37 var snapshot = new AggregateSnapshot<T> 38 { 39 AggregateId = aggregate.Id, 40 Aggregate = aggregate, 41 Version = aggregate.Version, 42 CreatedAt = DateTime.UtcNow 43 }; 44 45 await _snapshotStore.SaveSnapshotAsync(snapshot); 46 } 47 48 aggregate.MarkEventsAsCommitted(); 49 } 50}

Event Replay and Recovery

1public class EventReplayService 2{ 3 private readonly IEventStore _eventStore; 4 private readonly IMessagePublisher _publisher; 5 6 public async Task ReplayEventsAsync(DateTime fromDate, DateTime? toDate = null) 7 { 8 var endDate = toDate ?? DateTime.UtcNow; 9 var events = await _eventStore.GetEventsByDateRangeAsync(fromDate, endDate); 10 11 foreach (var @event in events) 12 { 13 try 14 { 15 await _publisher.PublishAsync(@event); 16 await Task.Delay(10); // Small delay to prevent overwhelming consumers 17 } 18 catch (Exception ex) 19 { 20 // Log error but continue with other events 21 Console.WriteLine($"Failed to replay event {@event.GetType().Name}: {ex.Message}"); 22 } 23 } 24 } 25 26 public async Task RecoverFromEventStoreAsync<T>(Guid aggregateId) where T : AggregateRoot, new() 27 { 28 var events = await _eventStore.GetEventsAsync(aggregateId); 29 var aggregate = new T(); 30 aggregate.LoadFromHistory(events); 31 32 // Republish domain events for any missing projections 33 foreach (var domainEvent in aggregate.GetUncommittedEvents()) 34 { 35 await _publisher.PublishAsync(domainEvent); 36 } 37 } 38}

Performance Optimization

Event Streaming and Batching

1public class HighThroughputEventProcessor 2{ 3 private readonly IEventProcessor _processor; 4 private readonly SemaphoreSlim _semaphore; 5 6 public HighThroughputEventProcessor(int maxConcurrency = 100) 7 { 8 _semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency); 9 } 10 11 public async Task ProcessEventStreamAsync(IAsyncEnumerable<DomainEvent> eventStream) 12 { 13 var tasks = new List<Task>(); 14 15 await foreach (var @event in eventStream) 16 { 17 await _semaphore.WaitAsync(); 18 19 tasks.Add(ProcessEventAsync(@event)); 20 21 // Clean up completed tasks 22 if (tasks.Count > 1000) 23 { 24 var completedTasks = tasks.Where(t => t.IsCompleted).ToList(); 25 foreach (var completed in completedTasks) 26 { 27 tasks.Remove(completed); 28 } 29 } 30 } 31 32 // Wait for all remaining tasks 33 await Task.WhenAll(tasks); 34 } 35 36 private async Task ProcessEventAsync(DomainEvent @event) 37 { 38 try 39 { 40 await _processor.ProcessAsync(@event); 41 } 42 finally 43 { 44 _semaphore.Release(); 45 } 46 } 47}

Event-Driven Architecture Governance

Event Schema Registry

1public interface IEventSchemaRegistry 2{ 3 Task<EventSchema> GetSchemaAsync(string eventType, int version); 4 Task RegisterSchemaAsync(EventSchema schema); 5 Task<bool> IsCompatibleAsync(EventSchema newSchema, EventSchema oldSchema); 6} 7 8public class EventSchema 9{ 10 public string EventType { get; set; } = string.Empty; 11 public int Version { get; set; } 12 public string JsonSchema { get; set; } = string.Empty; 13 public CompatibilityLevel Compatibility { get; set; } 14 public DateTime CreatedAt { get; set; } 15} 16 17public enum CompatibilityLevel 18{ 19 None, 20 Backward, 21 Forward, 22 Full 23} 24 25// Event versioning example 26public abstract record BaseOrderEvent(EventMetadata Metadata) : BaseEvent(Metadata); 27 28public record OrderCreatedEventV1( 29 EventMetadata Metadata, 30 Guid OrderId, 31 Guid CustomerId, 32 decimal TotalAmount 33) : BaseOrderEvent(Metadata); 34 35public record OrderCreatedEventV2( 36 EventMetadata Metadata, 37 Guid OrderId, 38 Guid CustomerId, 39 decimal TotalAmount, 40 string Currency, 41 List<OrderItem> Items 42) : BaseOrderEvent(Metadata); 43 44// Event version handler 45public class VersionedOrderEventConsumer : 46 IConsumer<OrderCreatedEventV1>, 47 IConsumer<OrderCreatedEventV2> 48{ 49 public async Task ConsumeAsync(OrderCreatedEventV1 @event) 50 { 51 // Handle V1 events 52 await ProcessOrderAsync(@event.OrderId, @event.TotalAmount, "USD", new List<OrderItem>()); 53 } 54 55 public async Task ConsumeAsync(OrderCreatedEventV2 @event) 56 { 57 // Handle V2 events with full data 58 await ProcessOrderAsync(@event.OrderId, @event.TotalAmount, @event.Currency, @event.Items); 59 } 60 61 private async Task ProcessOrderAsync(Guid orderId, decimal amount, string currency, List<OrderItem> items) 62 { 63 // Common processing logic 64 } 65}

Industry Adoption and Future Trends

Event-driven architecture has become the de facto standard for modern distributed systems. Major technology companies have shared their adoption patterns:

  • Netflix uses event-driven patterns for their recommendation engine and content delivery
  • Uber employs event sourcing for their dispatch system and surge pricing
  • Amazon leverages events throughout their entire e-commerce platform
  • Microsoft built Azure's event-driven services like Event Grid and Service Bus

Current industry trends include:

  1. Event Mesh Architectures - Distributed event routing across multiple cloud providers
  2. Real-time Analytics - Stream processing with Apache Kafka and Azure Stream Analytics
  3. Event-driven Microservices - Complete decoupling of service dependencies
  4. Serverless Event Processing - Azure Functions, AWS Lambda for event handling
  5. GraphQL Subscriptions - Real-time data updates in modern web applications

Conclusion

Event-driven architecture isn't just a technical pattern—it's a fundamental shift in how we think about building software systems. After implementing EDA in everything from small startups to enterprise systems, I can confidently say that the investment in understanding these patterns pays dividends in system reliability, scalability, and maintainability.

The key is to start small. Pick a bounded context in your application, identify the business events that matter, and begin implementing event-driven patterns there. As you gain confidence and see the benefits, you can expand the approach to other parts of your system.

Remember that event-driven architecture is not a silver bullet. It introduces complexity in terms of eventual consistency, distributed debugging, and operational overhead. However, for systems that need to scale, integrate multiple services, or respond to real-time events, it's often the most elegant solution available.

The patterns and examples we've covered—from basic pub/sub to advanced saga orchestration—represent the current state of the art in distributed systems design. As you implement these patterns in your own projects, you'll discover that event-driven thinking changes not just how you architect systems, but how you model business processes and think about software design as a whole.

Additional Resources for Mastery

To deepen your understanding of event-driven architecture and stay current with industry practices, explore these valuable resources:

Microsoft Official Documentation

  • Event-driven architecture style - Azure Architecture Center - Comprehensive guide to event-driven patterns in cloud applications
  • Azure Service Bus documentation - Learn Microsoft's enterprise messaging platform
  • Azure Event Grid documentation - Event routing service for building event-driven applications

Industry-Leading Resources

  • Martin Fowler's Event Sourcing articles - Foundational articles on event sourcing patterns from a thought leader
  • Microservices.io Event-driven Architecture - Chris Richardson's comprehensive guide to microservices patterns
  • Event Store documentation - Learn from the creators of a purpose-built event store database

Practical Implementation Guides

  • MassTransit documentation - The most popular .NET service bus framework with extensive examples
  • Building Event-Driven Microservices by Adam Bellemare - O'Reilly book covering practical implementation strategies
  • Kafka: The Definitive Guide - Essential reading for understanding distributed event streaming

Community and Advanced Learning

  • GOTO Conference talks on Event-Driven Architecture - Regular conference presentations from industry experts
  • DDD Community resources - Domain-Driven Design community discussions on event modeling
  • Microsoft's .NET Microservices eBook - Free comprehensive guide with event-driven examples

These resources will help you master both the theoretical foundations and practical implementation details needed to build world-class event-driven systems.

Tags:eventsarchitecturepatternsdistributed-systems
Published inEvent-Driven Architecture