Skip to content

Commit

Permalink
Kafka Related Base Configuration
Browse files Browse the repository at this point in the history
- Added Base Config Setup
- Init It With The Main Application
- Using It In The Logic Wherever Required
  • Loading branch information
Raman5837 committed Aug 4, 2024
1 parent af16450 commit a681fc1
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 5 deletions.
7 changes: 4 additions & 3 deletions app/repository/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"github.com/Raman5837/kafka.go/base/database"
)

// Get Messages For Given Partition And Offset >= Given Offset Number
func GetMessages(partitionId uint64, offset uint64) (Messages *[]types.GetMessage, exception error) {
// Get Messages For Given Partition And Offset >= Given Offset Number With Given Limit
func GetMessages(partitionId uint64, offset uint64, limit int) (Messages *[]types.GetMessage, exception error) {

model := model.Message{}
DB := database.DBManager.SqliteDB
responseInstance := &[]types.GetMessage{}
queryResponse := DB.Table(model.TableName()).
Where("partition_id = ? AND offset >= ?", partitionId, offset).Order("offset ASC").Find(responseInstance)
Where("partition_id = ? AND offset >= ?", partitionId, offset).
Order("offset ASC").Limit(limit).Find(responseInstance)

return responseInstance, queryResponse.Error

Expand Down
7 changes: 5 additions & 2 deletions app/service/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/Raman5837/kafka.go/app/repository"
"github.com/Raman5837/kafka.go/app/types"
"github.com/Raman5837/kafka.go/app/utils"
"github.com/Raman5837/kafka.go/base/config"
)

// Consumer Service
Expand Down Expand Up @@ -53,15 +54,17 @@ func (service *ConsumerService) GetMessages(payload *types.GetMessageToConsumeRe
committedOffset, queryErr := service.GetCommittedOffset(payload.ConsumerId, payload.PartitionId)

nextOffset := func() uint64 {
// Start From Beginning If `committedOffset` Is nil Or Something Breaks While Fetching The Committed Offset.
// Let's Consume From Beginning If `committedOffset` Is nil Or Something Breaks While Fetching The Committed Offset.
if queryErr != nil || committedOffset == nil {
return 0
}
return committedOffset.Number
}()

baseConfig := config.GetKafkaConfig()

// Fetch New Message
messages, messageErr := repository.GetMessages(payload.PartitionId, nextOffset)
messages, messageErr := repository.GetMessages(payload.PartitionId, nextOffset, baseConfig.Consumer.FetchSize)
if messageErr != nil {
return nil, messageErr
}
Expand Down
112 changes: 112 additions & 0 deletions base/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package config

import (
"sync"
"time"
)

// Contains All Kafka Related Configurations
type KafkaConfiguration struct {
Producer ProducerConfig
Consumer ConsumerConfig
ConsumerGroup ConsumerGroupConfig

Polling PollingConfig
HeartBeat HeartBeatConfig
MessageRetention MessageRetentionConfig
}

// Configuration Related To Kafka Producers
type ProducerConfig struct {
BatchSize int // Number Of Messages To Batch
BufferMemory int64 // Buffer Memory Size In Bytes
Compression string // Compression Type (Eg. none, gzip)

LingerMS time.Duration // Time To Wait Before Sending A Batch Of Messages
FlushInterval time.Duration // Interval To Flush The Messages
}

// Configuration Related To Kafka Consumers
type ConsumerConfig struct {
FetchSize int // Number Of Messages To Fetch In One Request
BatchSize int // Number Of Messages To Process In A Batch

PollInterval time.Duration // Time Between Polling Requests
SessionTimeout time.Duration // Session Timeout For Consumers

AutoCommit bool // Whether To Auto Commit Offsets
FetchMinBytes int // Minimum Bytes Of Data To Fetch
FetchMaxWaitMS time.Duration // Maximum Wait Time To Fetch Data
HeartBeatInterval time.Duration // Time Between Heartbeat Requests
}

// Configuration Related To Kafka Consumer Groups
type ConsumerGroupConfig struct {
OffsetReset string // Offset Reset Policy (Eg. earliest, latest)
RebalanceInterval time.Duration // Time Interval To Triggers Rebalancing
}

// Configuration Related To Kafka Message Polling
type PollingConfig struct {
Interval time.Duration // Interval Between Polling Requests
}

// Configuration Related To Kafka HeartBeat
type HeartBeatConfig struct {
Interval time.Duration // Heartbeat Interval
}

// Configuration Related To Kafka Message Retention
type MessageRetentionConfig struct {
RetentionPeriod time.Duration // Retention Duration For Messages
}

var singleton sync.Once
var baseConfig KafkaConfiguration

// Initialize Base Kafka Configuration
func InitKafkaConfig() {

singleton.Do(
func() {
baseConfig = KafkaConfiguration{
Producer: ProducerConfig{
BatchSize: 100,
Compression: "none",
BufferMemory: 33554432,
FlushInterval: 5 * time.Second,
LingerMS: 500 * time.Millisecond,
},
Consumer: ConsumerConfig{
FetchMinBytes: 1,
FetchSize: 100,
BatchSize: 10,
AutoCommit: true,
SessionTimeout: 60 * time.Second,
HeartBeatInterval: 10 * time.Second,
PollInterval: 500 * time.Millisecond,
FetchMaxWaitMS: 500 * time.Millisecond,
},
ConsumerGroup: ConsumerGroupConfig{
OffsetReset: "earliest",
RebalanceInterval: 30 * time.Minute,
},
Polling: PollingConfig{
Interval: 100 * time.Millisecond,
},
HeartBeat: HeartBeatConfig{
Interval: 10 * time.Second,
},
MessageRetention: MessageRetentionConfig{
RetentionPeriod: 24 * time.Hour,
},
}
},
)

}

// Returns Base Kafka Configuration
func GetKafkaConfig() *KafkaConfiguration {
return &baseConfig
}
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"os"

"github.com/Raman5837/kafka.go/base/config"
"github.com/Raman5837/kafka.go/base/settings"
"github.com/Raman5837/kafka.go/base/utils"
"github.com/Raman5837/kafka.go/routes"
Expand All @@ -19,6 +20,9 @@ func main() {
// New Fiber App.
app := settings.NewFiberApp()

// Initialize Global Kafka Configurations
config.InitKafkaConfig()

// Registering All Routes
routes.RegisterAllRoutes(app)

Expand Down

0 comments on commit a681fc1

Please sign in to comment.