package service import ( "context" "encoding/json" "errors" "os" "strings" "time" "github.com/rs/zerolog/log" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/scram" "gitlab.com/tensorsecurity-rd/waf-console/internal/model" "gitlab.com/tensorsecurity-rd/waf-console/internal/utils/id" es "github.com/olivere/elastic/v7" esStore "gitlab.com/tensorsecurity-rd/waf-console/internal/store" "gorm.io/gorm" ) const ( KafkaTopicIvanWafDetections = "ivan_waf_detections" KafkaGroupIvanWafDetectionsPalace = "ivan_waf_detections_palace" EsIndexWafDetections = "waf-detections*" EsIndexWafDetectionsAlias = "waf-detections" ) var scramAlgo = map[string]scram.Algorithm{ "SHA512": scram.SHA512, "SHA256": scram.SHA256, } var ErrMissingBrokers = errors.New("no KAFKA_BROKERS env variable set") var ErrMissingAuthInfo = errors.New("no username or passwrod set when auth is enabled") var ErrUnsupportedAuthMethod = errors.New("unsupported auth method") const ( EnvKafkaAuthMethod = "KAFKA_AUTH_METHOD" EnvKafkaAuthUsername = "KAFKA_AUTH_USERNAME" EnvKafkaAuthPassword = "KAFKA_AUTH_PASSWORD" EnvKafkaScramAlgo = "KAFKA_SCRAM_ALGO" EnvKafkaBrokers = "KAFKA_BROKERS" KafkaAuthPlain = "plain" KafkaAuthScram = "scram" ) func getSASLMechanismByEnv() (sasl.Mechanism, bool, error) { authMethod := os.Getenv(EnvKafkaAuthMethod) username := os.Getenv(EnvKafkaAuthUsername) password := os.Getenv(EnvKafkaAuthPassword) var mechanism sasl.Mechanism switch authMethod { case KafkaAuthPlain: if username == "" || password == "" { return nil, true, ErrMissingAuthInfo } mechanism = &plain.Mechanism{ Username: username, Password: password, } case KafkaAuthScram: if username == "" || password == "" { return nil, true, ErrMissingAuthInfo } algoKey := os.Getenv(EnvKafkaScramAlgo) algo := scram.SHA512 algoTmp, exist := scramAlgo[algoKey] if exist { algo = algoTmp } var err error mechanism, err = scram.Mechanism(algo, username, password) if err != nil { return nil, true, err } } return mechanism, mechanism != nil, nil } type LogConsumerService struct { consumer *kafka.Reader db *gorm.DB esStore *esStore.ESStore } func NewLogConsumerService(db *gorm.DB, esStore *esStore.ESStore) *LogConsumerService { mechanism, _, err := getSASLMechanismByEnv() if err != nil { log.Fatal().Err(err).Msg("failed to get SASL mechanism") } brokers := strings.Split(os.Getenv(EnvKafkaBrokers), ",") if len(brokers) == 0 { log.Fatal().Msg("no KAFKA_BROKERS env variable set") } consumer := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, GroupID: KafkaGroupIvanWafDetectionsPalace, Topic: KafkaTopicIvanWafDetections, Dialer: &kafka.Dialer{ SASLMechanism: mechanism, Timeout: 10 * time.Second, DualStack: true, }, }) return &LogConsumerService{ consumer: consumer, db: db, esStore: esStore, } } func (s *LogConsumerService) Consume() { for { m, err := s.consumer.ReadMessage(context.Background()) if err != nil { log.Error().Err(err).Msg("failed to read message") continue } log.Info().Msgf("message: %s", string(m.Value)) go s.processMessage(m) } } func (s *LogConsumerService) processMessage(m kafka.Message) { log.Info().Msgf("processing message: %s", string(m.Value)) s.Handle(context.Background(), m.Value) } func (s *LogConsumerService) Handle(ctx context.Context, message []byte) error { WafDetectionMessage := model.WafDetectionMessage{} err := json.Unmarshal(message, &WafDetectionMessage) if err != nil { log.Err(err).Str("message.Value", string(message)).Msg("unmarshal kafka message fails") return err } bulkableRequests := make([]es.BulkableRequest, 0) WafDetections := make([]model.WafDetection, len(WafDetectionMessage.AttackedLog)) unPassCount := 0 for i := range WafDetectionMessage.AttackedLog { if WafDetectionMessage.AttackedLog[i].AttackIP == "" { log.Err(err).Str("message.Value", string(message)).Msg("WafDetectionMessage.AttackedLog[i].AttackIP empty") continue } if WafDetectionMessage.AttackedLog[i].Action != "pass" { unPassCount++ } WafDetections[i].WafDetectionMessageBasic = WafDetectionMessage.WafDetectionMessageBasic WafDetections[i].WafDetectionAttackedLog = WafDetectionMessage.AttackedLog[i] WafDetections[i].WafDetectionAttackedLog.ID = id.Str() WafDetections[i].CreatedAt = WafDetectionMessage.CreatedAt bulkIndexSignal := es.NewBulkIndexRequest().Index(EsIndexWafDetectionsAlias) bulkableRequests = append(bulkableRequests, bulkIndexSignal.Id(WafDetections[i].WafDetectionAttackedLog.ID).Doc(WafDetections[i])) } s.esStore.Save(ctx, bulkableRequests) wafService := model.WafService{} err = s.db.WithContext(ctx).Where("id = ?", WafDetectionMessage.ServiceID).First(&wafService).Error if err != nil { log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Msg("Find waf_service fails") return err } attackNumber := int(wafService.AttackNum) + unPassCount err = s.db.WithContext(ctx).Model(&model.WafService{}).Where("id = ?", WafDetectionMessage.ServiceID).Update("attack_number", attackNumber).Error if err != nil { log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Int("attack_number", attackNumber).Msg("update waf_service attack_number fails") return err } return nil }