Skip to content

Commit

Permalink
Implemented Kafka Configurations
Browse files Browse the repository at this point in the history
- Added Support For Basic Kafka Configuration
- Updated Consumer, Group, Producer Service To Use This Configurations
  • Loading branch information
Raman5837 committed Aug 4, 2024
1 parent a681fc1 commit 54b9df3
Show file tree
Hide file tree
Showing 20 changed files with 504 additions and 91 deletions.
7 changes: 5 additions & 2 deletions app/handler/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handler
import (
"github.com/Raman5837/kafka.go/app/service"
"github.com/Raman5837/kafka.go/app/types"
"github.com/Raman5837/kafka.go/base/config"
"github.com/Raman5837/kafka.go/base/utils"
"github.com/gofiber/fiber/v2"
)
Expand All @@ -17,7 +18,8 @@ func AddNewConsumerHandler(context *fiber.Ctx) (exception error) {
return context.Status(fiber.StatusBadRequest).JSON(utils.HttpResponseFail(nil, "Invalid Payload!", exception))
}

service := service.NewConsumerService()
config := config.GetKafkaConfig()
service := service.NewConsumerService(config.Consumer)
response, queryErr := service.AddNewConsumer(&payload)

if queryErr != nil {
Expand All @@ -39,7 +41,8 @@ func ConsumeMessageHandler(context *fiber.Ctx) (exception error) {
return context.Status(fiber.StatusBadRequest).JSON(utils.HttpResponseFail(nil, "Invalid Payload!", exception))
}

service := service.NewConsumerService()
config := config.GetKafkaConfig()
service := service.NewConsumerService(config.Consumer)
response, queryErr := service.GetMessages(&payload)

if queryErr != nil {
Expand Down
4 changes: 3 additions & 1 deletion app/handler/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handler
import (
"github.com/Raman5837/kafka.go/app/service"
"github.com/Raman5837/kafka.go/app/types"
"github.com/Raman5837/kafka.go/base/config"
"github.com/Raman5837/kafka.go/base/utils"
"github.com/gofiber/fiber/v2"
)
Expand All @@ -17,7 +18,8 @@ func AddNewConsumerGroupHandler(context *fiber.Ctx) (exception error) {
return context.Status(fiber.StatusBadRequest).JSON(utils.HttpResponseFail(nil, "Invalid Payload!", exception))
}

service := service.NewConsumerGroupService()
config := config.GetKafkaConfig()
service := service.NewConsumerGroupService(config.ConsumerGroup)
response, queryErr := service.AddNewConsumerGroup(&payload)

if queryErr != nil {
Expand Down
5 changes: 4 additions & 1 deletion app/handler/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/Raman5837/kafka.go/app/service"
"github.com/Raman5837/kafka.go/app/types"
"github.com/Raman5837/kafka.go/app/utils"
"github.com/Raman5837/kafka.go/base/config"
base "github.com/Raman5837/kafka.go/base/utils"
"github.com/gofiber/fiber/v2"
)
Expand All @@ -18,8 +19,10 @@ func ProduceMessageHandler(context *fiber.Ctx) (exception error) {
return context.Status(fiber.StatusBadRequest).JSON(base.HttpResponseFail(nil, "Invalid Payload!", exception))
}

config := config.GetKafkaConfig()
assigner := utils.NewPartitionAssigner()
service := service.NewProducerService(assigner)
service := service.NewProducerService(assigner, config.Producer)

response, additionErr := service.AddNewMessage(&payload)

if additionErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion app/interface/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import "github.com/Raman5837/kafka.go/app/types"

// PartitionAssigner Interface
type PartitionAssignerInterface interface {
Next(topicID uint64) (*types.GetPartition, error)
Next(topicID uint) (*types.GetPartition, error)
}
22 changes: 11 additions & 11 deletions app/model/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ type Consumer struct {
model.AbstractModel
ID uint `gorm:"primaryKey"`

ConsumerID uuid.UUID `gorm:"type:uuid;unique;not null"`
ConsumerId uuid.UUID `gorm:"type:uuid;unique;not null"`

GroupID uint64 `gorm:"not null"`
GroupID uint `gorm:"not null"`
Group ConsumerGroup `gorm:"foreignKey:GroupID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE"`
}

// Add Default Value For Column `ConsumerID`
// Add Default Value For Column `ConsumerId`
func (instance *Consumer) BeforeCreate(transaction *gorm.DB) error {
instance.ConsumerID = uuid.New()
instance.ConsumerId = uuid.New()
return nil
}

Expand All @@ -47,7 +47,7 @@ func (instance Consumer) TableName() string {
}

func (instance Consumer) String() string {
return fmt.Sprintf("Consumer: %s Of Group: %s", instance.ConsumerID, instance.Group.Name)
return fmt.Sprintf("Consumer: %s Of Group: %s", instance.ConsumerId, instance.Group.Name)
}

// Stores Current Offset Of The Consumer In The Associated Partition
Expand All @@ -56,10 +56,10 @@ type Offset struct {
ID uint `gorm:"primaryKey"`

Number uint64 `gorm:"not null"`
ConsumerID uint64 `gorm:"not null"`
ConsumerId uint `gorm:"not null"`
Consumer Consumer `gorm:"foreignKey:ConsumerID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE"`

PartitionID uint64 `gorm:"not null"`
PartitionId uint `gorm:"not null"`
Partition Partition `gorm:"foreignKey:PartitionID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE"`
}

Expand All @@ -68,18 +68,18 @@ func (instance Offset) TableName() string {
}

func (instance Offset) String() string {
return fmt.Sprintf("Offset Having Number: %d For Consumer: %d Of Partition: %d", instance.Number, instance.ConsumerID, instance.PartitionID)
return fmt.Sprintf("Offset Having Number: %d For Consumer: %d Of Partition: %d", instance.Number, instance.ConsumerId, instance.PartitionId)
}

// To Manage Consumer Rebalancing, Partitions Assignments And All
type ConsumerAssignment struct {
model.AbstractModel
ID uint `gorm:"primaryKey"`

ConsumerID uint64 `gorm:"not null"`
ConsumerId uint `gorm:"not null"`
Consumer Consumer `gorm:"foreignKey:ConsumerID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE"`

PartitionID uint64 `gorm:"not null"`
PartitionId uint `gorm:"not null"`
Partition Partition `gorm:"foreignKey:PartitionID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE"`
}

Expand All @@ -88,5 +88,5 @@ func (instance ConsumerAssignment) TableName() string {
}

func (instance ConsumerAssignment) String() string {
return fmt.Sprintf("Management Of Consumer: %d Of Group: %d And Partition: %d", instance.ConsumerID, instance.Consumer.GroupID, instance.PartitionID)
return fmt.Sprintf("Management Of Consumer: %d Of Group: %d And Partition: %d", instance.ConsumerId, instance.Consumer.GroupID, instance.PartitionId)
}
20 changes: 10 additions & 10 deletions app/model/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ func (instance Topic) String() string {
// Represents A Kafka Partition
type Partition struct {
model.AbstractModel
ID uint `gorm:"primaryKey"`
PartitionId uint64 `gorm:"not null"`
ID uint `gorm:"primaryKey"`
PartitionId uint `gorm:"not null"`

TopicID uint64 `gorm:"not null"`
Topic Topic `gorm:"foreignKey:TopicID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE"`
TopicId uint `gorm:"not null"`
Topic Topic `gorm:"foreignKey:TopicID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE"`
}

func (instance Partition) TableName() string {
Expand All @@ -47,7 +47,7 @@ type Message struct {
Offset int64 `gorm:"not null"`
Value interface{} `gorm:"type:json"`

PartitionID uint64 `gorm:"not null"`
PartitionId uint `gorm:"not null"`
Partition Partition `gorm:"foreignKey:PartitionID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE"`
}

Expand All @@ -56,18 +56,18 @@ func (instance Message) TableName() string {
}

func (instance Message) String() string {
return fmt.Sprintf("Message: %d --> Of Partition %d", instance.ID, instance.PartitionID)
return fmt.Sprintf("Message: %d --> Of Partition %d", instance.ID, instance.PartitionId)
}

// Stores The Last Assigned Partition To A Topic
type LastAssignedPartition struct {
model.AbstractModel
ID uint `gorm:"primaryKey"`

TopicID uint64 `gorm:"unique;not null"`
Topic Topic `gorm:"constraint:OnUpdate:CASCADE,OnDelete:CASCADE"`
TopicID uint `gorm:"unique;not null"`
Topic Topic `gorm:"constraint:OnUpdate:CASCADE,OnDelete:CASCADE"`

PartitionID uint64 `gorm:"not null"`
PartitionId uint `gorm:"not null"`
Partition Partition `gorm:"foreignKey:PartitionID;constraint:OnUpdate:CASCADE,OnDelete:SET NULL"`
}

Expand All @@ -76,5 +76,5 @@ func (instance LastAssignedPartition) TableName() string {
}

func (instance LastAssignedPartition) String() string {
return fmt.Sprintf("Topic: %s's Last Partition Was --> %d", instance.Topic.Name, instance.PartitionID)
return fmt.Sprintf("Topic: %s's Last Partition Was --> %d", instance.Topic.Name, instance.PartitionId)
}
55 changes: 50 additions & 5 deletions app/repository/assignment.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package repository

import (
"fmt"

"github.com/Raman5837/kafka.go/app/model"
"github.com/Raman5837/kafka.go/app/types"
"github.com/Raman5837/kafka.go/base/database"
Expand All @@ -19,7 +21,7 @@ func GetConsumerAssignment(groupId int, partitionId int) (Assignment *types.GetC
}

// Get All Assigned Consumers For Given Group Id
func GetAssignedConsumersOfAGroup(groupId uint64) (Assignment *[]types.GetConsumerAssignment, exception error) {
func GetAssignedConsumersOfAGroup(groupId uint) (Assignment *[]types.GetConsumerAssignment, exception error) {

DB := database.DBManager.SqliteDB
model := model.ConsumerAssignment{}
Expand All @@ -30,6 +32,49 @@ func GetAssignedConsumersOfAGroup(groupId uint64) (Assignment *[]types.GetConsum

}

// Get All Assigned Consumers For Given Slice Of Group Ids Or All For All Active Groups
func GetAllAssignedConsumers(groupIds *[]uint) (Assignment *[]types.GetConsumerAssignment, exception error) {

DB := database.DBManager.SqliteDB
consumerAssignment := model.ConsumerAssignment{}
responseInstance := &[]types.GetConsumerAssignment{}

query := fmt.Sprintf(`
SELECT
assignment.id AS id,
topic.id AS topic_id,
partition.id AS partition_id,
consumer.group_id AS group_id,
consumer.consumer_id AS consumer_id,
assignment.created_at AS created_at
FROM
%s AS assignment
JOIN %s AS consumer ON assignment.consumer_id = consumer.consumer_id
JOIN %s AS partition ON consumer.consumer_id = partition.id
JOIN %s AS topic ON partition.topic_id = topic.id
JOIN %s AS group ON consumer.group_id = group.id
WHERE
group.is_active = true
`,
consumerAssignment.TableName(),
consumerAssignment.Consumer.TableName(),
consumerAssignment.Partition.TableName(),
consumerAssignment.Partition.Topic.TableName(),
consumerAssignment.Consumer.Group.TableName(),
)

if groupIds != nil {
query += " AND consumer.group_id IN (?)"
}

queryResponse := DB.Raw(query, groupIds).Scan(responseInstance)

return responseInstance, queryResponse.Error

}

// Create A ConsumerAssignment For Given Consumer And Partition
func AssignPartitionToConsumer(instance model.ConsumerAssignment) (Assignment model.ConsumerAssignment, exception error) {

Expand All @@ -45,18 +90,18 @@ func DeleteConsumerAssignment(consumerId uint64) (exception error) {

model := model.ConsumerAssignment{}
DB := database.DBManager.SqliteDB
queryResponse := DB.Table(model.TableName()).Where("consumer_id = ?", consumerId).Delete(model)
queryResponse := DB.Table(model.TableName()).Where("consumer_id = ?", consumerId).Update("is_deleted", true)

return queryResponse.Error

}

// Delete ConsumerAssignment For Given Consumer Ids
func DeleteAllConsumerAssignment(consumerIds []uint64) (exception error) {
func DeleteAllConsumerAssignment(consumerIds []uint) (exception error) {

model := model.ConsumerAssignment{}
DB := database.DBManager.SqliteDB
queryResponse := DB.Table(model.TableName()).Where("consumer_id IN ?", consumerIds).Delete(model)
queryResponse := DB.Table(model.TableName()).Where("consumer_id IN ?", consumerIds).Update("is_deleted", true)

return queryResponse.Error

Expand All @@ -77,7 +122,7 @@ func GetLastAssignedPartition(topicId int, partitionId int) (LastAssignedPartiti
}

// Get LastAssignedPartition With Given Topic
func GetLastAssignedPartitionForTopic(topicId uint64) (LastAssignedPartition *types.GetLastAssignedPartition, exception error) {
func GetLastAssignedPartitionForTopic(topicId uint) (LastAssignedPartition *types.GetLastAssignedPartition, exception error) {

DB := database.DBManager.SqliteDB
model := model.LastAssignedPartition{TopicID: topicId}
Expand Down
28 changes: 26 additions & 2 deletions app/repository/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// Get Consumer With Given PK And Group Id
func GetConsumer(ConsumerId uint64, groupId uint64) (Consumer *types.GetConsumer, exception error) {
func GetConsumer(ConsumerId uint, groupId uint) (Consumer *types.GetConsumer, exception error) {

model := model.Consumer{}
DB := database.DBManager.SqliteDB
Expand All @@ -18,8 +18,20 @@ func GetConsumer(ConsumerId uint64, groupId uint64) (Consumer *types.GetConsumer

}

// Get Consumer With Given Group Ids
func GetAllConsumersOfGroups(groupIds []uint) (Consumer *[]types.GetConsumer, exception error) {

model := model.Consumer{}
DB := database.DBManager.SqliteDB
responseInstance := &[]types.GetConsumer{}
queryResponse := DB.Table(model.TableName()).Where("group_id IN ?", groupIds).Find(responseInstance)

return responseInstance, queryResponse.Error

}

// Get Consumer With Given PK And Group Id
func GetConsumersOfAGroup(groupId uint64) (Consumer *[]types.GetConsumer, exception error) {
func GetConsumersOfAGroup(groupId uint) (Consumer *[]types.GetConsumer, exception error) {

model := model.Consumer{}
DB := database.DBManager.SqliteDB
Expand Down Expand Up @@ -52,6 +64,18 @@ func GetConsumerGroup(name string, isActive bool) (ConsumerGroup *types.GetConsu

}

// Get All Active Consumer Groups
func GetAllActiveConsumerGroup(isActive bool) (ConsumerGroup *[]types.GetConsumerGroup, exception error) {

model := model.ConsumerGroup{}
DB := database.DBManager.SqliteDB
responseInstance := &[]types.GetConsumerGroup{}
queryResponse := DB.Table(model.TableName()).Where("is_active = ?", isActive).Find(responseInstance)

return responseInstance, queryResponse.Error

}

// Add New ConsumerGroup
func AddNewConsumerGroup(instance *model.ConsumerGroup) (ConsumerGroup *model.ConsumerGroup, exception error) {

Expand Down
2 changes: 1 addition & 1 deletion app/repository/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// Get Messages For Given Partition And Offset >= Given Offset Number With Given Limit
func GetMessages(partitionId uint64, offset uint64, limit int) (Messages *[]types.GetMessage, exception error) {
func GetMessages(partitionId uint, offset uint64, limit int) (Messages *[]types.GetMessage, exception error) {

model := model.Message{}
DB := database.DBManager.SqliteDB
Expand Down
4 changes: 2 additions & 2 deletions app/repository/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// Get Offset For Given Consumer And Partition
func GetConsumerOffset(consumerId uint64, partitionId uint64) (Offset *types.GetOffset, exception error) {
func GetConsumerOffset(consumerId uint, partitionId uint) (Offset *types.GetOffset, exception error) {

model := model.Offset{}
DB := database.DBManager.SqliteDB
Expand All @@ -20,7 +20,7 @@ func GetConsumerOffset(consumerId uint64, partitionId uint64) (Offset *types.Get
}

// Update Offset For Given Consumer And Partition
func UpdateOffset(consumerId uint64, partitionId uint64, newOffset uint64) (exception error) {
func UpdateOffset(consumerId uint, partitionId uint, newOffset uint64) (exception error) {

model := model.Offset{}
DB := database.DBManager.SqliteDB
Expand Down
4 changes: 2 additions & 2 deletions app/repository/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// Get Partition With Given Topic Id And Partition Id
func GetPartition(topicId uint64, partitionId uint64) (Partition *types.GetPartition, exception error) {
func GetPartition(topicId uint, partitionId uint) (Partition *types.GetPartition, exception error) {

model := model.Partition{}
DB := database.DBManager.SqliteDB
Expand All @@ -19,7 +19,7 @@ func GetPartition(topicId uint64, partitionId uint64) (Partition *types.GetParti
}

// Get Partition With Given Topic Id
func GetPartitionByTopicId(topicId uint64) (Partition *[]types.GetPartition, exception error) {
func GetPartitionByTopicId(topicId uint) (Partition *[]types.GetPartition, exception error) {

model := model.Partition{}
DB := database.DBManager.SqliteDB
Expand Down
Loading

0 comments on commit 54b9df3

Please sign in to comment.