summaryrefslogtreecommitdiff
path: root/src/checkout/main.go
diff options
context:
space:
mode:
authorSaumit <justsaumit@protonmail.com>2025-09-27 02:14:26 +0530
committerSaumit <justsaumit@protonmail.com>2025-09-27 02:14:26 +0530
commit82e03978b89938219958032efb1448cc76baa181 (patch)
tree626f3e54d52ecd49be0ed3bee30abacc0453d081 /src/checkout/main.go
Initial snapshot - OpenTelemetry demo 2.1.3 -f
Diffstat (limited to 'src/checkout/main.go')
-rw-r--r--src/checkout/main.go729
1 files changed, 729 insertions, 0 deletions
diff --git a/src/checkout/main.go b/src/checkout/main.go
new file mode 100644
index 0000000..f200865
--- /dev/null
+++ b/src/checkout/main.go
@@ -0,0 +1,729 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+package main
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log/slog"
+ "net"
+ "net/http"
+ "os"
+ "os/signal"
+ "strconv"
+ "sync"
+ "syscall"
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/log/global"
+ semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
+ "go.opentelemetry.io/otel/trace"
+
+ "github.com/IBM/sarama"
+ "github.com/google/uuid"
+ otelhooks "github.com/open-feature/go-sdk-contrib/hooks/open-telemetry/pkg"
+ flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg"
+ "github.com/open-feature/go-sdk/openfeature"
+
+ "go.opentelemetry.io/contrib/bridges/otelslog"
+ "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
+ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
+ "go.opentelemetry.io/contrib/instrumentation/runtime"
+ "go.opentelemetry.io/otel"
+ otelcodes "go.opentelemetry.io/otel/codes"
+ "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
+ "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
+ "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
+ "go.opentelemetry.io/otel/propagation"
+
+ sdklog "go.opentelemetry.io/otel/sdk/log"
+ sdkmetric "go.opentelemetry.io/otel/sdk/metric"
+ sdkresource "go.opentelemetry.io/otel/sdk/resource"
+ sdktrace "go.opentelemetry.io/otel/sdk/trace"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/health"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+
+ pb "github.com/open-telemetry/opentelemetry-demo/src/checkout/genproto/oteldemo"
+ "github.com/open-telemetry/opentelemetry-demo/src/checkout/kafka"
+ "github.com/open-telemetry/opentelemetry-demo/src/checkout/money"
+)
+
+//go:generate go install google.golang.org/protobuf/cmd/protoc-gen-go
+//go:generate go install google.golang.org/grpc/cmd/protoc-gen-go-grpc
+//go:generate protoc --go_out=./ --go-grpc_out=./ --proto_path=../../pb ../../pb/demo.proto
+
+var logger *slog.Logger
+var tracer trace.Tracer
+var resource *sdkresource.Resource
+var initResourcesOnce sync.Once
+
+func initResource() *sdkresource.Resource {
+ initResourcesOnce.Do(func() {
+ extraResources, _ := sdkresource.New(
+ context.Background(),
+ sdkresource.WithOS(),
+ sdkresource.WithProcess(),
+ sdkresource.WithContainer(),
+ sdkresource.WithHost(),
+ )
+ resource, _ = sdkresource.Merge(
+ sdkresource.Default(),
+ extraResources,
+ )
+ })
+ return resource
+}
+
+func initTracerProvider() *sdktrace.TracerProvider {
+ ctx := context.Background()
+
+ exporter, err := otlptracegrpc.New(ctx)
+ if err != nil {
+ logger.Error(fmt.Sprintf("new otlp trace grpc exporter failed: %v", err))
+ }
+ tp := sdktrace.NewTracerProvider(
+ sdktrace.WithBatcher(exporter),
+ sdktrace.WithResource(initResource()),
+ )
+ otel.SetTracerProvider(tp)
+ otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
+ return tp
+}
+
+func initMeterProvider() *sdkmetric.MeterProvider {
+ ctx := context.Background()
+
+ exporter, err := otlpmetricgrpc.New(ctx)
+ if err != nil {
+ logger.Error(fmt.Sprintf("new otlp metric grpc exporter failed: %v", err))
+ }
+
+ mp := sdkmetric.NewMeterProvider(
+ sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter)),
+ sdkmetric.WithResource(initResource()),
+ )
+ otel.SetMeterProvider(mp)
+ return mp
+}
+
+func initLoggerProvider() *sdklog.LoggerProvider {
+ ctx := context.Background()
+
+ logExporter, err := otlploggrpc.New(ctx)
+ if err != nil {
+ return nil
+ }
+
+ loggerProvider := sdklog.NewLoggerProvider(
+ sdklog.WithProcessor(sdklog.NewBatchProcessor(logExporter)),
+ )
+ global.SetLoggerProvider(loggerProvider)
+
+ return loggerProvider
+}
+
+type checkout struct {
+ productCatalogSvcAddr string
+ cartSvcAddr string
+ currencySvcAddr string
+ shippingSvcAddr string
+ emailSvcAddr string
+ paymentSvcAddr string
+ kafkaBrokerSvcAddr string
+ pb.UnimplementedCheckoutServiceServer
+ KafkaProducerClient sarama.AsyncProducer
+ shippingSvcClient pb.ShippingServiceClient
+ productCatalogSvcClient pb.ProductCatalogServiceClient
+ cartSvcClient pb.CartServiceClient
+ currencySvcClient pb.CurrencyServiceClient
+ emailSvcClient pb.EmailServiceClient
+ paymentSvcClient pb.PaymentServiceClient
+}
+
+func main() {
+ var port string
+ mustMapEnv(&port, "CHECKOUT_PORT")
+
+ tp := initTracerProvider()
+ defer func() {
+ if err := tp.Shutdown(context.Background()); err != nil {
+ logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
+ }
+ }()
+
+ mp := initMeterProvider()
+ defer func() {
+ if err := mp.Shutdown(context.Background()); err != nil {
+ logger.Error(fmt.Sprintf("Error shutting down meter provider: %v", err))
+ }
+ }()
+
+ lp := initLoggerProvider()
+ defer func() {
+ if err := lp.Shutdown(context.Background()); err != nil {
+ logger.Error(fmt.Sprintf("Error shutting down logger provider: %v", err))
+ }
+ }()
+
+ // this *must* be called after the logger provider is initialized
+ // otherwise the Sarama producer in kafka/producer.go will not be
+ // able to log properly
+ logger = otelslog.NewLogger("checkout")
+ slog.SetDefault(logger)
+
+ err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second))
+ if err != nil {
+ logger.Error((err.Error()))
+ }
+
+ provider, err := flagd.NewProvider()
+ if err != nil {
+ logger.Error(fmt.Sprintf("Error creating flagd provider: %v", err))
+ }
+
+ openfeature.SetProvider(provider)
+ openfeature.AddHooks(otelhooks.NewTracesHook())
+
+ tracer = tp.Tracer("checkout")
+
+ svc := new(checkout)
+
+ mustMapEnv(&svc.shippingSvcAddr, "SHIPPING_ADDR")
+ c := mustCreateClient(svc.shippingSvcAddr)
+ svc.shippingSvcClient = pb.NewShippingServiceClient(c)
+ defer c.Close()
+
+ mustMapEnv(&svc.productCatalogSvcAddr, "PRODUCT_CATALOG_ADDR")
+ c = mustCreateClient(svc.productCatalogSvcAddr)
+ svc.productCatalogSvcClient = pb.NewProductCatalogServiceClient(c)
+ defer c.Close()
+
+ mustMapEnv(&svc.cartSvcAddr, "CART_ADDR")
+ c = mustCreateClient(svc.cartSvcAddr)
+ svc.cartSvcClient = pb.NewCartServiceClient(c)
+ defer c.Close()
+
+ mustMapEnv(&svc.currencySvcAddr, "CURRENCY_ADDR")
+ c = mustCreateClient(svc.currencySvcAddr)
+ svc.currencySvcClient = pb.NewCurrencyServiceClient(c)
+ defer c.Close()
+
+ mustMapEnv(&svc.emailSvcAddr, "EMAIL_ADDR")
+ c = mustCreateClient(svc.emailSvcAddr)
+ svc.emailSvcClient = pb.NewEmailServiceClient(c)
+ defer c.Close()
+
+ mustMapEnv(&svc.paymentSvcAddr, "PAYMENT_ADDR")
+ c = mustCreateClient(svc.paymentSvcAddr)
+ svc.paymentSvcClient = pb.NewPaymentServiceClient(c)
+ defer c.Close()
+
+ svc.kafkaBrokerSvcAddr = os.Getenv("KAFKA_ADDR")
+
+ if svc.kafkaBrokerSvcAddr != "" {
+ svc.KafkaProducerClient, err = kafka.CreateKafkaProducer([]string{svc.kafkaBrokerSvcAddr}, logger)
+ if err != nil {
+ logger.Error(err.Error())
+ }
+ }
+
+ logger.Info(fmt.Sprintf("service config: %+v", svc))
+
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
+ if err != nil {
+ logger.Error(err.Error())
+ }
+
+ var srv = grpc.NewServer(
+ grpc.StatsHandler(otelgrpc.NewServerHandler()),
+ )
+ pb.RegisterCheckoutServiceServer(srv, svc)
+
+ healthcheck := health.NewServer()
+ healthpb.RegisterHealthServer(srv, healthcheck)
+ logger.Info(fmt.Sprintf("starting to listen on tcp: %q", lis.Addr().String()))
+ err = srv.Serve(lis)
+ logger.Error(err.Error())
+
+ ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
+ defer cancel()
+
+ go func() {
+ if err := srv.Serve(lis); err != nil {
+ logger.Error(err.Error())
+ }
+ }()
+
+ <-ctx.Done()
+
+ srv.GracefulStop()
+ logger.Info("Checkout gRPC server stopped")
+}
+
+func mustMapEnv(target *string, envKey string) {
+ v := os.Getenv(envKey)
+ if v == "" {
+ panic(fmt.Sprintf("environment variable %q not set", envKey))
+ }
+ *target = v
+}
+
+func (cs *checkout) Check(ctx context.Context, req *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
+ return &healthpb.HealthCheckResponse{Status: healthpb.HealthCheckResponse_SERVING}, nil
+}
+
+func (cs *checkout) Watch(req *healthpb.HealthCheckRequest, ws healthpb.Health_WatchServer) error {
+ return status.Errorf(codes.Unimplemented, "health check via Watch not implemented")
+}
+
+func (cs *checkout) PlaceOrder(ctx context.Context, req *pb.PlaceOrderRequest) (*pb.PlaceOrderResponse, error) {
+ span := trace.SpanFromContext(ctx)
+ span.SetAttributes(
+ attribute.String("app.user.id", req.UserId),
+ attribute.String("app.user.currency", req.UserCurrency),
+ )
+ logger.LogAttrs(
+ ctx,
+ slog.LevelInfo, "[PlaceOrder]",
+ slog.String("user_id", req.UserId),
+ slog.String("user_currency", req.UserCurrency),
+ )
+
+ var err error
+ defer func() {
+ if err != nil {
+ span.AddEvent("error", trace.WithAttributes(semconv.ExceptionMessageKey.String(err.Error())))
+ }
+ }()
+
+ orderID, err := uuid.NewUUID()
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "failed to generate order uuid")
+ }
+
+ prep, err := cs.prepareOrderItemsAndShippingQuoteFromCart(ctx, req.UserId, req.UserCurrency, req.Address)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, err.Error())
+ }
+ span.AddEvent("prepared")
+
+ total := &pb.Money{CurrencyCode: req.UserCurrency,
+ Units: 0,
+ Nanos: 0}
+ total = money.Must(money.Sum(total, prep.shippingCostLocalized))
+ for _, it := range prep.orderItems {
+ multPrice := money.MultiplySlow(it.Cost, uint32(it.GetItem().GetQuantity()))
+ total = money.Must(money.Sum(total, multPrice))
+ }
+
+ txID, err := cs.chargeCard(ctx, total, req.CreditCard)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "failed to charge card: %+v", err)
+ }
+
+ span.AddEvent("charged",
+ trace.WithAttributes(attribute.String("app.payment.transaction.id", txID)))
+ logger.LogAttrs(
+ ctx,
+ slog.LevelInfo, "payment went through",
+ slog.String("transaction_id", txID),
+ )
+
+ shippingTrackingID, err := cs.shipOrder(ctx, req.Address, prep.cartItems)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "shipping error: %+v", err)
+ }
+ shippingTrackingAttribute := attribute.String("app.shipping.tracking.id", shippingTrackingID)
+ span.AddEvent("shipped", trace.WithAttributes(shippingTrackingAttribute))
+
+ _ = cs.emptyUserCart(ctx, req.UserId)
+
+ orderResult := &pb.OrderResult{
+ OrderId: orderID.String(),
+ ShippingTrackingId: shippingTrackingID,
+ ShippingCost: prep.shippingCostLocalized,
+ ShippingAddress: req.Address,
+ Items: prep.orderItems,
+ }
+
+ shippingCostFloat, _ := strconv.ParseFloat(fmt.Sprintf("%d.%02d", prep.shippingCostLocalized.GetUnits(), prep.shippingCostLocalized.GetNanos()/1000000000), 64)
+ totalPriceFloat, _ := strconv.ParseFloat(fmt.Sprintf("%d.%02d", total.GetUnits(), total.GetNanos()/1000000000), 64)
+
+ span.SetAttributes(
+ attribute.String("app.order.id", orderID.String()),
+ attribute.Float64("app.shipping.amount", shippingCostFloat),
+ attribute.Float64("app.order.amount", totalPriceFloat),
+ attribute.Int("app.order.items.count", len(prep.orderItems)),
+ shippingTrackingAttribute,
+ )
+ logger.LogAttrs(
+ ctx,
+ slog.LevelInfo, "order placed",
+ slog.String("app.order.id", orderID.String()),
+ slog.Float64("app.shipping.amount", shippingCostFloat),
+ slog.Float64("app.order.amount", totalPriceFloat),
+ slog.Int("app.order.items.count", len(prep.orderItems)),
+ slog.String("app.shipping.tracking.id", shippingTrackingID),
+ )
+
+ if err := cs.sendOrderConfirmation(ctx, req.Email, orderResult); err != nil {
+ logger.Warn(fmt.Sprintf("failed to send order confirmation to %q: %+v", req.Email, err))
+ } else {
+ logger.Info(fmt.Sprintf("order confirmation email sent to %q", req.Email))
+ }
+
+ // send to kafka only if kafka broker address is set
+ if cs.kafkaBrokerSvcAddr != "" {
+ logger.Info("sending to postProcessor")
+ cs.sendToPostProcessor(ctx, orderResult)
+ }
+
+ resp := &pb.PlaceOrderResponse{Order: orderResult}
+ return resp, nil
+}
+
+type orderPrep struct {
+ orderItems []*pb.OrderItem
+ cartItems []*pb.CartItem
+ shippingCostLocalized *pb.Money
+}
+
+func (cs *checkout) prepareOrderItemsAndShippingQuoteFromCart(ctx context.Context, userID, userCurrency string, address *pb.Address) (orderPrep, error) {
+
+ ctx, span := tracer.Start(ctx, "prepareOrderItemsAndShippingQuoteFromCart")
+ defer span.End()
+
+ var out orderPrep
+ cartItems, err := cs.getUserCart(ctx, userID)
+ if err != nil {
+ return out, fmt.Errorf("cart failure: %+v", err)
+ }
+ orderItems, err := cs.prepOrderItems(ctx, cartItems, userCurrency)
+ if err != nil {
+ return out, fmt.Errorf("failed to prepare order: %+v", err)
+ }
+ shippingUSD, err := cs.quoteShipping(ctx, address, cartItems)
+ if err != nil {
+ return out, fmt.Errorf("shipping quote failure: %+v", err)
+ }
+ shippingPrice, err := cs.convertCurrency(ctx, shippingUSD, userCurrency)
+ if err != nil {
+ return out, fmt.Errorf("failed to convert shipping cost to currency: %+v", err)
+ }
+
+ out.shippingCostLocalized = shippingPrice
+ out.cartItems = cartItems
+ out.orderItems = orderItems
+
+ var totalCart int32
+ for _, ci := range cartItems {
+ totalCart += ci.Quantity
+ }
+ shippingCostFloat, _ := strconv.ParseFloat(fmt.Sprintf("%d.%02d", shippingPrice.GetUnits(), shippingPrice.GetNanos()/1000000000), 64)
+
+ span.SetAttributes(
+ attribute.Float64("app.shipping.amount", shippingCostFloat),
+ attribute.Int("app.cart.items.count", int(totalCart)),
+ attribute.Int("app.order.items.count", len(orderItems)),
+ )
+ return out, nil
+}
+
+func mustCreateClient(svcAddr string) *grpc.ClientConn {
+ c, err := grpc.NewClient(svcAddr,
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
+ )
+ if err != nil {
+ logger.Error(fmt.Sprintf("could not connect to %s service, err: %+v", svcAddr, err))
+ }
+
+ return c
+}
+
+func (cs *checkout) quoteShipping(ctx context.Context, address *pb.Address, items []*pb.CartItem) (*pb.Money, error) {
+ quotePayload, err := json.Marshal(map[string]interface{}{
+ "address": address,
+ "items": items,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal ship order request: %+v", err)
+ }
+
+ resp, err := otelhttp.Post(ctx, cs.shippingSvcAddr+"/get-quote", "application/json", bytes.NewBuffer(quotePayload))
+ if err != nil {
+ return nil, fmt.Errorf("failed POST to shipping service: %+v", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("failed POST to email service: expected 200, got %d", resp.StatusCode)
+ }
+
+ shippingQuoteBytes, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read shipping quote response: %+v", err)
+ }
+
+ var quoteResp struct {
+ CostUsd *pb.Money `json:"cost_usd"`
+ }
+ if err := json.Unmarshal(shippingQuoteBytes, &quoteResp); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal shipping quote: %+v", err)
+ }
+ if quoteResp.CostUsd == nil {
+ return nil, fmt.Errorf("shipping quote missing cost_usd field")
+ }
+
+ return quoteResp.CostUsd, nil
+}
+
+func (cs *checkout) getUserCart(ctx context.Context, userID string) ([]*pb.CartItem, error) {
+ cart, err := cs.cartSvcClient.GetCart(ctx, &pb.GetCartRequest{UserId: userID})
+ if err != nil {
+ return nil, fmt.Errorf("failed to get user cart during checkout: %+v", err)
+ }
+ return cart.GetItems(), nil
+}
+
+func (cs *checkout) emptyUserCart(ctx context.Context, userID string) error {
+ if _, err := cs.cartSvcClient.EmptyCart(ctx, &pb.EmptyCartRequest{UserId: userID}); err != nil {
+ return fmt.Errorf("failed to empty user cart during checkout: %+v", err)
+ }
+ return nil
+}
+
+func (cs *checkout) prepOrderItems(ctx context.Context, items []*pb.CartItem, userCurrency string) ([]*pb.OrderItem, error) {
+ out := make([]*pb.OrderItem, len(items))
+
+ for i, item := range items {
+ product, err := cs.productCatalogSvcClient.GetProduct(ctx, &pb.GetProductRequest{Id: item.GetProductId()})
+ if err != nil {
+ return nil, fmt.Errorf("failed to get product #%q", item.GetProductId())
+ }
+ price, err := cs.convertCurrency(ctx, product.GetPriceUsd(), userCurrency)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert price of %q to %s", item.GetProductId(), userCurrency)
+ }
+ out[i] = &pb.OrderItem{
+ Item: item,
+ Cost: price}
+ }
+ return out, nil
+}
+
+func (cs *checkout) convertCurrency(ctx context.Context, from *pb.Money, toCurrency string) (*pb.Money, error) {
+ result, err := cs.currencySvcClient.Convert(ctx, &pb.CurrencyConversionRequest{
+ From: from,
+ ToCode: toCurrency})
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert currency: %+v", err)
+ }
+ return result, err
+}
+
+func (cs *checkout) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) {
+ paymentService := cs.paymentSvcClient
+ if cs.isFeatureFlagEnabled(ctx, "paymentUnreachable") {
+ badAddress := "badAddress:50051"
+ c := mustCreateClient(badAddress)
+ paymentService = pb.NewPaymentServiceClient(c)
+ }
+
+ paymentResp, err := paymentService.Charge(ctx, &pb.ChargeRequest{
+ Amount: amount,
+ CreditCard: paymentInfo})
+ if err != nil {
+ return "", fmt.Errorf("could not charge the card: %+v", err)
+ }
+ return paymentResp.GetTransactionId(), nil
+}
+
+func (cs *checkout) sendOrderConfirmation(ctx context.Context, email string, order *pb.OrderResult) error {
+ emailPayload, err := json.Marshal(map[string]interface{}{
+ "email": email,
+ "order": order,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to marshal order to JSON: %+v", err)
+ }
+
+ resp, err := otelhttp.Post(ctx, cs.emailSvcAddr+"/send_order_confirmation", "application/json", bytes.NewBuffer(emailPayload))
+ if err != nil {
+ return fmt.Errorf("failed POST to email service: %+v", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("failed POST to email service: expected 200, got %d", resp.StatusCode)
+ }
+
+ return err
+}
+
+func (cs *checkout) shipOrder(ctx context.Context, address *pb.Address, items []*pb.CartItem) (string, error) {
+ shipPayload, err := json.Marshal(map[string]interface{}{
+ "address": address,
+ "items": items,
+ })
+ if err != nil {
+ return "", fmt.Errorf("failed to marshal ship order request: %+v", err)
+ }
+
+ resp, err := otelhttp.Post(ctx, cs.shippingSvcAddr+"/ship-order", "application/json", bytes.NewBuffer(shipPayload))
+ if err != nil {
+ return "", fmt.Errorf("failed POST to shipping service: %+v", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return "", fmt.Errorf("failed POST to email service: expected 200, got %d", resp.StatusCode)
+ }
+
+ trackingRespBytes, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return "", fmt.Errorf("failed to read ship order response: %+v", err)
+ }
+
+ var shipResp struct {
+ TrackingID string `json:"tracking_id"`
+ }
+ if err := json.Unmarshal(trackingRespBytes, &shipResp); err != nil {
+ return "", fmt.Errorf("failed to unmarshal ship order response: %+v", err)
+ }
+ if shipResp.TrackingID == "" {
+ return "", fmt.Errorf("ship order response missing tracking_id field")
+ }
+
+ return shipResp.TrackingID, nil
+}
+
+func (cs *checkout) sendToPostProcessor(ctx context.Context, result *pb.OrderResult) {
+ message, err := proto.Marshal(result)
+ if err != nil {
+ logger.Error(fmt.Sprintf("Failed to marshal message to protobuf: %+v", err))
+ return
+ }
+
+ msg := sarama.ProducerMessage{
+ Topic: kafka.Topic,
+ Value: sarama.ByteEncoder(message),
+ }
+
+ // Inject tracing info into message
+ span := createProducerSpan(ctx, &msg)
+ defer span.End()
+
+ // Send message and handle response
+ startTime := time.Now()
+ select {
+ case cs.KafkaProducerClient.Input() <- &msg:
+ select {
+ case successMsg := <-cs.KafkaProducerClient.Successes():
+ span.SetAttributes(
+ attribute.Bool("messaging.kafka.producer.success", true),
+ attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
+ attribute.KeyValue(semconv.MessagingKafkaMessageOffset(int(successMsg.Offset))),
+ )
+ logger.Info(fmt.Sprintf("Successful to write message. offset: %v, duration: %v", successMsg.Offset, time.Since(startTime)))
+ case errMsg := <-cs.KafkaProducerClient.Errors():
+ span.SetAttributes(
+ attribute.Bool("messaging.kafka.producer.success", false),
+ attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
+ )
+ span.SetStatus(otelcodes.Error, errMsg.Err.Error())
+ logger.Error(fmt.Sprintf("Failed to write message: %v", errMsg.Err))
+ case <-ctx.Done():
+ span.SetAttributes(
+ attribute.Bool("messaging.kafka.producer.success", false),
+ attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
+ )
+ span.SetStatus(otelcodes.Error, "Context cancelled: "+ctx.Err().Error())
+ logger.Warn(fmt.Sprintf("Context canceled before success message received: %v", ctx.Err()))
+ }
+ case <-ctx.Done():
+ span.SetAttributes(
+ attribute.Bool("messaging.kafka.producer.success", false),
+ attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
+ )
+ span.SetStatus(otelcodes.Error, "Failed to send: "+ctx.Err().Error())
+ logger.Error(fmt.Sprintf("Failed to send message to Kafka within context deadline: %v", ctx.Err()))
+ return
+ }
+
+ ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems")
+ if ffValue > 0 {
+ logger.Info("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.")
+ for i := 0; i < ffValue; i++ {
+ go func(i int) {
+ cs.KafkaProducerClient.Input() <- &msg
+ _ = <-cs.KafkaProducerClient.Successes()
+ }(i)
+ }
+ logger.Info(fmt.Sprintf("Done with #%d messages for overload simulation.", ffValue))
+ }
+}
+
+func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span {
+ spanContext, span := tracer.Start(
+ ctx,
+ fmt.Sprintf("%s publish", msg.Topic),
+ trace.WithSpanKind(trace.SpanKindProducer),
+ trace.WithAttributes(
+ semconv.PeerService("kafka"),
+ semconv.NetworkTransportTCP,
+ semconv.MessagingSystemKafka,
+ semconv.MessagingDestinationName(msg.Topic),
+ semconv.MessagingOperationPublish,
+ semconv.MessagingKafkaDestinationPartition(int(msg.Partition)),
+ ),
+ )
+
+ carrier := propagation.MapCarrier{}
+ propagator := otel.GetTextMapPropagator()
+ propagator.Inject(spanContext, carrier)
+
+ for key, value := range carrier {
+ msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)})
+ }
+
+ return span
+}
+
+func (cs *checkout) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool {
+ client := openfeature.NewClient("checkout")
+
+ // Default value is set to false, but you could also make this a parameter.
+ featureEnabled, _ := client.BooleanValue(
+ ctx,
+ featureFlagName,
+ false,
+ openfeature.EvaluationContext{},
+ )
+
+ return featureEnabled
+}
+
+func (cs *checkout) getIntFeatureFlag(ctx context.Context, featureFlagName string) int {
+ client := openfeature.NewClient("checkout")
+
+ // Default value is set to 0, but you could also make this a parameter.
+ featureFlagValue, _ := client.IntValue(
+ ctx,
+ featureFlagName,
+ 0,
+ openfeature.EvaluationContext{},
+ )
+
+ return int(featureFlagValue)
+}