kafka.go 1.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
package service

import (
	"os"
	"time"

	"github.com/rs/zerolog/log"
	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl"
	"github.com/segmentio/kafka-go/sasl/plain"
	"github.com/segmentio/kafka-go/sasl/scram"
12
	"gitlab.com/tensorsecurity-rd/waf-console/internal/config"
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
)

func getSASLMechanism(config *config.KafkaConfig) (sasl.Mechanism, bool, error) {
	authMethod := config.AuthMethod
	username := config.Username
	password := config.Password
	var mechanism sasl.Mechanism
	switch authMethod {
	case KafkaAuthPlain:
		if username == "" || password == "" {
			return nil, true, ErrMissingAuthInfo
		}
		mechanism = &plain.Mechanism{
			Username: username,
			Password: password,
		}
	case KafkaAuthScram:
		if username == "" || password == "" {
			return nil, true, ErrMissingAuthInfo
		}
		algoKey := os.Getenv(EnvKafkaScramAlgo)
		algo := scram.SHA512
		algoTmp, exist := scramAlgo[algoKey]
		if exist {
			algo = algoTmp
		}
		var err error
		mechanism, err = scram.Mechanism(algo, username, password)
		if err != nil {
			return nil, true, err
		}
	}
	return mechanism, mechanism != nil, nil
}

func NewKafkaReader(config *config.KafkaConfig) *kafka.Reader {
	mechanism, _, err := getSASLMechanism(config)
	if err != nil {
		log.Fatal().Err(err).Msg("failed to get SASL mechanism")
	}
	if len(config.Brokers) == 0 {
		log.Fatal().Msg("no KAFKA_BROKERS env variable set")
	}
	if config.Group == "" {
		config.Group = KafkaGroupIvanWafDetectionsPalace
		log.Warn().Msg("no KAFKA_GROUP env variable set, using default group")
	}
	if config.Topic == "" {
		config.Topic = KafkaTopicIvanWafDetections
		log.Warn().Msg("no KAFKA_TOPIC env variable set, using default topic")
	}

	return kafka.NewReader(kafka.ReaderConfig{
		Brokers: config.Brokers,
		Topic:   config.Topic,
		GroupID: config.Group,
		Dialer: &kafka.Dialer{
			SASLMechanism: mechanism,
			Timeout:       10 * time.Second,
			DualStack:     true,
		},
	})
}