es.go 5.17 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
package es

import (
	"context"
	"errors"
	"fmt"
	"io"
	"os"
	"path"
	"time"

	json "github.com/json-iterator/go"
	"github.com/rs/zerolog/log"
14
	"gitlab.com/tensorsecurity-rd/waf-console/cmd/config"
15 16 17 18

	es "github.com/olivere/elastic/v7"
)

19
// var BulkChan chan []es.BulkableRequest
20 21 22 23 24 25 26 27 28 29
var esBatchSize int

type Config struct {
	ESBatchSize   int
	ESConcurrency int
	ESTimeout     time.Duration
}

func CreateEsClientFromEnv() (*es.Client, error) {
	var opts []es.ClientOptionFunc
30
	esURL := os.Getenv("ELASTIC_URL")
31 32 33 34 35
	if esURL == "" {
		return nil, fmt.Errorf("ES_URL is not set")
	}
	opts = append(opts, es.SetURL(esURL))

36
	esUsername := os.Getenv("ELASTIC_USERNAME")
37

38
	esPassword := os.Getenv("ELASTIC_PASSWORD")
39 40 41
	if esUsername != "" && esPassword != "" {
		opts = append(opts, es.SetBasicAuth(esUsername, esPassword))
	}
qiuqunfeng's avatar
debug  
qiuqunfeng committed
42
	opts = append(opts, es.SetTraceLog(&log.Logger))
43
	opts = append(opts, es.SetDecoder(&es.NumberDecoder{}))
44

45
	sniff := os.Getenv("ELASTIC_SNIFF")
46 47 48 49 50 51 52 53 54 55 56 57
	if sniff == "" {
		sniff = "false"
	}
	opts = append(opts, es.SetSniff(sniff == "true"))

	esClient, err := es.NewClient(opts...)
	if err != nil {
		return nil, err
	}
	return esClient, nil
}

58 59 60 61
func CreateEsClientFromConfig(config *config.ElasticsearchConfig) (*es.Client, error) {
	var opts []es.ClientOptionFunc

	if config.Url == "" {
62
		return nil, fmt.Errorf("es url is not set")
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
	}
	opts = append(opts, es.SetURL(config.Url))

	if config.Username != "" && config.Password != "" {
		opts = append(opts, es.SetBasicAuth(config.Username, config.Password))
	}
	opts = append(opts, es.SetTraceLog(&log.Logger))
	opts = append(opts, es.SetDecoder(&es.NumberDecoder{}))

	opts = append(opts, es.SetSniff(config.Sniff))

	esClient, err := es.NewClient(opts...)
	if err != nil {
		return nil, err
	}
	return esClient, nil
}

81 82 83
type ESStore struct {
	esClient *es.Client
	config   Config
84
	BulkChan chan []es.BulkableRequest
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
}

func readTplFile(filename string) (string, error) {
	file, err := os.OpenFile(path.Join(os.Getenv("CONFIG_DIR"), filename), os.O_RDONLY, 0500)
	if err != nil {
		return "", err
	}
	defer file.Close()

	body, err := io.ReadAll(file)
	if err != nil {
		return "", err
	}

	return string(body), nil
}

102 103 104 105 106 107 108 109 110
func NewESStore(config Config, client *es.Client) *ESStore {
	return &ESStore{
		config:   config,
		esClient: client,
		BulkChan: make(chan []es.BulkableRequest),
	}
}

// Init 初始化ES
111 112
func (e *ESStore) Init() {
	ctx := context.Background()
113
	err := e.createIndex(ctx, "waf_detections_index", "waf_detection_index_template.json", "waf-detections-000001", "waf-detections")
114 115 116 117 118 119 120 121 122 123
	if err != nil {
		log.Error().Err(err).Msg("create index test failed")
	}

	esConcurrency := e.config.ESConcurrency
	esBatchSize = e.config.ESBatchSize
	for i := 0; i < esConcurrency; i++ {
		go func() {
			bulkService := e.esClient.Bulk()
			for {
124
				bulkableRequests := <-e.BulkChan
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
				bulkService.Add(bulkableRequests...)
				err := doSave(ctx, e.config, *bulkService)
				if err != nil {
					bulkService = e.esClient.Bulk()
				} else {
					bulkService.Reset()
				}
			}
		}()
	}

	log.Info().Msg("palace es chan init finish")
}

// createIndex 创建索引
func (e *ESStore) createIndex(ctx context.Context, idxTplName, tplPath, initName, aliasName string) error {
	tpl, err := readTplFile(tplPath)
	if err != nil {
		return err
	}

	// create index template
	idxTplRes, err := e.esClient.IndexPutIndexTemplate(idxTplName).BodyString(tpl).Do(ctx)
	if err != nil {
		return err
	}
	if !idxTplRes.Acknowledged {
		return fmt.Errorf("failed: init %s index template", idxTplName)
	}

	// check and init index
	exist, err := e.esClient.IndexExists(aliasName).Do(ctx)
	if err != nil {
		return err
	}

	if !exist {
		log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("init event index")

		// create index
		createdRes, err := e.esClient.CreateIndex(initName).
			BodyString(fmt.Sprintf(`{"aliases": {"%s": {"is_write_index": "true"}}}`, aliasName)).
			Do(ctx)
		if err != nil {
			return err
		}

		if !createdRes.Acknowledged {
			return errors.New("failed: create " + initName)
		}
	} else {
		log.Info().Str("initName", initName).Str("aliasName", aliasName).Msg("index already exist")
	}

	return nil
}

func doSave(ctx context.Context, config Config, bulkService es.BulkService) error {
	start := time.Now()
	defer func() {
		delta := float64(time.Since(start).Milliseconds())
		log.Info().Msgf("es bulk insert cost: %.2f ms", delta)
	}()

	timeoutCtx, cancel := context.WithTimeout(ctx, config.ESTimeout)
	defer cancel()

	esResponse, err := bulkService.Do(timeoutCtx)
	if err != nil {
		log.Error().Err(err).Msg("es bulk insert error")
		return err
	}
	if esResponse.Errors {
		err = fmt.Errorf("es bulk item err")
		for _, item := range esResponse.Items {
			bi, _ := json.Marshal(item)
			log.Error().Err(err).Msgf("es bulk item: %s", string(bi))
		}
		return err
	}
	return nil

}

209
func (e *ESStore) Save(ctx context.Context, bulkableRequests []es.BulkableRequest) {
210 211 212 213
	batchRequests := make([]es.BulkableRequest, 0, esBatchSize)
	for i := 0; i < len(bulkableRequests); i++ {
		batchRequests = append(batchRequests, bulkableRequests[i])
		if (i+1)%esBatchSize == 0 || i == len(bulkableRequests)-1 {
214
			e.BulkChan <- batchRequests
215 216 217 218
			batchRequests = make([]es.BulkableRequest, 0, esBatchSize)
		}
	}
}