Skip to content

Commit

Permalink
Tree removal (#9)
Browse files Browse the repository at this point in the history
* Refactor group IDs to set at the parse level

This ensures group IDs are static for each parsed node.  We also ensure
that we check for at least GroupID.Size() matching items from trees when
matching on incoming events.

* Make GroupIDs deterministic based off of the Evaluable Identifier

* Implement aggregate tree removal

* Add tree removal
  • Loading branch information
tonyhb authored Jan 6, 2024
1 parent fbd7922 commit 5bf5dca
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 87 deletions.
92 changes: 78 additions & 14 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/ohler55/ojg/jp"
)

var (
ErrEvaluableNotFound = fmt.Errorf("Evaluable instance not found in aggregator")
)

// errTreeUnimplemented is used while we develop the aggregate tree library when trees
// are not yet implemented.
var errTreeUnimplemented = fmt.Errorf("tree type unimplemented")
Expand Down Expand Up @@ -238,12 +242,9 @@ func (a *aggregator) Add(ctx context.Context, eval Evaluable) (bool, error) {
return false, err
}

// NOTE: When modifying, ensure that Remove() is updated. We should reconcile
// the core loops to use the same code.

aggregateable := true
for _, g := range parsed.RootGroups() {
ok, err := a.addGroup(ctx, g, parsed)
ok, err := a.iterGroup(ctx, g, parsed, a.addNode)
if err != nil {
return false, err
}
Expand All @@ -264,7 +265,51 @@ func (a *aggregator) Add(ctx context.Context, eval Evaluable) (bool, error) {
return aggregateable, nil
}

func (a *aggregator) addGroup(ctx context.Context, node *Node, parsed *ParsedExpression) (bool, error) {
func (a *aggregator) Remove(ctx context.Context, eval Evaluable) error {
// parse the expression using our tree parser.
parsed, err := a.parser.Parse(ctx, eval)
if err != nil {
return err
}

aggregateable := true
for _, g := range parsed.RootGroups() {
ok, err := a.iterGroup(ctx, g, parsed, a.removeNode)
if err == ErrExpressionPartNotFound {
return ErrEvaluableNotFound
}
if err != nil {
return err
}
if !ok && aggregateable {
// Find the index of the evaluable in constants and yank out.
idx := -1
for n, item := range a.constants {
if item.Evaluable.Identifier() == eval.Identifier() {
idx = n
break
}
}

if idx == -1 {
return ErrEvaluableNotFound
}

a.lock.Lock()
a.constants = append(a.constants[:idx], a.constants[idx+1:]...)
a.lock.Unlock()
aggregateable = false
}
}

if aggregateable {
atomic.AddInt32(&a.len, -1)
}

return nil
}

func (a *aggregator) iterGroup(ctx context.Context, node *Node, parsed *ParsedExpression, op nodeOp) (bool, error) {
if len(node.Ors) > 0 {
// If there are additional branches, don't bother to add this to the aggregate tree.
// Mark this as a non-exhaustive addition and skip immediately.
Expand Down Expand Up @@ -305,7 +350,7 @@ func (a *aggregator) addGroup(ctx context.Context, node *Node, parsed *ParsedExp
// ident/variable. Using the group ID, we can see if we've matched N necessary
// items from the same identifier. If so, the evaluation is true.
for _, n := range all {
err := a.addNode(ctx, n, parsed)
err := op(ctx, n, parsed)
if err == errTreeUnimplemented {
return false, nil
}
Expand All @@ -317,6 +362,9 @@ func (a *aggregator) addGroup(ctx context.Context, node *Node, parsed *ParsedExp
return true, nil
}

// nodeOp represents an op eg. addNode or removeNode
type nodeOp func(ctx context.Context, n *Node, parsed *ParsedExpression) error

func (a *aggregator) addNode(ctx context.Context, n *Node, parsed *ParsedExpression) error {
// Don't allow anything to update in parallel. This enrues that Add() can be called
// concurrently.
Expand Down Expand Up @@ -344,15 +392,31 @@ func (a *aggregator) addNode(ctx context.Context, n *Node, parsed *ParsedExpress
return errTreeUnimplemented
}

func (a *aggregator) Remove(ctx context.Context, eval Evaluable) error {
// parse the expression using our tree parser.
parsed, err := a.parser.Parse(ctx, eval)
_ = parsed
if err != nil {
return err
}
func (a *aggregator) removeNode(ctx context.Context, n *Node, parsed *ParsedExpression) error {
// Don't allow anything to update in parallel. This enrues that Add() can be called
// concurrently.
a.lock.Lock()
defer a.lock.Unlock()

return fmt.Errorf("not implemented")
// Each node is aggregateable, so add this to the map for fast filtering.
switch n.Predicate.TreeType() {
case TreeTypeART:
tree, ok := a.artIdents[n.Predicate.Ident]
if !ok {
tree = newArtTree()
}
err := tree.Remove(ctx, ExpressionPart{
GroupID: n.GroupID,
Predicate: *n.Predicate,
Parsed: parsed,
})
if err != nil {
return err
}
a.artIdents[n.Predicate.Ident] = tree
return nil
}
return errTreeUnimplemented
}

func isAggregateable(n *Node) bool {
Expand Down
Loading

0 comments on commit 5bf5dca

Please sign in to comment.