es.go 4.16 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
package es

import (
	"context"
	"errors"
	"fmt"
	"io"
	"os"
	"path"
	"time"

	json "github.com/json-iterator/go"
	"github.com/rs/zerolog/log"

	es "github.com/olivere/elastic/v7"
)

var BulkChan chan []es.BulkableRequest
var esBatchSize int

type Config struct {
	ESBatchSize   int
	ESConcurrency int
	ESTimeout     time.Duration
}

func CreateEsClientFromEnv() (*es.Client, error) {
	var opts []es.ClientOptionFunc
	esURL := os.Getenv("ES_URL")
	if esURL == "" {
		return nil, fmt.Errorf("ES_URL is not set")
	}
	opts = append(opts, es.SetURL(esURL))

	esUsername := os.Getenv("ES_USERNAME")

	esPassword := os.Getenv("ES_PASSWORD")
	if esUsername != "" && esPassword != "" {
		opts = append(opts, es.SetBasicAuth(esUsername, esPassword))
	}

	sniff := os.Getenv("ES_SNIFF")
	if sniff == "" {
		sniff = "false"
	}
	opts = append(opts, es.SetSniff(sniff == "true"))

	esClient, err := es.NewClient(opts...)
	if err != nil {
		return nil, err
	}
	return esClient, nil
}

type ESStore struct {
	esClient *es.Client
	config   Config
}

func readTplFile(filename string) (string, error) {
	file, err := os.OpenFile(path.Join(os.Getenv("CONFIG_DIR"), filename), os.O_RDONLY, 0500)
	if err != nil {
		return "", err
	}
	defer file.Close()

	body, err := io.ReadAll(file)
	if err != nil {
		return "", err
	}

	return string(body), nil
}

func (e *ESStore) Init() {
	ctx := context.Background()
	err := e.createIndex(ctx, "waf_detections_index", "test.json", "waf-detections-000001", "waf-detections")
	if err != nil {
		log.Error().Err(err).Msg("create index test failed")
	}
	BulkChan = make(chan []es.BulkableRequest)

	esConcurrency := e.config.ESConcurrency
	esBatchSize = e.config.ESBatchSize
	for i := 0; i < esConcurrency; i++ {
		go func() {
			bulkService := e.esClient.Bulk()
			for {
				bulkableRequests := <-BulkChan
				bulkService.Add(bulkableRequests...)
				err := doSave(ctx, e.config, *bulkService)
				if err != nil {
					bulkService = e.esClient.Bulk()
				} else {
					bulkService.Reset()
				}
			}
		}()
	}

	log.Info().Msg("palace es chan init finish")
}

// createIndex 创建索引
func (e *ESStore) createIndex(ctx context.Context, idxTplName, tplPath, initName, aliasName string) error {
	tpl, err := readTplFile(tplPath)
	if err != nil {
		return err
	}

	// create index template
	idxTplRes, err := e.esClient.IndexPutIndexTemplate(idxTplName).BodyString(tpl).Do(ctx)
	if err != nil {
		return err
	}
	if !idxTplRes.Acknowledged {
		return fmt.Errorf("failed: init %s index template", idxTplName)
	}

	// check and init index
	exist, err := e.esClient.IndexExists(aliasName).Do(ctx)
	if err != nil {
		return err
	}

	if !exist {
		log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("init event index")

		// create index
		createdRes, err := e.esClient.CreateIndex(initName).
			BodyString(fmt.Sprintf(`{"aliases": {"%s": {"is_write_index": "true"}}}`, aliasName)).
			Do(ctx)
		if err != nil {
			return err
		}

		if !createdRes.Acknowledged {
			return errors.New("failed: create " + initName)
		}
	} else {
		log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("index already exist")
	}

	return nil
}

func doSave(ctx context.Context, config Config, bulkService es.BulkService) error {
	start := time.Now()
	defer func() {
		delta := float64(time.Since(start).Milliseconds())
		log.Info().Msgf("es bulk insert cost: %.2f ms", delta)
	}()

	timeoutCtx, cancel := context.WithTimeout(ctx, config.ESTimeout)
	defer cancel()

	esResponse, err := bulkService.Do(timeoutCtx)
	if err != nil {
		log.Error().Err(err).Msg("es bulk insert error")
		return err
	}
	if esResponse.Errors {
		err = fmt.Errorf("es bulk item err")
		for _, item := range esResponse.Items {
			bi, _ := json.Marshal(item)
			log.Error().Err(err).Msgf("es bulk item: %s", string(bi))
		}
		return err
	}
	return nil

}

func Send(ctx context.Context, bulkableRequests []es.BulkableRequest) {
	batchRequests := make([]es.BulkableRequest, 0, esBatchSize)
	for i := 0; i < len(bulkableRequests); i++ {
		batchRequests = append(batchRequests, bulkableRequests[i])
		if (i+1)%esBatchSize == 0 || i == len(bulkableRequests)-1 {
			BulkChan <- batchRequests
			batchRequests = make([]es.BulkableRequest, 0, esBatchSize)
		}
	}
}