summaryrefslogtreecommitdiff
path: root/src/accounting
diff options
context:
space:
mode:
Diffstat (limited to 'src/accounting')
-rw-r--r--src/accounting/.dockerignore3
-rw-r--r--src/accounting/.gitignore1
-rw-r--r--src/accounting/Accounting.csproj34
-rw-r--r--src/accounting/Accounting.sln25
-rw-r--r--src/accounting/Consumer.cs153
-rw-r--r--src/accounting/Directory.Build.props11
-rw-r--r--src/accounting/Dockerfile36
-rw-r--r--src/accounting/Entities.cs59
-rw-r--r--src/accounting/Helpers.cs34
-rw-r--r--src/accounting/Log.cs16
-rw-r--r--src/accounting/Program.cs24
-rw-r--r--src/accounting/README.md33
12 files changed, 429 insertions, 0 deletions
diff --git a/src/accounting/.dockerignore b/src/accounting/.dockerignore
new file mode 100644
index 0000000..f3c4ae4
--- /dev/null
+++ b/src/accounting/.dockerignore
@@ -0,0 +1,3 @@
+bin
+obj
+build \ No newline at end of file
diff --git a/src/accounting/.gitignore b/src/accounting/.gitignore
new file mode 100644
index 0000000..a297e63
--- /dev/null
+++ b/src/accounting/.gitignore
@@ -0,0 +1 @@
+proto \ No newline at end of file
diff --git a/src/accounting/Accounting.csproj b/src/accounting/Accounting.csproj
new file mode 100644
index 0000000..65a2bcc
--- /dev/null
+++ b/src/accounting/Accounting.csproj
@@ -0,0 +1,34 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <OutputType>Exe</OutputType>
+ <TargetFramework>net8.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
+ <Nullable>enable</Nullable>
+ <DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <PackageReference Include="Confluent.Kafka" Version="2.11.0" />
+ <PackageReference Include="EFCore.NamingConventions" Version="9.0.0" />
+ <PackageReference Include="Google.Protobuf" Version="3.31.1" />
+ <PackageReference Include="Grpc.Tools" Version="2.68.1">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
+ <PackageReference Include="Microsoft.Extensions.Logging" Version="9.0.7" />
+ <PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.22.1" />
+ <PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.4" />
+ <PackageReference Include="OpenTelemetry.AutoInstrumentation" Version="1.12.0" />
+ </ItemGroup>
+
+ <ItemGroup>
+ <!-- GrpcServices is 'none' so that we do not need to depend on the grpc nuget package, and we only need protobuf support. -->
+ <Protobuf Include="src\protos\demo.proto" GrpcServices="none" />
+ </ItemGroup>
+
+ <ItemGroup>
+ <Folder Include="src\protos\" />
+ </ItemGroup>
+
+</Project>
diff --git a/src/accounting/Accounting.sln b/src/accounting/Accounting.sln
new file mode 100644
index 0000000..819a398
--- /dev/null
+++ b/src/accounting/Accounting.sln
@@ -0,0 +1,25 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 17
+VisualStudioVersion = 17.9.34701.34
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Accounting", "Accounting.csproj", "{C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {6340CDDC-E917-4532-A056-5526E0A7BDDA}
+ EndGlobalSection
+EndGlobal
diff --git a/src/accounting/Consumer.cs b/src/accounting/Consumer.cs
new file mode 100644
index 0000000..89ede49
--- /dev/null
+++ b/src/accounting/Consumer.cs
@@ -0,0 +1,153 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using Oteldemo;
+using Microsoft.EntityFrameworkCore;
+using System.Diagnostics;
+
+namespace Accounting;
+
+internal class DBContext : DbContext
+{
+ public DbSet<OrderEntity> Orders { get; set; }
+ public DbSet<OrderItemEntity> CartItems { get; set; }
+ public DbSet<ShippingEntity> Shipping { get; set; }
+
+ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
+ {
+ var connectionString = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING");
+
+ optionsBuilder.UseNpgsql(connectionString).UseSnakeCaseNamingConvention();
+ }
+}
+
+
+internal class Consumer : IDisposable
+{
+ private const string TopicName = "orders";
+
+ private ILogger _logger;
+ private IConsumer<string, byte[]> _consumer;
+ private bool _isListening;
+ private DBContext? _dbContext;
+ private static readonly ActivitySource MyActivitySource = new("Accounting.Consumer");
+
+ public Consumer(ILogger<Consumer> logger)
+ {
+ _logger = logger;
+
+ var servers = Environment.GetEnvironmentVariable("KAFKA_ADDR")
+ ?? throw new ArgumentNullException("KAFKA_ADDR");
+
+ _consumer = BuildConsumer(servers);
+ _consumer.Subscribe(TopicName);
+
+ _logger.LogInformation($"Connecting to Kafka: {servers}");
+ _dbContext = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING") == null ? null : new DBContext();
+ }
+
+ public void StartListening()
+ {
+ _isListening = true;
+
+ try
+ {
+ while (_isListening)
+ {
+ try
+ {
+ using var activity = MyActivitySource.StartActivity("order-consumed", ActivityKind.Internal);
+ var consumeResult = _consumer.Consume();
+ ProcessMessage(consumeResult.Message);
+ }
+ catch (ConsumeException e)
+ {
+ _logger.LogError(e, "Consume error: {0}", e.Error.Reason);
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.LogInformation("Closing consumer");
+
+ _consumer.Close();
+ }
+ }
+
+ private void ProcessMessage(Message<string, byte[]> message)
+ {
+ try
+ {
+ var order = OrderResult.Parser.ParseFrom(message.Value);
+ Log.OrderReceivedMessage(_logger, order);
+
+ if (_dbContext == null)
+ {
+ return;
+ }
+
+ var orderEntity = new OrderEntity
+ {
+ Id = order.OrderId
+ };
+ _dbContext.Add(orderEntity);
+ foreach (var item in order.Items)
+ {
+ var orderItem = new OrderItemEntity
+ {
+ ItemCostCurrencyCode = item.Cost.CurrencyCode,
+ ItemCostUnits = item.Cost.Units,
+ ItemCostNanos = item.Cost.Nanos,
+ ProductId = item.Item.ProductId,
+ Quantity = item.Item.Quantity,
+ OrderId = order.OrderId
+ };
+
+ _dbContext.Add(orderItem);
+ }
+
+ var shipping = new ShippingEntity
+ {
+ ShippingTrackingId = order.ShippingTrackingId,
+ ShippingCostCurrencyCode = order.ShippingCost.CurrencyCode,
+ ShippingCostUnits = order.ShippingCost.Units,
+ ShippingCostNanos = order.ShippingCost.Nanos,
+ StreetAddress = order.ShippingAddress.StreetAddress,
+ City = order.ShippingAddress.City,
+ State = order.ShippingAddress.State,
+ Country = order.ShippingAddress.Country,
+ ZipCode = order.ShippingAddress.ZipCode,
+ OrderId = order.OrderId
+ };
+ _dbContext.Add(shipping);
+ _dbContext.SaveChanges();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Order parsing failed:");
+ }
+ }
+
+ private IConsumer<string, byte[]> BuildConsumer(string servers)
+ {
+ var conf = new ConsumerConfig
+ {
+ GroupId = $"accounting",
+ BootstrapServers = servers,
+ // https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example
+ AutoOffsetReset = AutoOffsetReset.Earliest,
+ EnableAutoCommit = true
+ };
+
+ return new ConsumerBuilder<string, byte[]>(conf)
+ .Build();
+ }
+
+ public void Dispose()
+ {
+ _isListening = false;
+ _consumer?.Dispose();
+ }
+}
diff --git a/src/accounting/Directory.Build.props b/src/accounting/Directory.Build.props
new file mode 100644
index 0000000..bfd1cab
--- /dev/null
+++ b/src/accounting/Directory.Build.props
@@ -0,0 +1,11 @@
+<Project>
+ <PropertyGroup>
+ <NuGetAudit>true</NuGetAudit>
+ <NuGetAuditMode>all</NuGetAuditMode>
+ <NuGetAuditLevel>low</NuGetAuditLevel>
+ </PropertyGroup>
+
+ <PropertyGroup Condition="'$(Configuration)'=='Release'">
+ <TreatWarningsAsErrors>true</TreatWarningsAsErrors>
+ </PropertyGroup>
+</Project>
diff --git a/src/accounting/Dockerfile b/src/accounting/Dockerfile
new file mode 100644
index 0000000..9509829
--- /dev/null
+++ b/src/accounting/Dockerfile
@@ -0,0 +1,36 @@
+# Copyright The OpenTelemetry Authors
+# SPDX-License-Identifier: Apache-2.0
+
+FROM --platform=${BUILDPLATFORM} mcr.microsoft.com/dotnet/sdk:8.0 AS builder
+ARG TARGETARCH
+ARG BUILD_CONFIGURATION=Release
+WORKDIR /src
+COPY ["/src/accounting/", "Accounting/"]
+COPY ["/pb/demo.proto", "Accounting/src/protos/demo.proto"]
+RUN dotnet restore "./Accounting/Accounting.csproj" -r linux-$TARGETARCH
+WORKDIR "/src/Accounting"
+
+RUN dotnet build "./Accounting.csproj" -r linux-$TARGETARCH -c $BUILD_CONFIGURATION -o /app/build
+
+# -----------------------------------------------------------------------------
+
+FROM builder AS publish
+ARG TARGETARCH
+ARG BUILD_CONFIGURATION=Release
+RUN dotnet publish "./Accounting.csproj" -r linux-$TARGETARCH -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
+
+# -----------------------------------------------------------------------------
+
+FROM mcr.microsoft.com/dotnet/aspnet:8.0
+USER app
+WORKDIR /app
+COPY --from=publish /app/publish .
+
+USER root
+RUN mkdir -p "/var/log/opentelemetry/dotnet"
+RUN chown app "/var/log/opentelemetry/dotnet"
+RUN chown app "/app/instrument.sh"
+USER app
+ENV OTEL_DOTNET_AUTO_TRACES_ADDITIONAL_SOURCES=Accounting.Consumer
+
+ENTRYPOINT ["./instrument.sh", "dotnet", "Accounting.dll"]
diff --git a/src/accounting/Entities.cs b/src/accounting/Entities.cs
new file mode 100644
index 0000000..849bd1f
--- /dev/null
+++ b/src/accounting/Entities.cs
@@ -0,0 +1,59 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+using Microsoft.EntityFrameworkCore;
+using System.ComponentModel.DataAnnotations.Schema;
+
+namespace Accounting;
+
+[Table("shipping")]
+[PrimaryKey(nameof(ShippingTrackingId))]
+internal class ShippingEntity
+{
+
+ public required string ShippingTrackingId { get; set; }
+
+ public required string ShippingCostCurrencyCode { get; set; }
+
+ public required long ShippingCostUnits { get; set; }
+
+ public required int ShippingCostNanos { get; set; }
+
+ public required string StreetAddress { get; set; }
+
+ public required string City { get; set; }
+
+ public required string State { get; set; }
+
+ public required string Country { get; set; }
+
+ public required string ZipCode { get; set; }
+
+ public required string OrderId { get; set; }
+}
+
+[Table("orderitem")]
+[PrimaryKey(nameof(ProductId), nameof(OrderId))]
+internal class OrderItemEntity
+{
+ public required string ItemCostCurrencyCode { get; set; }
+
+ public required long ItemCostUnits { get; set; }
+
+ public required int ItemCostNanos { get; set; }
+
+ public required string ProductId { get; set; }
+
+ public required int Quantity { get; set; }
+
+ public required string OrderId { get; set; }
+}
+
+[Table("order")]
+[PrimaryKey(nameof(Id))]
+internal class OrderEntity
+{
+ [Column("order_id")]
+ public required string Id { get; set; }
+
+}
diff --git a/src/accounting/Helpers.cs b/src/accounting/Helpers.cs
new file mode 100644
index 0000000..d167aae
--- /dev/null
+++ b/src/accounting/Helpers.cs
@@ -0,0 +1,34 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+using System.Collections;
+
+namespace Accounting
+{
+ internal static class Helpers
+ {
+ private static List<string> RelevantPrefixes = ["DOTNET_", "CORECLR_", "OTEL_", "KAFKA_"];
+
+ public static IEnumerable<DictionaryEntry> FilterRelevant(this IDictionary envs)
+ {
+ foreach (DictionaryEntry env in envs)
+ {
+ foreach (var prefix in RelevantPrefixes)
+ {
+ if (env.Key.ToString()?.StartsWith(prefix, StringComparison.InvariantCultureIgnoreCase) ?? false)
+ {
+ yield return env;
+ }
+ }
+ }
+ }
+
+ public static void OutputInOrder(this IEnumerable<DictionaryEntry> envs)
+ {
+ foreach (var env in envs.OrderBy(x => x.Key))
+ {
+ Console.WriteLine(env);
+ }
+ }
+ }
+}
diff --git a/src/accounting/Log.cs b/src/accounting/Log.cs
new file mode 100644
index 0000000..e18af0e
--- /dev/null
+++ b/src/accounting/Log.cs
@@ -0,0 +1,16 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+using Microsoft.Extensions.Logging;
+using Oteldemo;
+
+namespace Accounting
+{
+ internal static partial class Log
+ {
+ [LoggerMessage(
+ Level = LogLevel.Information,
+ Message = "Order details: {@OrderResult}.")]
+ public static partial void OrderReceivedMessage(ILogger logger, OrderResult orderResult);
+ }
+}
diff --git a/src/accounting/Program.cs b/src/accounting/Program.cs
new file mode 100644
index 0000000..54a5c44
--- /dev/null
+++ b/src/accounting/Program.cs
@@ -0,0 +1,24 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+using Accounting;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+
+Console.WriteLine("Accounting service started");
+
+Environment.GetEnvironmentVariables()
+ .FilterRelevant()
+ .OutputInOrder();
+
+var host = Host.CreateDefaultBuilder(args)
+ .ConfigureServices(services =>
+ {
+ services.AddSingleton<Consumer>();
+ })
+ .Build();
+
+var consumer = host.Services.GetRequiredService<Consumer>();
+consumer.StartListening();
+
+host.Run();
diff --git a/src/accounting/README.md b/src/accounting/README.md
new file mode 100644
index 0000000..cfaa25b
--- /dev/null
+++ b/src/accounting/README.md
@@ -0,0 +1,33 @@
+# Accounting Service
+
+This service consumes new orders from a Kafka topic.
+
+## Local Build
+
+To build the service binary, navigate to the root directory of the project and run:
+
+```sh
+make generate-protobuf
+```
+
+Navigate back to `src/accounting` and execute:
+
+```sh
+dotnet build
+```
+
+## Docker Build
+
+From the root directory, run:
+
+```sh
+docker compose build accounting
+```
+
+## Bump dependencies
+
+To bump all dependencies run in Package manager:
+
+```sh
+Update-Package -ProjectName Accounting
+```