// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package kafka import ( "fmt" "log/slog" "github.com/IBM/sarama" ) var ( Topic = "orders" ProtocolVersion = sarama.V3_0_0_0 ) type saramaLogger struct { logger *slog.Logger } func (l *saramaLogger) Printf(format string, v ...interface{}) { l.logger.Info(fmt.Sprintf(format, v...)) } func (l *saramaLogger) Println(v ...interface{}) { l.logger.Info(fmt.Sprint(v...)) } func (l *saramaLogger) Print(v ...interface{}) { l.logger.Info(fmt.Sprint(v...)) } func CreateKafkaProducer(brokers []string, logger *slog.Logger) (sarama.AsyncProducer, error) { // Set the logger for sarama to use. sarama.Logger = &saramaLogger{logger: logger} saramaConfig := sarama.NewConfig() saramaConfig.Producer.Return.Successes = true saramaConfig.Producer.Return.Errors = true // Sarama has an issue in a single broker kafka if the kafka broker is restarted. // This setting is to prevent that issue from manifesting itself, but may swallow failed messages. saramaConfig.Producer.RequiredAcks = sarama.NoResponse saramaConfig.Version = ProtocolVersion // So we can know the partition and offset of messages. saramaConfig.Producer.Return.Successes = true producer, err := sarama.NewAsyncProducer(brokers, saramaConfig) if err != nil { return nil, err } // We will log to STDOUT if we're not able to produce messages. go func() { for err := range producer.Errors() { logger.Error(fmt.Sprintf("Failed to write message: %+v", err)) } }() return producer, nil }