diff options
Diffstat (limited to 'src/checkout/kafka')
| -rw-r--r-- | src/checkout/kafka/producer.go | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/src/checkout/kafka/producer.go b/src/checkout/kafka/producer.go new file mode 100644 index 0000000..3bd9cc6 --- /dev/null +++ b/src/checkout/kafka/producer.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package kafka + +import ( + "fmt" + "log/slog" + + "github.com/IBM/sarama" +) + +var ( + Topic = "orders" + ProtocolVersion = sarama.V3_0_0_0 +) + +type saramaLogger struct { + logger *slog.Logger +} + +func (l *saramaLogger) Printf(format string, v ...interface{}) { + l.logger.Info(fmt.Sprintf(format, v...)) +} +func (l *saramaLogger) Println(v ...interface{}) { + l.logger.Info(fmt.Sprint(v...)) +} +func (l *saramaLogger) Print(v ...interface{}) { + l.logger.Info(fmt.Sprint(v...)) +} + +func CreateKafkaProducer(brokers []string, logger *slog.Logger) (sarama.AsyncProducer, error) { + // Set the logger for sarama to use. + sarama.Logger = &saramaLogger{logger: logger} + + saramaConfig := sarama.NewConfig() + saramaConfig.Producer.Return.Successes = true + saramaConfig.Producer.Return.Errors = true + + // Sarama has an issue in a single broker kafka if the kafka broker is restarted. + // This setting is to prevent that issue from manifesting itself, but may swallow failed messages. + saramaConfig.Producer.RequiredAcks = sarama.NoResponse + + saramaConfig.Version = ProtocolVersion + + // So we can know the partition and offset of messages. + saramaConfig.Producer.Return.Successes = true + + producer, err := sarama.NewAsyncProducer(brokers, saramaConfig) + if err != nil { + return nil, err + } + + // We will log to STDOUT if we're not able to produce messages. + go func() { + for err := range producer.Errors() { + logger.Error(fmt.Sprintf("Failed to write message: %+v", err)) + + } + }() + return producer, nil +} |
