#!/usr/bin/python # Copyright The OpenTelemetry Authors # SPDX-License-Identifier: Apache-2.0 import json import os import random import uuid import logging from locust import HttpUser, task, between from locust_plugins.users.playwright import PlaywrightUser, pw, PageWithRetry, event from opentelemetry import context, baggage, trace from opentelemetry.context import Context from opentelemetry.metrics import set_meter_provider from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.jinja2 import Jinja2Instrumentor from opentelemetry.instrumentation.requests import RequestsInstrumentor from opentelemetry.instrumentation.system_metrics import SystemMetricsInstrumentor from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor from opentelemetry.instrumentation.logging import LoggingInstrumentor from opentelemetry._logs import set_logger_provider from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.resources import Resource from openfeature import api from openfeature.contrib.provider.ofrep import OFREPProvider from openfeature.contrib.hook.opentelemetry import TracingHook from playwright.async_api import Route, Request # Configure tracer provider first (needed for trace context in logs) tracer_provider = TracerProvider() trace.set_tracer_provider(tracer_provider) tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(insecure=True))) # Configure logger provider with the same resource logger_provider = LoggerProvider() set_logger_provider(logger_provider) # Set up log exporter and processor log_exporter = OTLPLogExporter(insecure=True) logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) # Create logging handler that will include trace context handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider) # Configure root logger root_logger = logging.getLogger() root_logger.addHandler(handler) root_logger.setLevel(logging.INFO) # Configure metrics metric_exporter = OTLPMetricExporter(insecure=True) set_meter_provider(MeterProvider([PeriodicExportingMetricReader(metric_exporter)])) # Instrument logging to automatically inject trace context LoggingInstrumentor().instrument(set_logging_format=True) # Instrumenting manually to avoid error with locust gevent monkey Jinja2Instrumentor().instrument() RequestsInstrumentor().instrument() SystemMetricsInstrumentor().instrument() URLLib3Instrumentor().instrument() logging.info("Instrumentation complete - logs will now include trace context") # Initialize Flagd provider base_url = f"http://{os.environ.get('FLAGD_HOST', 'localhost')}:{os.environ.get('FLAGD_OFREP_PORT', 8016)}" api.set_provider(OFREPProvider(base_url=base_url)) api.add_hooks([TracingHook()]) def get_flagd_value(FlagName): # Initialize OpenFeature client = api.get_client() return client.get_integer_value(FlagName, 0) categories = [ "binoculars", "telescopes", "accessories", "assembly", "travel", "books", None, ] products = [ "0PUK6V6EV0", "1YMWWN1N4O", "2ZYFJ3GM2N", "66VCHSJNUP", "6E92ZMYYFZ", "9SIQT8TOJO", "L9ECAV7KIM", "LS4PSXUNUM", "OLJCESPC7Z", "HQTGWGPNH4", ] people_file = open('people.json') people = json.load(people_file) class WebsiteUser(HttpUser): wait_time = between(1, 10) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.tracer = trace.get_tracer(__name__) @task(1) def index(self): with self.tracer.start_as_current_span("user_index", context=Context()): logging.info("User accessing index page") self.client.get("/") @task(10) def browse_product(self): product = random.choice(products) with self.tracer.start_as_current_span("user_browse_product", context=Context(), attributes={"product.id": product}): logging.info(f"User browsing product: {product}") self.client.get("/api/products/" + product) @task(3) def get_recommendations(self): product = random.choice(products) with self.tracer.start_as_current_span("user_get_recommendations", context=Context(), attributes={"product.id": product}): logging.info(f"User getting recommendations for product: {product}") params = { "productIds": [product], } self.client.get("/api/recommendations", params=params) @task(3) def get_ads(self): category = random.choice(categories) with self.tracer.start_as_current_span("user_get_ads", context=Context(), attributes={"category": str(category)}): logging.info(f"User getting ads for category: {category}") params = { "contextKeys": [category], } self.client.get("/api/data/", params=params) @task(3) def view_cart(self): with self.tracer.start_as_current_span("user_view_cart", context=Context()): logging.info("User viewing cart") self.client.get("/api/cart") @task(2) def add_to_cart(self, user=""): if user == "": user = str(uuid.uuid1()) product = random.choice(products) quantity = random.choice([1, 2, 3, 4, 5, 10]) with self.tracer.start_as_current_span("user_add_to_cart", context=Context(), attributes={"user.id": user, "product.id": product, "quantity": quantity}): logging.info(f"User {user} adding {quantity} of product {product} to cart") self.client.get("/api/products/" + product) cart_item = { "item": { "productId": product, "quantity": quantity, }, "userId": user, } self.client.post("/api/cart", json=cart_item) @task(1) def checkout(self): user = str(uuid.uuid1()) with self.tracer.start_as_current_span("user_checkout_single", context=Context(), attributes={"user.id": user}): self.add_to_cart(user=user) checkout_person = random.choice(people) checkout_person["userId"] = user self.client.post("/api/checkout", json=checkout_person) logging.info(f"Checkout completed for user {user}") @task(1) def checkout_multi(self): user = str(uuid.uuid1()) item_count = random.choice([2, 3, 4]) with self.tracer.start_as_current_span("user_checkout_multi", context=Context(), attributes={"user.id": user, "item.count": item_count}): for i in range(item_count): self.add_to_cart(user=user) checkout_person = random.choice(people) checkout_person["userId"] = user self.client.post("/api/checkout", json=checkout_person) logging.info(f"Multi-item checkout completed for user {user}") @task(5) def flood_home(self): flood_count = get_flagd_value("loadGeneratorFloodHomepage") if flood_count > 0: with self.tracer.start_as_current_span("user_flood_home", context=Context(), attributes={"flood.count": flood_count}): logging.info(f"User flooding homepage {flood_count} times") for _ in range(0, flood_count): self.client.get("/") def on_start(self): with self.tracer.start_as_current_span("user_session_start", context=Context()): session_id = str(uuid.uuid4()) logging.info(f"Starting user session: {session_id}") ctx = baggage.set_baggage("session.id", session_id) ctx = baggage.set_baggage("synthetic_request", "true", context=ctx) context.attach(ctx) self.index() browser_traffic_enabled = os.environ.get("LOCUST_BROWSER_TRAFFIC_ENABLED", "").lower() in ("true", "yes", "on") if browser_traffic_enabled: class WebsiteBrowserUser(PlaywrightUser): headless = True # to use a headless browser, without a GUI def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.tracer = trace.get_tracer(__name__) @task @pw async def open_cart_page_and_change_currency(self, page: PageWithRetry): with self.tracer.start_as_current_span("browser_change_currency", context=Context()): try: page.on("console", lambda msg: print(msg.text)) await page.route('**/*', add_baggage_header) await page.goto("/cart", wait_until="domcontentloaded") await page.select_option('[name="currency_code"]', 'CHF') await page.wait_for_timeout(2000) # giving the browser time to export the traces logging.info("Currency changed to CHF") except Exception as e: logging.error(f"Error in change currency task: {str(e)}") @task @pw async def add_product_to_cart(self, page: PageWithRetry): with self.tracer.start_as_current_span("browser_add_to_cart", context=Context()): try: page.on("console", lambda msg: print(msg.text)) await page.route('**/*', add_baggage_header) await page.goto("/", wait_until="domcontentloaded") await page.click('p:has-text("Roof Binoculars")') await page.wait_for_load_state("domcontentloaded") await page.click('button:has-text("Add To Cart")') await page.wait_for_load_state("domcontentloaded") await page.wait_for_timeout(2000) # giving the browser time to export the traces logging.info("Product added to cart successfully") except Exception as e: logging.error(f"Error in add to cart task: {str(e)}") async def add_baggage_header(route: Route, request: Request): existing_baggage = request.headers.get('baggage', '') headers = { **request.headers, 'baggage': ', '.join(filter(None, (existing_baggage, 'synthetic_request=true'))) } await route.continue_(headers=headers)