Skip to content

Commit

Permalink
Impression per toggle implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sanzmauro committed Dec 11, 2024
1 parent 1178dcf commit 271390f
Show file tree
Hide file tree
Showing 9 changed files with 382 additions and 103 deletions.
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ module github.com/splitio/go-client/v6
go 1.18

require (
github.com/splitio/go-split-commons/v6 v6.0.0
github.com/splitio/go-split-commons/v6 v6.0.3-0.20241211200100-36a7e1a3eeb9
github.com/splitio/go-toolkit/v5 v5.4.0
)

require (
github.com/bits-and-blooms/bitset v1.3.1 // indirect
github.com/bits-and-blooms/bloom/v3 v3.3.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.0.4 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/sync v0.3.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
14 changes: 12 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,24 @@ github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc=
github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/splitio/go-split-commons/v6 v6.0.0 h1:qenr5qbXafjvM832C64CVpjtlShuQiWCwtR5I2h4ogM=
github.com/splitio/go-split-commons/v6 v6.0.0/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc=
github.com/splitio/go-split-commons/v6 v6.0.2 h1:uvrNjyGCOHUjxVTB1pDUA+UB20WypoxpeLkgkgX5v+U=
github.com/splitio/go-split-commons/v6 v6.0.2/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
github.com/splitio/go-split-commons/v6 v6.0.3-0.20241211200100-36a7e1a3eeb9 h1:H5ALS64HcbNiEQyT8fLKPck9qEMpSjwM9DYeG0bvvTg=
github.com/splitio/go-split-commons/v6 v6.0.3-0.20241211200100-36a7e1a3eeb9/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM=
github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg=
github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
Expand Down
38 changes: 25 additions & 13 deletions splitio/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type SplitClient struct {
validator inputValidation
factory *SplitFactory
impressionListener *impressionlistener.WrapperImpressionListener
impressionManager provisional.ImpressionManager
impressionManager *provisional.ImpressionManagerImpl
initTelemetry storage.TelemetryConfigProducer
evaluationTelemetry storage.TelemetryEvaluationProducer
runtimeTelemetry storage.TelemetryRuntimeProducer
Expand All @@ -62,9 +62,10 @@ func (c *SplitClient) getEvaluationResult(matchingKey string, bucketingKey *stri
c.logger.Warning(fmt.Sprintf("%s: the SDK is not ready, results may be incorrect for feature flag %s. Make sure to wait for SDK readiness before using this method", operation, featureFlag))
c.initTelemetry.RecordNonReadyUsage()
return &evaluator.Result{
Treatment: evaluator.Control,
Label: impressionlabels.ClientNotReady,
Config: nil,
Treatment: evaluator.Control,
Label: impressionlabels.ClientNotReady,
Config: nil,
TrackImpression: true,
}
}

Expand All @@ -82,9 +83,10 @@ func (c *SplitClient) getEvaluationsResult(matchingKey string, bucketingKey *str
}
for _, featureFlag := range featureFlags {
result.Evaluations[featureFlag] = evaluator.Result{
Treatment: evaluator.Control,
Label: impressionlabels.ClientNotReady,
Config: nil,
Treatment: evaluator.Control,
Label: impressionlabels.ClientNotReady,
Config: nil,
TrackImpression: true,
}
}
return result
Expand Down Expand Up @@ -114,14 +116,16 @@ func (c *SplitClient) createImpression(featureFlag string, bucketingKey *string,
}

// storeData stores impression, runs listener and stores metrics
func (c *SplitClient) storeData(impressions []dtos.Impression, attributes map[string]interface{}, metricsLabel string, evaluationTime time.Duration) {
func (c *SplitClient) storeData(impressions []dtos.ImpressionDecorated, attributes map[string]interface{}, metricsLabel string, evaluationTime time.Duration) {
// Store impression
if c.impressions != nil {
forLog, forListener := c.impressionManager.ProcessImpressions(impressions)
listenerEnabled := c.impressionListener != nil

forLog, forListener := c.impressionManager.Process(impressions, listenerEnabled)
c.impressions.LogImpressions(forLog)

// Custom Impression Listener
if c.impressionListener != nil {
if listenerEnabled {
c.impressionListener.SendDataToClient(forListener, attributes)
}
} else {
Expand Down Expand Up @@ -177,7 +181,12 @@ func (c *SplitClient) doTreatmentCall(key interface{}, featureFlag string, attri
}

c.storeData(
[]dtos.Impression{c.createImpression(featureFlag, bucketingKey, evaluationResult.Label, matchingKey, evaluationResult.Treatment, evaluationResult.SplitChangeNumber)},
[]dtos.ImpressionDecorated{
{
Impression: c.createImpression(featureFlag, bucketingKey, evaluationResult.Label, matchingKey, evaluationResult.Treatment, evaluationResult.SplitChangeNumber),
Track: evaluationResult.TrackImpression,
},
},
attributes,
metricsLabel,
evaluationResult.EvaluationTime,
Expand Down Expand Up @@ -218,7 +227,7 @@ func (c *SplitClient) generateControlTreatments(featureFlagNames []string, opera
}

func (c *SplitClient) processResult(result evaluator.Results, operation string, bucketingKey *string, matchingKey string, attributes map[string]interface{}, metricsLabel string) (t map[string]TreatmentResult) {
var bulkImpressions []dtos.Impression
var bulkImpressions []dtos.ImpressionDecorated
treatments := make(map[string]TreatmentResult)
for feature, evaluation := range result.Evaluations {
if !c.validator.IsSplitFound(evaluation.Label, feature, operation) {
Expand All @@ -227,7 +236,10 @@ func (c *SplitClient) processResult(result evaluator.Results, operation string,
Config: nil,
}
} else {
bulkImpressions = append(bulkImpressions, c.createImpression(feature, bucketingKey, evaluation.Label, matchingKey, evaluation.Treatment, evaluation.SplitChangeNumber))
bulkImpressions = append(bulkImpressions, dtos.ImpressionDecorated{
Impression: c.createImpression(feature, bucketingKey, evaluation.Label, matchingKey, evaluation.Treatment, evaluation.SplitChangeNumber),
Track: evaluation.TrackImpression,
})

treatments[feature] = TreatmentResult{
Treatment: evaluation.Treatment,
Expand Down
2 changes: 1 addition & 1 deletion splitio/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2357,7 +2357,7 @@ func TestTelemetryMemory(t *testing.T) {
t.Error("It should queue one impression")
}
if dataInPost.ImpressionsDeduped != 1 {
t.Error("It should dedupe one impression")
t.Error("It should dedupe one impression. ", dataInPost.ImpressionsDeduped)
}
if dataInPost.EventsQueued != 1 {
t.Error("It should queue one event")
Expand Down
91 changes: 5 additions & 86 deletions splitio/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/splitio/go-client/v6/splitio"
"github.com/splitio/go-client/v6/splitio/conf"
impressionlistener "github.com/splitio/go-client/v6/splitio/impressionListener"
"github.com/splitio/go-client/v6/splitio/impressions"

config "github.com/splitio/go-split-commons/v6/conf"
"github.com/splitio/go-split-commons/v6/dtos"
Expand All @@ -26,16 +27,13 @@ import (
"github.com/splitio/go-split-commons/v6/service/api/specs"
"github.com/splitio/go-split-commons/v6/service/local"
"github.com/splitio/go-split-commons/v6/storage"
"github.com/splitio/go-split-commons/v6/storage/filter"
"github.com/splitio/go-split-commons/v6/storage/inmemory"
"github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap"
"github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue"
"github.com/splitio/go-split-commons/v6/storage/mocks"
"github.com/splitio/go-split-commons/v6/storage/redis"
"github.com/splitio/go-split-commons/v6/synchronizer"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/event"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/impression"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/impressionscount"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/segment"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/split"
"github.com/splitio/go-split-commons/v6/tasks"
Expand All @@ -51,17 +49,6 @@ const (
sdkInitializationFailed = -1
)

const (
bfExpectedElemenets = 10000000
bfFalsePositiveProbability = 0.01
bfCleaningPeriod = 86400 // 24 hours
uniqueKeysPeriodTaskInMemory = 900 // 15 min
uniqueKeysPeriodTaskRedis = 300 // 5 min
impressionsCountPeriodTaskInMemory = 1800 // 30 min
impressionsCountPeriodTaskRedis = 300 // 5 min
impressionsBulkSizeRedis = 100
)

type sdkStorages struct {
splits storage.SplitStorageConsumer
segments storage.SegmentStorageConsumer
Expand Down Expand Up @@ -89,7 +76,7 @@ type SplitFactory struct {
logger logging.LoggerInterface
syncManager synchronizer.Manager
telemetrySync telemetry.TelemetrySynchronizer // To execute SynchronizeInit
impressionManager provisional.ImpressionManager
impressionManager *provisional.ImpressionManagerImpl
}

// Client returns the split client instantiated by the factory
Expand Down Expand Up @@ -319,7 +306,7 @@ func setupInMemoryFactory(
}
splitTasks := synchronizer.SplitTasks{
SplitSyncTask: tasks.NewFetchSplitsTask(workers.SplitUpdater, cfg.TaskPeriods.SplitSync, logger),
SegmentSyncTask: tasks.NewFetchSegmentsTask(workers.SegmentUpdater, cfg.TaskPeriods.SegmentSync, advanced.SegmentWorkers, advanced.SegmentQueueSize, logger),
SegmentSyncTask: tasks.NewFetchSegmentsTask(workers.SegmentUpdater, cfg.TaskPeriods.SegmentSync, advanced.SegmentWorkers, advanced.SegmentQueueSize, logger, dummyHC),
EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, cfg.TaskPeriods.EventsSync, logger),
TelemetrySyncTask: tasks.NewRecordTelemetryTask(workers.TelemetryRecorder, cfg.TaskPeriods.TelemetrySync, logger),
}
Expand All @@ -339,7 +326,7 @@ func setupInMemoryFactory(
cfg.ImpressionsMode = config.ImpressionsModeOptimized
}

impressionManager, err := buildImpressionManager(cfg, advanced, logger, true, &splitTasks, &workers, storages, metadata, splitAPI)
impressionManager, err := impressions.BuildInMemoryManager(cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, storages.runtimeTelemetry, storages.impressionsConsumer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -433,7 +420,7 @@ func setupRedisFactory(apikey string, cfg *conf.SplitSdkConfig, logger logging.L
cfg.ImpressionsMode = config.ImpressionsModeDebug
}

impressionManager, err := buildImpressionManager(cfg, advanced, logger, false, &splitTasks, &workers, storages, metadata, nil)
impressionManager, err := impressions.BuildRedisManager(cfg, logger, &splitTasks, storages.initTelemetry, storages.impressionsCount, storages.runtimeTelemetry)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -593,74 +580,6 @@ func newFactory(apikey string, cfg conf.SplitSdkConfig, logger logging.LoggerInt
return splitFactory, nil
}

func buildImpressionManager(
cfg *conf.SplitSdkConfig,
advanced config.AdvancedConfig,
logger logging.LoggerInterface,
inMemory bool,
splitTasks *synchronizer.SplitTasks,
workers *synchronizer.Workers,
storages sdkStorages,
metadata dtos.Metadata,
splitAPI *api.SplitAPI,
) (provisional.ImpressionManager, error) {
listenerEnabled := cfg.Advanced.ImpressionListener != nil
switch cfg.ImpressionsMode {
case config.ImpressionsModeNone:
impressionsCounter := strategy.NewImpressionsCounter()
filter := filter.NewBloomFilter(bfExpectedElemenets, bfFalsePositiveProbability)
uniqueKeysTracker := strategy.NewUniqueKeysTracker(filter)

if inMemory {
workers.ImpressionsCountRecorder = impressionscount.NewRecorderSingle(impressionsCounter, splitAPI.ImpressionRecorder, metadata, logger, storages.runtimeTelemetry)
splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(workers.ImpressionsCountRecorder, logger, impressionsCountPeriodTaskInMemory)
splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(workers.TelemetryRecorder, uniqueKeysTracker, uniqueKeysPeriodTaskInMemory, logger)
} else {
telemetryRecorder := telemetry.NewSynchronizerRedis(storages.initTelemetry, logger)
impressionsCountRecorder := impressionscount.NewRecorderRedis(impressionsCounter, storages.impressionsCount, logger)
splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(impressionsCountRecorder, logger, impressionsCountPeriodTaskRedis)
splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(telemetryRecorder, uniqueKeysTracker, uniqueKeysPeriodTaskRedis, logger)
}

splitTasks.CleanFilterTask = tasks.NewCleanFilterTask(filter, logger, bfCleaningPeriod)
impressionsStrategy := strategy.NewNoneImpl(impressionsCounter, uniqueKeysTracker, listenerEnabled)

return provisional.NewImpressionManager(impressionsStrategy), nil
case config.ImpressionsModeDebug:
if inMemory {
workers.ImpressionRecorder = impression.NewRecorderSingle(storages.impressionsConsumer, splitAPI.ImpressionRecorder, logger, metadata, cfg.ImpressionsMode, storages.runtimeTelemetry)
splitTasks.ImpressionSyncTask = tasks.NewRecordImpressionsTask(workers.ImpressionRecorder, cfg.TaskPeriods.ImpressionSync, logger, advanced.ImpressionsBulkSize)
}
impressionObserver, err := strategy.NewImpressionObserver(500)
if err != nil {
return nil, err
}
impressionsStrategy := strategy.NewDebugImpl(impressionObserver, listenerEnabled)

return provisional.NewImpressionManager(impressionsStrategy), nil
default:
impressionsCounter := strategy.NewImpressionsCounter()

if inMemory {
workers.ImpressionsCountRecorder = impressionscount.NewRecorderSingle(impressionsCounter, splitAPI.ImpressionRecorder, metadata, logger, storages.runtimeTelemetry)
workers.ImpressionRecorder = impression.NewRecorderSingle(storages.impressionsConsumer, splitAPI.ImpressionRecorder, logger, metadata, cfg.ImpressionsMode, storages.runtimeTelemetry)
splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(workers.ImpressionsCountRecorder, logger, impressionsCountPeriodTaskInMemory)
splitTasks.ImpressionSyncTask = tasks.NewRecordImpressionsTask(workers.ImpressionRecorder, cfg.TaskPeriods.ImpressionSync, logger, advanced.ImpressionsBulkSize)
} else {
workers.ImpressionsCountRecorder = impressionscount.NewRecorderRedis(impressionsCounter, storages.impressionsCount, logger)
splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(workers.ImpressionsCountRecorder, logger, impressionsCountPeriodTaskRedis)
}

impressionObserver, err := strategy.NewImpressionObserver(500)
if err != nil {
return nil, err
}
impressionsStrategy := strategy.NewOptimizedImpl(impressionObserver, impressionsCounter, storages.runtimeTelemetry, listenerEnabled)

return provisional.NewImpressionManager(impressionsStrategy), nil
}
}

func printWarnings(logger logging.LoggerInterface, errs []error) {
if len(errs) != 0 {
for _, err := range errs {
Expand Down
2 changes: 2 additions & 0 deletions splitio/client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type SplitView struct {
Configs map[string]string `json:"configs"`
DefaultTreatment string `json:"defaultTreatment"`
Sets []string `json:"sets"`
TrackImpressions bool `json:"trackImpressions"`
}

func newSplitView(splitDto *dtos.SplitDTO) *SplitView {
Expand All @@ -49,6 +50,7 @@ func newSplitView(splitDto *dtos.SplitDTO) *SplitView {
Configs: splitDto.Configurations,
DefaultTreatment: splitDto.DefaultTreatment,
Sets: sets,
TrackImpressions: splitDto.TrackImpressions == nil || *splitDto.TrackImpressions,
}
}

Expand Down
Loading

0 comments on commit 271390f

Please sign in to comment.