Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade dependency aws-sdk-v2 #143

Merged
merged 5 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ The components in this library allow you to efficiently deaggregate protocol buf

### Language Specific Implementations

AWS Lambda supports Java, Node.js and Python as programming languages. We have included support for those languages so that you can create and process UserRecords via standalone modules. Documentation is provided for each language:
AWS Lambda supports Java, Node.js, Python and Go as programming languages. We have included support for those languages so that you can create and process UserRecords via standalone modules. Documentation is provided for each language:

| Language | Location |
:--- | :---
| Java | [java](java/) |
| Node.js Javascript | [node.js](node/) |
| Python | [python](python/) |
| Go | [go](go/) |

----

Expand Down
23 changes: 23 additions & 0 deletions go/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Go Kinesis Deaggregation Modules

The Kinesis Deaggregation Modules for Go provide the ability to do in-memory deaggregation of standard Kinesis user records using the [Kinesis Aggregated Record Format](https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md) to allow for more efficient transmission of records.

There are 2 versions based upon the AWS SDK version you are using:

| SDK | Project |
| --- | ------- |
|Version 0 | [v0](.) |
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the first version referred to as v0 or v1? I'd assume v1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when importing it it is github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f so I left it as v0

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. so versioning it based on THIS repo vs the version of AWS SDK it's being consumed from

|Version 2 | [v2](v2) |

## Installation

Version 0 depends on `github.com/aws/aws-sdk-go`, in order to install
```
go get github.com/awslabs/kinesis-aggregation/go
```

Version 2 depends on `github.com/aws/aws-sdk-go-v2`, in order to install

```
go get github.com/awslabs/kinesis-aggregation/go/v2
```
94 changes: 94 additions & 0 deletions go/v2/deaggregator/deaggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package deaggregator

import (
"crypto/md5"
"fmt"

"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/golang/protobuf/proto"

rec "github.com/awslabs/kinesis-aggregation/go/v2/records"
)

// Magic File Header for a KPL Aggregated Record
var KplMagicHeader = fmt.Sprintf("%q", []byte("\xf3\x89\x9a\xc2"))

const (
KplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking.
DigestSize = 16 // MD5 Message size for protobuf.
)

// DeaggregateRecords takes an array of Kinesis records and expands any Protobuf
// records within that array, returning an array of all records
func DeaggregateRecords(records []types.Record) ([]types.Record, error) {
var isAggregated bool
allRecords := make([]types.Record, 0)
for _, record := range records {
isAggregated = true

var dataMagic string
var decodedDataNoMagic []byte
// Check if record is long enough to have magic file header
if len(record.Data) >= KplMagicLen {
dataMagic = fmt.Sprintf("%q", record.Data[:KplMagicLen])
decodedDataNoMagic = record.Data[KplMagicLen:]
} else {
isAggregated = false
}

// Check if record has KPL Aggregate Record Magic Header and data length
// is correct size
if KplMagicHeader != dataMagic || len(decodedDataNoMagic) <= DigestSize {
isAggregated = false
}

if isAggregated {
messageDigest := fmt.Sprintf("%x", decodedDataNoMagic[len(decodedDataNoMagic)-DigestSize:])
messageData := decodedDataNoMagic[:len(decodedDataNoMagic)-DigestSize]

calculatedDigest := fmt.Sprintf("%x", md5.Sum(messageData))

// Check protobuf MD5 hash matches MD5 sum of record
if messageDigest != calculatedDigest {
isAggregated = false
} else {
aggRecord := &rec.AggregatedRecord{}
err := proto.Unmarshal(messageData, aggRecord)

if err != nil {
return nil, err
}

partitionKeys := aggRecord.PartitionKeyTable

for _, aggrec := range aggRecord.Records {
newRecord := createUserRecord(partitionKeys, aggrec, record)
allRecords = append(allRecords, newRecord)
}
}
}

if !isAggregated {
allRecords = append(allRecords, record)
}
}

return allRecords, nil
}

// createUserRecord takes in the partitionKeys of the aggregated record, the individual
// deaggregated record, and the original aggregated record builds a kinesis.Record and
// returns it
func createUserRecord(partitionKeys []string, aggRec *rec.Record, record types.Record) types.Record {
partitionKey := partitionKeys[*aggRec.PartitionKeyIndex]

return types.Record{
ApproximateArrivalTimestamp: record.ApproximateArrivalTimestamp,
Data: aggRec.Data,
EncryptionType: record.EncryptionType,
PartitionKey: &partitionKey,
SequenceNumber: record.SequenceNumber,
}
}
202 changes: 202 additions & 0 deletions go/v2/deaggregator/deaggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package deaggregator_test

import (
"crypto/md5"
"fmt"
"math/rand"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"

deagg "github.com/awslabs/kinesis-aggregation/go/v2/deaggregator"
rec "github.com/awslabs/kinesis-aggregation/go/v2/records"
)

// Generate an aggregate record in the correct AWS-specified format
// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
func generateAggregateRecord(numRecords int) []byte {

aggr := &rec.AggregatedRecord{}
// Start with the magic header
aggRecord := []byte("\xf3\x89\x9a\xc2")
partKeyTable := make([]string, 0)

// Create proto record with numRecords length
for i := 0; i < numRecords; i++ {
var partKey uint64
var hashKey uint64
partKey = uint64(i)
hashKey = uint64(i) * uint64(10)
r := &rec.Record{
PartitionKeyIndex: &partKey,
ExplicitHashKeyIndex: &hashKey,
Data: []byte("Some test data string"),
Tags: make([]*rec.Tag, 0),
}

aggr.Records = append(aggr.Records, r)
partKeyVal := "test" + fmt.Sprint(i)
partKeyTable = append(partKeyTable, partKeyVal)
}

aggr.PartitionKeyTable = partKeyTable
// Marshal to protobuf record, create md5 sum from proto record
// and append both to aggRecord with magic header
data, _ := proto.Marshal(aggr)
md5Hash := md5.Sum(data)
aggRecord = append(aggRecord, data...)
aggRecord = append(aggRecord, md5Hash[:]...)
return aggRecord
}

// Generate a generic kinesis.Record using whatever []byte
// is passed in as the data (can be normal []byte or proto record)
func generateKinesisRecord(data []byte) types.Record {
currentTime := time.Now()
encryptionType := types.EncryptionTypeNone
partitionKey := "1234"
sequenceNumber := "21269319989900637946712965403778482371"
return types.Record{
ApproximateArrivalTimestamp: &currentTime,
Data: data,
EncryptionType: encryptionType,
PartitionKey: &partitionKey,
SequenceNumber: &sequenceNumber,
}
}

// This tests to make sure that the data is at least larger than the length
// of the magic header to do some array slicing with index out of bounds
func TestSmallLengthReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) {
var err error
var kr types.Record

krs := make([]types.Record, 0, 1)

smallByte := []byte("No")
kr = generateKinesisRecord(smallByte)
krs = append(krs, kr)
dars, err := deagg.DeaggregateRecords(krs)
if err != nil {
panic(err)
}

// Small byte test, since this is not a deaggregated record, should return 1
// record in the array.
assert.Equal(t, 1, len(dars), "Small Byte test should return length of 1.")
}

// This function tests to make sure that the data starts with the correct magic header
// according to KPL aggregate documentation.
func TestNonMatchingMagicHeaderReturnsSingleRecord(t *testing.T) {
var err error
var kr types.Record

krs := make([]types.Record, 0, 1)

min := 1
max := 10
n := rand.Intn(max-min) + min
aggData := generateAggregateRecord(n)
mismatchAggData := aggData[1:]
kr = generateKinesisRecord(mismatchAggData)

krs = append(krs, kr)

dars, err := deagg.DeaggregateRecords(krs)
if err != nil {
panic(err)
}

// A byte record with a magic header that does not match 0xF3 0x89 0x9A 0xC2
// should return a single record.
assert.Equal(t, 1, len(dars), "Mismatch magic header test should return length of 1.")
}

// This function tests that the DeaggregateRecords function returns the correct number of
// deaggregated records from a single aggregated record.
func TestVariableLengthRecordsReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) {
var err error
var kr types.Record

krs := make([]types.Record, 0, 1)

min := 1
max := 10
n := rand.Intn(max-min) + min
aggData := generateAggregateRecord(n)
kr = generateKinesisRecord(aggData)
krs = append(krs, kr)

dars, err := deagg.DeaggregateRecords(krs)
if err != nil {
panic(err)
}

// Variable Length Aggregate Record test has aggregaterd records and should return
// n length.
assertMsg := fmt.Sprintf("Variable Length Aggregate Record should return length %v.", len(dars))
assert.Equal(t, n, len(dars), assertMsg)
}

// This function tests the length of the message after magic file header. If length is less than
// the digest size (16 bytes), it is not an aggregated record.
func TestRecordAfterMagicHeaderWithLengthLessThanDigestSizeReturnsSingleRecord(t *testing.T) {
var err error
var kr types.Record

krs := make([]types.Record, 0, 1)

min := 1
max := 10
n := rand.Intn(max-min) + min
aggData := generateAggregateRecord(n)
// Change size of proto message to 15
reducedAggData := aggData[:19]
kr = generateKinesisRecord(reducedAggData)

krs = append(krs, kr)

dars, err := deagg.DeaggregateRecords(krs)
if err != nil {
panic(err)
}

// A byte record with length less than 16 after the magic header should return
// a single record from DeaggregateRecords
assert.Equal(t, 1, len(dars), "Digest size test should return length of 1.")
}

// This function tests the MD5 Sum at the end of the record by comparing MD5 sum
// at end of proto record with MD5 Sum of Proto message. If they do not match,
// it is not an aggregated record.
func TestRecordWithMismatchMd5SumReturnsSingleRecord(t *testing.T) {
var err error
var kr types.Record

krs := make([]types.Record, 0, 1)

min := 1
max := 10
n := rand.Intn(max-min) + min
aggData := generateAggregateRecord(n)
// Remove last byte from array to mismatch the MD5 sums
mismatchAggData := aggData[:len(aggData)-1]
kr = generateKinesisRecord(mismatchAggData)

krs = append(krs, kr)

dars, err := deagg.DeaggregateRecords(krs)
if err != nil {
panic(err)
}

// A byte record with an MD5 sum that does not match with the md5.Sum(record)
// will be marked as a non-aggregate record and return a single record
assert.Equal(t, 1, len(dars), "Mismatch md5 sum test should return length of 1.")
}
9 changes: 9 additions & 0 deletions go/v2/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module github.com/awslabs/kinesis-aggregation/go/v2

go 1.15

require (
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0
github.com/golang/protobuf v1.5.2
github.com/stretchr/testify v1.7.0
)
32 changes: 32 additions & 0 deletions go/v2/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
github.com/aws/aws-sdk-go-v2 v1.9.0 h1:+S+dSqQCN3MSU5vJRu1HqHrq00cJn6heIMU7X9hcsoo=
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0 h1:hb+NupVMUzINGUCfDs2+YqMkWKu47dBIQHpulM0XWh4=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
github.com/aws/smithy-go v1.8.0 h1:AEwwwXQZtUwP5Mz506FeXXrKBe0jA8gVM+1gEcSRooc=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading