log_consumer.go 3.29 KB
Newer Older
1 2 3 4
package service

import (
	"context"
5
	"encoding/json"
6 7 8

	"github.com/rs/zerolog/log"
	"github.com/segmentio/kafka-go"
9 10 11 12 13
	"gitlab.com/tensorsecurity-rd/waf-console/internal/model"
	"gitlab.com/tensorsecurity-rd/waf-console/internal/utils/id"

	es "github.com/olivere/elastic/v7"
	esStore "gitlab.com/tensorsecurity-rd/waf-console/internal/store"
14 15 16
	"gorm.io/gorm"
)

17 18 19 20 21 22 23 24
const (
	KafkaTopicIvanWafDetections       = "ivan_waf_detections"
	KafkaGroupIvanWafDetectionsPalace = "ivan_waf_detections_palace"

	EsIndexWafDetections      = "waf-detections*"
	EsIndexWafDetectionsAlias = "waf-detections"
)

25 26 27
type LogConsumerService struct {
	consumer *kafka.Reader
	db       *gorm.DB
28
	esStore  *esStore.ESStore
29 30
}

31
func NewLogConsumerService(consumer *kafka.Reader, db *gorm.DB, esStore *esStore.ESStore) *LogConsumerService {
32 33 34
	return &LogConsumerService{
		consumer: consumer,
		db:       db,
35
		esStore:  esStore,
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
	}
}

func (s *LogConsumerService) Consume() {
	for {
		m, err := s.consumer.ReadMessage(context.Background())
		if err != nil {
			log.Error().Err(err).Msg("failed to read message")
			continue
		}
		log.Info().Msgf("message: %s", string(m.Value))
		go s.processMessage(m)
	}
}

func (s *LogConsumerService) processMessage(m kafka.Message) {
	log.Info().Msgf("processing message: %s", string(m.Value))
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
	s.Handle(context.Background(), m.Value)
}

func (s *LogConsumerService) Handle(ctx context.Context, message []byte) error {
	WafDetectionMessage := model.WafDetectionMessage{}
	err := json.Unmarshal(message, &WafDetectionMessage)
	if err != nil {
		log.Err(err).Str("message.Value", string(message)).Msg("unmarshal kafka message fails")
		return err
	}

	bulkableRequests := make([]es.BulkableRequest, 0)
	WafDetections := make([]model.WafDetection, len(WafDetectionMessage.AttackedLog))

	unPassCount := 0
	for i := range WafDetectionMessage.AttackedLog {
		if WafDetectionMessage.AttackedLog[i].AttackIP == "" {
			log.Err(err).Str("message.Value", string(message)).Msg("WafDetectionMessage.AttackedLog[i].AttackIP empty")
			continue
		}
		if WafDetectionMessage.AttackedLog[i].Action != "pass" {
			unPassCount++
		}
		WafDetections[i].WafDetectionMessageBasic = WafDetectionMessage.WafDetectionMessageBasic
		WafDetections[i].WafDetectionAttackedLog = WafDetectionMessage.AttackedLog[i]
		WafDetections[i].WafDetectionAttackedLog.ID = id.Str()
		WafDetections[i].CreatedAt = WafDetectionMessage.CreatedAt

		bulkIndexSignal := es.NewBulkIndexRequest().Index(EsIndexWafDetectionsAlias)
		bulkableRequests = append(bulkableRequests, bulkIndexSignal.Id(WafDetections[i].WafDetectionAttackedLog.ID).Doc(WafDetections[i]))
	}

	s.esStore.Save(ctx, bulkableRequests)

	wafService := model.WafService{}
	err = s.db.WithContext(ctx).Where("id = ?", WafDetectionMessage.ServiceID).First(&wafService).Error
	if err != nil {
		log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Msg("Find waf_service fails")
		return err
	}

	attackNumber := int(wafService.AttackNum) + unPassCount
	err = s.db.WithContext(ctx).Model(&model.WafService{}).Where("id = ?", WafDetectionMessage.ServiceID).Update("attack_number", attackNumber).Error
	if err != nil {
		log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Int("attack_number", attackNumber).Msg("update waf_service attack_number fails")
		return err
	}

	return nil
102
}