From eebcf1d4d4fcf1670fbb0bad353f60b3182bd75e Mon Sep 17 00:00:00 2001 From: Ben Hartshorne Date: Fri, 3 Mar 2017 11:27:50 -0800 Subject: [PATCH] initial release --- LICENSE | 201 +++++++++++++++++++++++++++++++++++++++ NOTICE | 13 +++ README.md | 5 + avgsamplerate.go | 138 +++++++++++++++++++++++++++ avgsamplerate_test.go | 166 ++++++++++++++++++++++++++++++++ avgsamplewithmin.go | 156 ++++++++++++++++++++++++++++++ avgsamplewithmin_test.go | 184 +++++++++++++++++++++++++++++++++++ doc.go | 25 +++++ dynsampler.go | 11 +++ onlyonce.go | 67 +++++++++++++ onlyonce_test.go | 64 +++++++++++++ perkeythroughput.go | 91 ++++++++++++++++++ perkeythroughput_test.go | 183 +++++++++++++++++++++++++++++++++++ static.go | 28 ++++++ static_test.go | 21 ++++ totalthroughput.go | 100 +++++++++++++++++++ totalthroughput_test.go | 167 ++++++++++++++++++++++++++++++++ 17 files changed, 1620 insertions(+) create mode 100644 LICENSE create mode 100644 NOTICE create mode 100644 avgsamplerate.go create mode 100644 avgsamplerate_test.go create mode 100644 avgsamplewithmin.go create mode 100644 avgsamplewithmin_test.go create mode 100644 doc.go create mode 100644 dynsampler.go create mode 100644 onlyonce.go create mode 100644 onlyonce_test.go create mode 100644 perkeythroughput.go create mode 100644 perkeythroughput_test.go create mode 100644 static.go create mode 100644 static_test.go create mode 100644 totalthroughput.go create mode 100644 totalthroughput_test.go diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..9b25a98 --- /dev/null +++ b/NOTICE @@ -0,0 +1,13 @@ +Copyright (c) 2016-Present Honeycomb, Hound Technology, Inc. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/README.md b/README.md index 5e2d0d4..728576c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,7 @@ # dynsampler-go + Dynsampler is a golang library for doing dynamic sampling of traffic before sending it on to Honeycomb (or another analtics system) + +For full documentation, look at the [dynsampler godoc](https://godoc.org/github.com/honeycombio/dynsampler-go). + + diff --git a/avgsamplerate.go b/avgsamplerate.go new file mode 100644 index 0000000..1a03788 --- /dev/null +++ b/avgsamplerate.go @@ -0,0 +1,138 @@ +package dynsampler + +import ( + "math" + "sort" + "sync" + "time" +) + +// AvgSampleRate implements Sampler and attempts to average a given sample rate, +// weighting rare traffic and frequent traffic differently so as to end up with +// the correct average. This method breaks down when total traffic is low +// because it will be excessively sampled. +// +// Keys that occur only once within ClearFrequencySec will always have a sample +// rate of 1. Keys that occur more frequently will be sampled on a logarithmic +// curve. In other words, every key will be represented at least once per +// ClearFrequencySec and more frequent keys will have their sample rate +// increased proportionally to wind up with the goal sample rate. +type AvgSampleRate struct { + // ClearFrequencySec is how often the counters reset in seconds; default 30 + ClearFrequencySec int + + // GoalSampleRate is the average sample rate we're aiming for, across all + // events. Default 10 + GoalSampleRate int + + savedSampleRates map[string]int + currentCounts map[string]int + + lock sync.Mutex +} + +func (a *AvgSampleRate) Start() error { + // apply defaults + if a.ClearFrequencySec == 0 { + a.ClearFrequencySec = 30 + } + if a.GoalSampleRate == 0 { + a.GoalSampleRate = 10 + } + + // initialize internal variables + a.savedSampleRates = make(map[string]int) + a.currentCounts = make(map[string]int) + + // spin up calculator + go func() { + ticker := time.NewTicker(time.Second * time.Duration(a.ClearFrequencySec)) + for range ticker.C { + a.updateMaps() + } + }() + return nil +} + +// updateMaps calculates a new saved rate map based on the contents of the +// counter map +func (a *AvgSampleRate) updateMaps() { + // make a local copy of the sample counters for calculation + a.lock.Lock() + tmpCounts := a.currentCounts + a.currentCounts = make(map[string]int) + a.lock.Unlock() + // short circuit if no traffic + numKeys := len(tmpCounts) + if numKeys == 0 { + // no traffic the last 30s. clear the result map + a.savedSampleRates = make(map[string]int) + return + } + + // Goal events to send this interval is the total count of received events + // divided by the desired average sample rate + var sumEvents int + for _, count := range tmpCounts { + sumEvents += count + } + goalCount := float64(sumEvents) / float64(a.GoalSampleRate) + // goalRatio is the goalCount divided by the sum of all the log values - it + // determines what percentage of the total event space belongs to each key + var logSum float64 + for _, count := range tmpCounts { + logSum += math.Log10(float64(count)) + } + goalRatio := goalCount / logSum + + // must go through the keys in a fixed order to prevent rounding from changing + // results + keys := make([]string, len(tmpCounts)) + var i int + for k := range tmpCounts { + keys[i] = k + i++ + } + sort.Strings(keys) + + // goal number of events per key is goalRatio * key count, but never less than + // one. If a key falls below its goal, it gets a sample rate of 1 and the + // extra available events get passed on down the line. + newSavedSampleRates := make(map[string]int) + keysRemaining := len(tmpCounts) + var extra float64 + for _, key := range keys { + count := float64(tmpCounts[key]) + // take the max of 1 or my log10 share of the total + goalForKey := math.Max(1, math.Log10(count)*goalRatio) + // take this key's share of the extra and pass the rest along + extraForKey := extra / float64(keysRemaining) + goalForKey += extraForKey + extra -= extraForKey + keysRemaining-- + if count <= goalForKey { + // there are fewer samples than the allotted number for this key. set + // sample rate to 1 and redistribute the unused slots for future keys + newSavedSampleRates[key] = 1 + extra += goalForKey - count + } else { + // there are more samples than the allotted number. Sample this key enough + // to knock it under the limit (aka round up) + newSavedSampleRates[key] = int(math.Ceil(count / goalForKey)) + extra += goalForKey - (count / float64(newSavedSampleRates[key])) + } + } + a.savedSampleRates = newSavedSampleRates +} + +// GetSampleRate takes a key and returns the appropriate sample rate for that +// key +func (a *AvgSampleRate) GetSampleRate(key string) int { + a.lock.Lock() + defer a.lock.Unlock() + a.currentCounts[key]++ + if rate, found := a.savedSampleRates[key]; found { + return rate + } + return 1 +} diff --git a/avgsamplerate_test.go b/avgsamplerate_test.go new file mode 100644 index 0000000..97da16c --- /dev/null +++ b/avgsamplerate_test.go @@ -0,0 +1,166 @@ +package dynsampler + +import ( + "fmt" + "testing" + + "github.com/honeycombio/hound/test" +) + +func TestAvgSampleUpdateMaps(t *testing.T) { + a := &AvgSampleRate{ + GoalSampleRate: 20, + } + tsts := []struct { + inputSampleCount map[string]int + expectedSavedSampleRates map[string]int + }{ + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 8, + "six": 15, + "seven": 45, + "eight": 612, + "nine": 2000, + "ten": 10000, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 1, + "four": 1, + "five": 1, + "six": 1, + "seven": 1, + "eight": 6, + "nine": 14, + "ten": 47, + }, + }, + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 8, + "six": 15, + "seven": 45, + "eight": 50, + "nine": 60, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 8, + "six": 11, + "seven": 24, + "eight": 26, + "nine": 30, + }, + }, + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 7, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 7, + }, + }, + { + map[string]int{ + "one": 1000, + "two": 1000, + "three": 2000, + "four": 5000, + "five": 7000, + }, + map[string]int{ + "one": 7, + "two": 7, + "three": 13, + "four": 29, + "five": 39, + }, + }, + { + map[string]int{ + "one": 6000, + "two": 6000, + "three": 6000, + "four": 6000, + "five": 6000, + }, + map[string]int{ + "one": 20, + "two": 20, + "three": 20, + "four": 20, + "five": 20, + }, + }, + { + map[string]int{ + "one": 12000, + }, + map[string]int{ + "one": 20, + }, + }, + { + map[string]int{}, + map[string]int{}, + }, + } + for i, tst := range tsts { + a.currentCounts = tst.inputSampleCount + a.updateMaps() + test.Equals(t, len(a.currentCounts), 0) + test.Equals(t, a.savedSampleRates, tst.expectedSavedSampleRates, fmt.Sprintf("test %d failed", i)) + } +} + +func TestAvgSampleRateGetSampleRate(t *testing.T) { + a := &AvgSampleRate{} + a.currentCounts = map[string]int{ + "one": 5, + "two": 8, + } + a.savedSampleRates = map[string]int{ + "one": 10, + "two": 1, + "three": 5, + } + tsts := []struct { + inputKey string + expectedSampleRate int + expectedCurrentCountForKey int + }{ + {"one", 10, 6}, + {"two", 1, 9}, + {"two", 1, 10}, + {"three", 5, 1}, // key missing from current counts + {"three", 5, 2}, + {"four", 1, 1}, // key missing from current and saved counts + {"four", 1, 2}, + } + for _, tst := range tsts { + rate := a.GetSampleRate(tst.inputKey) + test.Equals(t, rate, tst.expectedSampleRate) + test.Equals(t, a.currentCounts[tst.inputKey], tst.expectedCurrentCountForKey) + } +} diff --git a/avgsamplewithmin.go b/avgsamplewithmin.go new file mode 100644 index 0000000..c5bc4ac --- /dev/null +++ b/avgsamplewithmin.go @@ -0,0 +1,156 @@ +package dynsampler + +import ( + "math" + "sort" + "sync" + "time" +) + +// AvgSampleWithMin implements Sampler and attempts to average a given sample +// rate, with a minimum number of events per second (i.e. it will reduce +// sampling if it would end up sending fewer than the mininum number of events). +// This method attempts to get the best of the normal average sample rate +// method, without the failings it shows on the low end of total traffic +// throughput +// +// Keys that occur only once within ClearFrequencySec will always have a sample +// rate of 1. Keys that occur more frequently will be sampled on a logarithmic +// curve. In other words, every key will be represented at least once per +// ClearFrequencySec and more frequent keys will have their sample rate +// increased proportionally to wind up with the goal sample rate. +type AvgSampleWithMin struct { + // ClearFrequencySec is how often the counters reset in seconds; default 30 + ClearFrequencySec int + + // GoalSampleRate is the average sample rate we're aiming for, across all + // events. Default 10 + GoalSampleRate int + + // MinEventsPerSec - when the total number of events drops below this + // threshold, sampling will cease. default 50 + MinEventsPerSec int + + savedSampleRates map[string]int + currentCounts map[string]int + + lock sync.Mutex +} + +func (a *AvgSampleWithMin) Start() error { + // apply defaults + if a.ClearFrequencySec == 0 { + a.ClearFrequencySec = 30 + } + if a.GoalSampleRate == 0 { + a.GoalSampleRate = 10 + } + if a.MinEventsPerSec == 0 { + a.MinEventsPerSec = 50 + } + + // initialize internal variables + a.savedSampleRates = make(map[string]int) + a.currentCounts = make(map[string]int) + + // spin up calculator + go func() { + ticker := time.NewTicker(time.Second * time.Duration(a.ClearFrequencySec)) + for range ticker.C { + a.updateMaps() + } + }() + return nil +} + +// updateMaps calculates a new saved rate map based on the contents of the +// counter map +func (a *AvgSampleWithMin) updateMaps() { + // make a local copy of the sample counters for calculation + a.lock.Lock() + tmpCounts := a.currentCounts + a.currentCounts = make(map[string]int) + a.lock.Unlock() + newSavedSampleRates := make(map[string]int) + // short circuit if no traffic + numKeys := len(tmpCounts) + if numKeys == 0 { + // no traffic the last 30s. clear the result map + a.savedSampleRates = newSavedSampleRates + return + } + + // Goal events to send this interval is the total count of received events + // divided by the desired average sample rate + var sumEvents int + for _, count := range tmpCounts { + sumEvents += count + } + goalCount := float64(sumEvents) / float64(a.GoalSampleRate) + // check to see if we fall below the minimum + if sumEvents < a.MinEventsPerSec*a.ClearFrequencySec { + // we still need to go through each key to set sample rates individually + for k := range tmpCounts { + newSavedSampleRates[k] = 1 + } + a.savedSampleRates = newSavedSampleRates + return + } + // goalRatio is the goalCount divided by the sum of all the log values - it + // determines what percentage of the total event space belongs to each key + var logSum float64 + for _, count := range tmpCounts { + logSum += math.Log10(float64(count)) + } + goalRatio := goalCount / logSum + + // must go through the keys in a fixed order to prevent rounding from changing + // results + keys := make([]string, len(tmpCounts)) + var i int + for k := range tmpCounts { + keys[i] = k + i++ + } + sort.Strings(keys) + + // goal number of events per key is goalRatio * key count, but never less than + // one. If a key falls below its goal, it gets a sample rate of 1 and the + // extra available events get passed on down the line. + keysRemaining := len(tmpCounts) + var extra float64 + for _, key := range keys { + count := float64(tmpCounts[key]) + // take the max of 1 or my log10 share of the total + goalForKey := math.Max(1, math.Log10(count)*goalRatio) + // take this key's share of the extra and pass the rest along + extraForKey := extra / float64(keysRemaining) + goalForKey += extraForKey + extra -= extraForKey + keysRemaining-- + if count <= goalForKey { + // there are fewer samples than the allotted number for this key. set + // sample rate to 1 and redistribute the unused slots for future keys + newSavedSampleRates[key] = 1 + extra += goalForKey - count + } else { + // there are more samples than the allotted number. Sample this key enough + // to knock it under the limit (aka round up) + newSavedSampleRates[key] = int(math.Ceil(count / goalForKey)) + extra += goalForKey - (count / float64(newSavedSampleRates[key])) + } + } + a.savedSampleRates = newSavedSampleRates +} + +// GetSampleRate takes a key and returns the appropriate sample rate for that +// key +func (a *AvgSampleWithMin) GetSampleRate(key string) int { + a.lock.Lock() + defer a.lock.Unlock() + a.currentCounts[key]++ + if rate, found := a.savedSampleRates[key]; found { + return rate + } + return 1 +} diff --git a/avgsamplewithmin_test.go b/avgsamplewithmin_test.go new file mode 100644 index 0000000..aad8cb5 --- /dev/null +++ b/avgsamplewithmin_test.go @@ -0,0 +1,184 @@ +package dynsampler + +import ( + "fmt" + "testing" + + "github.com/honeycombio/hound/test" +) + +func TestAvgSampleWithMinUpdateMaps(t *testing.T) { + a := &AvgSampleWithMin{ + GoalSampleRate: 20, + MinEventsPerSec: 50, + ClearFrequencySec: 30, + } + tsts := []struct { + inputSampleCount map[string]int + expectedSavedSampleRates map[string]int + }{ + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 8, + "six": 15, + "seven": 45, + "eight": 612, + "nine": 2000, + "ten": 10000, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 1, + "four": 1, + "five": 1, + "six": 1, + "seven": 1, + "eight": 6, + "nine": 14, + "ten": 47, + }, + }, + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 8, + "six": 15, + "seven": 45, + "eight": 50, + "nine": 60, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 1, + "four": 1, + "five": 1, + "six": 1, + "seven": 1, + "eight": 1, + "nine": 1, + }, + }, + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 7, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 1, + "four": 1, + "five": 1, + }, + }, + { + map[string]int{ + "one": 1, + }, + map[string]int{ + "one": 1, + }, + }, + { + map[string]int{ + "one": 8, + }, + map[string]int{ + "one": 1, + }, + }, + { + map[string]int{ + "one": 12000, + }, + map[string]int{ + "one": 20, + }, + }, + { + map[string]int{ + "one": 1000, + "two": 1000, + "three": 2000, + "four": 5000, + "five": 7000, + }, + map[string]int{ + "one": 7, + "two": 7, + "three": 13, + "four": 29, + "five": 39, + }, + }, + { + map[string]int{ + "one": 6000, + "two": 6000, + "three": 6000, + "four": 6000, + "five": 6000, + }, + map[string]int{ + "one": 20, + "two": 20, + "three": 20, + "four": 20, + "five": 20, + }, + }, + { + map[string]int{}, + map[string]int{}, + }, + } + for i, tst := range tsts { + a.currentCounts = tst.inputSampleCount + a.updateMaps() + test.Equals(t, len(a.currentCounts), 0) + test.Equals(t, a.savedSampleRates, tst.expectedSavedSampleRates, fmt.Sprintf("test %d failed", i)) + } +} + +func TestAvgSampleWithMinGetSampleRate(t *testing.T) { + a := &AvgSampleWithMin{} + a.currentCounts = map[string]int{ + "one": 5, + "two": 8, + } + a.savedSampleRates = map[string]int{ + "one": 10, + "two": 1, + "three": 5, + } + tsts := []struct { + inputKey string + expectedSampleRate int + expectedCurrentCountForKey int + }{ + {"one", 10, 6}, + {"two", 1, 9}, + {"two", 1, 10}, + {"three", 5, 1}, // key missing from current counts + {"three", 5, 2}, + {"four", 1, 1}, // key missing from current and saved counts + {"four", 1, 2}, + } + for _, tst := range tsts { + rate := a.GetSampleRate(tst.inputKey) + test.Equals(t, rate, tst.expectedSampleRate) + test.Equals(t, a.currentCounts[tst.inputKey], tst.expectedCurrentCountForKey) + } +} diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..062d17d --- /dev/null +++ b/doc.go @@ -0,0 +1,25 @@ +/* +Package dynsampler contains several sampling algorithms to help you select a representative set of events instead of a full stream. + +This package is intended to help sample a stream of tracking events, where events are typically created in response to a stream of traffic (for the purposes of logging or debugging). In general, sampling is used to reduce the total volume of events necessary to represent the stream of traffic in a meaningful way. + +For the purposes of these examples, the "traffic" will be a set of HTTP requests being handled by a server, and "event" will be a blob of metadata about a given HTTP request that might be useful to keep track of later. A "sample rate" of 100 means that for every 100 requests, we capture a single event and indicate that it represents 100 similar requests. + +Use + +Use the `Sampler` interface in your code. Each different sampling algorithm implements the Sampler interface. + +The following guidelines can help you choose a sampler. Depending on the shape of your traffic, one may serve better than another, or you may need to write a new one! Please consider contributing it back to this package if you do. + +* If your system has a completely homogeneous stream of requests: use `Static` with only the default set. + +* If your system has a steady stream of requests and a well-known low cardinality partition key (e.g. http status): use `Static` and override sample rates on a per-key basis (e.g. if you know want to sample `HTTP 200/OK` events ata different rate from `HTTP 503/Server Error`). + +* If your logging system has a strict cap on the rate it can receive events: use `TotalThroughput`, which will calculate sample rates based on keeping *the entire system's* representative event throughput right around (or under) particular cap. + +* If your system has a rough cap on the rate it can receive events and your partitioned keyspace is fairly steady, use `PerKeyThroughput`, which will calculate sample rates based on keeping the event throughput roughly constant *per key/partition* (e.g. per user id) + +* The best choice for a system with a large key space and a large disparity between the highest volume and lowest volume keys is AvgSampleRateWithMin - it will increase the sample rate of higher volume traffic proportionally to the logarithm of the specific key's volume. If total traffic falls below a configured minimum, it stops sampling to avoid any sampling when the traffic is too low to warrant it. + +*/ +package dynsampler diff --git a/dynsampler.go b/dynsampler.go new file mode 100644 index 0000000..e437bc0 --- /dev/null +++ b/dynsampler.go @@ -0,0 +1,11 @@ +package dynsampler + +// Sampler is the interface to samplers using different methods to determine +// sample rate. You should instantiate one of the actual samplers in this +// package, depending on the sample method you'd like to use. Each sampling +// method has its own set of struct variables you should set before Start()ing +// the sampler. +type Sampler interface { + Start() error + GetSampleRate(string) int +} diff --git a/onlyonce.go b/onlyonce.go new file mode 100644 index 0000000..0c16ef4 --- /dev/null +++ b/onlyonce.go @@ -0,0 +1,67 @@ +package dynsampler + +import ( + "sync" + "time" +) + +// OnlyOnce implements Sampler and returns a sample rate of 1 the first time a +// key is seen and 1,000,000,000 every subsequent time. Essentially, this means +// that every key will be reported the first time it's seen during each +// ClearFrequencySec and never again. Set ClearFrequencySec to -1 to report +// each key only once for the life of the process. +// +// (Note that it's not guaranteed that each key will be reported exactly once, +// just that the first seen event will be reported and subsequent events are +// unlikely to be reported. It is probable that an additional event will be +// reported for every billion times the key appears.) +// +// This emulates what you might expect from something catching stack traces - +// the first one is important but every subsequent one just repeats the same +// information. +type OnlyOnce struct { + // ClearFrequencySec is how often the counters reset in seconds; default 30 + ClearFrequencySec int + + seen map[string]bool + lock sync.Mutex +} + +// Start initializes the static dynsampler +func (o *OnlyOnce) Start() error { + // + if o.ClearFrequencySec == -1 { + return nil + } + if o.ClearFrequencySec == 0 { + o.ClearFrequencySec = 30 + } + o.seen = make(map[string]bool) + + // spin up calculator + go func() { + ticker := time.NewTicker(time.Second * time.Duration(o.ClearFrequencySec)) + for range ticker.C { + o.updateMaps() + } + }() + return nil +} + +func (o *OnlyOnce) updateMaps() { + o.lock.Lock() + defer o.lock.Unlock() + o.seen = make(map[string]bool) +} + +// GetSampleRate takes a key and returns the appropriate sample rate for that +// key +func (o *OnlyOnce) GetSampleRate(key string) int { + o.lock.Lock() + defer o.lock.Unlock() + if _, found := o.seen[key]; found { + return 1000000000 + } + o.seen[key] = true + return 1 +} diff --git a/onlyonce_test.go b/onlyonce_test.go new file mode 100644 index 0000000..2e5ec48 --- /dev/null +++ b/onlyonce_test.go @@ -0,0 +1,64 @@ +package dynsampler + +import ( + "fmt" + "testing" + + "github.com/honeycombio/hound/test" +) + +func TestOnlyOnceUpdateMaps(t *testing.T) { + o := &OnlyOnce{ + ClearFrequencySec: 30, + } + tsts := []struct { + inputSeen map[string]bool + expectedSeen map[string]bool + }{ + { + map[string]bool{ + "one": true, + "two": true, + "three": true, + }, + map[string]bool{}, + }, + { + map[string]bool{}, + map[string]bool{}, + }, + } + for i, tst := range tsts { + o.seen = tst.inputSeen + o.updateMaps() + test.Equals(t, o.seen, tst.expectedSeen, fmt.Sprintf("test %d failed", i)) + } +} + +func TestOnlyOnceGetSampleRate(t *testing.T) { + o := &OnlyOnce{} + o.seen = map[string]bool{ + "one": true, + "two": true, + } + tsts := []struct { + inputKey string + expectedSampleRate int + expectedCurrentCountForKeyBefore bool + expectedCurrentCountForKeyAfter bool + }{ + {"one", 1000000000, true, true}, + {"two", 1000000000, true, true}, + {"two", 1000000000, true, true}, + {"three", 1, false, true}, // key missing from seen + {"three", 1000000000, true, true}, + {"four", 1, false, true}, // key missing from seen + {"four", 1000000000, true, true}, + } + for _, tst := range tsts { + test.Equals(t, o.seen[tst.inputKey], tst.expectedCurrentCountForKeyBefore) + rate := o.GetSampleRate(tst.inputKey) + test.Equals(t, rate, tst.expectedSampleRate) + test.Equals(t, o.seen[tst.inputKey], tst.expectedCurrentCountForKeyAfter) + } +} diff --git a/perkeythroughput.go b/perkeythroughput.go new file mode 100644 index 0000000..35898c2 --- /dev/null +++ b/perkeythroughput.go @@ -0,0 +1,91 @@ +package dynsampler + +import ( + "math" + "sync" + "time" +) + +// PerKeyThroughput implements Sampler and attempts to meet a goal of a fixed +// number of events per key per second sent to Honeycomb. +// +// This method is to guarantee that at most a certain number of events per key +// get transmitted, no matter how many keys you have or how much traffic comes +// through. In other words, if capturing a minimum amount of traffic per key is +// important but beyond that doesn't matter much, this is the best method. +type PerKeyThroughput struct { + // ClearFrequency is how often the counters reset in seconds; default 30 + ClearFrequencySec int + + // PerKeyThroughputPerSec is the target number of events to send per second + // per key. Sample rates are generated on a per key basis to squash the + // throughput down to match the goal throughput. default 10 + PerKeyThroughputPerSec int + + savedSampleRates map[string]int + currentCounts map[string]int + + lock sync.Mutex +} + +func (p *PerKeyThroughput) Start() error { + // apply defaults + if p.ClearFrequencySec == 0 { + p.ClearFrequencySec = 30 + } + if p.PerKeyThroughputPerSec == 0 { + p.PerKeyThroughputPerSec = 10 + } + + // initialize internal variables + p.savedSampleRates = make(map[string]int) + p.currentCounts = make(map[string]int) + + // spin up calculator + go func() { + ticker := time.NewTicker(time.Second * time.Duration(p.ClearFrequencySec)) + for range ticker.C { + p.updateMaps() + } + }() + return nil +} + +// updateMaps calculates a new saved rate map based on the contents of the +// counter map +func (p *PerKeyThroughput) updateMaps() { + // make a local copy of the sample counters for calculation + p.lock.Lock() + tmpCounts := p.currentCounts + p.currentCounts = make(map[string]int) + p.lock.Unlock() + // short circuit if no traffic + numKeys := len(tmpCounts) + if numKeys == 0 { + // no traffic the last 30s. clear the result map + p.savedSampleRates = make(map[string]int) + return + } + actualPerKeyRate := p.PerKeyThroughputPerSec * p.ClearFrequencySec + // for each key, calculate sample rate by dividing counted events by the + // desired number of events + newSavedSampleRates := make(map[string]int) + for k, v := range tmpCounts { + rate := int(math.Max(1, (float64(v) / float64(actualPerKeyRate)))) + newSavedSampleRates[k] = rate + } + // save newly calculated sample rates + p.savedSampleRates = newSavedSampleRates +} + +// GetSampleRate takes a key and returns the appropriate sample rate for that +// key +func (p *PerKeyThroughput) GetSampleRate(key string) int { + p.lock.Lock() + defer p.lock.Unlock() + p.currentCounts[key]++ + if rate, found := p.savedSampleRates[key]; found { + return rate + } + return 1 +} diff --git a/perkeythroughput_test.go b/perkeythroughput_test.go new file mode 100644 index 0000000..597aa8e --- /dev/null +++ b/perkeythroughput_test.go @@ -0,0 +1,183 @@ +package dynsampler + +import ( + "fmt" + "testing" + + "github.com/honeycombio/hound/test" +) + +func TestPerKeyThroughputUpdateMaps(t *testing.T) { + p := &PerKeyThroughput{ + ClearFrequencySec: 30, + PerKeyThroughputPerSec: 5, + } + tsts := []struct { + inputCount map[string]int + expectedSavedSampleRates map[string]int + }{ + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 8, + "six": 15, + "seven": 45, + "eight": 612, + "nine": 2000, + "ten": 10000, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 1, + "four": 1, + "five": 1, + "six": 1, + "seven": 1, + "eight": 4, + "nine": 13, + "ten": 66, + }, + }, + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 8, + "six": 15, + "seven": 45, + "eight": 50, + "nine": 60, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 1, + "four": 1, + "five": 1, + "six": 1, + "seven": 1, + "eight": 1, + "nine": 1, + }, + }, + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 7, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 1, + "four": 1, + "five": 1, + }, + }, + { + map[string]int{ + "one": 1000, + "two": 1000, + "three": 2000, + "four": 5000, + "five": 7000, + }, + map[string]int{ + "one": 6, + "two": 6, + "three": 13, + "four": 33, + "five": 46, + }, + }, + { + map[string]int{ + "one": 1000, + "two": 1000, + "three": 2000, + "four": 5000, + "five": 70000, + }, + map[string]int{ + "one": 6, + "two": 6, + "three": 13, + "four": 33, + "five": 466, + }, + }, + { + map[string]int{ + "one": 6000, + "two": 6000, + "three": 6000, + "four": 6000, + "five": 6000, + }, + map[string]int{ + "one": 40, + "two": 40, + "three": 40, + "four": 40, + "five": 40, + }, + }, + { + map[string]int{ + "one": 12000, + }, + map[string]int{ + "one": 80, + }, + }, + { + map[string]int{}, + map[string]int{}, + }, + } + for i, tst := range tsts { + p.currentCounts = tst.inputCount + p.updateMaps() + test.Equals(t, len(p.currentCounts), 0) + test.Equals(t, p.savedSampleRates, tst.expectedSavedSampleRates, fmt.Sprintf("test %d failed", i)) + } +} + +func TestPerKeyThroughputGetSampleRate(t *testing.T) { + p := &PerKeyThroughput{} + p.currentCounts = map[string]int{ + "one": 5, + "two": 8, + } + p.savedSampleRates = map[string]int{ + "one": 10, + "two": 1, + "three": 5, + } + tsts := []struct { + inputKey string + expectedSampleRate int + expectedCurrentCountForKey int + }{ + {"one", 10, 6}, + {"two", 1, 9}, + {"two", 1, 10}, + {"three", 5, 1}, // key missing from current counts + {"three", 5, 2}, + {"four", 1, 1}, // key missing from current and saved counts + {"four", 1, 2}, + } + for _, tst := range tsts { + rate := p.GetSampleRate(tst.inputKey) + test.Equals(t, rate, tst.expectedSampleRate) + test.Equals(t, p.currentCounts[tst.inputKey], tst.expectedCurrentCountForKey) + } +} diff --git a/static.go b/static.go new file mode 100644 index 0000000..c2f3275 --- /dev/null +++ b/static.go @@ -0,0 +1,28 @@ +package dynsampler + +// Static implements Sampler with a static mapping for sample rates. This is +// useful if you have a known set of keys that you want to sample at specific +// rates and apply a default to everything else. +type Static struct { + // Rates is the set of sample rates to use + Rates map[string]int + // Default is the value to use if the key is not whitelisted in Rates + Default int +} + +// Start initializes the static dynsampler +func (s *Static) Start() error { + if s.Default == 0 { + s.Default = 1 + } + return nil +} + +// GetSampleRate takes a key and returns the appropriate sample rate for that +// key +func (s *Static) GetSampleRate(key string) int { + if rate, found := s.Rates[key]; found { + return rate + } + return s.Default +} diff --git a/static_test.go b/static_test.go new file mode 100644 index 0000000..cb756ab --- /dev/null +++ b/static_test.go @@ -0,0 +1,21 @@ +package dynsampler + +import ( + "testing" + + "github.com/honeycombio/hound/test" +) + +func TestStaticGetSampleRate(t *testing.T) { + s := &Static{ + Rates: map[string]int{ + "one": 5, + "two": 10, + }, + Default: 3, + } + test.Equals(t, s.GetSampleRate("one"), 5) + test.Equals(t, s.GetSampleRate("two"), 10) + test.Equals(t, s.GetSampleRate("three"), 3) + +} diff --git a/totalthroughput.go b/totalthroughput.go new file mode 100644 index 0000000..effcf53 --- /dev/null +++ b/totalthroughput.go @@ -0,0 +1,100 @@ +package dynsampler + +import ( + "math" + "sync" + "time" +) + +// TotalThroughput implements Sampler and attempts to meet a goal of a fixed +// number of events per second sent to Honeycomb. +// +// If your key space is sharded across different servers, this is a good method +// for making sure each server sends roughly the same volume of content to +// Honeycomb. It performs poorly when active the keyspace is very large. +// +// GoalThroughputSec * ClearFrequencySec defines the upper limit of the number +// of keys that can be reported and stay under the goal, but with that many +// keys, you'll only get one event per key per ClearFrequencySec, which is very +// coarse. You should aim for at least 1 event per key per sec to 1 event per +// key per 10sec to get reasonable data. In other words, the number of active +// keys should be less than 10*GoalThroughputSec. +type TotalThroughput struct { + // ClearFrequency is how often the counters reset in seconds; default 30 + ClearFrequencySec int + + // GoalThroughputPerSec is the target number of events to send per second. + // Sample rates are generated to squash the total throughput down to match the + // goal throughput. Actual throughput may exceed goal throughput. default 100 + GoalThroughputPerSec int + + savedSampleRates map[string]int + currentCounts map[string]int + + lock sync.Mutex +} + +func (t *TotalThroughput) Start() error { + // apply defaults + if t.ClearFrequencySec == 0 { + t.ClearFrequencySec = 30 + } + if t.GoalThroughputPerSec == 0 { + t.GoalThroughputPerSec = 100 + } + + // initialize internal variables + t.savedSampleRates = make(map[string]int) + t.currentCounts = make(map[string]int) + + // spin up calculator + go func() { + ticker := time.NewTicker(time.Second * time.Duration(t.ClearFrequencySec)) + for range ticker.C { + t.updateMaps() + } + }() + return nil +} + +// updateMaps calculates a new saved rate map based on the contents of the +// counter map +func (t *TotalThroughput) updateMaps() { + // make a local copy of the sample counters for calculation + t.lock.Lock() + tmpCounts := t.currentCounts + t.currentCounts = make(map[string]int) + t.lock.Unlock() + // short circuit if no traffic + numKeys := len(tmpCounts) + if numKeys == 0 { + // no traffic the last 30s. clear the result map + t.savedSampleRates = make(map[string]int) + return + } + // figure out our target throughput per key over ClearFrequencySec + totalGoalThroughput := t.GoalThroughputPerSec * t.ClearFrequencySec + // floor the throughput but min should be 1 event per bucket per time period + throughputPerKey := int(math.Max(1, float64(totalGoalThroughput)/float64(numKeys))) + // for each key, calculate sample rate by dividing counted events by the + // desired number of events + newSavedSampleRates := make(map[string]int) + for k, v := range tmpCounts { + rate := int(math.Max(1, (float64(v) / float64(throughputPerKey)))) + newSavedSampleRates[k] = rate + } + // save newly calculated sample rates + t.savedSampleRates = newSavedSampleRates +} + +// GetSampleRate takes a key and returns the appropriate sample rate for that +// key +func (t *TotalThroughput) GetSampleRate(key string) int { + t.lock.Lock() + defer t.lock.Unlock() + t.currentCounts[key]++ + if rate, found := t.savedSampleRates[key]; found { + return rate + } + return 1 +} diff --git a/totalthroughput_test.go b/totalthroughput_test.go new file mode 100644 index 0000000..9bb0e26 --- /dev/null +++ b/totalthroughput_test.go @@ -0,0 +1,167 @@ +package dynsampler + +import ( + "fmt" + "testing" + + "github.com/honeycombio/hound/test" +) + +func TestTotalThroughputUpdateMaps(t *testing.T) { + s := &TotalThroughput{ + ClearFrequencySec: 30, + GoalThroughputPerSec: 20, + } + tsts := []struct { + inputSampleCount map[string]int + expectedSavedSampleRates map[string]int + }{ + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 8, + "six": 15, + "seven": 45, + "eight": 612, + "nine": 2000, + "ten": 10000, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 1, + "four": 1, + "five": 1, + "six": 1, + "seven": 1, + "eight": 10, + "nine": 33, + "ten": 166, + }, + }, + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 8, + "six": 15, + "seven": 45, + "eight": 50, + "nine": 60, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 1, + "four": 1, + "five": 1, + "six": 1, + "seven": 1, + "eight": 1, + "nine": 1, + }, + }, + { + map[string]int{ + "one": 1, + "two": 1, + "three": 2, + "four": 5, + "five": 7, + }, + map[string]int{ + "one": 1, + "two": 1, + "three": 1, + "four": 1, + "five": 1, + }, + }, + { + map[string]int{ + "one": 1000, + "two": 1000, + "three": 2000, + "four": 5000, + "five": 7000, + }, + map[string]int{ + "one": 8, + "two": 8, + "three": 16, + "four": 41, + "five": 58, + }, + }, + { + map[string]int{ + "one": 6000, + "two": 6000, + "three": 6000, + "four": 6000, + "five": 6000, + }, + map[string]int{ + "one": 50, + "two": 50, + "three": 50, + "four": 50, + "five": 50, + }, + }, + { + map[string]int{ + "one": 12000, + }, + map[string]int{ + "one": 20, + }, + }, + { + map[string]int{}, + map[string]int{}, + }, + } + for i, tst := range tsts { + s.currentCounts = tst.inputSampleCount + s.updateMaps() + test.Equals(t, len(s.currentCounts), 0) + test.Equals(t, s.savedSampleRates, tst.expectedSavedSampleRates, fmt.Sprintf("test %d failed", i)) + } +} + +func TestTotalThroughputGetSampleRate(t *testing.T) { + s := &TotalThroughput{} + s.currentCounts = map[string]int{ + "one": 5, + "two": 8, + } + s.savedSampleRates = map[string]int{ + "one": 10, + "two": 1, + "three": 5, + } + tsts := []struct { + inputKey string + expectedSampleRate int + expectedCurrentCountForKey int + }{ + {"one", 10, 6}, + {"two", 1, 9}, + {"two", 1, 10}, + {"three", 5, 1}, // key missing from current counts + {"three", 5, 2}, + {"four", 1, 1}, // key missing from current and saved counts + {"four", 1, 2}, + } + for _, tst := range tsts { + rate := s.GetSampleRate(tst.inputKey) + test.Equals(t, rate, tst.expectedSampleRate) + test.Equals(t, s.currentCounts[tst.inputKey], tst.expectedCurrentCountForKey) + } +}