From 82e03978b89938219958032efb1448cc76baa181 Mon Sep 17 00:00:00 2001 From: Saumit Date: Sat, 27 Sep 2025 02:14:26 +0530 Subject: Initial snapshot - OpenTelemetry demo 2.1.3 -f --- src/load-generator/locustfile.py | 266 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 src/load-generator/locustfile.py (limited to 'src/load-generator/locustfile.py') diff --git a/src/load-generator/locustfile.py b/src/load-generator/locustfile.py new file mode 100644 index 0000000..c1ec95b --- /dev/null +++ b/src/load-generator/locustfile.py @@ -0,0 +1,266 @@ +#!/usr/bin/python + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import json +import os +import random +import uuid +import logging + +from locust import HttpUser, task, between +from locust_plugins.users.playwright import PlaywrightUser, pw, PageWithRetry, event + +from opentelemetry import context, baggage, trace +from opentelemetry.context import Context +from opentelemetry.metrics import set_meter_provider +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.jinja2 import Jinja2Instrumentor +from opentelemetry.instrumentation.requests import RequestsInstrumentor +from opentelemetry.instrumentation.system_metrics import SystemMetricsInstrumentor +from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor +from opentelemetry.instrumentation.logging import LoggingInstrumentor +from opentelemetry._logs import set_logger_provider +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.resources import Resource + +from openfeature import api +from openfeature.contrib.provider.ofrep import OFREPProvider +from openfeature.contrib.hook.opentelemetry import TracingHook + +from playwright.async_api import Route, Request + +# Configure tracer provider first (needed for trace context in logs) +tracer_provider = TracerProvider() +trace.set_tracer_provider(tracer_provider) +tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(insecure=True))) + +# Configure logger provider with the same resource +logger_provider = LoggerProvider() +set_logger_provider(logger_provider) + +# Set up log exporter and processor +log_exporter = OTLPLogExporter(insecure=True) +logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) + +# Create logging handler that will include trace context +handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider) + +# Configure root logger +root_logger = logging.getLogger() +root_logger.addHandler(handler) +root_logger.setLevel(logging.INFO) + +# Configure metrics +metric_exporter = OTLPMetricExporter(insecure=True) +set_meter_provider(MeterProvider([PeriodicExportingMetricReader(metric_exporter)])) + +# Instrument logging to automatically inject trace context +LoggingInstrumentor().instrument(set_logging_format=True) + +# Instrumenting manually to avoid error with locust gevent monkey +Jinja2Instrumentor().instrument() +RequestsInstrumentor().instrument() +SystemMetricsInstrumentor().instrument() +URLLib3Instrumentor().instrument() + +logging.info("Instrumentation complete - logs will now include trace context") + +# Initialize Flagd provider +base_url = f"http://{os.environ.get('FLAGD_HOST', 'localhost')}:{os.environ.get('FLAGD_OFREP_PORT', 8016)}" +api.set_provider(OFREPProvider(base_url=base_url)) +api.add_hooks([TracingHook()]) + +def get_flagd_value(FlagName): + # Initialize OpenFeature + client = api.get_client() + return client.get_integer_value(FlagName, 0) + +categories = [ + "binoculars", + "telescopes", + "accessories", + "assembly", + "travel", + "books", + None, +] + +products = [ + "0PUK6V6EV0", + "1YMWWN1N4O", + "2ZYFJ3GM2N", + "66VCHSJNUP", + "6E92ZMYYFZ", + "9SIQT8TOJO", + "L9ECAV7KIM", + "LS4PSXUNUM", + "OLJCESPC7Z", + "HQTGWGPNH4", +] + +people_file = open('people.json') +people = json.load(people_file) + +class WebsiteUser(HttpUser): + wait_time = between(1, 10) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.tracer = trace.get_tracer(__name__) + + @task(1) + def index(self): + with self.tracer.start_as_current_span("user_index", context=Context()): + logging.info("User accessing index page") + self.client.get("/") + + @task(10) + def browse_product(self): + product = random.choice(products) + with self.tracer.start_as_current_span("user_browse_product", context=Context(), attributes={"product.id": product}): + logging.info(f"User browsing product: {product}") + self.client.get("/api/products/" + product) + + @task(3) + def get_recommendations(self): + product = random.choice(products) + with self.tracer.start_as_current_span("user_get_recommendations", context=Context(), attributes={"product.id": product}): + logging.info(f"User getting recommendations for product: {product}") + params = { + "productIds": [product], + } + self.client.get("/api/recommendations", params=params) + + @task(3) + def get_ads(self): + category = random.choice(categories) + with self.tracer.start_as_current_span("user_get_ads", context=Context(), attributes={"category": str(category)}): + logging.info(f"User getting ads for category: {category}") + params = { + "contextKeys": [category], + } + self.client.get("/api/data/", params=params) + + @task(3) + def view_cart(self): + with self.tracer.start_as_current_span("user_view_cart", context=Context()): + logging.info("User viewing cart") + self.client.get("/api/cart") + + @task(2) + def add_to_cart(self, user=""): + if user == "": + user = str(uuid.uuid1()) + product = random.choice(products) + quantity = random.choice([1, 2, 3, 4, 5, 10]) + with self.tracer.start_as_current_span("user_add_to_cart", context=Context(), attributes={"user.id": user, "product.id": product, "quantity": quantity}): + logging.info(f"User {user} adding {quantity} of product {product} to cart") + self.client.get("/api/products/" + product) + cart_item = { + "item": { + "productId": product, + "quantity": quantity, + }, + "userId": user, + } + self.client.post("/api/cart", json=cart_item) + + @task(1) + def checkout(self): + user = str(uuid.uuid1()) + with self.tracer.start_as_current_span("user_checkout_single", context=Context(), attributes={"user.id": user}): + self.add_to_cart(user=user) + checkout_person = random.choice(people) + checkout_person["userId"] = user + self.client.post("/api/checkout", json=checkout_person) + logging.info(f"Checkout completed for user {user}") + + @task(1) + def checkout_multi(self): + user = str(uuid.uuid1()) + item_count = random.choice([2, 3, 4]) + with self.tracer.start_as_current_span("user_checkout_multi", context=Context(), + attributes={"user.id": user, "item.count": item_count}): + for i in range(item_count): + self.add_to_cart(user=user) + checkout_person = random.choice(people) + checkout_person["userId"] = user + self.client.post("/api/checkout", json=checkout_person) + logging.info(f"Multi-item checkout completed for user {user}") + + @task(5) + def flood_home(self): + flood_count = get_flagd_value("loadGeneratorFloodHomepage") + if flood_count > 0: + with self.tracer.start_as_current_span("user_flood_home", context=Context(), attributes={"flood.count": flood_count}): + logging.info(f"User flooding homepage {flood_count} times") + for _ in range(0, flood_count): + self.client.get("/") + + def on_start(self): + with self.tracer.start_as_current_span("user_session_start", context=Context()): + session_id = str(uuid.uuid4()) + logging.info(f"Starting user session: {session_id}") + ctx = baggage.set_baggage("session.id", session_id) + ctx = baggage.set_baggage("synthetic_request", "true", context=ctx) + context.attach(ctx) + self.index() + + +browser_traffic_enabled = os.environ.get("LOCUST_BROWSER_TRAFFIC_ENABLED", "").lower() in ("true", "yes", "on") + +if browser_traffic_enabled: + class WebsiteBrowserUser(PlaywrightUser): + headless = True # to use a headless browser, without a GUI + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.tracer = trace.get_tracer(__name__) + + @task + @pw + async def open_cart_page_and_change_currency(self, page: PageWithRetry): + with self.tracer.start_as_current_span("browser_change_currency", context=Context()): + try: + page.on("console", lambda msg: print(msg.text)) + await page.route('**/*', add_baggage_header) + await page.goto("/cart", wait_until="domcontentloaded") + await page.select_option('[name="currency_code"]', 'CHF') + await page.wait_for_timeout(2000) # giving the browser time to export the traces + logging.info("Currency changed to CHF") + except Exception as e: + logging.error(f"Error in change currency task: {str(e)}") + + @task + @pw + async def add_product_to_cart(self, page: PageWithRetry): + with self.tracer.start_as_current_span("browser_add_to_cart", context=Context()): + try: + page.on("console", lambda msg: print(msg.text)) + await page.route('**/*', add_baggage_header) + await page.goto("/", wait_until="domcontentloaded") + await page.click('p:has-text("Roof Binoculars")') + await page.wait_for_load_state("domcontentloaded") + await page.click('button:has-text("Add To Cart")') + await page.wait_for_load_state("domcontentloaded") + await page.wait_for_timeout(2000) # giving the browser time to export the traces + logging.info("Product added to cart successfully") + except Exception as e: + logging.error(f"Error in add to cart task: {str(e)}") + +async def add_baggage_header(route: Route, request: Request): + existing_baggage = request.headers.get('baggage', '') + headers = { + **request.headers, + 'baggage': ', '.join(filter(None, (existing_baggage, 'synthetic_request=true'))) + } + await route.continue_(headers=headers) -- cgit v1.2.3