package service import ( "context" "encoding/json" "github.com/rs/zerolog/log" "github.com/segmentio/kafka-go" "gitlab.com/tensorsecurity-rd/waf-console/internal/model" "gitlab.com/tensorsecurity-rd/waf-console/internal/utils/id" es "github.com/olivere/elastic/v7" esStore "gitlab.com/tensorsecurity-rd/waf-console/internal/store" "gorm.io/gorm" ) const ( KafkaTopicIvanWafDetections = "ivan_waf_detections" KafkaGroupIvanWafDetectionsPalace = "ivan_waf_detections_palace" EsIndexWafDetections = "waf-detections*" EsIndexWafDetectionsAlias = "waf-detections" ) type LogConsumerService struct { consumer *kafka.Reader db *gorm.DB esStore *esStore.ESStore } func NewLogConsumerService(consumer *kafka.Reader, db *gorm.DB, esStore *esStore.ESStore) *LogConsumerService { return &LogConsumerService{ consumer: consumer, db: db, esStore: esStore, } } func (s *LogConsumerService) Consume() { for { m, err := s.consumer.ReadMessage(context.Background()) if err != nil { log.Error().Err(err).Msg("failed to read message") continue } log.Info().Msgf("message: %s", string(m.Value)) go s.processMessage(m) } } func (s *LogConsumerService) processMessage(m kafka.Message) { log.Info().Msgf("processing message: %s", string(m.Value)) s.Handle(context.Background(), m.Value) } func (s *LogConsumerService) Handle(ctx context.Context, message []byte) error { WafDetectionMessage := model.WafDetectionMessage{} err := json.Unmarshal(message, &WafDetectionMessage) if err != nil { log.Err(err).Str("message.Value", string(message)).Msg("unmarshal kafka message fails") return err } bulkableRequests := make([]es.BulkableRequest, 0) WafDetections := make([]model.WafDetection, len(WafDetectionMessage.AttackedLog)) unPassCount := 0 for i := range WafDetectionMessage.AttackedLog { if WafDetectionMessage.AttackedLog[i].AttackIP == "" { log.Err(err).Str("message.Value", string(message)).Msg("WafDetectionMessage.AttackedLog[i].AttackIP empty") continue } if WafDetectionMessage.AttackedLog[i].Action != "pass" { unPassCount++ } WafDetections[i].WafDetectionMessageBasic = WafDetectionMessage.WafDetectionMessageBasic WafDetections[i].WafDetectionAttackedLog = WafDetectionMessage.AttackedLog[i] WafDetections[i].WafDetectionAttackedLog.ID = id.Str() WafDetections[i].CreatedAt = WafDetectionMessage.CreatedAt bulkIndexSignal := es.NewBulkIndexRequest().Index(EsIndexWafDetectionsAlias) bulkableRequests = append(bulkableRequests, bulkIndexSignal.Id(WafDetections[i].WafDetectionAttackedLog.ID).Doc(WafDetections[i])) } s.esStore.Save(ctx, bulkableRequests) wafService := model.WafService{} err = s.db.WithContext(ctx).Where("id = ?", WafDetectionMessage.ServiceID).First(&wafService).Error if err != nil { log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Msg("Find waf_service fails") return err } attackNumber := int(wafService.AttackNum) + unPassCount err = s.db.WithContext(ctx).Model(&model.WafService{}).Where("id = ?", WafDetectionMessage.ServiceID).Update("attack_number", attackNumber).Error if err != nil { log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Int("attack_number", attackNumber).Msg("update waf_service attack_number fails") return err } return nil }