summaryrefslogtreecommitdiff
path: root/src/accounting/Consumer.cs
diff options
context:
space:
mode:
authorSaumit <justsaumit@protonmail.com>2025-09-27 02:14:26 +0530
committerSaumit <justsaumit@protonmail.com>2025-09-27 02:14:26 +0530
commit82e03978b89938219958032efb1448cc76baa181 (patch)
tree626f3e54d52ecd49be0ed3bee30abacc0453d081 /src/accounting/Consumer.cs
Initial snapshot - OpenTelemetry demo 2.1.3 -f
Diffstat (limited to 'src/accounting/Consumer.cs')
-rw-r--r--src/accounting/Consumer.cs153
1 files changed, 153 insertions, 0 deletions
diff --git a/src/accounting/Consumer.cs b/src/accounting/Consumer.cs
new file mode 100644
index 0000000..89ede49
--- /dev/null
+++ b/src/accounting/Consumer.cs
@@ -0,0 +1,153 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using Oteldemo;
+using Microsoft.EntityFrameworkCore;
+using System.Diagnostics;
+
+namespace Accounting;
+
+internal class DBContext : DbContext
+{
+ public DbSet<OrderEntity> Orders { get; set; }
+ public DbSet<OrderItemEntity> CartItems { get; set; }
+ public DbSet<ShippingEntity> Shipping { get; set; }
+
+ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
+ {
+ var connectionString = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING");
+
+ optionsBuilder.UseNpgsql(connectionString).UseSnakeCaseNamingConvention();
+ }
+}
+
+
+internal class Consumer : IDisposable
+{
+ private const string TopicName = "orders";
+
+ private ILogger _logger;
+ private IConsumer<string, byte[]> _consumer;
+ private bool _isListening;
+ private DBContext? _dbContext;
+ private static readonly ActivitySource MyActivitySource = new("Accounting.Consumer");
+
+ public Consumer(ILogger<Consumer> logger)
+ {
+ _logger = logger;
+
+ var servers = Environment.GetEnvironmentVariable("KAFKA_ADDR")
+ ?? throw new ArgumentNullException("KAFKA_ADDR");
+
+ _consumer = BuildConsumer(servers);
+ _consumer.Subscribe(TopicName);
+
+ _logger.LogInformation($"Connecting to Kafka: {servers}");
+ _dbContext = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING") == null ? null : new DBContext();
+ }
+
+ public void StartListening()
+ {
+ _isListening = true;
+
+ try
+ {
+ while (_isListening)
+ {
+ try
+ {
+ using var activity = MyActivitySource.StartActivity("order-consumed", ActivityKind.Internal);
+ var consumeResult = _consumer.Consume();
+ ProcessMessage(consumeResult.Message);
+ }
+ catch (ConsumeException e)
+ {
+ _logger.LogError(e, "Consume error: {0}", e.Error.Reason);
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.LogInformation("Closing consumer");
+
+ _consumer.Close();
+ }
+ }
+
+ private void ProcessMessage(Message<string, byte[]> message)
+ {
+ try
+ {
+ var order = OrderResult.Parser.ParseFrom(message.Value);
+ Log.OrderReceivedMessage(_logger, order);
+
+ if (_dbContext == null)
+ {
+ return;
+ }
+
+ var orderEntity = new OrderEntity
+ {
+ Id = order.OrderId
+ };
+ _dbContext.Add(orderEntity);
+ foreach (var item in order.Items)
+ {
+ var orderItem = new OrderItemEntity
+ {
+ ItemCostCurrencyCode = item.Cost.CurrencyCode,
+ ItemCostUnits = item.Cost.Units,
+ ItemCostNanos = item.Cost.Nanos,
+ ProductId = item.Item.ProductId,
+ Quantity = item.Item.Quantity,
+ OrderId = order.OrderId
+ };
+
+ _dbContext.Add(orderItem);
+ }
+
+ var shipping = new ShippingEntity
+ {
+ ShippingTrackingId = order.ShippingTrackingId,
+ ShippingCostCurrencyCode = order.ShippingCost.CurrencyCode,
+ ShippingCostUnits = order.ShippingCost.Units,
+ ShippingCostNanos = order.ShippingCost.Nanos,
+ StreetAddress = order.ShippingAddress.StreetAddress,
+ City = order.ShippingAddress.City,
+ State = order.ShippingAddress.State,
+ Country = order.ShippingAddress.Country,
+ ZipCode = order.ShippingAddress.ZipCode,
+ OrderId = order.OrderId
+ };
+ _dbContext.Add(shipping);
+ _dbContext.SaveChanges();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Order parsing failed:");
+ }
+ }
+
+ private IConsumer<string, byte[]> BuildConsumer(string servers)
+ {
+ var conf = new ConsumerConfig
+ {
+ GroupId = $"accounting",
+ BootstrapServers = servers,
+ // https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example
+ AutoOffsetReset = AutoOffsetReset.Earliest,
+ EnableAutoCommit = true
+ };
+
+ return new ConsumerBuilder<string, byte[]>(conf)
+ .Build();
+ }
+
+ public void Dispose()
+ {
+ _isListening = false;
+ _consumer?.Dispose();
+ }
+}