summaryrefslogtreecommitdiff
path: root/src/ad/src
diff options
context:
space:
mode:
Diffstat (limited to 'src/ad/src')
-rw-r--r--src/ad/src/main/java/oteldemo/AdService.java328
-rw-r--r--src/ad/src/main/java/oteldemo/problempattern/CPULoad.java116
-rw-r--r--src/ad/src/main/java/oteldemo/problempattern/GarbageCollectionTrigger.java80
-rw-r--r--src/ad/src/main/java/oteldemo/problempattern/MemoryUtils.java65
-rw-r--r--src/ad/src/main/resources/log4j2.xml18
5 files changed, 607 insertions, 0 deletions
diff --git a/src/ad/src/main/java/oteldemo/AdService.java b/src/ad/src/main/java/oteldemo/AdService.java
new file mode 100644
index 0000000..d7f2688
--- /dev/null
+++ b/src/ad/src/main/java/oteldemo/AdService.java
@@ -0,0 +1,328 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package oteldemo;
+
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Iterables;
+import io.grpc.*;
+import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
+import io.grpc.protobuf.services.*;
+import io.grpc.stub.StreamObserver;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.baggage.Baggage;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.instrumentation.annotations.SpanAttribute;
+import io.opentelemetry.instrumentation.annotations.WithSpan;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import oteldemo.Demo.Ad;
+import oteldemo.Demo.AdRequest;
+import oteldemo.Demo.AdResponse;
+import oteldemo.problempattern.GarbageCollectionTrigger;
+import oteldemo.problempattern.CPULoad;
+import dev.openfeature.contrib.providers.flagd.FlagdOptions;
+import dev.openfeature.contrib.providers.flagd.FlagdProvider;
+import dev.openfeature.sdk.Client;
+import dev.openfeature.sdk.EvaluationContext;
+import dev.openfeature.sdk.MutableContext;
+import dev.openfeature.sdk.OpenFeatureAPI;
+import java.util.UUID;
+
+
+public final class AdService {
+
+ private static final Logger logger = LogManager.getLogger(AdService.class);
+
+ @SuppressWarnings("FieldCanBeLocal")
+ private static final int MAX_ADS_TO_SERVE = 2;
+
+ private Server server;
+ private HealthStatusManager healthMgr;
+
+ private static final AdService service = new AdService();
+ private static final Tracer tracer = GlobalOpenTelemetry.getTracer("ad");
+ private static final Meter meter = GlobalOpenTelemetry.getMeter("ad");
+
+ private static final LongCounter adRequestsCounter =
+ meter
+ .counterBuilder("app.ads.ad_requests")
+ .setDescription("Counts ad requests by request and response type")
+ .build();
+
+ private static final AttributeKey<String> adRequestTypeKey =
+ AttributeKey.stringKey("app.ads.ad_request_type");
+ private static final AttributeKey<String> adResponseTypeKey =
+ AttributeKey.stringKey("app.ads.ad_response_type");
+
+ private void start() throws IOException {
+ int port =
+ Integer.parseInt(
+ Optional.ofNullable(System.getenv("AD_PORT"))
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "environment vars: AD_PORT must not be null")));
+ healthMgr = new HealthStatusManager();
+
+ // Create a flagd instance with OpenTelemetry
+ FlagdOptions options =
+ FlagdOptions.builder()
+ .withGlobalTelemetry(true)
+ .build();
+
+ FlagdProvider flagdProvider = new FlagdProvider(options);
+ // Set flagd as the OpenFeature Provider
+ OpenFeatureAPI.getInstance().setProvider(flagdProvider);
+
+ server =
+ ServerBuilder.forPort(port)
+ .addService(new AdServiceImpl())
+ .addService(healthMgr.getHealthService())
+ .build()
+ .start();
+ logger.info("Ad service started, listening on " + port);
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+ System.err.println(
+ "*** shutting down gRPC ads server since JVM is shutting down");
+ AdService.this.stop();
+ System.err.println("*** server shut down");
+ }));
+ healthMgr.setStatus("", ServingStatus.SERVING);
+ }
+
+ private void stop() {
+ if (server != null) {
+ healthMgr.clearStatus("");
+ server.shutdown();
+ }
+ }
+
+ private enum AdRequestType {
+ TARGETED,
+ NOT_TARGETED
+ }
+
+ private enum AdResponseType {
+ TARGETED,
+ RANDOM
+ }
+
+ private static class AdServiceImpl extends oteldemo.AdServiceGrpc.AdServiceImplBase {
+
+ private static final String AD_FAILURE = "adFailure";
+ private static final String AD_MANUAL_GC_FEATURE_FLAG = "adManualGc";
+ private static final String AD_HIGH_CPU_FEATURE_FLAG = "adHighCpu";
+ private static final Client ffClient = OpenFeatureAPI.getInstance().getClient();
+
+ private AdServiceImpl() {}
+
+ /**
+ * Retrieves ads based on context provided in the request {@code AdRequest}.
+ *
+ * @param req the request containing context.
+ * @param responseObserver the stream observer which gets notified with the value of {@code
+ * AdResponse}
+ */
+ @Override
+ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
+ AdService service = AdService.getInstance();
+
+ // get the current span in context
+ Span span = Span.current();
+ try {
+ List<Ad> allAds = new ArrayList<>();
+ AdRequestType adRequestType;
+ AdResponseType adResponseType;
+
+ Baggage baggage = Baggage.fromContextOrNull(Context.current());
+ MutableContext evaluationContext = new MutableContext();
+ if (baggage != null) {
+ final String sessionId = baggage.getEntryValue("session.id");
+ span.setAttribute("session.id", sessionId);
+ evaluationContext.setTargetingKey(sessionId);
+ evaluationContext.add("session", sessionId);
+ } else {
+ logger.info("no baggage found in context");
+ }
+
+ CPULoad cpuload = CPULoad.getInstance();
+ cpuload.execute(ffClient.getBooleanValue(AD_HIGH_CPU_FEATURE_FLAG, false, evaluationContext));
+
+ span.setAttribute("app.ads.contextKeys", req.getContextKeysList().toString());
+ span.setAttribute("app.ads.contextKeys.count", req.getContextKeysCount());
+ if (req.getContextKeysCount() > 0) {
+ logger.info("Targeted ad request received for " + req.getContextKeysList());
+ for (int i = 0; i < req.getContextKeysCount(); i++) {
+ Collection<Ad> ads = service.getAdsByCategory(req.getContextKeys(i));
+ allAds.addAll(ads);
+ }
+ adRequestType = AdRequestType.TARGETED;
+ adResponseType = AdResponseType.TARGETED;
+ } else {
+ logger.info("Non-targeted ad request received, preparing random response.");
+ allAds = service.getRandomAds();
+ adRequestType = AdRequestType.NOT_TARGETED;
+ adResponseType = AdResponseType.RANDOM;
+ }
+ if (allAds.isEmpty()) {
+ // Serve random ads.
+ allAds = service.getRandomAds();
+ adResponseType = AdResponseType.RANDOM;
+ }
+ span.setAttribute("app.ads.count", allAds.size());
+ span.setAttribute("app.ads.ad_request_type", adRequestType.name());
+ span.setAttribute("app.ads.ad_response_type", adResponseType.name());
+
+ adRequestsCounter.add(
+ 1,
+ Attributes.of(
+ adRequestTypeKey, adRequestType.name(), adResponseTypeKey, adResponseType.name()));
+
+ // Throw 1/10 of the time to simulate a failure when the feature flag is enabled
+ if (ffClient.getBooleanValue(AD_FAILURE, false, evaluationContext) && random.nextInt(10) == 0) {
+ throw new StatusRuntimeException(Status.UNAVAILABLE);
+ }
+
+ if (ffClient.getBooleanValue(AD_MANUAL_GC_FEATURE_FLAG, false, evaluationContext)) {
+ logger.warn("Feature Flag " + AD_MANUAL_GC_FEATURE_FLAG + " enabled, performing a manual gc now");
+ GarbageCollectionTrigger gct = new GarbageCollectionTrigger();
+ gct.doExecute();
+ }
+
+ AdResponse reply = AdResponse.newBuilder().addAllAds(allAds).build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ } catch (StatusRuntimeException e) {
+ span.addEvent(
+ "Error", Attributes.of(AttributeKey.stringKey("exception.message"), e.getMessage()));
+ span.setStatus(StatusCode.ERROR);
+ logger.log(Level.WARN, "GetAds Failed with status {}", e.getStatus());
+ responseObserver.onError(e);
+ }
+ }
+ }
+
+ private static final ImmutableListMultimap<String, Ad> adsMap = createAdsMap();
+
+ @WithSpan("getAdsByCategory")
+ private Collection<Ad> getAdsByCategory(@SpanAttribute("app.ads.category") String category) {
+ Collection<Ad> ads = adsMap.get(category);
+ Span.current().setAttribute("app.ads.count", ads.size());
+ return ads;
+ }
+
+ private static final Random random = new Random();
+
+ private List<Ad> getRandomAds() {
+
+ List<Ad> ads = new ArrayList<>(MAX_ADS_TO_SERVE);
+
+ // create and start a new span manually
+ Span span = tracer.spanBuilder("getRandomAds").startSpan();
+
+ // put the span into context, so if any child span is started the parent will be set properly
+ try (Scope ignored = span.makeCurrent()) {
+
+ Collection<Ad> allAds = adsMap.values();
+ for (int i = 0; i < MAX_ADS_TO_SERVE; i++) {
+ ads.add(Iterables.get(allAds, random.nextInt(allAds.size())));
+ }
+ span.setAttribute("app.ads.count", ads.size());
+
+ } finally {
+ span.end();
+ }
+
+ return ads;
+ }
+
+ private static AdService getInstance() {
+ return service;
+ }
+
+ /** Await termination on the main thread since the grpc library uses daemon threads. */
+ private void blockUntilShutdown() throws InterruptedException {
+ if (server != null) {
+ server.awaitTermination();
+ }
+ }
+
+ private static ImmutableListMultimap<String, Ad> createAdsMap() {
+ Ad binoculars =
+ Ad.newBuilder()
+ .setRedirectUrl("/product/2ZYFJ3GM2N")
+ .setText("Roof Binoculars for sale. 50% off.")
+ .build();
+ Ad explorerTelescope =
+ Ad.newBuilder()
+ .setRedirectUrl("/product/66VCHSJNUP")
+ .setText("Starsense Explorer Refractor Telescope for sale. 20% off.")
+ .build();
+ Ad colorImager =
+ Ad.newBuilder()
+ .setRedirectUrl("/product/0PUK6V6EV0")
+ .setText("Solar System Color Imager for sale. 30% off.")
+ .build();
+ Ad opticalTube =
+ Ad.newBuilder()
+ .setRedirectUrl("/product/9SIQT8TOJO")
+ .setText("Optical Tube Assembly for sale. 10% off.")
+ .build();
+ Ad travelTelescope =
+ Ad.newBuilder()
+ .setRedirectUrl("/product/1YMWWN1N4O")
+ .setText(
+ "Eclipsmart Travel Refractor Telescope for sale. Buy one, get second kit for free")
+ .build();
+ Ad solarFilter =
+ Ad.newBuilder()
+ .setRedirectUrl("/product/6E92ZMYYFZ")
+ .setText("Solar Filter for sale. Buy two, get third one for free")
+ .build();
+ Ad cleaningKit =
+ Ad.newBuilder()
+ .setRedirectUrl("/product/L9ECAV7KIM")
+ .setText("Lens Cleaning Kit for sale. Buy one, get second one for free")
+ .build();
+ return ImmutableListMultimap.<String, Ad>builder()
+ .putAll("binoculars", binoculars)
+ .putAll("telescopes", explorerTelescope)
+ .putAll("accessories", colorImager, solarFilter, cleaningKit)
+ .putAll("assembly", opticalTube)
+ .putAll("travel", travelTelescope)
+ // Keep the books category free of ads to ensure the random code branch is tested
+ .build();
+ }
+
+ /** Main launches the server from the command line. */
+ public static void main(String[] args) throws IOException, InterruptedException {
+ // Start the RPC server. You shouldn't see any output from gRPC before this.
+ logger.info("Ad service starting.");
+ final AdService service = AdService.getInstance();
+ service.start();
+ service.blockUntilShutdown();
+ }
+}
diff --git a/src/ad/src/main/java/oteldemo/problempattern/CPULoad.java b/src/ad/src/main/java/oteldemo/problempattern/CPULoad.java
new file mode 100644
index 0000000..178f773
--- /dev/null
+++ b/src/ad/src/main/java/oteldemo/problempattern/CPULoad.java
@@ -0,0 +1,116 @@
+/*
+* Copyright The OpenTelemetry Authors
+* SPDX-License-Identifier: Apache-2.0
+*/
+package oteldemo.problempattern;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import io.grpc.ManagedChannelBuilder;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * This class is designed to simulate a high CPU load scenario.
+ * It contains methods to start and stop a specified number of worker threads designed to
+ * perform CPU-intensive calculations.
+ */
+public class CPULoad {
+ private static final Logger logger = LogManager.getLogger(CPULoad.class.getName());
+ private static final int THREAD_COUNT = 4;
+ private boolean running = false;
+ private final List<Logarithmizer> runningWorkers = new ArrayList<>();
+
+ private static CPULoad instance;
+
+ /**
+ * Singleton pattern to get the instance of CPULoad.
+ * @return The singleton instance of CPULoad.
+ */
+ public static CPULoad getInstance() {
+ if (instance == null) {
+ instance = new CPULoad();
+ }
+ return instance;
+ }
+
+ /**
+ * Starts or stops the CPU load generation based on the input parameter.
+ * If enabled, it launches worker threads. If disabled, it stops any running threads.
+ *
+ * @param enabled Flag to start (true) or stop (false) the CPU load simulation.
+ */
+ public void execute(Boolean enabled) {
+ if (enabled) {
+ logger.info("High CPU-Load problempattern enabled");
+ if (!running) {
+ spawnLoadWorkers(THREAD_COUNT);
+ running = true;
+ }
+ } else {
+ running = false;
+ stopWorkers();
+ }
+ }
+
+ /**
+ * Creates and starts a specified number of Logarithmizer threads to simulate CPU load.
+ *
+ * @param threadCount The number of threads to be started.
+ */
+ private void spawnLoadWorkers(int threadCount) {
+ synchronized(runningWorkers) {
+ for (int i = 0; i < threadCount; i++) {
+ Logarithmizer logarithmizer = new Logarithmizer();
+ Thread thread = new Thread(logarithmizer);
+ thread.setDaemon(true);
+ thread.start();
+ runningWorkers.add(logarithmizer);
+ }
+ }
+ }
+
+ /**
+ * Signals all running Logarithmizer threads to stop and clears the list of running workers.
+ */
+ private void stopWorkers() {
+ synchronized(runningWorkers) {
+ for (Logarithmizer logarithmizer : runningWorkers) {
+ logarithmizer.setShouldRun(false);
+ }
+ runningWorkers.clear();
+ }
+ }
+
+ /**
+ * Inner class representing a worker focused on calculating logarithms to consume CPU resources.
+ */
+ private static class Logarithmizer implements Runnable {
+
+ private volatile boolean shouldRun = true;
+
+ /**
+ * Continuously calculates the logarithm of the current system time until
+ * requested to stop.
+ */
+ @Override
+ public void run() {
+ while (shouldRun) {
+ Math.log(System.currentTimeMillis());
+ }
+ }
+
+ /**
+ * Sets the shouldRun flag to control whether this Logarithmizer should continue
+ * to run.
+ *
+ * @param shouldRun A boolean flag to continue (true) or stop (false) the logarithm computation.
+ */
+ public void setShouldRun(boolean shouldRun) {
+ this.shouldRun = shouldRun;
+ }
+ }
+}
diff --git a/src/ad/src/main/java/oteldemo/problempattern/GarbageCollectionTrigger.java b/src/ad/src/main/java/oteldemo/problempattern/GarbageCollectionTrigger.java
new file mode 100644
index 0000000..aa72bc1
--- /dev/null
+++ b/src/ad/src/main/java/oteldemo/problempattern/GarbageCollectionTrigger.java
@@ -0,0 +1,80 @@
+/*
+* Copyright The OpenTelemetry Authors
+* SPDX-License-Identifier: Apache-2.0
+*/
+
+package oteldemo.problempattern;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * The GarbageCollectionTrigger class is responsible for triggering manual garbage collection
+* at specified intervals to simulate memory pressure and measure the impact on performance.
+*/
+public class GarbageCollectionTrigger {
+ private static final Logger logger = LogManager.getLogger(GarbageCollectionTrigger.class.getName());
+
+ private final long gc_delay;
+ private final int finalize_delay;
+ private final int maxObjects;
+
+ private long lastGC = 0;
+
+ private final MemoryUtils memUtils;
+
+ /**
+ * Constructs a new GarbageCollectionTrigger with default values.
+ */
+ public GarbageCollectionTrigger() {
+ memUtils = new MemoryUtils(ManagementFactory.getMemoryMXBean());
+ gc_delay = TimeUnit.SECONDS.toMillis(10);
+ finalize_delay = 500;
+ maxObjects = 500000;
+ }
+
+ /**
+ * Triggers manual garbage collection at specified intervals and measures the impact on performance.
+ * It creates Entry objects to fill up memory and initiates garbage collection.
+ */
+ public void doExecute() {
+ if (System.currentTimeMillis() - lastGC > gc_delay) {
+ logger.info("Triggering a manual garbage collection, next one in " + (gc_delay/1000) + " seconds.");
+ // clear old data, we want to clear old Entry objects, because their finalization is expensive
+ System.gc();
+
+ long total = 0;
+ for (int i = 0; i < 10; i++) {
+ while (memUtils.getHeapUsage() < 0.9 && memUtils.getObjectPendingFinalizationCount() < maxObjects) {
+ new Entry();
+ }
+ long start = System.currentTimeMillis();
+ System.gc();
+ total += System.currentTimeMillis() - start;
+ }
+ logger.info("The artificially triggered GCs took: " + total + " ms");
+ lastGC = System.currentTimeMillis();
+ }
+
+ }
+
+ /**
+ * The Entry class represents objects created for the purpose of triggering garbage collection.
+ */
+ private class Entry {
+ /**
+ * Overrides the finalize method to introduce a delay, simulating finalization during garbage collection.
+ *
+ * @throws Throwable If an exception occurs during finalization.
+ */
+ @SuppressWarnings("removal")
+ @Override
+ protected void finalize() throws Throwable {
+ TimeUnit.MILLISECONDS.sleep(finalize_delay);
+ super.finalize();
+ }
+ }
+}
diff --git a/src/ad/src/main/java/oteldemo/problempattern/MemoryUtils.java b/src/ad/src/main/java/oteldemo/problempattern/MemoryUtils.java
new file mode 100644
index 0000000..6b31414
--- /dev/null
+++ b/src/ad/src/main/java/oteldemo/problempattern/MemoryUtils.java
@@ -0,0 +1,65 @@
+/*
+* Copyright The OpenTelemetry Authors
+* SPDX-License-Identifier: Apache-2.0
+*/
+
+package oteldemo.problempattern;
+
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+
+/**
+ * This class provides JVM heap related utility methods.
+*/
+public class MemoryUtils {
+
+ private static final Logger logger = LogManager.getLogger(MemoryUtils.class.getName());
+
+ private static final long NO_HEAP_LIMIT = -1;
+
+ private final MemoryMXBean memoryBean;
+
+ /**
+ * @param memoryBean defines which {@link MemoryMXBean} is to use
+ */
+ public MemoryUtils(MemoryMXBean memoryBean) {
+ this.memoryBean = memoryBean;
+ }
+
+
+ /**
+ * @return The current heap usage as a decimal number between 0.0 and 1.0.
+ * That is, if the returned value is 0.85, 85% of the max heap is used.
+ *
+ * If no max heap is set, the method returns -1.0.
+ */
+ public double getHeapUsage() {
+ MemoryUsage heapProps = memoryBean.getHeapMemoryUsage();
+ long heapUsed = heapProps.getUsed();
+ long heapMax = heapProps.getMax();
+
+ if (heapMax == NO_HEAP_LIMIT) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("No maximum heap is set");
+ }
+ return NO_HEAP_LIMIT;
+ }
+
+
+ double heapUsage = (double) heapUsed / heapMax;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Current heap usage is {0} percent" + (heapUsage * 100));
+ }
+ return heapUsage;
+ }
+
+ /**
+ * see {@link MemoryMXBean#getObjectPendingFinalizationCount()}
+ */
+ public int getObjectPendingFinalizationCount() {
+ return memoryBean.getObjectPendingFinalizationCount();
+ }
+}
diff --git a/src/ad/src/main/resources/log4j2.xml b/src/ad/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..db5cb39
--- /dev/null
+++ b/src/ad/src/main/resources/log4j2.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright The OpenTelemetry Authors
+SPDX-License-Identifier: Apache-2.0
+-->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="STDOUT" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} - %logger{36} - %msg trace_id=%X{trace_id} span_id=%X{span_id} trace_flags=%X{trace_flags} %n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="INFO">
+ <AppenderRef ref="STDOUT"/>
+ </Root>
+ </Loggers>
+</Configuration>