package es import ( "context" "errors" "fmt" "io" "os" "path" "time" json "github.com/json-iterator/go" "github.com/rs/zerolog/log" es "github.com/olivere/elastic/v7" ) var BulkChan chan []es.BulkableRequest var esBatchSize int type Config struct { ESBatchSize int ESConcurrency int ESTimeout time.Duration } func CreateEsClientFromEnv() (*es.Client, error) { var opts []es.ClientOptionFunc esURL := os.Getenv("ES_URL") if esURL == "" { return nil, fmt.Errorf("ES_URL is not set") } opts = append(opts, es.SetURL(esURL)) esUsername := os.Getenv("ES_USERNAME") esPassword := os.Getenv("ES_PASSWORD") if esUsername != "" && esPassword != "" { opts = append(opts, es.SetBasicAuth(esUsername, esPassword)) } sniff := os.Getenv("ES_SNIFF") if sniff == "" { sniff = "false" } opts = append(opts, es.SetSniff(sniff == "true")) esClient, err := es.NewClient(opts...) if err != nil { return nil, err } return esClient, nil } type ESStore struct { esClient *es.Client config Config } func readTplFile(filename string) (string, error) { file, err := os.OpenFile(path.Join(os.Getenv("CONFIG_DIR"), filename), os.O_RDONLY, 0500) if err != nil { return "", err } defer file.Close() body, err := io.ReadAll(file) if err != nil { return "", err } return string(body), nil } func (e *ESStore) Init() { ctx := context.Background() err := e.createIndex(ctx, "waf_detections_index", "test.json", "waf-detections-000001", "waf-detections") if err != nil { log.Error().Err(err).Msg("create index test failed") } BulkChan = make(chan []es.BulkableRequest) esConcurrency := e.config.ESConcurrency esBatchSize = e.config.ESBatchSize for i := 0; i < esConcurrency; i++ { go func() { bulkService := e.esClient.Bulk() for { bulkableRequests := <-BulkChan bulkService.Add(bulkableRequests...) err := doSave(ctx, e.config, *bulkService) if err != nil { bulkService = e.esClient.Bulk() } else { bulkService.Reset() } } }() } log.Info().Msg("palace es chan init finish") } // createIndex 创建索引 func (e *ESStore) createIndex(ctx context.Context, idxTplName, tplPath, initName, aliasName string) error { tpl, err := readTplFile(tplPath) if err != nil { return err } // create index template idxTplRes, err := e.esClient.IndexPutIndexTemplate(idxTplName).BodyString(tpl).Do(ctx) if err != nil { return err } if !idxTplRes.Acknowledged { return fmt.Errorf("failed: init %s index template", idxTplName) } // check and init index exist, err := e.esClient.IndexExists(aliasName).Do(ctx) if err != nil { return err } if !exist { log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("init event index") // create index createdRes, err := e.esClient.CreateIndex(initName). BodyString(fmt.Sprintf(`{"aliases": {"%s": {"is_write_index": "true"}}}`, aliasName)). Do(ctx) if err != nil { return err } if !createdRes.Acknowledged { return errors.New("failed: create " + initName) } } else { log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("index already exist") } return nil } func doSave(ctx context.Context, config Config, bulkService es.BulkService) error { start := time.Now() defer func() { delta := float64(time.Since(start).Milliseconds()) log.Info().Msgf("es bulk insert cost: %.2f ms", delta) }() timeoutCtx, cancel := context.WithTimeout(ctx, config.ESTimeout) defer cancel() esResponse, err := bulkService.Do(timeoutCtx) if err != nil { log.Error().Err(err).Msg("es bulk insert error") return err } if esResponse.Errors { err = fmt.Errorf("es bulk item err") for _, item := range esResponse.Items { bi, _ := json.Marshal(item) log.Error().Err(err).Msgf("es bulk item: %s", string(bi)) } return err } return nil } func Send(ctx context.Context, bulkableRequests []es.BulkableRequest) { batchRequests := make([]es.BulkableRequest, 0, esBatchSize) for i := 0; i < len(bulkableRequests); i++ { batchRequests = append(batchRequests, bulkableRequests[i]) if (i+1)%esBatchSize == 0 || i == len(bulkableRequests)-1 { BulkChan <- batchRequests batchRequests = make([]es.BulkableRequest, 0, esBatchSize) } } }