package es import ( "context" "errors" "fmt" "io" "os" "path" "time" json "github.com/json-iterator/go" "github.com/rs/zerolog/log" "gitlab.com/tensorsecurity-rd/waf-console/internal/config" es "github.com/olivere/elastic/v7" ) // var BulkChan chan []es.BulkableRequest var esBatchSize int type Config struct { ESBatchSize int ESConcurrency int ESTimeout time.Duration } func CreateEsClientFromEnv() (*es.Client, error) { var opts []es.ClientOptionFunc esURL := os.Getenv("ELASTIC_URL") if esURL == "" { return nil, fmt.Errorf("ES_URL is not set") } opts = append(opts, es.SetURL(esURL)) esUsername := os.Getenv("ELASTIC_USERNAME") esPassword := os.Getenv("ELASTIC_PASSWORD") if esUsername != "" && esPassword != "" { opts = append(opts, es.SetBasicAuth(esUsername, esPassword)) } opts = append(opts, es.SetTraceLog(&log.Logger)) opts = append(opts, es.SetDecoder(&es.NumberDecoder{})) sniff := os.Getenv("ELASTIC_SNIFF") if sniff == "" { sniff = "false" } opts = append(opts, es.SetSniff(sniff == "true")) esClient, err := es.NewClient(opts...) if err != nil { return nil, err } return esClient, nil } func CreateEsClientFromConfig(config *config.ElasticsearchConfig) (*es.Client, error) { var opts []es.ClientOptionFunc if config.Url == "" { return nil, fmt.Errorf("es url is not set") } opts = append(opts, es.SetURL(config.Url)) if config.Username != "" && config.Password != "" { opts = append(opts, es.SetBasicAuth(config.Username, config.Password)) } opts = append(opts, es.SetTraceLog(&log.Logger)) opts = append(opts, es.SetDecoder(&es.NumberDecoder{})) opts = append(opts, es.SetSniff(config.Sniff)) esClient, err := es.NewClient(opts...) if err != nil { return nil, err } return esClient, nil } type ESStore struct { esClient *es.Client config Config BulkChan chan []es.BulkableRequest } func readTplFile(filename string) (string, error) { file, err := os.OpenFile(path.Join(os.Getenv("CONFIG_DIR"), filename), os.O_RDONLY, 0500) if err != nil { return "", err } defer file.Close() body, err := io.ReadAll(file) if err != nil { return "", err } return string(body), nil } func NewESStore(config Config, client *es.Client) *ESStore { return &ESStore{ config: config, esClient: client, BulkChan: make(chan []es.BulkableRequest), } } // Init 初始化ES func (e *ESStore) Init() { ctx := context.Background() err := e.createIndex(ctx, "waf_detections_index", "waf_detection_index_template.json", "waf-detections-000001", "waf-detections") if err != nil { log.Error().Err(err).Msg("create index test failed") } esConcurrency := e.config.ESConcurrency esBatchSize = e.config.ESBatchSize for i := 0; i < esConcurrency; i++ { go func() { bulkService := e.esClient.Bulk() for { bulkableRequests := <-e.BulkChan bulkService.Add(bulkableRequests...) err := doSave(ctx, e.config, *bulkService) if err != nil { bulkService = e.esClient.Bulk() } else { bulkService.Reset() } } }() } log.Info().Msg("palace es chan init finish") } // createIndex 创建索引 func (e *ESStore) createIndex(ctx context.Context, idxTplName, tplPath, initName, aliasName string) error { tpl, err := readTplFile(tplPath) if err != nil { return err } // create index template idxTplRes, err := e.esClient.IndexPutIndexTemplate(idxTplName).BodyString(tpl).Do(ctx) if err != nil { return err } if !idxTplRes.Acknowledged { return fmt.Errorf("failed: init %s index template", idxTplName) } // check and init index exist, err := e.esClient.IndexExists(aliasName).Do(ctx) if err != nil { return err } if !exist { log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("init event index") // create index createdRes, err := e.esClient.CreateIndex(initName). BodyString(fmt.Sprintf(`{"aliases": {"%s": {"is_write_index": "true"}}}`, aliasName)). Do(ctx) if err != nil { return err } if !createdRes.Acknowledged { return errors.New("failed: create " + initName) } } else { log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("index already exist") } return nil } func doSave(ctx context.Context, config Config, bulkService es.BulkService) error { start := time.Now() defer func() { delta := float64(time.Since(start).Milliseconds()) log.Info().Msgf("es bulk insert cost: %.2f ms", delta) }() timeoutCtx, cancel := context.WithTimeout(ctx, config.ESTimeout) defer cancel() esResponse, err := bulkService.Do(timeoutCtx) if err != nil { log.Error().Err(err).Msg("es bulk insert error") return err } if esResponse.Errors { err = fmt.Errorf("es bulk item err") for _, item := range esResponse.Items { bi, _ := json.Marshal(item) log.Error().Err(err).Msgf("es bulk item: %s", string(bi)) } return err } return nil } func (e *ESStore) Save(ctx context.Context, bulkableRequests []es.BulkableRequest) { batchRequests := make([]es.BulkableRequest, 0, esBatchSize) for i := 0; i < len(bulkableRequests); i++ { batchRequests = append(batchRequests, bulkableRequests[i]) if (i+1)%esBatchSize == 0 || i == len(bulkableRequests)-1 { e.BulkChan <- batchRequests batchRequests = make([]es.BulkableRequest, 0, esBatchSize) } } }