log_consumer.go 5.44 KB
Newer Older
1 2 3 4
package service

import (
	"context"
5
	"encoding/json"
6 7 8 9
	"errors"
	"os"
	"strings"
	"time"
10 11 12

	"github.com/rs/zerolog/log"
	"github.com/segmentio/kafka-go"
13 14 15
	"github.com/segmentio/kafka-go/sasl"
	"github.com/segmentio/kafka-go/sasl/plain"
	"github.com/segmentio/kafka-go/sasl/scram"
16 17 18 19 20
	"gitlab.com/tensorsecurity-rd/waf-console/internal/model"
	"gitlab.com/tensorsecurity-rd/waf-console/internal/utils/id"

	es "github.com/olivere/elastic/v7"
	esStore "gitlab.com/tensorsecurity-rd/waf-console/internal/store"
21 22 23
	"gorm.io/gorm"
)

24 25 26 27 28 29 30 31
const (
	KafkaTopicIvanWafDetections       = "ivan_waf_detections"
	KafkaGroupIvanWafDetectionsPalace = "ivan_waf_detections_palace"

	EsIndexWafDetections      = "waf-detections*"
	EsIndexWafDetectionsAlias = "waf-detections"
)

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
var scramAlgo = map[string]scram.Algorithm{
	"SHA512": scram.SHA512,
	"SHA256": scram.SHA256,
}

var ErrMissingBrokers = errors.New("no KAFKA_BROKERS env variable set")
var ErrMissingAuthInfo = errors.New("no username or passwrod set when auth is enabled")
var ErrUnsupportedAuthMethod = errors.New("unsupported auth method")

const (
	EnvKafkaAuthMethod   = "KAFKA_AUTH_METHOD"
	EnvKafkaAuthUsername = "KAFKA_AUTH_USERNAME"
	EnvKafkaAuthPassword = "KAFKA_AUTH_PASSWORD"
	EnvKafkaScramAlgo    = "KAFKA_SCRAM_ALGO"
	EnvKafkaBrokers      = "KAFKA_BROKERS"
	KafkaAuthPlain       = "plain"
	KafkaAuthScram       = "scram"
)

func getSASLMechanismByEnv() (sasl.Mechanism, bool, error) {
	authMethod := os.Getenv(EnvKafkaAuthMethod)
	username := os.Getenv(EnvKafkaAuthUsername)
	password := os.Getenv(EnvKafkaAuthPassword)
	var mechanism sasl.Mechanism
	switch authMethod {
	case KafkaAuthPlain:
		if username == "" || password == "" {
			return nil, true, ErrMissingAuthInfo
		}
		mechanism = &plain.Mechanism{
			Username: username,
			Password: password,
		}
	case KafkaAuthScram:
		if username == "" || password == "" {
			return nil, true, ErrMissingAuthInfo
		}
		algoKey := os.Getenv(EnvKafkaScramAlgo)
		algo := scram.SHA512
		algoTmp, exist := scramAlgo[algoKey]
		if exist {
			algo = algoTmp
		}
		var err error
		mechanism, err = scram.Mechanism(algo, username, password)
		if err != nil {
			return nil, true, err
		}
	}
	return mechanism, mechanism != nil, nil
}

84 85 86
type LogConsumerService struct {
	consumer *kafka.Reader
	db       *gorm.DB
87
	esStore  *esStore.ESStore
88 89
}

90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
func NewLogConsumerService(db *gorm.DB, esStore *esStore.ESStore) *LogConsumerService {
	mechanism, _, err := getSASLMechanismByEnv()
	if err != nil {
		log.Fatal().Err(err).Msg("failed to get SASL mechanism")
	}
	brokers := strings.Split(os.Getenv(EnvKafkaBrokers), ",")
	if len(brokers) == 0 {
		log.Fatal().Msg("no KAFKA_BROKERS env variable set")
	}

	consumer := kafka.NewReader(kafka.ReaderConfig{
		Brokers: brokers,
		GroupID: KafkaGroupIvanWafDetectionsPalace,
		Topic:   KafkaTopicIvanWafDetections,
		Dialer: &kafka.Dialer{
			SASLMechanism: mechanism,
			Timeout:       10 * time.Second,
			DualStack:     true,
		},
	})

111 112 113
	return &LogConsumerService{
		consumer: consumer,
		db:       db,
114
		esStore:  esStore,
115 116 117 118
	}
}

func (s *LogConsumerService) Consume() {
119
	log.Info().Msg("start consume kafka message")
120 121 122 123 124 125 126 127 128 129 130 131 132
	for {
		m, err := s.consumer.ReadMessage(context.Background())
		if err != nil {
			log.Error().Err(err).Msg("failed to read message")
			continue
		}
		log.Info().Msgf("message: %s", string(m.Value))
		go s.processMessage(m)
	}
}

func (s *LogConsumerService) processMessage(m kafka.Message) {
	log.Info().Msgf("processing message: %s", string(m.Value))
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
	s.Handle(context.Background(), m.Value)
}

func (s *LogConsumerService) Handle(ctx context.Context, message []byte) error {
	WafDetectionMessage := model.WafDetectionMessage{}
	err := json.Unmarshal(message, &WafDetectionMessage)
	if err != nil {
		log.Err(err).Str("message.Value", string(message)).Msg("unmarshal kafka message fails")
		return err
	}

	bulkableRequests := make([]es.BulkableRequest, 0)
	WafDetections := make([]model.WafDetection, len(WafDetectionMessage.AttackedLog))

	unPassCount := 0
	for i := range WafDetectionMessage.AttackedLog {
		if WafDetectionMessage.AttackedLog[i].AttackIP == "" {
			log.Err(err).Str("message.Value", string(message)).Msg("WafDetectionMessage.AttackedLog[i].AttackIP empty")
			continue
		}
		if WafDetectionMessage.AttackedLog[i].Action != "pass" {
			unPassCount++
		}
		WafDetections[i].WafDetectionMessageBasic = WafDetectionMessage.WafDetectionMessageBasic
		WafDetections[i].WafDetectionAttackedLog = WafDetectionMessage.AttackedLog[i]
		WafDetections[i].WafDetectionAttackedLog.ID = id.Str()
		WafDetections[i].CreatedAt = WafDetectionMessage.CreatedAt

		bulkIndexSignal := es.NewBulkIndexRequest().Index(EsIndexWafDetectionsAlias)
		bulkableRequests = append(bulkableRequests, bulkIndexSignal.Id(WafDetections[i].WafDetectionAttackedLog.ID).Doc(WafDetections[i]))
	}

	s.esStore.Save(ctx, bulkableRequests)

	wafService := model.WafService{}
	err = s.db.WithContext(ctx).Where("id = ?", WafDetectionMessage.ServiceID).First(&wafService).Error
	if err != nil {
		log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Msg("Find waf_service fails")
		return err
	}

	attackNumber := int(wafService.AttackNum) + unPassCount
	err = s.db.WithContext(ctx).Model(&model.WafService{}).Where("id = ?", WafDetectionMessage.ServiceID).Update("attack_number", attackNumber).Error
	if err != nil {
		log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Int("attack_number", attackNumber).Msg("update waf_service attack_number fails")
		return err
	}

	return nil
182
}