Skip to content

Commit

Permalink
Add compression support
Browse files Browse the repository at this point in the history
  • Loading branch information
camdencheek committed Aug 11, 2020
1 parent 6ec38be commit dd0329c
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Google Cloud Output failure when sent a field of type uint16
- Added a default function to plugin templates
### Added
- Google Cloud Output option to enable gzip compression

## [0.9.7] - 2020-08-05
### Changed
Expand Down
1 change: 1 addition & 0 deletions docs/operators/google_cloud_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The `google_cloud_output` operator will send entries to Google Cloud Logging.
| `severity_field` | | A [field](/docs/types/field.md) for the severity on the log entry |
| `trace_field` | | A [field](/docs/types/field.md) for the trace on the log entry |
| `span_id_field` | | A [field](/docs/types/field.md) for the span_id on the log entry |
| `use_compression` | `false` | Whether to compress the log entry payloads with gzip before sending to Google Cloud |
| `timeout` | 10s | A [duration](/docs/types/duration.md) indicating how long to wait for the API to respond before timing out |

If both `credentials` and `credentials_file` are left empty, the agent will attempt to find
Expand Down
2 changes: 1 addition & 1 deletion operator/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewConfig() Config {
BundleByteThreshold: 4 * 1024 * 1024 * 1024, // 4MB
BundleByteLimit: 4 * 1024 * 1024 * 1024, // 4MB
BufferedByteLimit: 500 * 1024 * 1024 * 1024, // 500MB
HandlerLimit: 32,
HandlerLimit: 16,
Retry: NewRetryConfig(),
}
}
Expand Down
21 changes: 15 additions & 6 deletions operator/builtin/output/google_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
mrpb "google.golang.org/genproto/googleapis/api/monitoredres"
sev "google.golang.org/genproto/googleapis/logging/type"
logpb "google.golang.org/genproto/googleapis/logging/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
)

func init() {
Expand All @@ -30,9 +32,10 @@ func init() {

func NewGoogleCloudOutputConfig(operatorID string) *GoogleCloudOutputConfig {
return &GoogleCloudOutputConfig{
OutputConfig: helper.NewOutputConfig(operatorID, "google_cloud_output"),
BufferConfig: buffer.NewConfig(),
Timeout: operator.Duration{Duration: 10 * time.Second},
OutputConfig: helper.NewOutputConfig(operatorID, "google_cloud_output"),
BufferConfig: buffer.NewConfig(),
Timeout: operator.Duration{Duration: 30 * time.Second},
UseCompression: false,
}
}

Expand All @@ -48,6 +51,7 @@ type GoogleCloudOutputConfig struct {
TraceField *entry.Field `json:"trace_field,omitempty" yaml:"trace_field,omitempty"`
SpanIDField *entry.Field `json:"span_id_field,omitempty" yaml:"span_id_field,omitempty"`
Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
UseCompression bool `json:"use_compression,omitempty" yaml:"use_compression,omitempty"`
}

// Build will build a google cloud output operator.
Expand All @@ -72,6 +76,7 @@ func (c GoogleCloudOutputConfig) Build(buildContext operator.BuildContext) (oper
traceField: c.TraceField,
spanIDField: c.SpanIDField,
timeout: c.Timeout.Raw(),
useCompression: c.UseCompression,
}

newBuffer.SetHandler(googleCloudOutput)
Expand All @@ -88,9 +93,10 @@ type GoogleCloudOutput struct {
credentialsFile string
projectID string

logNameField *entry.Field
traceField *entry.Field
spanIDField *entry.Field
logNameField *entry.Field
traceField *entry.Field
spanIDField *entry.Field
useCompression bool

client *vkit.Client
timeout time.Duration
Expand Down Expand Up @@ -136,6 +142,9 @@ func (p *GoogleCloudOutput) Start() error {
options := make([]option.ClientOption, 0, 2)
options = append(options, option.WithCredentials(credentials))
options = append(options, option.WithUserAgent("CarbonLogAgent/"+version.GetVersion()))
if p.useCompression {
options = append(options, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))))
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

Expand Down

0 comments on commit dd0329c

Please sign in to comment.