Commit 859c282c authored by qiuqunfeng's avatar qiuqunfeng
Browse files

Enhance LogConsumerService to include region name in log processing, updating...

Enhance LogConsumerService to include region name in log processing, updating configuration structure and initialization to support new regionName parameter.
parent b33fd4c1
...@@ -46,7 +46,7 @@ func NewRootCommand() *cobra.Command { ...@@ -46,7 +46,7 @@ func NewRootCommand() *cobra.Command {
ESTimeout: 10 * time.Second, ESTimeout: 10 * time.Second,
}, esClient) }, esClient)
esStore.Init() esStore.Init()
logConsumerService := service.NewLogConsumerService(nil, esStore, config.KafkaConfig) logConsumerService := service.NewLogConsumerService(nil, esStore, config.KafkaConfig, config.RegionConfig.RegionName)
go logConsumerService.Consume() go logConsumerService.Consume()
return e.Run(":8080") return e.Run(":8080")
}, },
......
...@@ -10,11 +10,12 @@ import ( ...@@ -10,11 +10,12 @@ import (
type RegionConfig struct { type RegionConfig struct {
RegionCode string `json:"region_code"` RegionCode string `json:"region_code"`
RegionName string `json:"region_name"`
ApiServer string `json:"api_server"` ApiServer string `json:"api_server"`
} }
type Config struct { type Config struct {
RegionConfigs []RegionConfig `json:"region_configs"` RegionConfig RegionConfig `json:"region_config"`
Debug bool `json:"debug"` Debug bool `json:"debug"`
SSOUrl string `json:"sso_url"` SSOUrl string `json:"sso_url"`
ElasticsearchConfig *config.ElasticsearchConfig `json:"elasticsearch_config"` ElasticsearchConfig *config.ElasticsearchConfig `json:"elasticsearch_config"`
......
...@@ -115,9 +115,10 @@ type LogConsumerService struct { ...@@ -115,9 +115,10 @@ type LogConsumerService struct {
consumer *kafka.Reader consumer *kafka.Reader
db *gorm.DB db *gorm.DB
esStore *esStore.ESStore esStore *esStore.ESStore
regionName string
} }
func NewLogConsumerService(db *gorm.DB, esStore *esStore.ESStore, config *config.KafkaConfig) *LogConsumerService { func NewLogConsumerService(db *gorm.DB, esStore *esStore.ESStore, config *config.KafkaConfig, regionName string) *LogConsumerService {
// mechanism, _, err := getSASLMechanismByEnv() // mechanism, _, err := getSASLMechanismByEnv()
// if err != nil { // if err != nil {
// log.Fatal().Err(err).Msg("failed to get SASL mechanism") // log.Fatal().Err(err).Msg("failed to get SASL mechanism")
...@@ -143,6 +144,7 @@ func NewLogConsumerService(db *gorm.DB, esStore *esStore.ESStore, config *config ...@@ -143,6 +144,7 @@ func NewLogConsumerService(db *gorm.DB, esStore *esStore.ESStore, config *config
consumer: consumer, consumer: consumer,
db: db, db: db,
esStore: esStore, esStore: esStore,
regionName: regionName,
} }
} }
...@@ -199,7 +201,7 @@ func (s *LogConsumerService) genWafDetectionSignal(wafDetectionMessage model.Waf ...@@ -199,7 +201,7 @@ func (s *LogConsumerService) genWafDetectionSignal(wafDetectionMessage model.Waf
"cluster": { "cluster": {
Kind: "cluster", Kind: "cluster",
ID: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey, ID: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey,
Name: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey, Name: s.regionName,
}, },
"namespace": { "namespace": {
Kind: "namespace", Kind: "namespace",
...@@ -209,7 +211,7 @@ func (s *LogConsumerService) genWafDetectionSignal(wafDetectionMessage model.Waf ...@@ -209,7 +211,7 @@ func (s *LogConsumerService) genWafDetectionSignal(wafDetectionMessage model.Waf
"resource": { "resource": {
Kind: "resource", Kind: "resource",
ID: "", ID: "",
Name: fmt.Sprintf("%s(%s)", wafDetectionMessage.WafDetectionMessageBasic.ResName, wafDetectionMessage.WafDetectionMessageBasic.ResKind), Name: fmt.Sprintf("%s(%s)", wafDetectionMessage.WafDetectionMessageBasic.ResName, "Deployment"),
}, },
}, },
Severity: serverityFromAttackAction(attackedLog.Action), Severity: serverityFromAttackAction(attackedLog.Action),
...@@ -263,7 +265,7 @@ func (s *LogConsumerService) genWafDetectionEvent(wafDetectionMessage model.WafD ...@@ -263,7 +265,7 @@ func (s *LogConsumerService) genWafDetectionEvent(wafDetectionMessage model.WafD
{ {
Kind: "cluster", Kind: "cluster",
ID: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey, ID: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey,
Name: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey, Name: s.regionName,
}, },
}, },
"namespace": { "namespace": {
...@@ -296,7 +298,7 @@ func (s *LogConsumerService) genWafDetectionEvent(wafDetectionMessage model.WafD ...@@ -296,7 +298,7 @@ func (s *LogConsumerService) genWafDetectionEvent(wafDetectionMessage model.WafD
"resource": { "resource": {
Kind: "resource", Kind: "resource",
ID: "", ID: "",
Name: fmt.Sprintf("%s(%s)", wafDetectionMessage.WafDetectionMessageBasic.ResName, wafDetectionMessage.WafDetectionMessageBasic.ResKind), Name: fmt.Sprintf("%s(%s)", wafDetectionMessage.WafDetectionMessageBasic.ResName, "Deployment"),
}, },
}, },
}, },
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment