// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 using Confluent.Kafka; using Microsoft.Extensions.Logging; using Oteldemo; using Microsoft.EntityFrameworkCore; using System.Diagnostics; namespace Accounting; internal class DBContext : DbContext { public DbSet Orders { get; set; } public DbSet CartItems { get; set; } public DbSet Shipping { get; set; } protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { var connectionString = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING"); optionsBuilder.UseNpgsql(connectionString).UseSnakeCaseNamingConvention(); } } internal class Consumer : IDisposable { private const string TopicName = "orders"; private ILogger _logger; private IConsumer _consumer; private bool _isListening; private DBContext? _dbContext; private static readonly ActivitySource MyActivitySource = new("Accounting.Consumer"); public Consumer(ILogger logger) { _logger = logger; var servers = Environment.GetEnvironmentVariable("KAFKA_ADDR") ?? throw new ArgumentNullException("KAFKA_ADDR"); _consumer = BuildConsumer(servers); _consumer.Subscribe(TopicName); _logger.LogInformation($"Connecting to Kafka: {servers}"); _dbContext = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING") == null ? null : new DBContext(); } public void StartListening() { _isListening = true; try { while (_isListening) { try { using var activity = MyActivitySource.StartActivity("order-consumed", ActivityKind.Internal); var consumeResult = _consumer.Consume(); ProcessMessage(consumeResult.Message); } catch (ConsumeException e) { _logger.LogError(e, "Consume error: {0}", e.Error.Reason); } } } catch (OperationCanceledException) { _logger.LogInformation("Closing consumer"); _consumer.Close(); } } private void ProcessMessage(Message message) { try { var order = OrderResult.Parser.ParseFrom(message.Value); Log.OrderReceivedMessage(_logger, order); if (_dbContext == null) { return; } var orderEntity = new OrderEntity { Id = order.OrderId }; _dbContext.Add(orderEntity); foreach (var item in order.Items) { var orderItem = new OrderItemEntity { ItemCostCurrencyCode = item.Cost.CurrencyCode, ItemCostUnits = item.Cost.Units, ItemCostNanos = item.Cost.Nanos, ProductId = item.Item.ProductId, Quantity = item.Item.Quantity, OrderId = order.OrderId }; _dbContext.Add(orderItem); } var shipping = new ShippingEntity { ShippingTrackingId = order.ShippingTrackingId, ShippingCostCurrencyCode = order.ShippingCost.CurrencyCode, ShippingCostUnits = order.ShippingCost.Units, ShippingCostNanos = order.ShippingCost.Nanos, StreetAddress = order.ShippingAddress.StreetAddress, City = order.ShippingAddress.City, State = order.ShippingAddress.State, Country = order.ShippingAddress.Country, ZipCode = order.ShippingAddress.ZipCode, OrderId = order.OrderId }; _dbContext.Add(shipping); _dbContext.SaveChanges(); } catch (Exception ex) { _logger.LogError(ex, "Order parsing failed:"); } } private IConsumer BuildConsumer(string servers) { var conf = new ConsumerConfig { GroupId = $"accounting", BootstrapServers = servers, // https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = true }; return new ConsumerBuilder(conf) .Build(); } public void Dispose() { _isListening = false; _consumer?.Dispose(); } }