Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace HAResources with TieredLimits #169

Merged
merged 4 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 36 additions & 14 deletions v2/account_claims.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (n *NatsLimits) IsUnlimited() bool {
}

type JetStreamLimits struct {
HAResources int64 `json:"ha_resources"` // Max number of bytes high availability resources (streams & consumer). (0 means disabled). no omitempty on purpose
MemoryStorage int64 `json:"mem_storage,omitempty"` // Max number of bytes stored in memory across all streams. (0 means disabled)
DiskStorage int64 `json:"disk_storage,omitempty"` // Max number of bytes stored on disk across all streams. (0 means disabled)
Streams int64 `json:"streams,omitempty"` // Max number of streams
Expand All @@ -61,37 +60,58 @@ type JetStreamLimits struct {

// IsUnlimited returns true if all limits are unlimited
func (j *JetStreamLimits) IsUnlimited() bool {
return *j == JetStreamLimits{NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, false}
return *j == JetStreamLimits{NoLimit, NoLimit, NoLimit, NoLimit, false}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JetStreamLimits will be mapped to R factor in the future, right?

Copy link
Contributor Author

@matthiashanel matthiashanel Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for now we will be indexing with tier names "R1" and "R3".
However, that indexing gives us the flexibility to change that going forward.
You can see a sample of this in the attached unit test

}

// IsJSEnabled returns true if JS is enabled by either disk or memory storage being enabled
func (j *JetStreamLimits) IsJSEnabled() bool {
if j == nil {
return false
}
return j.MemoryStorage != 0 || j.DiskStorage != 0
}
type JetStreamTieredLimits map[string]JetStreamLimits

// OperatorLimits are used to limit access by an account
type OperatorLimits struct {
NatsLimits
AccountLimits
JetStreamLimits
JetStreamTieredLimits `json:"tiered_limits,omitempty"`
}

// IsEmpty returns true if all of the limits are 0/false.
// IsJSEnabled returns if this account claim has JS enabled either through a tier or the non tiered limits.
func (o *OperatorLimits) IsJSEnabled() bool {
if len(o.JetStreamTieredLimits) > 0 {
for _, l := range o.JetStreamTieredLimits {
if l.MemoryStorage != 0 || l.DiskStorage != 0 {
return true
}
}
return false
}
l := o.JetStreamLimits
return l.MemoryStorage != 0 || l.DiskStorage != 0
}

// IsEmpty returns true if all limits are 0/false/empty.
func (o *OperatorLimits) IsEmpty() bool {
return *o == OperatorLimits{}
return o.NatsLimits == NatsLimits{} &&
o.AccountLimits == AccountLimits{} &&
o.JetStreamLimits == JetStreamLimits{} &&
len(o.JetStreamTieredLimits) == 0
}

// IsUnlimited returns true if all limits are unlimited
func (o *OperatorLimits) IsUnlimited() bool {
return o.AccountLimits.IsUnlimited() && o.NatsLimits.IsUnlimited() && o.JetStreamLimits.IsUnlimited()
return o.AccountLimits.IsUnlimited() && o.NatsLimits.IsUnlimited() &&
o.JetStreamLimits.IsUnlimited() && len(o.JetStreamTieredLimits) == 0
}

// Validate checks that the operator limits contain valid values
func (o *OperatorLimits) Validate(_ *ValidationResults) {
func (o *OperatorLimits) Validate(vr *ValidationResults) {
// negative values mean unlimited, so all numbers are valid
if len(o.JetStreamTieredLimits) > 0 {
if (o.JetStreamLimits != JetStreamLimits{}) {
vr.AddError("JetStream Limits and tiered JetStream Limits are mutually exclusive")
}
if _, ok := o.JetStreamTieredLimits[""]; ok {
vr.AddError(`Tiered JetStream Limits can nont contain a blank "" tier name`)
}
}
}

// Mapping for publishes
Expand Down Expand Up @@ -198,7 +218,9 @@ func NewAccountClaims(subject string) *AccountClaims {
c.Limits = OperatorLimits{
NatsLimits{NoLimit, NoLimit, NoLimit},
AccountLimits{NoLimit, NoLimit, true, NoLimit, NoLimit},
JetStreamLimits{0, 0, 0, 0, 0, false}}
JetStreamLimits{0, 0, 0, 0, false},
JetStreamTieredLimits{},
}
c.Subject = subject
c.Mappings = Mapping{}
return c
Expand Down
44 changes: 38 additions & 6 deletions v2/account_claims_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,9 @@ func TestJetstreamLimits(t *testing.T) {
acc1.Limits.JetStreamLimits.MemoryStorage != 0 ||
acc1.Limits.JetStreamLimits.Consumer != 0 ||
acc1.Limits.JetStreamLimits.Streams != 0 ||
acc1.Limits.JetStreamLimits.HAResources != 0 ||
acc1.Limits.JetStreamLimits.MaxBytesRequired != false {
t.Fatalf("Expected unlimited operator limits")
}
acc1.Limits.HAResources = 1
acc1.Limits.Consumer = 1
acc1.Limits.Streams = 2
acc1.Limits.MemoryStorage = 3
Expand All @@ -341,24 +339,58 @@ func TestJetstreamLimits(t *testing.T) {
}
}

func TestTieredLimits(t *testing.T) {
akp := createAccountNKey(t)
apk := publicKey(akp, t)
acc1 := NewAccountClaims(apk)
l := JetStreamLimits{
MemoryStorage: 1024,
DiskStorage: 1024,
Streams: 1,
Consumer: 1,
MaxBytesRequired: true,
}
acc1.Limits.JetStreamTieredLimits["R1"] = l
l.Streams = 2 // minor change so both tiers differ
acc1.Limits.JetStreamTieredLimits["R3"] = l

vr := CreateValidationResults()
acc1.Validate(vr)
if !vr.IsEmpty() {
t.Fatal("valid account should have validation issues")
}

if token, err := acc1.Encode(akp); err != nil {
t.Fatal("valid account should have no validation issues")
} else if acc2, err := DecodeAccountClaims(token); err != nil {
t.Fatal("valid account should have no validation issues")
} else if acc1.Limits.JetStreamTieredLimits["R1"] != acc2.Limits.JetStreamTieredLimits["R1"] {
t.Fatal("tier R1 should have same limits")
} else if acc1.Limits.JetStreamTieredLimits["R3"] != acc2.Limits.JetStreamTieredLimits["R3"] {
t.Fatal("tier R2 should have same limits")
}
// test tiers and js limits being mutual exclusive
acc1.Limits.JetStreamLimits = l
acc1.Validate(vr)
if vr.IsEmpty() {
t.Fatal("valid account should have validation issues")
}
}

func TestJetstreamLimitsDeEnCode(t *testing.T) {
// token (generated without this change) with js disabled
c1, err := DecodeAccountClaims(`eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiI3V0NYQkZKS0lIN1Y0SkdPR0FVTEtTS05aQUxVTUZMVExZWVgyNTdDSzZORk5OWTdGRVdBIiwiaWF0IjoxNjQ0Mjc5NDY3LCJpc3MiOiJPQk5UUVJFSEVJUFJFVE1BVlBWUVVDSUdFUktHWkIzRVJBVjVTNUdNM0lPRVFOSFJFQkpPVUFSRiIsIm5hbWUiOiJ0ZXN0Iiwic3ViIjoiQUM2SFFJMlVBTVVQREVQN1dTWVFZV1JDTEVZQkxZVlNQTDZBSExDVVBKVEdVMzJUNEtRQktZU0ciLCJuYXRzIjp7ImxpbWl0cyI6eyJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJpbXBvcnRzIjotMSwiZXhwb3J0cyI6LTEsIndpbGRjYXJkcyI6dHJ1ZSwiY29ubiI6LTEsImxlYWYiOi0xfSwiZGVmYXVsdF9wZXJtaXNzaW9ucyI6eyJwdWIiOnt9LCJzdWIiOnt9fSwidHlwZSI6ImFjY291bnQiLCJ2ZXJzaW9uIjoyfX0.73vDu9osNeLKwQ-g1Uu1fAtszMNz_QRZXRJLp0kzZh-0eMBmt0nb2mMwBwg-fufJs1FqYe9WbWKeXAYStnVnBg`)
if err != nil {
t.Fatal(err)
} else if c1.Limits.IsJSEnabled() {
t.Fatal("JetStream expected to be disabled")
} else if c1.Limits.JetStreamLimits.HAResources != 0 {
t.Fatal("expected value for HAResources is 0")
}
// token (generated without this change) with js enabled
c2, err := DecodeAccountClaims(`eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJPVFFXVEQyVkFMWkRQQTZYU1hLS09GRVFZR1VaVFBDNEtKV1BYMlAyWU1XMjVTMzRTVjNRIiwiaWF0IjoxNjQ0Mjc5MzM0LCJpc3MiOiJPQk5UUVJFSEVJUFJFVE1BVlBWUVVDSUdFUktHWkIzRVJBVjVTNUdNM0lPRVFOSFJFQkpPVUFSRiIsIm5hbWUiOiJ0ZXN0Iiwic3ViIjoiQUM2SFFJMlVBTVVQREVQN1dTWVFZV1JDTEVZQkxZVlNQTDZBSExDVVBKVEdVMzJUNEtRQktZU0ciLCJuYXRzIjp7ImxpbWl0cyI6eyJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJpbXBvcnRzIjotMSwiZXhwb3J0cyI6LTEsIndpbGRjYXJkcyI6dHJ1ZSwiY29ubiI6LTEsImxlYWYiOi0xLCJkaXNrX3N0b3JhZ2UiOjEwMDAwMDB9LCJkZWZhdWx0X3Blcm1pc3Npb25zIjp7InB1YiI6e30sInN1YiI6e319LCJ0eXBlIjoiYWNjb3VudCIsInZlcnNpb24iOjJ9fQ.Xt5azhxOkC7nywz9Q8xVtzX8lZIqdOhpfGyQI30aNdd-nbVGX2O13OOfouIaTLyajZiS4bcJFXa29q6QCFRUDA`)
if err != nil {
t.Fatal(err)
} else if !c2.Limits.IsJSEnabled() {
t.Fatal("JetStream expected to be enabled")
} else if c2.Limits.JetStreamLimits.HAResources != NoLimit {
t.Fatal("expected value for HAResources is NoLimit")
}
}

Expand Down
7 changes: 3 additions & 4 deletions v2/decoder_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@ func loadAccount(data []byte, version int) (*AccountClaims, error) {
return v1a.Migrate()
case 2:
var v2a AccountClaims
v2a.Limits.HAResources = NoLimit
v2a.SigningKeys = make(SigningKeys)
if err := json.Unmarshal(data, &v2a); err != nil {
return nil, err
}
if !v2a.Limits.IsJSEnabled() {
v2a.Limits.HAResources = 0
if len(v2a.Limits.JetStreamTieredLimits) > 0 {
v2a.Limits.JetStreamLimits = JetStreamLimits{}
}
return &v2a, nil
default:
Expand Down Expand Up @@ -77,7 +76,7 @@ func (oa v1AccountClaims) migrateV1() (*AccountClaims, error) {
a.Account.Exports = oa.v1NatsAccount.Exports
a.Account.Limits.AccountLimits = oa.v1NatsAccount.Limits.AccountLimits
a.Account.Limits.NatsLimits = oa.v1NatsAccount.Limits.NatsLimits
a.Account.Limits.JetStreamLimits = JetStreamLimits{0, 0, 0, 0, 0, false}
a.Account.Limits.JetStreamLimits = JetStreamLimits{}
a.Account.SigningKeys = make(SigningKeys)
for _, v := range oa.SigningKeys {
a.Account.SigningKeys.Add(v)
Expand Down