Commit 6f4bd708 authored by qiuqunfeng's avatar qiuqunfeng
Browse files

Add name field to ListenerWaf struct and update dependencies

- Added `Name` field to `ListenerWaf` struct in `internal/service/types.go`
- Updated Go module dependencies, adding Elasticsearch client library
- Removed `CreateListener` method from `waf.go` service
parent fac776fe
......@@ -55,6 +55,7 @@ require (
require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/olivere/elastic/v7 v7.0.32 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/segmentio/kafka-go v0.4.40 // indirect
)
......
......@@ -231,6 +231,7 @@ type EnableGatewayWafReq struct {
type ListenerWaf struct {
Hosts []string `json:"hosts"`
Port int `json:"port"`
Name string `json:"name"`
}
type EnableListenerWafsReq struct {
......
......@@ -391,36 +391,6 @@ func (s *wafService) SaveRuleCategoryToDB(ctx context.Context) error {
return nil
}
func (s *wafService) CreateListener(ctx context.Context, req *CreateListenerReq) (*GatewayListener, error) {
listener := &model.GatewayListener{}
err := s.db.Model(&model.GatewayListener{}).Where("gateway_name = ? AND namespace = ? AND region_code = ?", req.GatewayName, req.Namespace, req.RegionCode).First(listener).Error
if err != nil {
if err == gorm.ErrRecordNotFound {
listener = &model.GatewayListener{
GatewayName: req.GatewayName,
Namespace: req.Namespace,
RegionCode: req.RegionCode,
Port: req.Port,
Enable: true,
}
err = s.db.Model(&model.GatewayListener{}).Create(listener).Error
if err != nil {
return nil, err
}
} else {
return nil, err
}
}
return &GatewayListener{
GatewayName: listener.GatewayName,
Namespace: listener.Namespace,
RegionCode: listener.RegionCode,
Port: listener.Port,
Enable: listener.Enable,
}, nil
}
func (s *wafService) DeleteListener(ctx context.Context, req *DeleteListenerReq) error {
listener := &model.GatewayListener{}
err := s.db.Model(&model.GatewayListener{}).Where("gateway_name = ? AND namespace = ? AND region_code = ?", req.GatewayName, req.Namespace, req.RegionCode).First(listener).Error
......
package es
import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
"time"
json "github.com/json-iterator/go"
"github.com/rs/zerolog/log"
es "github.com/olivere/elastic/v7"
)
var BulkChan chan []es.BulkableRequest
var esBatchSize int
type Config struct {
ESBatchSize int
ESConcurrency int
ESTimeout time.Duration
}
func CreateEsClientFromEnv() (*es.Client, error) {
var opts []es.ClientOptionFunc
esURL := os.Getenv("ES_URL")
if esURL == "" {
return nil, fmt.Errorf("ES_URL is not set")
}
opts = append(opts, es.SetURL(esURL))
esUsername := os.Getenv("ES_USERNAME")
esPassword := os.Getenv("ES_PASSWORD")
if esUsername != "" && esPassword != "" {
opts = append(opts, es.SetBasicAuth(esUsername, esPassword))
}
sniff := os.Getenv("ES_SNIFF")
if sniff == "" {
sniff = "false"
}
opts = append(opts, es.SetSniff(sniff == "true"))
esClient, err := es.NewClient(opts...)
if err != nil {
return nil, err
}
return esClient, nil
}
type ESStore struct {
esClient *es.Client
config Config
}
func readTplFile(filename string) (string, error) {
file, err := os.OpenFile(path.Join(os.Getenv("CONFIG_DIR"), filename), os.O_RDONLY, 0500)
if err != nil {
return "", err
}
defer file.Close()
body, err := io.ReadAll(file)
if err != nil {
return "", err
}
return string(body), nil
}
func (e *ESStore) Init() {
ctx := context.Background()
err := e.createIndex(ctx, "waf_detections_index", "test.json", "waf-detections-000001", "waf-detections")
if err != nil {
log.Error().Err(err).Msg("create index test failed")
}
BulkChan = make(chan []es.BulkableRequest)
esConcurrency := e.config.ESConcurrency
esBatchSize = e.config.ESBatchSize
for i := 0; i < esConcurrency; i++ {
go func() {
bulkService := e.esClient.Bulk()
for {
bulkableRequests := <-BulkChan
bulkService.Add(bulkableRequests...)
err := doSave(ctx, e.config, *bulkService)
if err != nil {
bulkService = e.esClient.Bulk()
} else {
bulkService.Reset()
}
}
}()
}
log.Info().Msg("palace es chan init finish")
}
// createIndex 创建索引
func (e *ESStore) createIndex(ctx context.Context, idxTplName, tplPath, initName, aliasName string) error {
tpl, err := readTplFile(tplPath)
if err != nil {
return err
}
// create index template
idxTplRes, err := e.esClient.IndexPutIndexTemplate(idxTplName).BodyString(tpl).Do(ctx)
if err != nil {
return err
}
if !idxTplRes.Acknowledged {
return fmt.Errorf("failed: init %s index template", idxTplName)
}
// check and init index
exist, err := e.esClient.IndexExists(aliasName).Do(ctx)
if err != nil {
return err
}
if !exist {
log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("init event index")
// create index
createdRes, err := e.esClient.CreateIndex(initName).
BodyString(fmt.Sprintf(`{"aliases": {"%s": {"is_write_index": "true"}}}`, aliasName)).
Do(ctx)
if err != nil {
return err
}
if !createdRes.Acknowledged {
return errors.New("failed: create " + initName)
}
} else {
log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("index already exist")
}
return nil
}
func doSave(ctx context.Context, config Config, bulkService es.BulkService) error {
start := time.Now()
defer func() {
delta := float64(time.Since(start).Milliseconds())
log.Info().Msgf("es bulk insert cost: %.2f ms", delta)
}()
timeoutCtx, cancel := context.WithTimeout(ctx, config.ESTimeout)
defer cancel()
esResponse, err := bulkService.Do(timeoutCtx)
if err != nil {
log.Error().Err(err).Msg("es bulk insert error")
return err
}
if esResponse.Errors {
err = fmt.Errorf("es bulk item err")
for _, item := range esResponse.Items {
bi, _ := json.Marshal(item)
log.Error().Err(err).Msgf("es bulk item: %s", string(bi))
}
return err
}
return nil
}
func Send(ctx context.Context, bulkableRequests []es.BulkableRequest) {
batchRequests := make([]es.BulkableRequest, 0, esBatchSize)
for i := 0; i < len(bulkableRequests); i++ {
batchRequests = append(batchRequests, bulkableRequests[i])
if (i+1)%esBatchSize == 0 || i == len(bulkableRequests)-1 {
BulkChan <- batchRequests
batchRequests = make([]es.BulkableRequest, 0, esBatchSize)
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment