package service import ( "os" "time" "github.com/rs/zerolog/log" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/scram" "gitlab.com/tensorsecurity-rd/waf-console/internal/config" ) func getSASLMechanism(config *config.KafkaConfig) (sasl.Mechanism, bool, error) { authMethod := config.AuthMethod username := config.Username password := config.Password var mechanism sasl.Mechanism switch authMethod { case KafkaAuthPlain: if username == "" || password == "" { return nil, true, ErrMissingAuthInfo } mechanism = &plain.Mechanism{ Username: username, Password: password, } case KafkaAuthScram: if username == "" || password == "" { return nil, true, ErrMissingAuthInfo } algoKey := os.Getenv(EnvKafkaScramAlgo) algo := scram.SHA512 algoTmp, exist := scramAlgo[algoKey] if exist { algo = algoTmp } var err error mechanism, err = scram.Mechanism(algo, username, password) if err != nil { return nil, true, err } } return mechanism, mechanism != nil, nil } func NewKafkaReader(config *config.KafkaConfig) *kafka.Reader { mechanism, _, err := getSASLMechanism(config) if err != nil { log.Fatal().Err(err).Msg("failed to get SASL mechanism") } if len(config.Brokers) == 0 { log.Fatal().Msg("no KAFKA_BROKERS env variable set") } if config.Group == "" { config.Group = KafkaGroupIvanWafDetectionsPalace log.Warn().Msg("no KAFKA_GROUP env variable set, using default group") } if config.Topic == "" { config.Topic = KafkaTopicIvanWafDetections log.Warn().Msg("no KAFKA_TOPIC env variable set, using default topic") } return kafka.NewReader(kafka.ReaderConfig{ Brokers: config.Brokers, Topic: config.Topic, GroupID: config.Group, Dialer: &kafka.Dialer{ SASLMechanism: mechanism, Timeout: 10 * time.Second, DualStack: true, }, }) }