Commit 5f22de38 authored by qiuqunfeng's avatar qiuqunfeng
Browse files

Implement dynamic log level configuration and refactor Elasticsearch client creation

This update introduces the ability to set the log level dynamically based on the LOG_LEVEL environment variable, allowing for better control over logging verbosity. Additionally, the Elasticsearch client creation has been refactored to utilize configuration settings from a JSON file, enhancing the application's configurability. The previous LoadConfig function has been replaced with a more structured approach, and unnecessary code has been removed to streamline the log consumer service.
parent c2e31c1b
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"gitlab.com/tensorsecurity-rd/waf-console/api" "gitlab.com/tensorsecurity-rd/waf-console/api"
"gitlab.com/tensorsecurity-rd/waf-console/cmd/config"
"gitlab.com/tensorsecurity-rd/waf-console/internal/service" "gitlab.com/tensorsecurity-rd/waf-console/internal/service"
es "gitlab.com/tensorsecurity-rd/waf-console/internal/store" es "gitlab.com/tensorsecurity-rd/waf-console/internal/store"
"gitlab.com/tensorsecurity-rd/waf-console/internal/utils" "gitlab.com/tensorsecurity-rd/waf-console/internal/utils"
...@@ -22,7 +23,7 @@ func NewRootCommand() *cobra.Command { ...@@ -22,7 +23,7 @@ func NewRootCommand() *cobra.Command {
Short: "Start waf-console service.", Short: "Start waf-console service.",
Args: cobra.ExactArgs(0), Args: cobra.ExactArgs(0),
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
config := LoadConfig() config := config.LoadConfig()
debugMode := os.Getenv("DEBUG_MODE") debugMode := os.Getenv("DEBUG_MODE")
log.Info().Msgf("DEBUG_MODE: %s", debugMode) log.Info().Msgf("DEBUG_MODE: %s", debugMode)
if debugMode == "true" { if debugMode == "true" {
...@@ -66,21 +67,10 @@ func NewRootCommand() *cobra.Command { ...@@ -66,21 +67,10 @@ func NewRootCommand() *cobra.Command {
} }
} }
kubeClient := utils.NewKubeClient(regionConfig.ApiServer, caData, clientCertData, clientKeyData, regionConfig.Insecure) kubeClient := utils.NewKubeClient(regionConfig.ApiServer, caData, clientCertData, clientKeyData, regionConfig.Insecure)
// client := versioned.NewForConfigOrDie(&rest.Config{
// Host: regionConfig.ApiServer,
// TLSClientConfig: rest.TLSClientConfig{
// Insecure: regionConfig.Insecure,
// CAData: caData,
// CertData: clientCertData,
// KeyData: clientKeyData,
// },
// })
clusterClientManager.AddClient(regionConfig.RegionCode, kubeClient) clusterClientManager.AddClient(regionConfig.RegionCode, kubeClient)
} }
esClient, err := es.CreateEsClientFromEnv() esClient, err := es.CreateEsClientFromConfig(config.ElasticsearchConfig)
if err != nil { if err != nil {
panic(err) panic(err)
} }
...@@ -91,7 +81,7 @@ func NewRootCommand() *cobra.Command { ...@@ -91,7 +81,7 @@ func NewRootCommand() *cobra.Command {
ESTimeout: 10 * time.Second, ESTimeout: 10 * time.Second,
}, esClient) }, esClient)
esStore.Init() esStore.Init()
logConsumerService := service.NewLogConsumerService(db, esStore) logConsumerService := service.NewLogConsumerService(db, esStore, config.KafkaConfig)
go logConsumerService.Consume() go logConsumerService.Consume()
return e.Run(":8080") return e.Run(":8080")
}, },
......
package app package config
import ( import (
"encoding/json" "encoding/json"
"os" "os"
"strings"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
...@@ -21,6 +22,8 @@ type Config struct { ...@@ -21,6 +22,8 @@ type Config struct {
Debug bool `json:"debug"` Debug bool `json:"debug"`
GatewayUrl string `json:"gateway_url"` GatewayUrl string `json:"gateway_url"`
SSOUrl string `json:"sso_url"` SSOUrl string `json:"sso_url"`
ElasticsearchConfig *ElasticsearchConfig `json:"elasticsearch_config"`
KafkaConfig *KafkaConfig `json:"kafka_config"`
} }
type DBConfig struct { type DBConfig struct {
...@@ -41,6 +44,23 @@ type RegionConfig struct { ...@@ -41,6 +44,23 @@ type RegionConfig struct {
Insecure bool `json:"insecure"` Insecure bool `json:"insecure"`
} }
type ElasticsearchConfig struct {
Url string `json:"url"`
Username string `json:"username"`
Password string `json:"password"`
Sniff bool `json:"sniff"`
}
type KafkaConfig struct {
Brokers []string `json:"brokers"`
Topic string `json:"topic"`
Group string `json:"group"`
AuthMethod string `json:"auth_method"`
Username string `json:"username"`
Password string `json:"password"`
ScramAlgo string `json:"scram_algo"`
}
func LoadConfig() *Config { func LoadConfig() *Config {
configFile := "config/config.json" configFile := "config/config.json"
if envFile := os.Getenv("CONFIG_FILE"); envFile != "" { if envFile := os.Getenv("CONFIG_FILE"); envFile != "" {
...@@ -54,18 +74,11 @@ func LoadConfig() *Config { ...@@ -54,18 +74,11 @@ func LoadConfig() *Config {
} }
var config Config var config Config
// if err := yaml.Unmarshal(data, &config); err != nil {
// log.Err(err).Msg("Failed to parse config file")
// return nil
// }
if err := json.Unmarshal(data, &config); err != nil { if err := json.Unmarshal(data, &config); err != nil {
log.Err(err).Msg("Failed to parse config file") log.Err(err).Msg("Failed to parse config file")
return nil return nil
} }
log.Info().Msgf("config password: %s", config.DBConfig.Password)
password := os.Getenv("RDB_PASSWORD") password := os.Getenv("RDB_PASSWORD")
log.Info().Msgf("RDB_PASSWORD: %s", password)
log.Info().Msgf("config: %+v", config)
if password != "" { if password != "" {
config.DBConfig.Password = password config.DBConfig.Password = password
} }
...@@ -79,6 +92,51 @@ func LoadConfig() *Config { ...@@ -79,6 +92,51 @@ func LoadConfig() *Config {
Database: DB_NAME, Database: DB_NAME,
} }
} }
esURL := os.Getenv("ELASTIC_URL")
if esURL != "" {
config.ElasticsearchConfig.Url = esURL
}
esUsername := os.Getenv("ELASTIC_USERNAME")
if esUsername != "" {
config.ElasticsearchConfig.Username = esUsername
}
esPassword := os.Getenv("ELASTIC_PASSWORD")
if esPassword != "" {
config.ElasticsearchConfig.Password = esPassword
}
sniff := os.Getenv("ELASTIC_SNIFF")
if sniff != "" {
config.ElasticsearchConfig.Sniff = (sniff == "true")
}
kafkaBrokers := os.Getenv("KAFKA_BROKERS")
if kafkaBrokers != "" {
config.KafkaConfig.Brokers = strings.Split(kafkaBrokers, ",")
}
kafkaTopic := os.Getenv("KAFKA_TOPIC")
if kafkaTopic != "" {
config.KafkaConfig.Topic = kafkaTopic
}
kafkaGroup := os.Getenv("KAFKA_GROUP")
if kafkaGroup != "" {
config.KafkaConfig.Group = kafkaGroup
}
kafkaAuthMethod := os.Getenv("KAFKA_AUTH_METHOD")
if kafkaAuthMethod != "" {
config.KafkaConfig.AuthMethod = kafkaAuthMethod
}
kafkaUsername := os.Getenv("KAFKA_USERNAME")
if kafkaUsername != "" {
config.KafkaConfig.Username = kafkaUsername
}
kafkaPassword := os.Getenv("KAFKA_PASSWORD")
if kafkaPassword != "" {
config.KafkaConfig.Password = kafkaPassword
}
kafkaScramAlgo := os.Getenv("KAFKA_SCRAM_ALGO")
if kafkaScramAlgo != "" {
config.KafkaConfig.ScramAlgo = kafkaScramAlgo
}
return &config return &config
} }
...@@ -14,7 +14,19 @@ const ( ...@@ -14,7 +14,19 @@ const (
) )
func main() { func main() {
logLevel := os.Getenv("LOG_LEVEL")
switch logLevel {
case "debug":
zerolog.SetGlobalLevel(zerolog.DebugLevel) zerolog.SetGlobalLevel(zerolog.DebugLevel)
case "info":
zerolog.SetGlobalLevel(zerolog.InfoLevel)
case "warn":
zerolog.SetGlobalLevel(zerolog.WarnLevel)
case "error":
zerolog.SetGlobalLevel(zerolog.ErrorLevel)
default:
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout}) log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout})
log.Info().Msg("starting gateway") log.Info().Msg("starting gateway")
......
...@@ -15,5 +15,20 @@ ...@@ -15,5 +15,20 @@
"host": "tensorsec-mysql.tensorsec.svc.cluster.local", "host": "tensorsec-mysql.tensorsec.svc.cluster.local",
"port": "3306", "port": "3306",
"database": "ivan" "database": "ivan"
},
"elasticsearch_config": {
"url": "http://192.168.3.10:9200",
"username": "elastic",
"password": "Mysql-ha@123",
"sniff": true
},
"kafka_config": {
"brokers": ["192.168.3.10:9092"],
"topic": "ivan_waf_detections",
"group": "ivan_waf_detections_console",
"auth_method": "scram",
"username": "ivan",
"password": "Mysql-ha@123",
"scram_algo": "sha256"
} }
} }
\ No newline at end of file
package service
import (
"os"
"time"
"github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
"gitlab.com/tensorsecurity-rd/waf-console/cmd/config"
)
func getSASLMechanism(config *config.KafkaConfig) (sasl.Mechanism, bool, error) {
authMethod := config.AuthMethod
username := config.Username
password := config.Password
var mechanism sasl.Mechanism
switch authMethod {
case KafkaAuthPlain:
if username == "" || password == "" {
return nil, true, ErrMissingAuthInfo
}
mechanism = &plain.Mechanism{
Username: username,
Password: password,
}
case KafkaAuthScram:
if username == "" || password == "" {
return nil, true, ErrMissingAuthInfo
}
algoKey := os.Getenv(EnvKafkaScramAlgo)
algo := scram.SHA512
algoTmp, exist := scramAlgo[algoKey]
if exist {
algo = algoTmp
}
var err error
mechanism, err = scram.Mechanism(algo, username, password)
if err != nil {
return nil, true, err
}
}
return mechanism, mechanism != nil, nil
}
func NewKafkaReader(config *config.KafkaConfig) *kafka.Reader {
mechanism, _, err := getSASLMechanism(config)
if err != nil {
log.Fatal().Err(err).Msg("failed to get SASL mechanism")
}
if len(config.Brokers) == 0 {
log.Fatal().Msg("no KAFKA_BROKERS env variable set")
}
if config.Group == "" {
config.Group = KafkaGroupIvanWafDetectionsPalace
log.Warn().Msg("no KAFKA_GROUP env variable set, using default group")
}
if config.Topic == "" {
config.Topic = KafkaTopicIvanWafDetections
log.Warn().Msg("no KAFKA_TOPIC env variable set, using default topic")
}
return kafka.NewReader(kafka.ReaderConfig{
Brokers: config.Brokers,
Topic: config.Topic,
GroupID: config.Group,
Dialer: &kafka.Dialer{
SASLMechanism: mechanism,
Timeout: 10 * time.Second,
DualStack: true,
},
})
}
...@@ -4,15 +4,11 @@ import ( ...@@ -4,15 +4,11 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"os"
"strings"
"time"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram" "github.com/segmentio/kafka-go/sasl/scram"
"gitlab.com/tensorsecurity-rd/waf-console/cmd/config"
"gitlab.com/tensorsecurity-rd/waf-console/internal/model" "gitlab.com/tensorsecurity-rd/waf-console/internal/model"
"gitlab.com/tensorsecurity-rd/waf-console/internal/utils/id" "gitlab.com/tensorsecurity-rd/waf-console/internal/utils/id"
...@@ -48,38 +44,38 @@ const ( ...@@ -48,38 +44,38 @@ const (
KafkaAuthScram = "scram" KafkaAuthScram = "scram"
) )
func getSASLMechanismByEnv() (sasl.Mechanism, bool, error) { // func getSASLMechanismByEnv() (sasl.Mechanism, bool, error) {
authMethod := os.Getenv(EnvKafkaAuthMethod) // authMethod := os.Getenv(EnvKafkaAuthMethod)
username := os.Getenv(EnvKafkaAuthUsername) // username := os.Getenv(EnvKafkaAuthUsername)
password := os.Getenv(EnvKafkaAuthPassword) // password := os.Getenv(EnvKafkaAuthPassword)
var mechanism sasl.Mechanism // var mechanism sasl.Mechanism
switch authMethod { // switch authMethod {
case KafkaAuthPlain: // case KafkaAuthPlain:
if username == "" || password == "" { // if username == "" || password == "" {
return nil, true, ErrMissingAuthInfo // return nil, true, ErrMissingAuthInfo
} // }
mechanism = &plain.Mechanism{ // mechanism = &plain.Mechanism{
Username: username, // Username: username,
Password: password, // Password: password,
} // }
case KafkaAuthScram: // case KafkaAuthScram:
if username == "" || password == "" { // if username == "" || password == "" {
return nil, true, ErrMissingAuthInfo // return nil, true, ErrMissingAuthInfo
} // }
algoKey := os.Getenv(EnvKafkaScramAlgo) // algoKey := os.Getenv(EnvKafkaScramAlgo)
algo := scram.SHA512 // algo := scram.SHA512
algoTmp, exist := scramAlgo[algoKey] // algoTmp, exist := scramAlgo[algoKey]
if exist { // if exist {
algo = algoTmp // algo = algoTmp
} // }
var err error // var err error
mechanism, err = scram.Mechanism(algo, username, password) // mechanism, err = scram.Mechanism(algo, username, password)
if err != nil { // if err != nil {
return nil, true, err // return nil, true, err
} // }
} // }
return mechanism, mechanism != nil, nil // return mechanism, mechanism != nil, nil
} // }
type LogConsumerService struct { type LogConsumerService struct {
consumer *kafka.Reader consumer *kafka.Reader
...@@ -87,26 +83,27 @@ type LogConsumerService struct { ...@@ -87,26 +83,27 @@ type LogConsumerService struct {
esStore *esStore.ESStore esStore *esStore.ESStore
} }
func NewLogConsumerService(db *gorm.DB, esStore *esStore.ESStore) *LogConsumerService { func NewLogConsumerService(db *gorm.DB, esStore *esStore.ESStore, config *config.KafkaConfig) *LogConsumerService {
mechanism, _, err := getSASLMechanismByEnv() // mechanism, _, err := getSASLMechanismByEnv()
if err != nil { // if err != nil {
log.Fatal().Err(err).Msg("failed to get SASL mechanism") // log.Fatal().Err(err).Msg("failed to get SASL mechanism")
} // }
brokers := strings.Split(os.Getenv(EnvKafkaBrokers), ",") // brokers := strings.Split(os.Getenv(EnvKafkaBrokers), ",")
if len(brokers) == 0 { // if len(brokers) == 0 {
log.Fatal().Msg("no KAFKA_BROKERS env variable set") // log.Fatal().Msg("no KAFKA_BROKERS env variable set")
} // }
consumer := kafka.NewReader(kafka.ReaderConfig{ // consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers, // Brokers: brokers,
GroupID: KafkaGroupIvanWafDetectionsPalace, // GroupID: KafkaGroupIvanWafDetectionsPalace,
Topic: KafkaTopicIvanWafDetections, // Topic: KafkaTopicIvanWafDetections,
Dialer: &kafka.Dialer{ // Dialer: &kafka.Dialer{
SASLMechanism: mechanism, // SASLMechanism: mechanism,
Timeout: 10 * time.Second, // Timeout: 10 * time.Second,
DualStack: true, // DualStack: true,
}, // },
}) // })
consumer := NewKafkaReader(config)
return &LogConsumerService{ return &LogConsumerService{
consumer: consumer, consumer: consumer,
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
json "github.com/json-iterator/go" json "github.com/json-iterator/go"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"gitlab.com/tensorsecurity-rd/waf-console/cmd/config"
es "github.com/olivere/elastic/v7" es "github.com/olivere/elastic/v7"
) )
...@@ -54,6 +55,29 @@ func CreateEsClientFromEnv() (*es.Client, error) { ...@@ -54,6 +55,29 @@ func CreateEsClientFromEnv() (*es.Client, error) {
return esClient, nil return esClient, nil
} }
func CreateEsClientFromConfig(config *config.ElasticsearchConfig) (*es.Client, error) {
var opts []es.ClientOptionFunc
if config.Url == "" {
return nil, fmt.Errorf("ES_URL is not set")
}
opts = append(opts, es.SetURL(config.Url))
if config.Username != "" && config.Password != "" {
opts = append(opts, es.SetBasicAuth(config.Username, config.Password))
}
opts = append(opts, es.SetTraceLog(&log.Logger))
opts = append(opts, es.SetDecoder(&es.NumberDecoder{}))
opts = append(opts, es.SetSniff(config.Sniff))
esClient, err := es.NewClient(opts...)
if err != nil {
return nil, err
}
return esClient, nil
}
type ESStore struct { type ESStore struct {
esClient *es.Client esClient *es.Client
config Config config Config
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment