package service import ( "context" "github.com/rs/zerolog/log" "github.com/segmentio/kafka-go" "gorm.io/gorm" ) type LogConsumerService struct { consumer *kafka.Reader db *gorm.DB } func NewLogConsumerService(consumer *kafka.Reader, db *gorm.DB) *LogConsumerService { return &LogConsumerService{ consumer: consumer, db: db, } } func (s *LogConsumerService) Consume() { for { m, err := s.consumer.ReadMessage(context.Background()) if err != nil { log.Error().Err(err).Msg("failed to read message") continue } log.Info().Msgf("message: %s", string(m.Value)) go s.processMessage(m) } } func (s *LogConsumerService) processMessage(m kafka.Message) { log.Info().Msgf("processing message: %s", string(m.Value)) }