log_consumer.go 756 Bytes
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
package service

import (
	"context"

	"github.com/rs/zerolog/log"
	"github.com/segmentio/kafka-go"
	"gorm.io/gorm"
)

type LogConsumerService struct {
	consumer *kafka.Reader
	db       *gorm.DB
}

func NewLogConsumerService(consumer *kafka.Reader, db *gorm.DB) *LogConsumerService {
	return &LogConsumerService{
		consumer: consumer,
		db:       db,
	}
}

func (s *LogConsumerService) Consume() {
	for {
		m, err := s.consumer.ReadMessage(context.Background())
		if err != nil {
			log.Error().Err(err).Msg("failed to read message")
			continue
		}
		log.Info().Msgf("message: %s", string(m.Value))
		go s.processMessage(m)
	}
}

func (s *LogConsumerService) processMessage(m kafka.Message) {
	log.Info().Msgf("processing message: %s", string(m.Value))
}