package service import ( "context" "encoding/json" "errors" "fmt" "time" "github.com/rs/zerolog/log" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/scram" "gitlab.com/tensorsecurity-rd/waf-console/internal/config" "gitlab.com/tensorsecurity-rd/waf-console/internal/model" "gitlab.com/tensorsecurity-rd/waf-console/internal/utils/id" es "github.com/olivere/elastic/v7" esStore "gitlab.com/tensorsecurity-rd/waf-console/internal/store" "gorm.io/gorm" ) const ( KafkaTopicIvanWafDetections = "ivan_waf_detections" KafkaGroupIvanWafDetectionsPalace = "ivan_waf_detections_palace" EsIndexWafDetections = "waf-detections*" EsIndexWafDetectionsAlias = "waf-detections" ESIndexEvents = "events" ESIndexSignals = "signals" ) var scramAlgo = map[string]scram.Algorithm{ "SHA512": scram.SHA512, "SHA256": scram.SHA256, } var ErrMissingBrokers = errors.New("no KAFKA_BROKERS env variable set") var ErrMissingAuthInfo = errors.New("no username or passwrod set when auth is enabled") var ErrUnsupportedAuthMethod = errors.New("unsupported auth method") const ( EnvKafkaAuthMethod = "KAFKA_AUTH_METHOD" EnvKafkaAuthUsername = "KAFKA_AUTH_USERNAME" EnvKafkaAuthPassword = "KAFKA_AUTH_PASSWORD" EnvKafkaScramAlgo = "KAFKA_SCRAM_ALGO" EnvKafkaBrokers = "KAFKA_BROKERS" KafkaAuthPlain = "plain" KafkaAuthScram = "scram" ) type AttackClassDesp struct { En string Zh string } var AttackClassDespMap = map[string]AttackClassDesp{ "RCE_OS": {En: "Remote Command Execution", Zh: "远程代码执行"}, "SQLI": {En: "SQL Injection", Zh: "SQL注入"}, "XSS": {En: "Cross-Site Scripting", Zh: "跨站脚本攻击"}, "AOIC": {En: "Access of Internal Components", Zh: "内部组件访问"}, "DT": {En: "Directory Traversal", Zh: "路径穿越"}, "DL": {En: "Data Leakage", Zh: "数据泄露"}, "SCD": {En: "Source Code Disclosure", Zh: "源码泄露"}, "RCE_PHP": {En: "Php remote code execution", Zh: "PHP远程代码执行"}, "RCE_JAVA": {En: "Java remote code execution", Zh: "JAVA远程代码执行"}, "LFI": {En: "Local file include", Zh: "本地文件包含"}, "RFI": {En: "Remote file include", Zh: "远程文件包含"}, "UR": {En: "Url Redirect", Zh: "URL重定向(CVE)"}, "DOS": {En: "DOS", Zh: "DOS攻击"}, "UFL": {En: "Unauthorized File Upload", Zh: "未授权文件上传"}, "GR": {En: "General Rule", Zh: "一般文件规则"}, "SS": {En: "Site Scanning/Probing", Zh: "网站扫描/探测"}, "SSRF": {En: "Server-side request forgery", Zh: "跨站请求伪造"}, "FAPPV": {En: "Famous application vulnerable", Zh: "针对知名应用的针对性规则"}, "Other": {En: "Other", Zh: "其它"}, "black": {En: "blacklist", Zh: "黑名单"}, "white": {En: "whitelist", Zh: "白名单"}, "force-white": {En: "strong whitelist", Zh: "强白名单"}, } // func getSASLMechanismByEnv() (sasl.Mechanism, bool, error) { // authMethod := os.Getenv(EnvKafkaAuthMethod) // username := os.Getenv(EnvKafkaAuthUsername) // password := os.Getenv(EnvKafkaAuthPassword) // var mechanism sasl.Mechanism // switch authMethod { // case KafkaAuthPlain: // if username == "" || password == "" { // return nil, true, ErrMissingAuthInfo // } // mechanism = &plain.Mechanism{ // Username: username, // Password: password, // } // case KafkaAuthScram: // if username == "" || password == "" { // return nil, true, ErrMissingAuthInfo // } // algoKey := os.Getenv(EnvKafkaScramAlgo) // algo := scram.SHA512 // algoTmp, exist := scramAlgo[algoKey] // if exist { // algo = algoTmp // } // var err error // mechanism, err = scram.Mechanism(algo, username, password) // if err != nil { // return nil, true, err // } // } // return mechanism, mechanism != nil, nil // } type LogConsumerService struct { consumer *kafka.Reader db *gorm.DB esStore *esStore.ESStore } func NewLogConsumerService(db *gorm.DB, esStore *esStore.ESStore, config *config.KafkaConfig) *LogConsumerService { // mechanism, _, err := getSASLMechanismByEnv() // if err != nil { // log.Fatal().Err(err).Msg("failed to get SASL mechanism") // } // brokers := strings.Split(os.Getenv(EnvKafkaBrokers), ",") // if len(brokers) == 0 { // log.Fatal().Msg("no KAFKA_BROKERS env variable set") // } // consumer := kafka.NewReader(kafka.ReaderConfig{ // Brokers: brokers, // GroupID: KafkaGroupIvanWafDetectionsPalace, // Topic: KafkaTopicIvanWafDetections, // Dialer: &kafka.Dialer{ // SASLMechanism: mechanism, // Timeout: 10 * time.Second, // DualStack: true, // }, // }) consumer := NewKafkaReader(config) return &LogConsumerService{ consumer: consumer, db: db, esStore: esStore, } } func (s *LogConsumerService) Consume() { log.Info().Msg("start consume kafka message") for { m, err := s.consumer.ReadMessage(context.Background()) if err != nil { log.Error().Err(err).Msg("failed to read message") continue } log.Info().Msgf("message: %s", string(m.Value)) go s.processMessage(m) } } func (s *LogConsumerService) processMessage(m kafka.Message) { log.Info().Msgf("processing message: %s", string(m.Value)) s.Handle(context.Background(), m.Value) } func (s *LogConsumerService) genWafDetection(wafDetectionMessage model.WafDetectionMessage, attackedLog model.WafDetectionAttackedLog) (model.WafDetection, error) { if attackedLog.AttackIP == "" { return model.WafDetection{}, errors.New("attack_ip is empty") } wafDetection := model.WafDetection{ WafDetectionMessageBasic: wafDetectionMessage.WafDetectionMessageBasic, WafDetectionAttackedLog: attackedLog, CreatedAt: wafDetectionMessage.CreatedAt, } wafDetection.WafDetectionAttackedLog.ID = id.Str() return wafDetection, nil } func (s *LogConsumerService) genWafDetectionSignal(wafDetectionMessage model.WafDetectionMessage, attackedLog model.WafDetectionAttackedLog, eventID string) (model.Signal, error) { signal := model.Signal{ ID: id.Str(), RuleKey: &model.RuleKey{Name: attackedLog.RuleName, Category: "WAF"}, Scope: &map[string]model.Scope{ "cluster": { Kind: "cluster", ID: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey, Name: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey, }, "namespace": { Kind: "namespace", ID: wafDetectionMessage.WafDetectionMessageBasic.Namespace, Name: wafDetectionMessage.WafDetectionMessageBasic.Namespace, }, "resource": { Kind: "resource", ID: "", Name: fmt.Sprintf("%s(%s)", wafDetectionMessage.WafDetectionMessageBasic.ResName, wafDetectionMessage.WafDetectionMessageBasic.ResKind), }, }, Severity: 6, Tags: []string{"waf"}, EventIDs: []string{eventID}, Context: map[string]interface{}{ "attack_ip": attackedLog.AttackIP, "attack_time": attackedLog.AttackTime, "attack_url": attackedLog.AttackedURL, "attack_app": attackedLog.AttackedApp, "attack_load": attackedLog.AttackLoad, "rule_name": attackedLog.RuleName, "action": attackedLog.Action, "waf_body": map[string]interface{}{ "type": "code", "request": map[string]interface{}{ "action": attackedLog.Action, "req_pkg": attackedLog.ReqPkg, }, "response": map[string]interface{}{ "content_type": attackedLog.RspContentType, "res_pkg": attackedLog.RspPkg, }, }, }, CreatedAt: attackedLog.AttackTime, IsWhitelistFilter: false, } return signal, nil } func (s *LogConsumerService) genWafDetectionEvent(wafDetectionMessage model.WafDetectionMessage, attackedLog model.WafDetectionAttackedLog) (model.Event, error) { // attackClass := AttackClassDespMap[attackedLog.AttackType] // attackClassDesp := attackClass.Zh // if lang == "en" { // attackClassDesp = attackClass.En // } event := model.Event{ ID: id.Str(), Type: "ruleScope", Description: attackedLog.AttackType, RuleKeys: []model.RuleKey{ { Version1: 0, Name: attackedLog.AttackType, Category: "WAF", }, }, Scopes: map[string][]model.Scope{ "cluster": { { Kind: "cluster", ID: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey, Name: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey, }, }, "namespace": { { Kind: "namespace", ID: wafDetectionMessage.WafDetectionMessageBasic.Namespace, Name: wafDetectionMessage.WafDetectionMessageBasic.Namespace, }, }, "resource": { { Kind: "resource", ID: "", Name: fmt.Sprintf("%s(%s)", wafDetectionMessage.WafDetectionMessageBasic.ResName, wafDetectionMessage.WafDetectionMessageBasic.ResKind), }, }, }, Resources: []map[string]model.Scope{ { "cluster": { Kind: "cluster", ID: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey, Name: wafDetectionMessage.WafDetectionMessageBasic.ClusterKey, }, "namespace": { Kind: "namespace", ID: wafDetectionMessage.WafDetectionMessageBasic.Namespace, Name: wafDetectionMessage.WafDetectionMessageBasic.Namespace, }, "resource": { Kind: "resource", ID: "", Name: fmt.Sprintf("%s(%s)", wafDetectionMessage.WafDetectionMessageBasic.ResName, wafDetectionMessage.WafDetectionMessageBasic.ResKind), }, }, }, Relation: model.Relation{ Type: "Discovery", }, CreatedAt: attackedLog.AttackTime, UpdatedAt: attackedLog.AttackTime, Severity: 6, Timestamp: time.Now(), Context: map[string]interface{}{ "attack_ip": attackedLog.AttackIP, "attack_time": attackedLog.AttackTime, "attack_url": attackedLog.AttackedURL, "attack_app": attackedLog.AttackedApp, "attack_load": attackedLog.AttackLoad, "rule_name": attackedLog.RuleName, "action": attackedLog.Action, "waf_body": map[string]interface{}{ "type": "code", "request": map[string]interface{}{ "action": attackedLog.Action, "req_pkg": attackedLog.ReqPkg, }, "response": map[string]interface{}{ "content_type": attackedLog.RspContentType, "res_pkg": attackedLog.RspPkg, }, }, }, SignalsCount: map[int]int{ 6: 1, }, } return event, nil } func (s *LogConsumerService) Handle(ctx context.Context, message []byte) error { WafDetectionMessage := model.WafDetectionMessage{} err := json.Unmarshal(message, &WafDetectionMessage) if err != nil { log.Err(err).Str("message.Value", string(message)).Msg("unmarshal kafka message fails") return err } bulkableRequests := make([]es.BulkableRequest, 0) // WafDetections := make([]model.WafDetection, len(WafDetectionMessage.AttackedLog)) unPassCount := 0 for i := range WafDetectionMessage.AttackedLog { if WafDetectionMessage.AttackedLog[i].AttackIP == "" { log.Err(err).Str("message.Value", string(message)).Msg("WafDetectionMessage.AttackedLog[i].AttackIP empty") continue } if WafDetectionMessage.AttackedLog[i].Action != "pass" { unPassCount++ } // WafDetections[i].WafDetectionMessageBasic = WafDetectionMessage.WafDetectionMessageBasic // WafDetections[i].WafDetectionAttackedLog = WafDetectionMessage.AttackedLog[i] // WafDetections[i].WafDetectionAttackedLog.ID = id.Str() // WafDetections[i].CreatedAt = WafDetectionMessage.CreatedAt wafDetection, err := s.genWafDetection(WafDetectionMessage, WafDetectionMessage.AttackedLog[i]) if err != nil { log.Err(err).Str("message.Value", string(message)).Msg("gen waf detection fails") continue } bulkIndexWaflog := es.NewBulkIndexRequest().Index(EsIndexWafDetectionsAlias) bulkableRequests = append(bulkableRequests, bulkIndexWaflog.Id(wafDetection.WafDetectionAttackedLog.ID).Doc(wafDetection)) event, err := s.genWafDetectionEvent(WafDetectionMessage, WafDetectionMessage.AttackedLog[i]) if err != nil { log.Err(err).Str("message.Value", string(message)).Msg("gen waf detection event fails") continue } log.Info().Msgf("waf event: %+v", event) bulkIndexEvent := es.NewBulkIndexRequest().Index(ESIndexEvents) bulkableRequests = append(bulkableRequests, bulkIndexEvent.Id(event.ID).Doc(event)) signal, err := s.genWafDetectionSignal(WafDetectionMessage, WafDetectionMessage.AttackedLog[i], event.ID) if err != nil { log.Err(err).Str("message.Value", string(message)).Msg("gen waf detection signal fails") continue } bulkIndexSignal := es.NewBulkIndexRequest().Index(ESIndexSignals) bulkableRequests = append(bulkableRequests, bulkIndexSignal.Id(signal.ID).Doc(signal)) } s.esStore.Save(ctx, bulkableRequests) // err = s.db.WithContext(ctx).Model(&model.WafService{}).Where("id = ?", WafDetectionMessage.ServiceID).Update("attack_num", gorm.Expr("attack_num + ?", 1)).Error // if err != nil { // log.Err(err).Int64("WafDetectionMessage.ServiceID", WafDetectionMessage.ServiceID).Msg("update waf_service attack_number fails") // return err // } return nil }