From 82e03978b89938219958032efb1448cc76baa181 Mon Sep 17 00:00:00 2001 From: Saumit Date: Sat, 27 Sep 2025 02:14:26 +0530 Subject: Initial snapshot - OpenTelemetry demo 2.1.3 -f --- src/accounting/Consumer.cs | 153 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 src/accounting/Consumer.cs (limited to 'src/accounting/Consumer.cs') diff --git a/src/accounting/Consumer.cs b/src/accounting/Consumer.cs new file mode 100644 index 0000000..89ede49 --- /dev/null +++ b/src/accounting/Consumer.cs @@ -0,0 +1,153 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; +using Microsoft.Extensions.Logging; +using Oteldemo; +using Microsoft.EntityFrameworkCore; +using System.Diagnostics; + +namespace Accounting; + +internal class DBContext : DbContext +{ + public DbSet Orders { get; set; } + public DbSet CartItems { get; set; } + public DbSet Shipping { get; set; } + + protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) + { + var connectionString = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING"); + + optionsBuilder.UseNpgsql(connectionString).UseSnakeCaseNamingConvention(); + } +} + + +internal class Consumer : IDisposable +{ + private const string TopicName = "orders"; + + private ILogger _logger; + private IConsumer _consumer; + private bool _isListening; + private DBContext? _dbContext; + private static readonly ActivitySource MyActivitySource = new("Accounting.Consumer"); + + public Consumer(ILogger logger) + { + _logger = logger; + + var servers = Environment.GetEnvironmentVariable("KAFKA_ADDR") + ?? throw new ArgumentNullException("KAFKA_ADDR"); + + _consumer = BuildConsumer(servers); + _consumer.Subscribe(TopicName); + + _logger.LogInformation($"Connecting to Kafka: {servers}"); + _dbContext = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING") == null ? null : new DBContext(); + } + + public void StartListening() + { + _isListening = true; + + try + { + while (_isListening) + { + try + { + using var activity = MyActivitySource.StartActivity("order-consumed", ActivityKind.Internal); + var consumeResult = _consumer.Consume(); + ProcessMessage(consumeResult.Message); + } + catch (ConsumeException e) + { + _logger.LogError(e, "Consume error: {0}", e.Error.Reason); + } + } + } + catch (OperationCanceledException) + { + _logger.LogInformation("Closing consumer"); + + _consumer.Close(); + } + } + + private void ProcessMessage(Message message) + { + try + { + var order = OrderResult.Parser.ParseFrom(message.Value); + Log.OrderReceivedMessage(_logger, order); + + if (_dbContext == null) + { + return; + } + + var orderEntity = new OrderEntity + { + Id = order.OrderId + }; + _dbContext.Add(orderEntity); + foreach (var item in order.Items) + { + var orderItem = new OrderItemEntity + { + ItemCostCurrencyCode = item.Cost.CurrencyCode, + ItemCostUnits = item.Cost.Units, + ItemCostNanos = item.Cost.Nanos, + ProductId = item.Item.ProductId, + Quantity = item.Item.Quantity, + OrderId = order.OrderId + }; + + _dbContext.Add(orderItem); + } + + var shipping = new ShippingEntity + { + ShippingTrackingId = order.ShippingTrackingId, + ShippingCostCurrencyCode = order.ShippingCost.CurrencyCode, + ShippingCostUnits = order.ShippingCost.Units, + ShippingCostNanos = order.ShippingCost.Nanos, + StreetAddress = order.ShippingAddress.StreetAddress, + City = order.ShippingAddress.City, + State = order.ShippingAddress.State, + Country = order.ShippingAddress.Country, + ZipCode = order.ShippingAddress.ZipCode, + OrderId = order.OrderId + }; + _dbContext.Add(shipping); + _dbContext.SaveChanges(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Order parsing failed:"); + } + } + + private IConsumer BuildConsumer(string servers) + { + var conf = new ConsumerConfig + { + GroupId = $"accounting", + BootstrapServers = servers, + // https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = true + }; + + return new ConsumerBuilder(conf) + .Build(); + } + + public void Dispose() + { + _isListening = false; + _consumer?.Dispose(); + } +} -- cgit v1.2.3