summaryrefslogtreecommitdiff
path: root/src/recommendation/recommendation_server.py
diff options
context:
space:
mode:
authorSaumit <justsaumit@protonmail.com>2025-09-27 02:14:26 +0530
committerSaumit <justsaumit@protonmail.com>2025-09-27 02:14:26 +0530
commit82e03978b89938219958032efb1448cc76baa181 (patch)
tree626f3e54d52ecd49be0ed3bee30abacc0453d081 /src/recommendation/recommendation_server.py
Initial snapshot - OpenTelemetry demo 2.1.3 -f
Diffstat (limited to 'src/recommendation/recommendation_server.py')
-rw-r--r--src/recommendation/recommendation_server.py173
1 files changed, 173 insertions, 0 deletions
diff --git a/src/recommendation/recommendation_server.py b/src/recommendation/recommendation_server.py
new file mode 100644
index 0000000..df681bf
--- /dev/null
+++ b/src/recommendation/recommendation_server.py
@@ -0,0 +1,173 @@
+#!/usr/bin/python
+
+# Copyright The OpenTelemetry Authors
+# SPDX-License-Identifier: Apache-2.0
+
+
+# Python
+import os
+import random
+from concurrent import futures
+
+# Pip
+import grpc
+from opentelemetry import trace, metrics
+from opentelemetry._logs import set_logger_provider
+from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
+ OTLPLogExporter,
+)
+from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
+from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
+from opentelemetry.sdk.resources import Resource
+
+from openfeature import api
+from openfeature.contrib.provider.flagd import FlagdProvider
+
+from openfeature.contrib.hook.opentelemetry import TracingHook
+
+# Local
+import logging
+import demo_pb2
+import demo_pb2_grpc
+from grpc_health.v1 import health_pb2
+from grpc_health.v1 import health_pb2_grpc
+
+from metrics import (
+ init_metrics
+)
+
+cached_ids = []
+first_run = True
+
+class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
+ def ListRecommendations(self, request, context):
+ prod_list = get_product_list(request.product_ids)
+ span = trace.get_current_span()
+ span.set_attribute("app.products_recommended.count", len(prod_list))
+ logger.info(f"Receive ListRecommendations for product ids:{prod_list}")
+
+ # build and return response
+ response = demo_pb2.ListRecommendationsResponse()
+ response.product_ids.extend(prod_list)
+
+ # Collect metrics for this service
+ rec_svc_metrics["app_recommendations_counter"].add(len(prod_list), {'recommendation.type': 'catalog'})
+
+ return response
+
+ def Check(self, request, context):
+ return health_pb2.HealthCheckResponse(
+ status=health_pb2.HealthCheckResponse.SERVING)
+
+ def Watch(self, request, context):
+ return health_pb2.HealthCheckResponse(
+ status=health_pb2.HealthCheckResponse.UNIMPLEMENTED)
+
+
+def get_product_list(request_product_ids):
+ global first_run
+ global cached_ids
+ with tracer.start_as_current_span("get_product_list") as span:
+ max_responses = 5
+
+ # Formulate the list of characters to list of strings
+ request_product_ids_str = ''.join(request_product_ids)
+ request_product_ids = request_product_ids_str.split(',')
+
+ # Feature flag scenario - Cache Leak
+ if check_feature_flag("recommendationCacheFailure"):
+ span.set_attribute("app.recommendation.cache_enabled", True)
+ if random.random() < 0.5 or first_run:
+ first_run = False
+ span.set_attribute("app.cache_hit", False)
+ logger.info("get_product_list: cache miss")
+ cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty())
+ response_ids = [x.id for x in cat_response.products]
+ cached_ids = cached_ids + response_ids
+ cached_ids = cached_ids + cached_ids[:len(cached_ids) // 4]
+ product_ids = cached_ids
+ else:
+ span.set_attribute("app.cache_hit", True)
+ logger.info("get_product_list: cache hit")
+ product_ids = cached_ids
+ else:
+ span.set_attribute("app.recommendation.cache_enabled", False)
+ cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty())
+ product_ids = [x.id for x in cat_response.products]
+
+ span.set_attribute("app.products.count", len(product_ids))
+
+ # Create a filtered list of products excluding the products received as input
+ filtered_products = list(set(product_ids) - set(request_product_ids))
+ num_products = len(filtered_products)
+ span.set_attribute("app.filtered_products.count", num_products)
+ num_return = min(max_responses, num_products)
+
+ # Sample list of indicies to return
+ indices = random.sample(range(num_products), num_return)
+ # Fetch product ids from indices
+ prod_list = [filtered_products[i] for i in indices]
+
+ span.set_attribute("app.filtered_products.list", prod_list)
+
+ return prod_list
+
+
+def must_map_env(key: str):
+ value = os.environ.get(key)
+ if value is None:
+ raise Exception(f'{key} environment variable must be set')
+ return value
+
+
+def check_feature_flag(flag_name: str):
+ # Initialize OpenFeature
+ client = api.get_client()
+ return client.get_boolean_value("recommendationCacheFailure", False)
+
+
+if __name__ == "__main__":
+ service_name = must_map_env('OTEL_SERVICE_NAME')
+ api.set_provider(FlagdProvider(host=os.environ.get('FLAGD_HOST', 'flagd'), port=os.environ.get('FLAGD_PORT', 8013)))
+ api.add_hooks([TracingHook()])
+
+ # Initialize Traces and Metrics
+ tracer = trace.get_tracer_provider().get_tracer(service_name)
+ meter = metrics.get_meter_provider().get_meter(service_name)
+ rec_svc_metrics = init_metrics(meter)
+
+ # Initialize Logs
+ logger_provider = LoggerProvider(
+ resource=Resource.create(
+ {
+ 'service.name': service_name,
+ }
+ ),
+ )
+ set_logger_provider(logger_provider)
+ log_exporter = OTLPLogExporter(insecure=True)
+ logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
+ handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider)
+
+ # Attach OTLP handler to logger
+ logger = logging.getLogger('main')
+ logger.addHandler(handler)
+
+ catalog_addr = must_map_env('PRODUCT_CATALOG_ADDR')
+ pc_channel = grpc.insecure_channel(catalog_addr)
+ product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(pc_channel)
+
+ # Create gRPC server
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+
+ # Add class to gRPC server
+ service = RecommendationService()
+ demo_pb2_grpc.add_RecommendationServiceServicer_to_server(service, server)
+ health_pb2_grpc.add_HealthServicer_to_server(service, server)
+
+ # Start server
+ port = must_map_env('RECOMMENDATION_PORT')
+ server.add_insecure_port(f'[::]:{port}')
+ server.start()
+ logger.info(f'Recommendation service started, listening on port {port}')
+ server.wait_for_termination()