diff options
| author | Saumit <justsaumit@protonmail.com> | 2025-09-27 02:14:26 +0530 |
|---|---|---|
| committer | Saumit <justsaumit@protonmail.com> | 2025-09-27 02:14:26 +0530 |
| commit | 82e03978b89938219958032efb1448cc76baa181 (patch) | |
| tree | 626f3e54d52ecd49be0ed3bee30abacc0453d081 /src/accounting/Consumer.cs | |
Initial snapshot - OpenTelemetry demo 2.1.3 -f
Diffstat (limited to 'src/accounting/Consumer.cs')
| -rw-r--r-- | src/accounting/Consumer.cs | 153 |
1 files changed, 153 insertions, 0 deletions
diff --git a/src/accounting/Consumer.cs b/src/accounting/Consumer.cs new file mode 100644 index 0000000..89ede49 --- /dev/null +++ b/src/accounting/Consumer.cs @@ -0,0 +1,153 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; +using Microsoft.Extensions.Logging; +using Oteldemo; +using Microsoft.EntityFrameworkCore; +using System.Diagnostics; + +namespace Accounting; + +internal class DBContext : DbContext +{ + public DbSet<OrderEntity> Orders { get; set; } + public DbSet<OrderItemEntity> CartItems { get; set; } + public DbSet<ShippingEntity> Shipping { get; set; } + + protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) + { + var connectionString = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING"); + + optionsBuilder.UseNpgsql(connectionString).UseSnakeCaseNamingConvention(); + } +} + + +internal class Consumer : IDisposable +{ + private const string TopicName = "orders"; + + private ILogger _logger; + private IConsumer<string, byte[]> _consumer; + private bool _isListening; + private DBContext? _dbContext; + private static readonly ActivitySource MyActivitySource = new("Accounting.Consumer"); + + public Consumer(ILogger<Consumer> logger) + { + _logger = logger; + + var servers = Environment.GetEnvironmentVariable("KAFKA_ADDR") + ?? throw new ArgumentNullException("KAFKA_ADDR"); + + _consumer = BuildConsumer(servers); + _consumer.Subscribe(TopicName); + + _logger.LogInformation($"Connecting to Kafka: {servers}"); + _dbContext = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING") == null ? null : new DBContext(); + } + + public void StartListening() + { + _isListening = true; + + try + { + while (_isListening) + { + try + { + using var activity = MyActivitySource.StartActivity("order-consumed", ActivityKind.Internal); + var consumeResult = _consumer.Consume(); + ProcessMessage(consumeResult.Message); + } + catch (ConsumeException e) + { + _logger.LogError(e, "Consume error: {0}", e.Error.Reason); + } + } + } + catch (OperationCanceledException) + { + _logger.LogInformation("Closing consumer"); + + _consumer.Close(); + } + } + + private void ProcessMessage(Message<string, byte[]> message) + { + try + { + var order = OrderResult.Parser.ParseFrom(message.Value); + Log.OrderReceivedMessage(_logger, order); + + if (_dbContext == null) + { + return; + } + + var orderEntity = new OrderEntity + { + Id = order.OrderId + }; + _dbContext.Add(orderEntity); + foreach (var item in order.Items) + { + var orderItem = new OrderItemEntity + { + ItemCostCurrencyCode = item.Cost.CurrencyCode, + ItemCostUnits = item.Cost.Units, + ItemCostNanos = item.Cost.Nanos, + ProductId = item.Item.ProductId, + Quantity = item.Item.Quantity, + OrderId = order.OrderId + }; + + _dbContext.Add(orderItem); + } + + var shipping = new ShippingEntity + { + ShippingTrackingId = order.ShippingTrackingId, + ShippingCostCurrencyCode = order.ShippingCost.CurrencyCode, + ShippingCostUnits = order.ShippingCost.Units, + ShippingCostNanos = order.ShippingCost.Nanos, + StreetAddress = order.ShippingAddress.StreetAddress, + City = order.ShippingAddress.City, + State = order.ShippingAddress.State, + Country = order.ShippingAddress.Country, + ZipCode = order.ShippingAddress.ZipCode, + OrderId = order.OrderId + }; + _dbContext.Add(shipping); + _dbContext.SaveChanges(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Order parsing failed:"); + } + } + + private IConsumer<string, byte[]> BuildConsumer(string servers) + { + var conf = new ConsumerConfig + { + GroupId = $"accounting", + BootstrapServers = servers, + // https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = true + }; + + return new ConsumerBuilder<string, byte[]>(conf) + .Build(); + } + + public void Dispose() + { + _isListening = false; + _consumer?.Dispose(); + } +} |
