summaryrefslogtreecommitdiff
path: root/src/checkout/kafka/producer.go
diff options
context:
space:
mode:
authorSaumit <justsaumit@protonmail.com>2025-09-27 02:14:26 +0530
committerSaumit <justsaumit@protonmail.com>2025-09-27 02:14:26 +0530
commit82e03978b89938219958032efb1448cc76baa181 (patch)
tree626f3e54d52ecd49be0ed3bee30abacc0453d081 /src/checkout/kafka/producer.go
Initial snapshot - OpenTelemetry demo 2.1.3 -f
Diffstat (limited to 'src/checkout/kafka/producer.go')
-rw-r--r--src/checkout/kafka/producer.go61
1 files changed, 61 insertions, 0 deletions
diff --git a/src/checkout/kafka/producer.go b/src/checkout/kafka/producer.go
new file mode 100644
index 0000000..3bd9cc6
--- /dev/null
+++ b/src/checkout/kafka/producer.go
@@ -0,0 +1,61 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+package kafka
+
+import (
+ "fmt"
+ "log/slog"
+
+ "github.com/IBM/sarama"
+)
+
+var (
+ Topic = "orders"
+ ProtocolVersion = sarama.V3_0_0_0
+)
+
+type saramaLogger struct {
+ logger *slog.Logger
+}
+
+func (l *saramaLogger) Printf(format string, v ...interface{}) {
+ l.logger.Info(fmt.Sprintf(format, v...))
+}
+func (l *saramaLogger) Println(v ...interface{}) {
+ l.logger.Info(fmt.Sprint(v...))
+}
+func (l *saramaLogger) Print(v ...interface{}) {
+ l.logger.Info(fmt.Sprint(v...))
+}
+
+func CreateKafkaProducer(brokers []string, logger *slog.Logger) (sarama.AsyncProducer, error) {
+ // Set the logger for sarama to use.
+ sarama.Logger = &saramaLogger{logger: logger}
+
+ saramaConfig := sarama.NewConfig()
+ saramaConfig.Producer.Return.Successes = true
+ saramaConfig.Producer.Return.Errors = true
+
+ // Sarama has an issue in a single broker kafka if the kafka broker is restarted.
+ // This setting is to prevent that issue from manifesting itself, but may swallow failed messages.
+ saramaConfig.Producer.RequiredAcks = sarama.NoResponse
+
+ saramaConfig.Version = ProtocolVersion
+
+ // So we can know the partition and offset of messages.
+ saramaConfig.Producer.Return.Successes = true
+
+ producer, err := sarama.NewAsyncProducer(brokers, saramaConfig)
+ if err != nil {
+ return nil, err
+ }
+
+ // We will log to STDOUT if we're not able to produce messages.
+ go func() {
+ for err := range producer.Errors() {
+ logger.Error(fmt.Sprintf("Failed to write message: %+v", err))
+
+ }
+ }()
+ return producer, nil
+}