Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sql data stream #7

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4a624f4
sql grammar/lexer/parser/listener
Jun 16, 2022
a5e794a
Deleted unnecessary testing staff
Jun 16, 2022
b2ff6da
some update
antonchirikalov Jul 1, 2022
5e5bbf5
Visitor init
antonchirikalov Jul 26, 2022
8828c3b
visitor update/tests
antonchirikalov Jul 27, 2022
19235e5
update recursive conditions/tests
antonchirikalov Jul 28, 2022
fb90a14
update recursive conditions/tests
antonchirikalov Jul 28, 2022
a4eac08
SQL Streaming update
antonchirikalov Jul 29, 2022
ceddc46
Delete config.yaml
antonchirikalov Jul 28, 2022
14d9723
small fix
antonchirikalov Jul 29, 2022
13b9d40
Intermediate updates
antonchirikalov Aug 4, 2022
21c68d6
refactor/tumbling visitor/tests updated
antonchirikalov Aug 5, 2022
085bb87
update tumbling window/ tests
antonchirikalov Aug 8, 2022
9315703
aggregations/tests
antonchirikalov Aug 8, 2022
d7d57fa
update
antonchirikalov Aug 9, 2022
255357c
grammar update/bug fixed/tests added
antonchirikalov Aug 11, 2022
e1cb108
function support added
antonchirikalov Aug 15, 2022
21c7e1a
go.mod
antonchirikalov Aug 16, 2022
f3c779d
Sql streaming processor
antonchirikalov Aug 16, 2022
118ce30
nested queries support
antonchirikalov Aug 23, 2022
d69acc5
one-pass optimization/tests/fixes
antonchirikalov Aug 25, 2022
1b8d272
support for nested fields in select list
antonchirikalov Sep 2, 2022
3a36c46
nested field support update
antonchirikalov Sep 5, 2022
3cf5473
part function support
antonchirikalov Sep 5, 2022
66ef074
recursive func/nested support/fixes/tests
antonchirikalov Sep 7, 2022
a89b12e
function support refactor/improvement
antonchirikalov Sep 7, 2022
df3eab2
groupby refactoring/fixes intermediate
antonchirikalov Sep 15, 2022
e432afd
groupBe temp refactor
antonchirikalov Sep 22, 2022
d9b9bfa
refactor window tumbling/fixes
antonchirikalov Sep 23, 2022
6e29441
aggregation fixes/tests
antonchirikalov Sep 29, 2022
7239c2d
Some updates/fixes/tests for tumbling windows
antonchirikalov Sep 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ local/
*.dylib
bin/
dist/

test/
# Emacs
*~
\#*\#
Expand All @@ -34,3 +34,4 @@ integration-coverage.html
# Wix
*.wixobj
*.wixpdb
/config.yaml
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib
go 1.17

require (
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20220816024939-bc8df83d7b9d
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/alibabacloudlogserviceexporter v0.55.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awscloudwatchlogsexporter v0.55.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter v0.55.0
Expand Down Expand Up @@ -153,7 +154,9 @@ require (
github.com/prometheus/prometheus v0.36.2
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/collector v0.56.0
go.opentelemetry.io/collector/pdata v0.56.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.21.0
golang.org/x/sys v0.0.0-20220624220833-87e55d714810
)

Expand Down Expand Up @@ -531,7 +534,6 @@ require (
go.mongodb.org/atlas v0.16.0 // indirect
go.mongodb.org/mongo-driver v1.9.1 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/pdata v0.56.0 // indirect
go.opentelemetry.io/collector/semconv v0.56.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.33.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.33.0 // indirect
Expand All @@ -544,7 +546,6 @@ require (
go.opentelemetry.io/otel/trace v1.8.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 // indirect
golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions internal/components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opsrampotlpexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/batchmemlimitprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/scrubbingprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/stream_processor"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/loggingexporter"
"go.opentelemetry.io/collector/exporter/otlpexporter"
Expand Down Expand Up @@ -351,6 +352,7 @@ func Components() (component.Factories, error) {
transformprocessor.NewFactory(),
scrubbingprocessor.NewFactory(),
batchmemlimitprocessor.NewFactory(),
stream_processor.NewFactory(),
}
factories.Processors, err = component.MakeProcessorFactoryMap(processors...)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions pkg/stanza/operator/helper/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package helper // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"context"
"fmt"

"go.uber.org/zap"

Expand Down Expand Up @@ -88,6 +89,26 @@ func (i *InputOperator) NewEntry(value interface{}) (*entry.Entry, error) {
return entry, nil
}

func (i *InputOperator) NewEntryWithAttr(value interface{}) (*entry.Entry, error) {
entry := entry.New()

converted, ok := value.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("value type is not map")
}
entry.Attributes = converted

if err := i.Attribute(entry); err != nil {
return nil, errors.Wrap(err, "add attributes to entry")
}

if err := i.Identify(entry); err != nil {
return nil, errors.Wrap(err, "add resource keys to entry")
}

return entry, nil
}

// CanProcess will always return false for an input operator.
func (i *InputOperator) CanProcess() bool {
return false
Expand Down
14 changes: 13 additions & 1 deletion pkg/stanza/operator/input/windows/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package windows // import "github.com/open-telemetry/opentelemetry-collector-con
import (
"context"
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"sync"
"time"

Expand All @@ -39,6 +40,7 @@ type Config struct {
Channel string `mapstructure:"channel" json:"channel" yaml:"channel"`
MaxReads int `mapstructure:"max_reads,omitempty" json:"max_reads,omitempty" yaml:"max_reads,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
BodyToAttr bool `mapstructure:"body_to_attr" json:"body_to_attr" yaml:"body_to_attr"`
PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
}

Expand Down Expand Up @@ -68,6 +70,7 @@ func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
maxReads: c.MaxReads,
startAt: c.StartAt,
pollInterval: c.PollInterval,
bodyToAttr: c.BodyToAttr,
}, nil
}

Expand All @@ -92,6 +95,7 @@ type Input struct {
channel string
maxReads int
startAt string
bodyToAttr bool
pollInterval helper.Duration
persister operator.Persister
cancel context.CancelFunc
Expand Down Expand Up @@ -223,7 +227,15 @@ func (e *Input) processEvent(ctx context.Context, event Event) {
// sendEvent will send EventXML as an entry to the operator's output.
func (e *Input) sendEvent(ctx context.Context, eventXML EventXML) {
body := eventXML.parseBody()
entry, err := e.NewEntry(body)
var entry *entry.Entry
var err error

if e.bodyToAttr {
entry, err = e.NewEntryWithAttr(body)
} else {
entry, err = e.NewEntry(body)
}

if err != nil {
e.Errorf("Failed to create entry: %s", err)
return
Expand Down
2 changes: 1 addition & 1 deletion processor/batchmemlimitprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"go.opentelemetry.io/collector/config"
)

// Config defines configuration for Resource processor.
// Config defines configuration for batch memory limit processor.
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
adapter.BaseConfig `mapstructure:",squash"`
Expand Down
2 changes: 1 addition & 1 deletion processor/scrubbingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type MaskingSettings struct {
Placeholder string `mapstructure:"placeholder"`
}

// Config defines configuration for Resource processor.
// Config defines configuration for scrubbing processor.
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
adapter.BaseConfig `mapstructure:",squash"`
Expand Down
194 changes: 194 additions & 0 deletions processor/stream_processor/Sql.g4
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
grammar Sql;


//java -jar antlr-4.10.1-complete.jar -Dlanguage=Go -o parser Sql.g4 -no-listener -visitor

sqlQuery
: selectQuery EOF;

selectQuery
: K_SELECT resultColumns (whereStatement)? EOQ #selectSimple
| K_SELECT aggregationColumns windowTumbling (whereStatement)? EOQ #selectTumbling
| K_SELECT groupByAggregationColumns windowTumbling (whereStatement)? groupBy EOQ #selectTumblingGroupBy
;

windowTumbling
: K_WINDOW_TUMBLING NUMERIC_LITERAL
;


resultColumns
: column (COMMA column)* # selectColumns
| STAR # selectStar
;


aggregationColumns
: aggregationColumn (COMMA aggregationColumn)* #selectAggregations
;

groupByAggregationColumns
: (column | aggregationColumn) (COMMA (aggregationColumn | column))* #selectGroupByAggregations
;


aggregationColumn
: (K_MIN | K_MAX | K_COUNT | K_AVG | K_SUM) L_BRACKET ( IDENTIFIER | IDENTIFIER DOT IDENTIFIER) R_BRACKET alias?
;

column
: (IDENTIFIER | IDENTIFIER DOT IDENTIFIER) alias? #identifierColumn
| function alias? #functionColumn
;



alias
: K_AS IDENTIFIER
;

function
: functionName L_BRACKET ( IDENTIFIER | IDENTIFIER DOT IDENTIFIER) (COMMA literalValue)* R_BRACKET #simpleFunction
| functionName L_BRACKET ( IDENTIFIER | IDENTIFIER DOT IDENTIFIER) (COMMA literalValue)* R_BRACKET #simpleFunction
| functionName L_BRACKET function (COMMA literalValue)* R_BRACKET #recursiveFunction
;

functionName
: IDENTIFIER
;


whereStatement
: K_WHERE expr #whereStmt
;

expr
: simpleExpr #simpleCondition
| (compoundExpr | simpleExpr) ( K_AND | K_OR ) (compoundExpr | simpleExpr) #compoundRecursiveCondition
// IDENTIFIER (K_IS_NULL | K_IS_NOT_NULL) #nullCondition
| expr (K_AND| K_OR) expr #simpleRecursiveCondition
| compoundExpr #simpleCompoundCondition
;

simpleExpr
: IDENTIFIER comparisonOperator literalValue #simpleExpression
| IDENTIFIER DOT IDENTIFIER comparisonOperator literalValue #nestedExpression
;

compoundExpr
: L_BRACKET expr R_BRACKET #compoundExpression
;

comparisonOperator
: K_EQUAL | K_GREATER | K_LESS | K_LESS_EQUAL | K_GREATER_EQUAL | K_NOT_EQUAL | K_LIKE |K_NOT_LIKE | K_IN | K_IS | K_NOT_IN
;


literalValue
: NUMERIC_LITERAL
| BOOLEAN_LITERAL
| STRING_LITERAL
;


groupBy
: K_GROUP_BY column
;


SPACE
: [ \u000B\t\r\n] -> channel(HIDDEN)
;

WS : [ \t]+ -> skip ;

COMMA : ',' ;
L_BRACKET : '(' ;
R_BRACKET : ')' ;
DOT : '.';

EOQ: ';';

K_SELECT : S E L E C T;
K_WHERE : W H E R E;
K_WINDOW_TUMBLING : W I N D O W SPACE T U M B L I N G;
K_GROUP_BY : G R O U P SPACE B Y;
K_AND : A N D;
K_AS : A S;
K_OR : O R;
K_IS : I S;
K_LIKE : L I K E;
K_NOT_LIKE : N O T SPACE L I K E;
K_EQUAL : '=';
K_GREATER : '>';
K_LESS : '<';
K_LESS_EQUAL : (K_LESS K_EQUAL);
K_GREATER_EQUAL : (K_GREATER K_EQUAL);
K_NOT_EQUAL : ('!' K_EQUAL);
K_NULL : N U L L;
K_IS_NULL : (K_IS SPACE K_NULL);
K_IS_NOT_NULL : (K_IS SPACE K_NOT SPACE K_NULL);
K_NOT : N O T;
K_NOT_IN : (K_NOT SPACE I N);
K_IN : I N;
K_COUNT : C O U N T;
K_SUM : S U M;
K_MIN : M I N;
K_MAX : M A X;
K_AVG : A V G;
K_TRUE : T R U E;
K_FALSE : F A L S E;



IDENTIFIER
: '"' (~'"' | '""')* '"'
| '`' (~'`' | '``')* '`'
| '[' ~']'* ']'
| [a-zA-Z_] [a-zA-Z_0-9]*
;


NUMERIC_LITERAL
: DIGIT+ ( '.' DIGIT* )? ( E [-+]? DIGIT+ )?
| '.' DIGIT+ ( E [-+]? DIGIT+ )?
;

STRING_LITERAL
: '\'' ( ~'\'' | '\'\'' )* '\''
;


BOOLEAN_LITERAL
: (K_TRUE | K_FALSE)
;

STAR : '*';

fragment DIGIT : [0-9];
fragment A : [aA];
fragment B : [bB];
fragment C : [cC];
fragment D : [dD];
fragment E : [eE];
fragment F : [fF];
fragment G : [gG];
fragment H : [hH];
fragment I : [iI];
fragment J : [jJ];
fragment K : [kK];
fragment L : [lL];
fragment M : [mM];
fragment N : [nN];
fragment O : [oO];
fragment P : [pP];
fragment Q : [qQ];
fragment R : [rR];
fragment S : [sS];
fragment T : [tT];
fragment U : [uU];
fragment V : [vV];
fragment W : [wW];
fragment X : [xX];
fragment Y : [yY];
fragment Z : [zZ];
Loading