Commit 79468cd7 authored by qiuqunfeng's avatar qiuqunfeng
Browse files

Add Elasticsearch-based WAF attack log and rule listing endpoints

- Integrate Elasticsearch client for querying WAF attack logs
- Add new API endpoints for listing attack logs and WAF rules
- Implement pagination and filtering for attack log retrieval
- Support language-based rule category display
- Update service and controller layers to support new functionality
parent f613d917
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"net/http" "net/http"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/olivere/elastic/v7"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"gitlab.com/tensorsecurity-rd/waf-console/internal/config" "gitlab.com/tensorsecurity-rd/waf-console/internal/config"
"gitlab.com/tensorsecurity-rd/waf-console/internal/middleware" "gitlab.com/tensorsecurity-rd/waf-console/internal/middleware"
...@@ -12,7 +13,7 @@ import ( ...@@ -12,7 +13,7 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
) )
func SetRouters(db *gorm.DB, clusterClientManager *utils.ClusterClientManager, gatewayUrl string, ssoUrl string) *gin.Engine { func SetRouters(db *gorm.DB, clusterClientManager *utils.ClusterClientManager, gatewayUrl string, ssoUrl string, elasticClient *elastic.Client) *gin.Engine {
var engine *gin.Engine var engine *gin.Engine
if !config.Conf.Debug { if !config.Conf.Debug {
...@@ -64,7 +65,7 @@ func SetRouters(db *gorm.DB, clusterClientManager *utils.ClusterClientManager, g ...@@ -64,7 +65,7 @@ func SetRouters(db *gorm.DB, clusterClientManager *utils.ClusterClientManager, g
// }, // },
// // BearerToken: "1234567890", // // BearerToken: "1234567890",
// }) // })
SetWafRouter(engine, clusterClientManager, db, gatewayUrl) SetWafRouter(engine, clusterClientManager, db, gatewayUrl, elasticClient)
// 统一处理 404 // 统一处理 404
engine.NoRoute(func(c *gin.Context) { engine.NoRoute(func(c *gin.Context) {
......
...@@ -2,15 +2,16 @@ package api ...@@ -2,15 +2,16 @@ package api
import ( import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/olivere/elastic/v7"
"gitlab.com/tensorsecurity-rd/waf-console/internal/controller" "gitlab.com/tensorsecurity-rd/waf-console/internal/controller"
"gitlab.com/tensorsecurity-rd/waf-console/internal/utils" "gitlab.com/tensorsecurity-rd/waf-console/internal/utils"
"gorm.io/gorm" "gorm.io/gorm"
) )
func SetWafRouter(e *gin.Engine, clusterClientManager *utils.ClusterClientManager, db *gorm.DB, gatewayUrl string) { func SetWafRouter(e *gin.Engine, clusterClientManager *utils.ClusterClientManager, db *gorm.DB, gatewayUrl string, elasticClient *elastic.Client) {
v1 := e.Group("v1/api/waf") v1 := e.Group("v1/api/waf")
wafController := controller.NewWafController(clusterClientManager, db, gatewayUrl) wafController := controller.NewWafController(clusterClientManager, db, gatewayUrl, elasticClient)
v1.GET("", wafController.Waf) v1.GET("", wafController.Waf)
v1.POST("/", wafController.CreateWaf) v1.POST("/", wafController.CreateWaf)
v1.PUT("mode", wafController.UpdateMode) v1.PUT("mode", wafController.UpdateMode)
...@@ -21,4 +22,9 @@ func SetWafRouter(e *gin.Engine, clusterClientManager *utils.ClusterClientManage ...@@ -21,4 +22,9 @@ func SetWafRouter(e *gin.Engine, clusterClientManager *utils.ClusterClientManage
v1.DELETE("listener/:region_code/:namespace/:gateway_name/:port", wafController.DeleteListenerWaf) v1.DELETE("listener/:region_code/:namespace/:gateway_name/:port", wafController.DeleteListenerWaf)
v1.DELETE("gateway/:region_code/:namespace/:gateway_name", wafController.DeleteGatewayWaf) v1.DELETE("gateway/:region_code/:namespace/:gateway_name", wafController.DeleteGatewayWaf)
v1.POST("debug/savecatagory", wafController.SaveRuleCategoryToDB) v1.POST("debug/savecatagory", wafController.SaveRuleCategoryToDB)
v2 := e.Group("api/v2/containerSec/waf")
v2.GET("/attack/log/list", wafController.ListAttackLogs)
v2.GET("/rules", wafController.ListRules)
v2.PUT("/rules", wafController.UpdateRule)
} }
...@@ -84,7 +84,7 @@ func NewRootCommand() *cobra.Command { ...@@ -84,7 +84,7 @@ func NewRootCommand() *cobra.Command {
if err != nil { if err != nil {
panic(err) panic(err)
} }
e := api.SetRouters(db, clusterClientManager, config.GatewayUrl, config.SSOUrl) e := api.SetRouters(db, clusterClientManager, config.GatewayUrl, config.SSOUrl, esClient)
esStore := es.NewESStore(es.Config{ esStore := es.NewESStore(es.Config{
ESBatchSize: 100, ESBatchSize: 100,
ESConcurrency: 10, ESConcurrency: 10,
......
...@@ -3,10 +3,12 @@ package controller ...@@ -3,10 +3,12 @@ package controller
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"strconv" "strconv"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/olivere/elastic/v7"
"gitlab.com/tensorsecurity-rd/waf-console/internal/service" "gitlab.com/tensorsecurity-rd/waf-console/internal/service"
"gitlab.com/tensorsecurity-rd/waf-console/internal/utils" "gitlab.com/tensorsecurity-rd/waf-console/internal/utils"
"gorm.io/gorm" "gorm.io/gorm"
...@@ -16,9 +18,9 @@ type WafController struct { ...@@ -16,9 +18,9 @@ type WafController struct {
service service.Service service service.Service
} }
func NewWafController(clusterClientManager *utils.ClusterClientManager, db *gorm.DB, gatewayUrl string) *WafController { func NewWafController(clusterClientManager *utils.ClusterClientManager, db *gorm.DB, gatewayUrl string, elasticClient *elastic.Client) *WafController {
return &WafController{ return &WafController{
service: service.NewWafService(clusterClientManager, db, gatewayUrl), service: service.NewWafService(clusterClientManager, db, gatewayUrl, elasticClient),
} }
} }
...@@ -135,6 +137,27 @@ func (c *WafController) UpdateRule(ctx *gin.Context) { ...@@ -135,6 +137,27 @@ func (c *WafController) UpdateRule(ctx *gin.Context) {
utils.AssembleResponse(ctx, nil, nil) utils.AssembleResponse(ctx, nil, nil)
} }
func (c *WafController) ListRules(ctx *gin.Context) {
ctx1, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
regionCode := ctx.Query("region_code")
namespace := ctx.Query("namespace")
gatewayName := ctx.Query("gateway_name")
language := ctx.Request.Header.Get("Accept-Language")
name := ctx.Query("name")
if language == "" {
language = "zh"
}
rules, err := c.service.ListRules(ctx1, regionCode, namespace, gatewayName, language, name)
if err != nil {
utils.AssembleResponse(ctx, nil, err)
return
}
utils.AssembleResponse(ctx, rules, nil)
}
func (c *WafController) SaveRuleCategoryToDB(ctx *gin.Context) { func (c *WafController) SaveRuleCategoryToDB(ctx *gin.Context) {
ctx1, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx1, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
...@@ -249,3 +272,72 @@ func (c *WafController) DeleteGatewayWaf(ctx *gin.Context) { ...@@ -249,3 +272,72 @@ func (c *WafController) DeleteGatewayWaf(ctx *gin.Context) {
} }
utils.AssembleResponse(ctx, nil, nil) utils.AssembleResponse(ctx, nil, nil)
} }
func (c *WafController) ListAttackLogs(ctx *gin.Context) {
var filter service.AttackLogFilter
id := ctx.Query("serviceId")
filter.ServiceId, _ = strconv.ParseInt(id, 10, 64)
filter.Limit, filter.Offset, _ = getLimitAndOffset(ctx)
filter.Cluster = ctx.Query("cluster")
filter.AttackUrl = ctx.Query("attackAddr")
filter.AttackIp = ctx.Query("attackIp")
filter.AttackType = ctx.Query("attackType")
filter.AttackApp = ctx.Query("attackApp")
filter.Token = ctx.Query("token")
filter.Action = ctx.Query("action")
stime := ctx.Query("startTime")
filter.StartTime, _ = strconv.ParseInt(stime, 10, 64)
etime := ctx.Query("endTime")
filter.EndTime, _ = strconv.ParseInt(etime, 10, 64)
ctx1, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
logs, token, err := c.service.ListAttackLogs(ctx1, &filter)
if err != nil {
utils.AssembleResponse(ctx, nil, err)
return
}
type resp struct {
Items []service.AttackLog `json:"items"`
Token string `json:"token"`
}
utils.AssembleResponse(ctx, resp{
Items: logs,
Token: token,
}, nil)
}
// getLimitAndOffset extracts pagination parameters from the context
// Returns limit, offset, and error
func getLimitAndOffset(ctx *gin.Context) (int, int, error) {
limit := 10 // default limit
offset := 0 // default offset
// Get limit from query parameters
if limitStr := ctx.Query("limit"); limitStr != "" {
parsedLimit, err := strconv.Atoi(limitStr)
if err != nil {
return 0, 0, fmt.Errorf("invalid limit parameter: %v", err)
}
if parsedLimit > 0 {
limit = parsedLimit
}
}
// Get offset from query parameters
if offsetStr := ctx.Query("offset"); offsetStr != "" {
parsedOffset, err := strconv.Atoi(offsetStr)
if err != nil {
return 0, 0, fmt.Errorf("invalid offset parameter: %v", err)
}
if parsedOffset >= 0 {
offset = parsedOffset
}
}
return limit, offset, nil
}
...@@ -15,4 +15,6 @@ type Service interface { ...@@ -15,4 +15,6 @@ type Service interface {
DeleteGatewayWaf(ctx context.Context, req *GatewateInfo) error DeleteGatewayWaf(ctx context.Context, req *GatewateInfo) error
DeleteListenerWaf(ctx context.Context, req *DeleteListenerReq) error DeleteListenerWaf(ctx context.Context, req *DeleteListenerReq) error
EnableListenerWafs(ctx context.Context, req *EnableListenerWafsReq) error EnableListenerWafs(ctx context.Context, req *EnableListenerWafsReq) error
ListAttackLogs(ctx context.Context, req *AttackLogFilter) ([]AttackLog, string, error)
ListRules(ctx context.Context, regionCode, namespace, gatewayName, language, name string) ([]RuleGroupResp, error)
} }
...@@ -274,6 +274,13 @@ type WafRuleCategory struct { ...@@ -274,6 +274,13 @@ type WafRuleCategory struct {
Rules []WafRule `json:"rules"` Rules []WafRule `json:"rules"`
} }
type RuleGroupResp struct {
CategoryID string `json:"category_id"`
Category string `json:"category"`
Description string `json:"description"`
Status int `json:"status"`
}
type GatewayListener struct { type GatewayListener struct {
GatewayName string `json:"gateway_name"` GatewayName string `json:"gateway_name"`
Namespace string `json:"namespace"` Namespace string `json:"namespace"`
...@@ -318,3 +325,29 @@ type GatewayListenerResponseList struct { ...@@ -318,3 +325,29 @@ type GatewayListenerResponseList struct {
GatewayResponseBase GatewayResponseBase
Data []GatewayRespListenerData `json:"data"` Data []GatewayRespListenerData `json:"data"`
} }
type AttackLog struct {
Uuid string `json:"uuid"`
AttackIp string `json:"attack_ip"`
AttackedAddr string `json:"attacked_addr"`
AttackType string `json:"attack_type"`
AttackTime int64 `json:"attack_time"`
AttackedApp string `json:"attacked_app"`
ClusterKey string `json:"cluster_key"`
Action string `json:"action"`
}
type AttackLogFilter struct {
Offset int `json:"offset"`
Limit int `json:"limit"`
ServiceId int64 `json:"service_id"`
Cluster string `json:"cluster"`
AttackUrl string `json:"attack_url"`
AttackIp string `json:"attack_ip"`
AttackType string `json:"attack_type"`
AttackApp string `json:"attack_app"`
Action string `json:"action"`
Token string `json:"token"`
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
}
...@@ -14,6 +14,8 @@ import ( ...@@ -14,6 +14,8 @@ import (
"strings" "strings"
"sync" "sync"
jsoniter "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"gitlab.com/tensorsecurity-rd/waf-console/internal/model" "gitlab.com/tensorsecurity-rd/waf-console/internal/model"
"gitlab.com/tensorsecurity-rd/waf-console/internal/utils" "gitlab.com/tensorsecurity-rd/waf-console/internal/utils"
...@@ -27,10 +29,11 @@ type wafService struct { ...@@ -27,10 +29,11 @@ type wafService struct {
clusterClientManager *utils.ClusterClientManager clusterClientManager *utils.ClusterClientManager
db *gorm.DB db *gorm.DB
gatewayUrl string gatewayUrl string
elasticClient *elastic.Client
} }
func NewWafService(clusterClientManager *utils.ClusterClientManager, db *gorm.DB, gatewayUrl string) Service { func NewWafService(clusterClientManager *utils.ClusterClientManager, db *gorm.DB, gatewayUrl string, elasticClient *elastic.Client) Service {
return &wafService{clusterClientManager: clusterClientManager, db: db, gatewayUrl: gatewayUrl} return &wafService{clusterClientManager: clusterClientManager, db: db, gatewayUrl: gatewayUrl, elasticClient: elasticClient}
} }
func (s *wafService) GetWaf(ctx context.Context, regionCode, namespace, gatewayName string) (*WafService, error) { func (s *wafService) GetWaf(ctx context.Context, regionCode, namespace, gatewayName string) (*WafService, error) {
...@@ -199,7 +202,7 @@ func (s *wafService) CreateWaf(ctx context.Context, req *CreateWafReq) (*WafServ ...@@ -199,7 +202,7 @@ func (s *wafService) CreateWaf(ctx context.Context, req *CreateWafReq) (*WafServ
}, },
Spec: v1alpha1.ServiceSpec{ Spec: v1alpha1.ServiceSpec{
HostNames: req.Host, HostNames: req.Host,
ServiceName: req.GatewayName, ServiceName: req.ListenerName,
Port: req.Port, Port: req.Port,
Workload: v1alpha1.WorkloadRef{ Workload: v1alpha1.WorkloadRef{
Kind: req.GatewayName, Kind: req.GatewayName,
...@@ -312,14 +315,14 @@ func (s *wafService) UpdateMode(ctx context.Context, req *UpdateModeReq) (*WafSe ...@@ -312,14 +315,14 @@ func (s *wafService) UpdateMode(ctx context.Context, req *UpdateModeReq) (*WafSe
}, nil }, nil
} }
func (s *wafService) GetRuleCategories(ctx context.Context) ([]WafRuleCategory, error) { // func (s *wafService) GetRuleCategories(ctx context.Context) ([]WafRuleCategory, error) {
var categories []WafRuleCategory // var categories []WafRuleCategory
err := s.db.Table("waf_rule_categories").Find(&categories).Error // err := s.db.Table("waf_rule_categories").Find(&categories).Error
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
return categories, nil // return categories, nil
} // }
func (s *wafService) GetRules(ctx context.Context, categoryID string) ([]WafRule, error) { func (s *wafService) GetRules(ctx context.Context, categoryID string) ([]WafRule, error) {
var rules []WafRule var rules []WafRule
...@@ -730,3 +733,166 @@ func (s *wafService) EnableListenerWafs(ctx context.Context, req *EnableListener ...@@ -730,3 +733,166 @@ func (s *wafService) EnableListenerWafs(ctx context.Context, req *EnableListener
} }
return nil return nil
} }
func generateListToken(sort []interface{}) (string, error) {
var token []string
for _, v := range sort {
switch v.(type) {
case string:
token = append(token, v.(string))
case float64:
token = append(token, strconv.FormatInt(int64(v.(float64)), 10))
default:
// json-iterator 和 encoding/json 的Decode,返回的都是 encoding/json 的 Number
t, ok := jsoniter.CastJsonNumber(v)
if !ok {
return "", fmt.Errorf("unsupported sort field. value: %v, type: %T", v, v)
}
token = append(token, t)
}
}
return strings.Join(token, ","), nil
}
func (s *wafService) ListAttackLogs(ctx context.Context, req *AttackLogFilter) ([]AttackLog, string, error) {
boolQuery := elastic.NewBoolQuery()
if req.ServiceId != 0 {
boolQuery.Must(elastic.NewTermQuery("service_id", req.ServiceId))
}
if req.Cluster != "" {
boolQuery.Filter(elastic.NewTermQuery("cluster_key", req.Cluster))
}
if req.AttackUrl != "" {
boolQuery.Filter(elastic.NewMatchPhraseQuery("attacked_url", req.AttackUrl).Slop(0))
}
if req.AttackIp != "" {
boolQuery.Filter(elastic.NewMatchPhraseQuery("attack_ip", req.AttackIp).Slop(0))
}
if req.AttackApp != "" {
boolQuery.Filter(elastic.NewMatchPhraseQuery("attacked_app", req.AttackApp).Slop(0))
}
if req.AttackType != "" {
attackTypes := strings.Split(req.AttackType, ",")
var terms []any
for _, attackType := range attackTypes {
terms = append(terms, attackType)
}
boolQuery.Filter(elastic.NewTermsQuery("attack_type", terms...))
}
if req.Action != "" {
actions := strings.Split(req.Action, ",")
var terms []interface{}
for _, action := range actions {
terms = append(terms, action)
}
boolQuery.Filter(elastic.NewTermsQuery("action", terms...))
} else {
boolQuery.Filter(elastic.NewBoolQuery().MustNot(elastic.NewTermQuery("action", "pass")))
}
hasStart := req.StartTime >= 0
hasEnd := req.EndTime >= 0
if hasStart || hasEnd {
rangeQuery := elastic.NewRangeQuery("attack_time")
if hasStart {
rangeQuery.Gte(req.StartTime)
}
if hasEnd {
rangeQuery.Lte(req.EndTime)
}
boolQuery.Filter(rangeQuery)
}
src, _ := boolQuery.Source()
log.Debug().Interface("src", src.(map[string]interface{})).Msg("find waf detections src")
ss := s.elasticClient.Search("waf-detections*")
if req.Token != "" {
for _, t := range strings.Split(req.Token, ",") {
ss.SearchAfter(t)
}
}
res, err := ss.Query(boolQuery).Size(req.Limit).
SortBy(elastic.NewFieldSort("attack_time").Order(false),
elastic.NewFieldSort("id.digit").Order(false)).
Do(ctx)
if err != nil {
return nil, "", fmt.Errorf("failed to search waf detections: %v", err)
}
list := make([]model.WafDetection, len(res.Hits.Hits))
endIdx := len(res.Hits.Hits) - 1
pageToken := ""
for i, hit := range res.Hits.Hits {
wafDetection := model.WafDetection{}
if err = json.Unmarshal(hit.Source, &wafDetection); err != nil {
return nil, "", fmt.Errorf("failed to unmarshal waf detection: %v", err)
}
list[i] = wafDetection
if i == endIdx {
pageToken, err = generateListToken(hit.Sort)
if err != nil {
return nil, "", fmt.Errorf("failed to generate list token: %v", err)
}
}
}
attackLogs := make([]AttackLog, len(list))
for i, wafDetection := range list {
attackLogs[i] = AttackLog{
AttackTime: wafDetection.AttackTime,
AttackIp: wafDetection.AttackIP,
AttackedApp: wafDetection.AttackedApp,
AttackType: wafDetection.AttackType,
Action: wafDetection.Action,
ClusterKey: wafDetection.ClusterKey,
AttackedAddr: wafDetection.AttackedURL,
}
}
return attackLogs, pageToken, nil
}
func (s *wafService) ListRules(ctx context.Context, regionCode, namespace, gatewayName, language, name string) ([]RuleGroupResp, error) {
ruleCategories := []model.WafRuleCategory{}
err := s.db.Model(&model.WafRuleCategory{}).Find(&ruleCategories).Error
if err != nil {
return nil, fmt.Errorf("failed to get waf service: %v", err)
}
ruleGroupResp := []RuleGroupResp{}
wafService := &model.WafService{}
err = s.db.Model(&model.WafService{}).Where("gateway_name = ? and namespace = ? and region_code = ?", gatewayName, namespace, regionCode).First(wafService).Error
if err != nil {
return nil, fmt.Errorf("failed to get waf service: %v", err)
}
if wafService.RuleCategoryStatus.Status == 1 {
for _, category := range ruleCategories {
for _, categoryID := range wafService.RuleCategoryStatus.CategoryID {
if category.CategoryID == categoryID {
category.Status = 1
}
}
if language == "en" {
ruleGroupResp = append(ruleGroupResp, RuleGroupResp{
CategoryID: category.CategoryID,
Status: category.Status,
Category: category.CategoryEN,
Description: category.DescriptionEN,
})
} else {
ruleGroupResp = append(ruleGroupResp, RuleGroupResp{
CategoryID: category.CategoryID,
Status: category.Status,
Category: category.CategoryZH,
Description: category.DescriptionZH,
})
}
}
}
return ruleGroupResp, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment