Skip to content

Commit

Permalink
feat: Add DataPlane https injestion for SecOps exporter (#1829)
Browse files Browse the repository at this point in the history
* Add DataPlane https injestion for secops exporter
  • Loading branch information
shazlehu authored and dpaasman00 committed Sep 17, 2024
1 parent 420a61b commit 117ba15
Show file tree
Hide file tree
Showing 10 changed files with 1,005 additions and 16 deletions.
31 changes: 31 additions & 0 deletions exporter/chronicleexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
const (
// noCompression is the no compression type.
noCompression = "none"
protocolHTTPS = "https"
protocolGRPC = "gRPC"
)

// Config defines configuration for the Chronicle exporter.
Expand Down Expand Up @@ -70,6 +72,19 @@ type Config struct {

// CollectAgentMetrics is a flag that determines whether or not to collect agent metrics.
CollectAgentMetrics bool `mapstructure:"collect_agent_metrics"`

// Protocol is the protocol that will be used to send logs to Chronicle.
// Either https or grpc.
Protocol string `mapstructure:"protocol"`

// Location is the location that will be used when the protocol is https.
Location string `mapstructure:"location"`

// Project is the project that will be used when the protocol is https.
Project string `mapstructure:"project"`

// Forwarder is the forwarder that will be used when the protocol is https.
Forwarder string `mapstructure:"forwarder"`
}

// Validate checks if the configuration is valid.
Expand Down Expand Up @@ -99,5 +114,21 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("endpoint should not contain a protocol: %s", cfg.Endpoint)
}

if cfg.Protocol != protocolHTTPS && cfg.Protocol != protocolGRPC {
return fmt.Errorf("invalid protocol: %s", cfg.Protocol)
}

if cfg.Protocol == protocolHTTPS {
if cfg.Location == "" {
return errors.New("location is required when protocol is https")
}
if cfg.Project == "" {
return errors.New("project is required when protocol is https")
}
if cfg.Forwarder == "" {
return errors.New("forwarder is required when protocol is https")
}
}

return nil
}
39 changes: 39 additions & 0 deletions exporter/chronicleexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func TestConfigValidate(t *testing.T) {
Creds: "creds_example",
LogType: "log_type_example",
Compression: noCompression,
Protocol: protocolGRPC,
},
expectedErr: "",
},
Expand All @@ -60,6 +61,7 @@ func TestConfigValidate(t *testing.T) {
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Compression: noCompression,
Protocol: protocolGRPC,
},
expectedErr: "",
},
Expand All @@ -70,6 +72,7 @@ func TestConfigValidate(t *testing.T) {
LogType: "log_type_example",
RawLogField: `body["field"]`,
Compression: noCompression,
Protocol: protocolGRPC,
},
expectedErr: "",
},
Expand All @@ -82,6 +85,42 @@ func TestConfigValidate(t *testing.T) {
},
expectedErr: "invalid compression type",
},
{
desc: "Protocol is https and location is empty",
config: &Config{
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Forwarder: "forwarder_example",
Project: "project_example",
},
expectedErr: "location is required when protocol is https",
},
{
desc: "Protocol is https and forwarder is empty",
config: &Config{
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Project: "project_example",
Location: "location_example",
},
expectedErr: "forwarder is required when protocol is https",
},
{
desc: "Protocol is https and project is empty",
config: &Config{
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Location: "location_example",
Forwarder: "forwarder_example",
},
expectedErr: "project is required when protocol is https",
},
}

for _, tc := range testCases {
Expand Down
128 changes: 113 additions & 15 deletions exporter/chronicleexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
package chronicleexporter

import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
Expand All @@ -29,18 +33,23 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/encoding/gzip"
grpcgzip "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
)

const scope = "https://www.googleapis.com/auth/malachite-ingestion"
const (
grpcScope = "https://www.googleapis.com/auth/malachite-ingestion"
httpScope = "https://www.googleapis.com/auth/cloud-platform"

const baseEndpoint = "malachiteingestion-pa.googleapis.com"
baseEndpoint = "malachiteingestion-pa.googleapis.com"
)

type chronicleExporter struct {
cfg *Config
Expand All @@ -53,6 +62,8 @@ type chronicleExporter struct {
wg sync.WaitGroup

cancel context.CancelFunc

httpClient *http.Client
}

func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID string) (*chronicleExporter, error) {
Expand All @@ -68,11 +79,6 @@ func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
}

conn, err := grpc.DialContext(context.Background(), cfg.Endpoint+":443", opts...)
if err != nil {
return nil, fmt.Errorf("dial: %w", err)
}

customerID, err := uuid.Parse(cfg.CustomerID)
if err != nil {
return nil, fmt.Errorf("parse customer ID: %w", err)
Expand All @@ -93,23 +99,39 @@ func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID
cfg: cfg,
logger: params.Logger,
metrics: newExporterMetrics(uuidCID[:], customerID[:], exporterID, cfg.Namespace),
client: api.NewIngestionServiceV2Client(conn),
conn: conn,
marshaler: marshaller,
collectorID: collectorID,
exporterID: exporterID,
cancel: cancel,
}

if cfg.CollectAgentMetrics {
exp.wg.Add(1)
go exp.startHostMetricsCollection(ctx)
if cfg.Protocol == protocolHTTPS {
exp.httpClient = oauth2.NewClient(context.Background(), creds.TokenSource)
} else {
conn, err := grpc.NewClient(cfg.Endpoint+":443", opts...)
if err != nil {
return nil, fmt.Errorf("dial: %w", err)
}

exp.conn = conn
exp.client = api.NewIngestionServiceV2Client(conn)

if cfg.CollectAgentMetrics {
exp.wg.Add(1)
go exp.startHostMetricsCollection(ctx)
}
}

return exp, nil
}

func loadGoogleCredentials(cfg *Config) (*google.Credentials, error) {

scope := grpcScope
if cfg.Protocol == protocolHTTPS {
scope = httpScope
}

switch {
case cfg.Creds != "":
return google.CredentialsFromJSON(context.Background(), []byte(cfg.Creds), scope)
Expand Down Expand Up @@ -187,8 +209,8 @@ func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api
func (ce *chronicleExporter) buildOptions() []grpc.CallOption {
opts := make([]grpc.CallOption, 0)

if ce.cfg.Compression == gzip.Name {
opts = append(opts, grpc.UseCompressor(gzip.Name))
if ce.cfg.Compression == grpcgzip.Name {
opts = append(opts, grpc.UseCompressor(grpcgzip.Name))
}

return opts
Expand Down Expand Up @@ -228,3 +250,79 @@ func (ce *chronicleExporter) Shutdown(context.Context) error {
}
return nil
}

func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Logs) error {
payloads, err := ce.marshaler.MarshalRawLogsForHTTP(ctx, ld)
if err != nil {
return fmt.Errorf("marshal logs: %w", err)
}

for logType, payload := range payloads {
if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil {
return fmt.Errorf("upload to chronicle: %w", err)
}
}

return nil
}

// This uses the DataPlane URL for the request
// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}/logTypes/{logtype}/logs:import
func buildEndpoint(cfg *Config, logType string) string {
// Location Endpoint Version Project Location Instance LogType
formatString := "https://%s-%s/%s/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import"
return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, "v1alpha", cfg.Project, cfg.Location, cfg.CustomerID, logType)
}

func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error {

data, err := protojson.Marshal(logs)
if err != nil {
return fmt.Errorf("marshal protobuf logs to JSON: %w", err)
}

var body io.Reader

if ce.cfg.Compression == grpcgzip.Name {
var b bytes.Buffer
gz := gzip.NewWriter(&b)
if _, err := gz.Write(data); err != nil {
return fmt.Errorf("gzip write: %w", err)
}
if err := gz.Close(); err != nil {
return fmt.Errorf("gzip close: %w", err)
}
body = &b
} else {
body = bytes.NewBuffer(data)
}

request, err := http.NewRequestWithContext(ctx, "POST", buildEndpoint(ce.cfg, logType), body)
if err != nil {
return fmt.Errorf("create request: %w", err)
}

if ce.cfg.Compression == grpcgzip.Name {
request.Header.Set("Content-Encoding", "gzip")
}

request.Header.Set("Content-Type", "application/json")

resp, err := ce.httpClient.Do(request)
if err != nil {
return fmt.Errorf("send request to Chronicle: %w", err)
}
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
if err != nil {
ce.logger.Warn("Failed to read response body", zap.Error(err))
} else {
ce.logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody))
}
return fmt.Errorf("received non-OK response from Chronicle: %s", resp.Status)
}

return nil
}
7 changes: 6 additions & 1 deletion exporter/chronicleexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewFactory() exporter.Factory {
// createDefaultConfig creates the default configuration for the exporter.
func createDefaultConfig() component.Config {
return &Config{
Protocol: protocolGRPC,
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
Expand Down Expand Up @@ -72,11 +73,15 @@ func createLogsExporter(
return nil, err
}

pusher := exp.logsDataPusher
if chronicleCfg.Protocol == protocolHTTPS {
pusher = exp.logsHTTPDataPusher
}
return exporterhelper.NewLogsExporter(
ctx,
params,
chronicleCfg,
exp.logsDataPusher,
pusher,
exporterhelper.WithCapabilities(exp.Capabilities()),
exporterhelper.WithTimeout(chronicleCfg.TimeoutSettings),
exporterhelper.WithQueue(chronicleCfg.QueueSettings),
Expand Down
1 change: 1 addition & 0 deletions exporter/chronicleexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func Test_createDefaultConfig(t *testing.T) {
Endpoint: "malachiteingestion-pa.googleapis.com",
Compression: "none",
CollectAgentMetrics: true,
Protocol: "gRPC",
}

actual := createDefaultConfig()
Expand Down
Loading

0 comments on commit 117ba15

Please sign in to comment.