From 82e03978b89938219958032efb1448cc76baa181 Mon Sep 17 00:00:00 2001 From: Saumit Date: Sat, 27 Sep 2025 02:14:26 +0530 Subject: Initial snapshot - OpenTelemetry demo 2.1.3 -f --- src/accounting/.dockerignore | 3 + src/accounting/.gitignore | 1 + src/accounting/Accounting.csproj | 34 ++++++++ src/accounting/Accounting.sln | 25 ++++++ src/accounting/Consumer.cs | 153 +++++++++++++++++++++++++++++++++++ src/accounting/Directory.Build.props | 11 +++ src/accounting/Dockerfile | 36 +++++++++ src/accounting/Entities.cs | 59 ++++++++++++++ src/accounting/Helpers.cs | 34 ++++++++ src/accounting/Log.cs | 16 ++++ src/accounting/Program.cs | 24 ++++++ src/accounting/README.md | 33 ++++++++ 12 files changed, 429 insertions(+) create mode 100644 src/accounting/.dockerignore create mode 100644 src/accounting/.gitignore create mode 100644 src/accounting/Accounting.csproj create mode 100644 src/accounting/Accounting.sln create mode 100644 src/accounting/Consumer.cs create mode 100644 src/accounting/Directory.Build.props create mode 100644 src/accounting/Dockerfile create mode 100644 src/accounting/Entities.cs create mode 100644 src/accounting/Helpers.cs create mode 100644 src/accounting/Log.cs create mode 100644 src/accounting/Program.cs create mode 100644 src/accounting/README.md (limited to 'src/accounting') diff --git a/src/accounting/.dockerignore b/src/accounting/.dockerignore new file mode 100644 index 0000000..f3c4ae4 --- /dev/null +++ b/src/accounting/.dockerignore @@ -0,0 +1,3 @@ +bin +obj +build \ No newline at end of file diff --git a/src/accounting/.gitignore b/src/accounting/.gitignore new file mode 100644 index 0000000..a297e63 --- /dev/null +++ b/src/accounting/.gitignore @@ -0,0 +1 @@ +proto \ No newline at end of file diff --git a/src/accounting/Accounting.csproj b/src/accounting/Accounting.csproj new file mode 100644 index 0000000..65a2bcc --- /dev/null +++ b/src/accounting/Accounting.csproj @@ -0,0 +1,34 @@ + + + + Exe + net8.0 + enable + enable + Linux + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + + + + + diff --git a/src/accounting/Accounting.sln b/src/accounting/Accounting.sln new file mode 100644 index 0000000..819a398 --- /dev/null +++ b/src/accounting/Accounting.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.9.34701.34 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Accounting", "Accounting.csproj", "{C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {6340CDDC-E917-4532-A056-5526E0A7BDDA} + EndGlobalSection +EndGlobal diff --git a/src/accounting/Consumer.cs b/src/accounting/Consumer.cs new file mode 100644 index 0000000..89ede49 --- /dev/null +++ b/src/accounting/Consumer.cs @@ -0,0 +1,153 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; +using Microsoft.Extensions.Logging; +using Oteldemo; +using Microsoft.EntityFrameworkCore; +using System.Diagnostics; + +namespace Accounting; + +internal class DBContext : DbContext +{ + public DbSet Orders { get; set; } + public DbSet CartItems { get; set; } + public DbSet Shipping { get; set; } + + protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) + { + var connectionString = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING"); + + optionsBuilder.UseNpgsql(connectionString).UseSnakeCaseNamingConvention(); + } +} + + +internal class Consumer : IDisposable +{ + private const string TopicName = "orders"; + + private ILogger _logger; + private IConsumer _consumer; + private bool _isListening; + private DBContext? _dbContext; + private static readonly ActivitySource MyActivitySource = new("Accounting.Consumer"); + + public Consumer(ILogger logger) + { + _logger = logger; + + var servers = Environment.GetEnvironmentVariable("KAFKA_ADDR") + ?? throw new ArgumentNullException("KAFKA_ADDR"); + + _consumer = BuildConsumer(servers); + _consumer.Subscribe(TopicName); + + _logger.LogInformation($"Connecting to Kafka: {servers}"); + _dbContext = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING") == null ? null : new DBContext(); + } + + public void StartListening() + { + _isListening = true; + + try + { + while (_isListening) + { + try + { + using var activity = MyActivitySource.StartActivity("order-consumed", ActivityKind.Internal); + var consumeResult = _consumer.Consume(); + ProcessMessage(consumeResult.Message); + } + catch (ConsumeException e) + { + _logger.LogError(e, "Consume error: {0}", e.Error.Reason); + } + } + } + catch (OperationCanceledException) + { + _logger.LogInformation("Closing consumer"); + + _consumer.Close(); + } + } + + private void ProcessMessage(Message message) + { + try + { + var order = OrderResult.Parser.ParseFrom(message.Value); + Log.OrderReceivedMessage(_logger, order); + + if (_dbContext == null) + { + return; + } + + var orderEntity = new OrderEntity + { + Id = order.OrderId + }; + _dbContext.Add(orderEntity); + foreach (var item in order.Items) + { + var orderItem = new OrderItemEntity + { + ItemCostCurrencyCode = item.Cost.CurrencyCode, + ItemCostUnits = item.Cost.Units, + ItemCostNanos = item.Cost.Nanos, + ProductId = item.Item.ProductId, + Quantity = item.Item.Quantity, + OrderId = order.OrderId + }; + + _dbContext.Add(orderItem); + } + + var shipping = new ShippingEntity + { + ShippingTrackingId = order.ShippingTrackingId, + ShippingCostCurrencyCode = order.ShippingCost.CurrencyCode, + ShippingCostUnits = order.ShippingCost.Units, + ShippingCostNanos = order.ShippingCost.Nanos, + StreetAddress = order.ShippingAddress.StreetAddress, + City = order.ShippingAddress.City, + State = order.ShippingAddress.State, + Country = order.ShippingAddress.Country, + ZipCode = order.ShippingAddress.ZipCode, + OrderId = order.OrderId + }; + _dbContext.Add(shipping); + _dbContext.SaveChanges(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Order parsing failed:"); + } + } + + private IConsumer BuildConsumer(string servers) + { + var conf = new ConsumerConfig + { + GroupId = $"accounting", + BootstrapServers = servers, + // https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = true + }; + + return new ConsumerBuilder(conf) + .Build(); + } + + public void Dispose() + { + _isListening = false; + _consumer?.Dispose(); + } +} diff --git a/src/accounting/Directory.Build.props b/src/accounting/Directory.Build.props new file mode 100644 index 0000000..bfd1cab --- /dev/null +++ b/src/accounting/Directory.Build.props @@ -0,0 +1,11 @@ + + + true + all + low + + + + true + + diff --git a/src/accounting/Dockerfile b/src/accounting/Dockerfile new file mode 100644 index 0000000..9509829 --- /dev/null +++ b/src/accounting/Dockerfile @@ -0,0 +1,36 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +FROM --platform=${BUILDPLATFORM} mcr.microsoft.com/dotnet/sdk:8.0 AS builder +ARG TARGETARCH +ARG BUILD_CONFIGURATION=Release +WORKDIR /src +COPY ["/src/accounting/", "Accounting/"] +COPY ["/pb/demo.proto", "Accounting/src/protos/demo.proto"] +RUN dotnet restore "./Accounting/Accounting.csproj" -r linux-$TARGETARCH +WORKDIR "/src/Accounting" + +RUN dotnet build "./Accounting.csproj" -r linux-$TARGETARCH -c $BUILD_CONFIGURATION -o /app/build + +# ----------------------------------------------------------------------------- + +FROM builder AS publish +ARG TARGETARCH +ARG BUILD_CONFIGURATION=Release +RUN dotnet publish "./Accounting.csproj" -r linux-$TARGETARCH -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false + +# ----------------------------------------------------------------------------- + +FROM mcr.microsoft.com/dotnet/aspnet:8.0 +USER app +WORKDIR /app +COPY --from=publish /app/publish . + +USER root +RUN mkdir -p "/var/log/opentelemetry/dotnet" +RUN chown app "/var/log/opentelemetry/dotnet" +RUN chown app "/app/instrument.sh" +USER app +ENV OTEL_DOTNET_AUTO_TRACES_ADDITIONAL_SOURCES=Accounting.Consumer + +ENTRYPOINT ["./instrument.sh", "dotnet", "Accounting.dll"] diff --git a/src/accounting/Entities.cs b/src/accounting/Entities.cs new file mode 100644 index 0000000..849bd1f --- /dev/null +++ b/src/accounting/Entities.cs @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.EntityFrameworkCore; +using System.ComponentModel.DataAnnotations.Schema; + +namespace Accounting; + +[Table("shipping")] +[PrimaryKey(nameof(ShippingTrackingId))] +internal class ShippingEntity +{ + + public required string ShippingTrackingId { get; set; } + + public required string ShippingCostCurrencyCode { get; set; } + + public required long ShippingCostUnits { get; set; } + + public required int ShippingCostNanos { get; set; } + + public required string StreetAddress { get; set; } + + public required string City { get; set; } + + public required string State { get; set; } + + public required string Country { get; set; } + + public required string ZipCode { get; set; } + + public required string OrderId { get; set; } +} + +[Table("orderitem")] +[PrimaryKey(nameof(ProductId), nameof(OrderId))] +internal class OrderItemEntity +{ + public required string ItemCostCurrencyCode { get; set; } + + public required long ItemCostUnits { get; set; } + + public required int ItemCostNanos { get; set; } + + public required string ProductId { get; set; } + + public required int Quantity { get; set; } + + public required string OrderId { get; set; } +} + +[Table("order")] +[PrimaryKey(nameof(Id))] +internal class OrderEntity +{ + [Column("order_id")] + public required string Id { get; set; } + +} diff --git a/src/accounting/Helpers.cs b/src/accounting/Helpers.cs new file mode 100644 index 0000000..d167aae --- /dev/null +++ b/src/accounting/Helpers.cs @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Collections; + +namespace Accounting +{ + internal static class Helpers + { + private static List RelevantPrefixes = ["DOTNET_", "CORECLR_", "OTEL_", "KAFKA_"]; + + public static IEnumerable FilterRelevant(this IDictionary envs) + { + foreach (DictionaryEntry env in envs) + { + foreach (var prefix in RelevantPrefixes) + { + if (env.Key.ToString()?.StartsWith(prefix, StringComparison.InvariantCultureIgnoreCase) ?? false) + { + yield return env; + } + } + } + } + + public static void OutputInOrder(this IEnumerable envs) + { + foreach (var env in envs.OrderBy(x => x.Key)) + { + Console.WriteLine(env); + } + } + } +} diff --git a/src/accounting/Log.cs b/src/accounting/Log.cs new file mode 100644 index 0000000..e18af0e --- /dev/null +++ b/src/accounting/Log.cs @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.Extensions.Logging; +using Oteldemo; + +namespace Accounting +{ + internal static partial class Log + { + [LoggerMessage( + Level = LogLevel.Information, + Message = "Order details: {@OrderResult}.")] + public static partial void OrderReceivedMessage(ILogger logger, OrderResult orderResult); + } +} diff --git a/src/accounting/Program.cs b/src/accounting/Program.cs new file mode 100644 index 0000000..54a5c44 --- /dev/null +++ b/src/accounting/Program.cs @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Accounting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +Console.WriteLine("Accounting service started"); + +Environment.GetEnvironmentVariables() + .FilterRelevant() + .OutputInOrder(); + +var host = Host.CreateDefaultBuilder(args) + .ConfigureServices(services => + { + services.AddSingleton(); + }) + .Build(); + +var consumer = host.Services.GetRequiredService(); +consumer.StartListening(); + +host.Run(); diff --git a/src/accounting/README.md b/src/accounting/README.md new file mode 100644 index 0000000..cfaa25b --- /dev/null +++ b/src/accounting/README.md @@ -0,0 +1,33 @@ +# Accounting Service + +This service consumes new orders from a Kafka topic. + +## Local Build + +To build the service binary, navigate to the root directory of the project and run: + +```sh +make generate-protobuf +``` + +Navigate back to `src/accounting` and execute: + +```sh +dotnet build +``` + +## Docker Build + +From the root directory, run: + +```sh +docker compose build accounting +``` + +## Bump dependencies + +To bump all dependencies run in Package manager: + +```sh +Update-Package -ProjectName Accounting +``` -- cgit v1.2.3