log_consumer.go 5.25 KB
Newer Older
1 2 3 4
package service

import (
	"context"
5
	"encoding/json"
6
	"errors"
7 8 9

	"github.com/rs/zerolog/log"
	"github.com/segmentio/kafka-go"
10
	"github.com/segmentio/kafka-go/sasl/scram"
11
	"gitlab.com/tensorsecurity-rd/waf-console/cmd/config"
12 13 14 15 16
	"gitlab.com/tensorsecurity-rd/waf-console/internal/model"
	"gitlab.com/tensorsecurity-rd/waf-console/internal/utils/id"

	es "github.com/olivere/elastic/v7"
	esStore "gitlab.com/tensorsecurity-rd/waf-console/internal/store"
17 18 19
	"gorm.io/gorm"
)

20 21 22 23 24 25 26 27
const (
	KafkaTopicIvanWafDetections       = "ivan_waf_detections"
	KafkaGroupIvanWafDetectionsPalace = "ivan_waf_detections_palace"

	EsIndexWafDetections      = "waf-detections*"
	EsIndexWafDetectionsAlias = "waf-detections"
)

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
var scramAlgo = map[string]scram.Algorithm{
	"SHA512": scram.SHA512,
	"SHA256": scram.SHA256,
}

var ErrMissingBrokers = errors.New("no KAFKA_BROKERS env variable set")
var ErrMissingAuthInfo = errors.New("no username or passwrod set when auth is enabled")
var ErrUnsupportedAuthMethod = errors.New("unsupported auth method")

const (
	EnvKafkaAuthMethod   = "KAFKA_AUTH_METHOD"
	EnvKafkaAuthUsername = "KAFKA_AUTH_USERNAME"
	EnvKafkaAuthPassword = "KAFKA_AUTH_PASSWORD"
	EnvKafkaScramAlgo    = "KAFKA_SCRAM_ALGO"
	EnvKafkaBrokers      = "KAFKA_BROKERS"
	KafkaAuthPlain       = "plain"
	KafkaAuthScram       = "scram"
)

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
// func getSASLMechanismByEnv() (sasl.Mechanism, bool, error) {
// 	authMethod := os.Getenv(EnvKafkaAuthMethod)
// 	username := os.Getenv(EnvKafkaAuthUsername)
// 	password := os.Getenv(EnvKafkaAuthPassword)
// 	var mechanism sasl.Mechanism
// 	switch authMethod {
// 	case KafkaAuthPlain:
// 		if username == "" || password == "" {
// 			return nil, true, ErrMissingAuthInfo
// 		}
// 		mechanism = &plain.Mechanism{
// 			Username: username,
// 			Password: password,
// 		}
// 	case KafkaAuthScram:
// 		if username == "" || password == "" {
// 			return nil, true, ErrMissingAuthInfo
// 		}
// 		algoKey := os.Getenv(EnvKafkaScramAlgo)
// 		algo := scram.SHA512
// 		algoTmp, exist := scramAlgo[algoKey]
// 		if exist {
// 			algo = algoTmp
// 		}
// 		var err error
// 		mechanism, err = scram.Mechanism(algo, username, password)
// 		if err != nil {
// 			return nil, true, err
// 		}
// 	}
// 	return mechanism, mechanism != nil, nil
// }
79

80 81 82
type LogConsumerService struct {
	consumer *kafka.Reader
	db       *gorm.DB
83
	esStore  *esStore.ESStore
84 85
}

86 87 88 89 90 91 92 93 94
func NewLogConsumerService(db *gorm.DB, esStore *esStore.ESStore, config *config.KafkaConfig) *LogConsumerService {
	// mechanism, _, err := getSASLMechanismByEnv()
	// if err != nil {
	// 	log.Fatal().Err(err).Msg("failed to get SASL mechanism")
	// }
	// brokers := strings.Split(os.Getenv(EnvKafkaBrokers), ",")
	// if len(brokers) == 0 {
	// 	log.Fatal().Msg("no KAFKA_BROKERS env variable set")
	// }
95

96 97 98 99 100 101 102 103 104 105 106
	// consumer := kafka.NewReader(kafka.ReaderConfig{
	// 	Brokers: brokers,
	// 	GroupID: KafkaGroupIvanWafDetectionsPalace,
	// 	Topic:   KafkaTopicIvanWafDetections,
	// 	Dialer: &kafka.Dialer{
	// 		SASLMechanism: mechanism,
	// 		Timeout:       10 * time.Second,
	// 		DualStack:     true,
	// 	},
	// })
	consumer := NewKafkaReader(config)
107

108 109 110
	return &LogConsumerService{
		consumer: consumer,
		db:       db,
111
		esStore:  esStore,
112 113 114 115
	}
}

func (s *LogConsumerService) Consume() {
116
	log.Info().Msg("start consume kafka message")
117 118 119 120 121 122 123 124 125 126 127 128 129
	for {
		m, err := s.consumer.ReadMessage(context.Background())
		if err != nil {
			log.Error().Err(err).Msg("failed to read message")
			continue
		}
		log.Info().Msgf("message: %s", string(m.Value))
		go s.processMessage(m)
	}
}

func (s *LogConsumerService) processMessage(m kafka.Message) {
	log.Info().Msgf("processing message: %s", string(m.Value))
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
	s.Handle(context.Background(), m.Value)
}

func (s *LogConsumerService) Handle(ctx context.Context, message []byte) error {
	WafDetectionMessage := model.WafDetectionMessage{}
	err := json.Unmarshal(message, &WafDetectionMessage)
	if err != nil {
		log.Err(err).Str("message.Value", string(message)).Msg("unmarshal kafka message fails")
		return err
	}

	bulkableRequests := make([]es.BulkableRequest, 0)
	WafDetections := make([]model.WafDetection, len(WafDetectionMessage.AttackedLog))

	unPassCount := 0
	for i := range WafDetectionMessage.AttackedLog {
		if WafDetectionMessage.AttackedLog[i].AttackIP == "" {
			log.Err(err).Str("message.Value", string(message)).Msg("WafDetectionMessage.AttackedLog[i].AttackIP empty")
			continue
		}
		if WafDetectionMessage.AttackedLog[i].Action != "pass" {
			unPassCount++
		}
		WafDetections[i].WafDetectionMessageBasic = WafDetectionMessage.WafDetectionMessageBasic
		WafDetections[i].WafDetectionAttackedLog = WafDetectionMessage.AttackedLog[i]
		WafDetections[i].WafDetectionAttackedLog.ID = id.Str()
		WafDetections[i].CreatedAt = WafDetectionMessage.CreatedAt

		bulkIndexSignal := es.NewBulkIndexRequest().Index(EsIndexWafDetectionsAlias)
		bulkableRequests = append(bulkableRequests, bulkIndexSignal.Id(WafDetections[i].WafDetectionAttackedLog.ID).Doc(WafDetections[i]))
	}

	s.esStore.Save(ctx, bulkableRequests)

164
	err = s.db.WithContext(ctx).Model(&model.WafService{}).Where("id = ?", WafDetectionMessage.ServiceID).Update("attack_num", gorm.Expr("attack_num + ?", 1)).Error
165
	if err != nil {
166
		log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Msg("update waf_service attack_number fails")
167 168 169 170
		return err
	}

	return nil
171
}