es.go 4.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
package es

import (
	"context"
	"errors"
	"fmt"
	"io"
	"os"
	"path"
	"time"

	json "github.com/json-iterator/go"
	"github.com/rs/zerolog/log"

	es "github.com/olivere/elastic/v7"
)

18
// var BulkChan chan []es.BulkableRequest
19 20 21 22 23 24 25 26 27 28
var esBatchSize int

type Config struct {
	ESBatchSize   int
	ESConcurrency int
	ESTimeout     time.Duration
}

func CreateEsClientFromEnv() (*es.Client, error) {
	var opts []es.ClientOptionFunc
29
	esURL := os.Getenv("ELASTIC_URL")
30 31 32 33 34
	if esURL == "" {
		return nil, fmt.Errorf("ES_URL is not set")
	}
	opts = append(opts, es.SetURL(esURL))

35
	esUsername := os.Getenv("ELASTIC_USERNAME")
36

37
	esPassword := os.Getenv("ELASTIC_PASSWORD")
38 39 40
	if esUsername != "" && esPassword != "" {
		opts = append(opts, es.SetBasicAuth(esUsername, esPassword))
	}
qiuqunfeng's avatar
debug  
qiuqunfeng committed
41
	opts = append(opts, es.SetTraceLog(&log.Logger))
42
	opts = append(opts, es.SetDecoder(&es.NumberDecoder{}))
43

44
	sniff := os.Getenv("ELASTIC_SNIFF")
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
	if sniff == "" {
		sniff = "false"
	}
	opts = append(opts, es.SetSniff(sniff == "true"))

	esClient, err := es.NewClient(opts...)
	if err != nil {
		return nil, err
	}
	return esClient, nil
}

type ESStore struct {
	esClient *es.Client
	config   Config
60
	BulkChan chan []es.BulkableRequest
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
}

func readTplFile(filename string) (string, error) {
	file, err := os.OpenFile(path.Join(os.Getenv("CONFIG_DIR"), filename), os.O_RDONLY, 0500)
	if err != nil {
		return "", err
	}
	defer file.Close()

	body, err := io.ReadAll(file)
	if err != nil {
		return "", err
	}

	return string(body), nil
}

78 79 80 81 82 83 84 85 86
func NewESStore(config Config, client *es.Client) *ESStore {
	return &ESStore{
		config:   config,
		esClient: client,
		BulkChan: make(chan []es.BulkableRequest),
	}
}

// Init 初始化ES
87 88
func (e *ESStore) Init() {
	ctx := context.Background()
89
	err := e.createIndex(ctx, "waf_detections_index", "waf_detection_index_template.json", "waf-detections-000001", "waf-detections")
90 91 92 93 94 95 96 97 98 99
	if err != nil {
		log.Error().Err(err).Msg("create index test failed")
	}

	esConcurrency := e.config.ESConcurrency
	esBatchSize = e.config.ESBatchSize
	for i := 0; i < esConcurrency; i++ {
		go func() {
			bulkService := e.esClient.Bulk()
			for {
100
				bulkableRequests := <-e.BulkChan
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
				bulkService.Add(bulkableRequests...)
				err := doSave(ctx, e.config, *bulkService)
				if err != nil {
					bulkService = e.esClient.Bulk()
				} else {
					bulkService.Reset()
				}
			}
		}()
	}

	log.Info().Msg("palace es chan init finish")
}

// createIndex 创建索引
func (e *ESStore) createIndex(ctx context.Context, idxTplName, tplPath, initName, aliasName string) error {
	tpl, err := readTplFile(tplPath)
	if err != nil {
		return err
	}

	// create index template
	idxTplRes, err := e.esClient.IndexPutIndexTemplate(idxTplName).BodyString(tpl).Do(ctx)
	if err != nil {
		return err
	}
	if !idxTplRes.Acknowledged {
		return fmt.Errorf("failed: init %s index template", idxTplName)
	}

	// check and init index
	exist, err := e.esClient.IndexExists(aliasName).Do(ctx)
	if err != nil {
		return err
	}

	if !exist {
		log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("init event index")

		// create index
		createdRes, err := e.esClient.CreateIndex(initName).
			BodyString(fmt.Sprintf(`{"aliases": {"%s": {"is_write_index": "true"}}}`, aliasName)).
			Do(ctx)
		if err != nil {
			return err
		}

		if !createdRes.Acknowledged {
			return errors.New("failed: create " + initName)
		}
	} else {
		log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("index already exist")
	}

	return nil
}

func doSave(ctx context.Context, config Config, bulkService es.BulkService) error {
	start := time.Now()
	defer func() {
		delta := float64(time.Since(start).Milliseconds())
		log.Info().Msgf("es bulk insert cost: %.2f ms", delta)
	}()

	timeoutCtx, cancel := context.WithTimeout(ctx, config.ESTimeout)
	defer cancel()

	esResponse, err := bulkService.Do(timeoutCtx)
	if err != nil {
		log.Error().Err(err).Msg("es bulk insert error")
		return err
	}
	if esResponse.Errors {
		err = fmt.Errorf("es bulk item err")
		for _, item := range esResponse.Items {
			bi, _ := json.Marshal(item)
			log.Error().Err(err).Msgf("es bulk item: %s", string(bi))
		}
		return err
	}
	return nil

}

185
func (e *ESStore) Save(ctx context.Context, bulkableRequests []es.BulkableRequest) {
186 187 188 189
	batchRequests := make([]es.BulkableRequest, 0, esBatchSize)
	for i := 0; i < len(bulkableRequests); i++ {
		batchRequests = append(batchRequests, bulkableRequests[i])
		if (i+1)%esBatchSize == 0 || i == len(bulkableRequests)-1 {
190
			e.BulkChan <- batchRequests
191 192 193 194
			batchRequests = make([]es.BulkableRequest, 0, esBatchSize)
		}
	}
}