Commit 22159493 authored by qiuqunfeng's avatar qiuqunfeng
Browse files

Implement Kafka log consumer with Elasticsearch and database integration

- Add log consumer service to process WAF detection messages
- Implement message handling with JSON unmarshaling
- Save WAF detection logs to Elasticsearch
- Update WAF service attack number in database
- Modify ESStore to support bulk indexing with channel-based approach
- Add constants for Kafka topics and Elasticsearch indices
parent 6f4bd708
...@@ -58,6 +58,7 @@ require ( ...@@ -58,6 +58,7 @@ require (
github.com/olivere/elastic/v7 v7.0.32 // indirect github.com/olivere/elastic/v7 v7.0.32 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/segmentio/kafka-go v0.4.40 // indirect github.com/segmentio/kafka-go v0.4.40 // indirect
github.com/sony/sonyflake v1.0.0 // indirect
) )
require ( require (
......
...@@ -23,6 +23,7 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 ...@@ -23,6 +23,7 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
...@@ -185,6 +186,8 @@ github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3 ...@@ -185,6 +186,8 @@ github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/segmentio/kafka-go v0.4.40 h1:sszW7c0/uyv7+VcTW5trx2ZC7kMWDTxuR/6Zn8U1bm8= github.com/segmentio/kafka-go v0.4.40 h1:sszW7c0/uyv7+VcTW5trx2ZC7kMWDTxuR/6Zn8U1bm8=
github.com/segmentio/kafka-go v0.4.40/go.mod h1:naFEZc5MQKdeL3W6NkZIAn48Y6AazqjRFDhnXeg3h94= github.com/segmentio/kafka-go v0.4.40/go.mod h1:naFEZc5MQKdeL3W6NkZIAn48Y6AazqjRFDhnXeg3h94=
github.com/sony/sonyflake v1.0.0 h1:MpU6Ro7tfXwgn2l5eluf9xQvQJDROTBImNCfRXn/YeM=
github.com/sony/sonyflake v1.0.0/go.mod h1:Jv3cfhf/UFtolOTTRd3q4Nl6ENqM+KfyZ5PseKfZGF4=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
......
package model
type WafDetectionMessage struct {
WafDetectionMessageBasic
AttackedLog []WafDetectionAttackedLog `json:"attacked_log"`
CreatedAt int64 `json:"created_at"`
}
type WafDetectionMessageBasic struct {
ClusterKey string `json:"cluster_key"`
Namespace string `json:"namespace"`
ResKind string `json:"res_kind"`
ResName string `json:"res_name"`
ServiceID int64 `json:"service_id"`
AppName string `json:"app_name"`
}
type WafDetectionAttackedLog struct {
ID string `json:"id"`
RuleID int64 `json:"rule_id"`
RuleName string `json:"rule_name"`
Action string `json:"action"`
AttackIP string `json:"attack_ip"`
AttackLoad string `json:"attack_load"`
AttackTime int64 `json:"attack_time"`
AttackType string `json:"attack_type"`
AttackedApp string `json:"attacked_app"`
AttackedURL string `json:"attacked_url"`
ReqPkg string `json:"req_pkg"`
RspPkg string `json:"rsp_pkg,omitempty"`
RspContentType string `json:"rsp_content_type,omitempty"`
}
type WafDetection struct {
WafDetectionMessageBasic
WafDetectionAttackedLog
CreatedAt int64 `json:"created_at"`
}
...@@ -2,21 +2,37 @@ package service ...@@ -2,21 +2,37 @@ package service
import ( import (
"context" "context"
"encoding/json"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
"gitlab.com/tensorsecurity-rd/waf-console/internal/model"
"gitlab.com/tensorsecurity-rd/waf-console/internal/utils/id"
es "github.com/olivere/elastic/v7"
esStore "gitlab.com/tensorsecurity-rd/waf-console/internal/store"
"gorm.io/gorm" "gorm.io/gorm"
) )
const (
KafkaTopicIvanWafDetections = "ivan_waf_detections"
KafkaGroupIvanWafDetectionsPalace = "ivan_waf_detections_palace"
EsIndexWafDetections = "waf-detections*"
EsIndexWafDetectionsAlias = "waf-detections"
)
type LogConsumerService struct { type LogConsumerService struct {
consumer *kafka.Reader consumer *kafka.Reader
db *gorm.DB db *gorm.DB
esStore *esStore.ESStore
} }
func NewLogConsumerService(consumer *kafka.Reader, db *gorm.DB) *LogConsumerService { func NewLogConsumerService(consumer *kafka.Reader, db *gorm.DB, esStore *esStore.ESStore) *LogConsumerService {
return &LogConsumerService{ return &LogConsumerService{
consumer: consumer, consumer: consumer,
db: db, db: db,
esStore: esStore,
} }
} }
...@@ -34,4 +50,53 @@ func (s *LogConsumerService) Consume() { ...@@ -34,4 +50,53 @@ func (s *LogConsumerService) Consume() {
func (s *LogConsumerService) processMessage(m kafka.Message) { func (s *LogConsumerService) processMessage(m kafka.Message) {
log.Info().Msgf("processing message: %s", string(m.Value)) log.Info().Msgf("processing message: %s", string(m.Value))
s.Handle(context.Background(), m.Value)
}
func (s *LogConsumerService) Handle(ctx context.Context, message []byte) error {
WafDetectionMessage := model.WafDetectionMessage{}
err := json.Unmarshal(message, &WafDetectionMessage)
if err != nil {
log.Err(err).Str("message.Value", string(message)).Msg("unmarshal kafka message fails")
return err
}
bulkableRequests := make([]es.BulkableRequest, 0)
WafDetections := make([]model.WafDetection, len(WafDetectionMessage.AttackedLog))
unPassCount := 0
for i := range WafDetectionMessage.AttackedLog {
if WafDetectionMessage.AttackedLog[i].AttackIP == "" {
log.Err(err).Str("message.Value", string(message)).Msg("WafDetectionMessage.AttackedLog[i].AttackIP empty")
continue
}
if WafDetectionMessage.AttackedLog[i].Action != "pass" {
unPassCount++
}
WafDetections[i].WafDetectionMessageBasic = WafDetectionMessage.WafDetectionMessageBasic
WafDetections[i].WafDetectionAttackedLog = WafDetectionMessage.AttackedLog[i]
WafDetections[i].WafDetectionAttackedLog.ID = id.Str()
WafDetections[i].CreatedAt = WafDetectionMessage.CreatedAt
bulkIndexSignal := es.NewBulkIndexRequest().Index(EsIndexWafDetectionsAlias)
bulkableRequests = append(bulkableRequests, bulkIndexSignal.Id(WafDetections[i].WafDetectionAttackedLog.ID).Doc(WafDetections[i]))
}
s.esStore.Save(ctx, bulkableRequests)
wafService := model.WafService{}
err = s.db.WithContext(ctx).Where("id = ?", WafDetectionMessage.ServiceID).First(&wafService).Error
if err != nil {
log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Msg("Find waf_service fails")
return err
}
attackNumber := int(wafService.AttackNum) + unPassCount
err = s.db.WithContext(ctx).Model(&model.WafService{}).Where("id = ?", WafDetectionMessage.ServiceID).Update("attack_number", attackNumber).Error
if err != nil {
log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Int("attack_number", attackNumber).Msg("update waf_service attack_number fails")
return err
}
return nil
} }
...@@ -15,7 +15,7 @@ import ( ...@@ -15,7 +15,7 @@ import (
es "github.com/olivere/elastic/v7" es "github.com/olivere/elastic/v7"
) )
var BulkChan chan []es.BulkableRequest // var BulkChan chan []es.BulkableRequest
var esBatchSize int var esBatchSize int
type Config struct { type Config struct {
...@@ -55,6 +55,7 @@ func CreateEsClientFromEnv() (*es.Client, error) { ...@@ -55,6 +55,7 @@ func CreateEsClientFromEnv() (*es.Client, error) {
type ESStore struct { type ESStore struct {
esClient *es.Client esClient *es.Client
config Config config Config
BulkChan chan []es.BulkableRequest
} }
func readTplFile(filename string) (string, error) { func readTplFile(filename string) (string, error) {
...@@ -72,13 +73,21 @@ func readTplFile(filename string) (string, error) { ...@@ -72,13 +73,21 @@ func readTplFile(filename string) (string, error) {
return string(body), nil return string(body), nil
} }
func NewESStore(config Config, client *es.Client) *ESStore {
return &ESStore{
config: config,
esClient: client,
BulkChan: make(chan []es.BulkableRequest),
}
}
// Init 初始化ES
func (e *ESStore) Init() { func (e *ESStore) Init() {
ctx := context.Background() ctx := context.Background()
err := e.createIndex(ctx, "waf_detections_index", "test.json", "waf-detections-000001", "waf-detections") err := e.createIndex(ctx, "waf_detections_index", "test.json", "waf-detections-000001", "waf-detections")
if err != nil { if err != nil {
log.Error().Err(err).Msg("create index test failed") log.Error().Err(err).Msg("create index test failed")
} }
BulkChan = make(chan []es.BulkableRequest)
esConcurrency := e.config.ESConcurrency esConcurrency := e.config.ESConcurrency
esBatchSize = e.config.ESBatchSize esBatchSize = e.config.ESBatchSize
...@@ -86,7 +95,7 @@ func (e *ESStore) Init() { ...@@ -86,7 +95,7 @@ func (e *ESStore) Init() {
go func() { go func() {
bulkService := e.esClient.Bulk() bulkService := e.esClient.Bulk()
for { for {
bulkableRequests := <-BulkChan bulkableRequests := <-e.BulkChan
bulkService.Add(bulkableRequests...) bulkService.Add(bulkableRequests...)
err := doSave(ctx, e.config, *bulkService) err := doSave(ctx, e.config, *bulkService)
if err != nil { if err != nil {
...@@ -171,12 +180,12 @@ func doSave(ctx context.Context, config Config, bulkService es.BulkService) erro ...@@ -171,12 +180,12 @@ func doSave(ctx context.Context, config Config, bulkService es.BulkService) erro
} }
func Send(ctx context.Context, bulkableRequests []es.BulkableRequest) { func (e *ESStore) Save(ctx context.Context, bulkableRequests []es.BulkableRequest) {
batchRequests := make([]es.BulkableRequest, 0, esBatchSize) batchRequests := make([]es.BulkableRequest, 0, esBatchSize)
for i := 0; i < len(bulkableRequests); i++ { for i := 0; i < len(bulkableRequests); i++ {
batchRequests = append(batchRequests, bulkableRequests[i]) batchRequests = append(batchRequests, bulkableRequests[i])
if (i+1)%esBatchSize == 0 || i == len(bulkableRequests)-1 { if (i+1)%esBatchSize == 0 || i == len(bulkableRequests)-1 {
BulkChan <- batchRequests e.BulkChan <- batchRequests
batchRequests = make([]es.BulkableRequest, 0, esBatchSize) batchRequests = make([]es.BulkableRequest, 0, esBatchSize)
} }
} }
......
package id
import (
"log"
"math/rand"
"strconv"
"time"
"github.com/sony/sonyflake"
)
var sf *sonyflake.Sonyflake
const max = 1<<16 - 1
func init() {
// 有一定的几率会产生相同的machineID
// 比如两个引用这个包的实例,在同一个微秒被创建...
rand.Seed(time.Now().UnixMicro())
st := sonyflake.Settings{
StartTime: time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC),
MachineID: func() (uint16, error) {
return uint16(rand.Intn(max)), nil
},
CheckMachineID: nil,
}
sf = sonyflake.NewSonyflake(st)
if sf == nil {
log.Panicf("sonyflake init fails")
}
}
func Str() string {
ui64id, _ := sf.NextID()
return strconv.FormatUint(ui64id, 10)
}
func UInt64() uint64 {
ui64id, _ := sf.NextID()
return ui64id
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment