summaryrefslogtreecommitdiff
path: root/src/product-catalog/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/product-catalog/main.go')
-rw-r--r--src/product-catalog/main.go401
1 files changed, 401 insertions, 0 deletions
diff --git a/src/product-catalog/main.go b/src/product-catalog/main.go
new file mode 100644
index 0000000..fa5d7f0
--- /dev/null
+++ b/src/product-catalog/main.go
@@ -0,0 +1,401 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+package main
+
+//go:generate go install google.golang.org/protobuf/cmd/protoc-gen-go
+//go:generate go install google.golang.org/grpc/cmd/protoc-gen-go-grpc
+//go:generate protoc --go_out=./ --go-grpc_out=./ --proto_path=../../pb ../../pb/demo.proto
+
+import (
+ "context"
+ "fmt"
+ "io/fs"
+ "log/slog"
+ "net"
+ "os"
+ "os/signal"
+ "strconv"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "go.opentelemetry.io/contrib/bridges/otelslog"
+ "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
+ "go.opentelemetry.io/contrib/instrumentation/runtime"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ otelcodes "go.opentelemetry.io/otel/codes"
+ "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
+ "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
+ "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
+ "go.opentelemetry.io/otel/log/global"
+ "go.opentelemetry.io/otel/propagation"
+ sdklog "go.opentelemetry.io/otel/sdk/log"
+ sdkmetric "go.opentelemetry.io/otel/sdk/metric"
+ sdkresource "go.opentelemetry.io/otel/sdk/resource"
+ sdktrace "go.opentelemetry.io/otel/sdk/trace"
+ "go.opentelemetry.io/otel/trace"
+
+ otelhooks "github.com/open-feature/go-sdk-contrib/hooks/open-telemetry/pkg"
+ flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg"
+ "github.com/open-feature/go-sdk/openfeature"
+ pb "github.com/opentelemetry/opentelemetry-demo/src/product-catalog/genproto/oteldemo"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/health"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/grpc/reflection"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/encoding/protojson"
+)
+
+var (
+ logger *slog.Logger
+ catalog []*pb.Product
+ resource *sdkresource.Resource
+ initResourcesOnce sync.Once
+)
+
+const DEFAULT_RELOAD_INTERVAL = 10
+
+func init() {
+ logger = otelslog.NewLogger("product-catalog")
+
+ loadProductCatalog()
+}
+
+func initResource() *sdkresource.Resource {
+ initResourcesOnce.Do(func() {
+ extraResources, _ := sdkresource.New(
+ context.Background(),
+ sdkresource.WithOS(),
+ sdkresource.WithProcess(),
+ sdkresource.WithContainer(),
+ sdkresource.WithHost(),
+ )
+ resource, _ = sdkresource.Merge(
+ sdkresource.Default(),
+ extraResources,
+ )
+ })
+ return resource
+}
+
+func initTracerProvider() *sdktrace.TracerProvider {
+ ctx := context.Background()
+
+ exporter, err := otlptracegrpc.New(ctx)
+ if err != nil {
+ logger.Error(fmt.Sprintf("OTLP Trace gRPC Creation: %v", err))
+
+ }
+ tp := sdktrace.NewTracerProvider(
+ sdktrace.WithBatcher(exporter),
+ sdktrace.WithResource(initResource()),
+ )
+ otel.SetTracerProvider(tp)
+ otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
+ return tp
+}
+
+func initMeterProvider() *sdkmetric.MeterProvider {
+ ctx := context.Background()
+
+ exporter, err := otlpmetricgrpc.New(ctx)
+ if err != nil {
+ logger.Error(fmt.Sprintf("new otlp metric grpc exporter failed: %v", err))
+ }
+
+ mp := sdkmetric.NewMeterProvider(
+ sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter)),
+ sdkmetric.WithResource(initResource()),
+ )
+ otel.SetMeterProvider(mp)
+ return mp
+}
+
+func initLoggerProvider() *sdklog.LoggerProvider {
+ ctx := context.Background()
+
+ logExporter, err := otlploggrpc.New(ctx)
+ if err != nil {
+ return nil
+ }
+
+ loggerProvider := sdklog.NewLoggerProvider(
+ sdklog.WithProcessor(sdklog.NewBatchProcessor(logExporter)),
+ )
+ global.SetLoggerProvider(loggerProvider)
+
+ return loggerProvider
+}
+
+func main() {
+ lp := initLoggerProvider()
+ defer func() {
+ if err := lp.Shutdown(context.Background()); err != nil {
+ logger.Error(fmt.Sprintf("Logger Provider Shutdown: %v", err))
+ }
+ logger.Info("Shutdown logger provider")
+ }()
+
+ tp := initTracerProvider()
+ defer func() {
+ if err := tp.Shutdown(context.Background()); err != nil {
+ logger.Error(fmt.Sprintf("Tracer Provider Shutdown: %v", err))
+ }
+ logger.Info("Shutdown tracer provider")
+ }()
+
+ mp := initMeterProvider()
+ defer func() {
+ if err := mp.Shutdown(context.Background()); err != nil {
+ logger.Error(fmt.Sprintf("Error shutting down meter provider: %v", err))
+ }
+ logger.Info("Shutdown meter provider")
+ }()
+ openfeature.AddHooks(otelhooks.NewTracesHook())
+ provider, err := flagd.NewProvider()
+ if err != nil {
+ logger.Error(err.Error())
+ }
+ err = openfeature.SetProvider(provider)
+ if err != nil {
+ logger.Error(err.Error())
+ }
+
+ err = runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second))
+ if err != nil {
+ logger.Error(err.Error())
+ }
+
+ svc := &productCatalog{}
+ var port string
+ mustMapEnv(&port, "PRODUCT_CATALOG_PORT")
+
+ logger.Info(fmt.Sprintf("Product Catalog gRPC server started on port: %s", port))
+
+ ln, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
+ if err != nil {
+ logger.Error(fmt.Sprintf("TCP Listen: %v", err))
+ }
+
+ srv := grpc.NewServer(
+ grpc.StatsHandler(otelgrpc.NewServerHandler()),
+ )
+
+ reflection.Register(srv)
+
+ pb.RegisterProductCatalogServiceServer(srv, svc)
+
+ healthcheck := health.NewServer()
+ healthpb.RegisterHealthServer(srv, healthcheck)
+
+ ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
+ defer cancel()
+
+ go func() {
+ if err := srv.Serve(ln); err != nil {
+ logger.Error(fmt.Sprintf("Failed to serve gRPC server, err: %v", err))
+ }
+ }()
+
+ <-ctx.Done()
+
+ srv.GracefulStop()
+ logger.Info("Product Catalog gRPC server stopped")
+}
+
+type productCatalog struct {
+ pb.UnimplementedProductCatalogServiceServer
+}
+
+func loadProductCatalog() {
+ logger.Info("Loading Product Catalog...")
+ var err error
+ catalog, err = readProductFiles()
+ if err != nil {
+ logger.Error(fmt.Sprintf("Error reading product files: %v\n", err))
+ os.Exit(1)
+ }
+
+ // Default reload interval is 10 seconds
+ interval := DEFAULT_RELOAD_INTERVAL
+ si := os.Getenv("PRODUCT_CATALOG_RELOAD_INTERVAL")
+ if si != "" {
+ interval, _ = strconv.Atoi(si)
+ if interval <= 0 {
+ interval = DEFAULT_RELOAD_INTERVAL
+ }
+ }
+ logger.Info(fmt.Sprintf("Product Catalog reload interval: %d", interval))
+
+ ticker := time.NewTicker(time.Duration(interval) * time.Second)
+
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ logger.Info("Reloading Product Catalog...")
+ catalog, err = readProductFiles()
+ if err != nil {
+ logger.Error(fmt.Sprintf("Error reading product files: %v", err))
+ continue
+ }
+ }
+ }
+ }()
+}
+
+func readProductFiles() ([]*pb.Product, error) {
+
+ // find all .json files in the products directory
+ entries, err := os.ReadDir("./products")
+ if err != nil {
+ return nil, err
+ }
+
+ jsonFiles := make([]fs.FileInfo, 0, len(entries))
+ for _, entry := range entries {
+ if strings.HasSuffix(entry.Name(), ".json") {
+ info, err := entry.Info()
+ if err != nil {
+ return nil, err
+ }
+ jsonFiles = append(jsonFiles, info)
+ }
+ }
+
+ // read the contents of each .json file and unmarshal into a ListProductsResponse
+ // then append the products to the catalog
+ var products []*pb.Product
+ for _, f := range jsonFiles {
+ jsonData, err := os.ReadFile("./products/" + f.Name())
+ if err != nil {
+ return nil, err
+ }
+
+ var res pb.ListProductsResponse
+ if err := protojson.Unmarshal(jsonData, &res); err != nil {
+ return nil, err
+ }
+
+ products = append(products, res.Products...)
+ }
+
+ logger.LogAttrs(
+ context.Background(),
+ slog.LevelInfo,
+ fmt.Sprintf("Loaded %d products\n", len(products)),
+ slog.Int("products", len(products)),
+ )
+
+ return products, nil
+}
+
+func mustMapEnv(target *string, key string) {
+ value, present := os.LookupEnv(key)
+ if !present {
+ logger.Error(fmt.Sprintf("Environment Variable Not Set: %q", key))
+ }
+ *target = value
+}
+
+func (p *productCatalog) Check(ctx context.Context, req *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
+ return &healthpb.HealthCheckResponse{Status: healthpb.HealthCheckResponse_SERVING}, nil
+}
+
+func (p *productCatalog) Watch(req *healthpb.HealthCheckRequest, ws healthpb.Health_WatchServer) error {
+ return status.Errorf(codes.Unimplemented, "health check via Watch not implemented")
+}
+
+func (p *productCatalog) ListProducts(ctx context.Context, req *pb.Empty) (*pb.ListProductsResponse, error) {
+ span := trace.SpanFromContext(ctx)
+
+ span.SetAttributes(
+ attribute.Int("app.products.count", len(catalog)),
+ )
+ return &pb.ListProductsResponse{Products: catalog}, nil
+}
+
+func (p *productCatalog) GetProduct(ctx context.Context, req *pb.GetProductRequest) (*pb.Product, error) {
+ span := trace.SpanFromContext(ctx)
+ span.SetAttributes(
+ attribute.String("app.product.id", req.Id),
+ )
+
+ // GetProduct will fail on a specific product when feature flag is enabled
+ if p.checkProductFailure(ctx, req.Id) {
+ msg := "Error: Product Catalog Fail Feature Flag Enabled"
+ span.SetStatus(otelcodes.Error, msg)
+ span.AddEvent(msg)
+ return nil, status.Errorf(codes.Internal, msg)
+ }
+
+ var found *pb.Product
+ for _, product := range catalog {
+ if req.Id == product.Id {
+ found = product
+ break
+ }
+ }
+
+ if found == nil {
+ msg := fmt.Sprintf("Product Not Found: %s", req.Id)
+ span.SetStatus(otelcodes.Error, msg)
+ span.AddEvent(msg)
+ return nil, status.Errorf(codes.NotFound, msg)
+ }
+
+ span.AddEvent("Product Found")
+ span.SetAttributes(
+ attribute.String("app.product.id", req.Id),
+ attribute.String("app.product.name", found.Name),
+ )
+
+ logger.LogAttrs(
+ ctx,
+ slog.LevelInfo, "Product Found",
+ slog.String("app.product.name", found.Name),
+ slog.String("app.product.id", req.Id),
+ )
+
+ return found, nil
+}
+
+func (p *productCatalog) SearchProducts(ctx context.Context, req *pb.SearchProductsRequest) (*pb.SearchProductsResponse, error) {
+ span := trace.SpanFromContext(ctx)
+
+ var result []*pb.Product
+ for _, product := range catalog {
+ if strings.Contains(strings.ToLower(product.Name), strings.ToLower(req.Query)) ||
+ strings.Contains(strings.ToLower(product.Description), strings.ToLower(req.Query)) {
+ result = append(result, product)
+ }
+ }
+ span.SetAttributes(
+ attribute.Int("app.products_search.count", len(result)),
+ )
+ return &pb.SearchProductsResponse{Results: result}, nil
+}
+
+func (p *productCatalog) checkProductFailure(ctx context.Context, id string) bool {
+ if id != "OLJCESPC7Z" {
+ return false
+ }
+
+ client := openfeature.NewClient("productCatalog")
+ failureEnabled, _ := client.BooleanValue(
+ ctx, "productCatalogFailure", false, openfeature.EvaluationContext{},
+ )
+ return failureEnabled
+}
+
+func createClient(ctx context.Context, svcAddr string) (*grpc.ClientConn, error) {
+ return grpc.DialContext(ctx, svcAddr,
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
+ )
+}