Skip to content

Commit

Permalink
Add OCI Registry Collector CLI
Browse files Browse the repository at this point in the history
Signed-off-by: robert-cronin <robert.owen.cronin@gmail.com>
  • Loading branch information
robert-cronin committed Oct 29, 2024
1 parent 4f629c2 commit 7f7e751
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 33 deletions.
110 changes: 110 additions & 0 deletions cmd/guaccollect/cmd/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cmd
import (
"context"
"fmt"
"net"
"os"
"time"

Expand Down Expand Up @@ -46,6 +47,19 @@ type ociOptions struct {
publishToQueue bool
}

type ociRegistryOptions struct {
// datasource for the collector
dataSource datasource.CollectSource
// address for pubsub connection
pubsubAddr string
// address for blob store
blobAddr string
// run as poll collector
poll bool
// enable/disable message publish to queue
publishToQueue bool
}

var ociCmd = &cobra.Command{
Use: "image [flags] image_path1 image_path2...",
Short: "takes images to download sbom and attestation stored in OCI to add to GUAC graph utilizing Nats pubsub and blob store",
Expand Down Expand Up @@ -98,6 +112,43 @@ you have access to read and write to the respective blob store.`,
},
}

var ociRegistryCmd = &cobra.Command{
Use: "registry [flags] registry",
Short: "takes an OCI registry with catalog capability and downloads sbom and attestation stored in OCI to add to GUAC graph",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)

opts, err := validateOCIRegistryFlags(
viper.GetString("pubsub-addr"),
viper.GetString("blob-addr"),
viper.GetString("csub-addr"),
viper.GetBool("csub-tls"),
viper.GetBool("csub-tls-skip-verify"),
viper.GetBool("use-csub"),
viper.GetBool("service-poll"),
viper.GetBool("publish-to-queue"),
args)
if err != nil {
fmt.Printf("unable to validate flags: %v\n", err)
_ = cmd.Help()
os.Exit(1)
}

// Register collector
// We probably want a much longer poll interval for registry collectors as the _catalog
// endpoint can be expensive to hit and likely won't change often.
ociRegistryCollector := oci.NewOCIRegistryCollector(ctx, opts.dataSource, opts.poll, 30*time.Minute)
err = collector.RegisterDocumentCollector(ociRegistryCollector, oci.OCIRegistryCollector)
if err != nil {
logger.Errorf("unable to register oci collector: %v", err)
}

initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr, opts.publishToQueue)
},
}

func validateOCIFlags(
pubsubAddr,
blobAddr,
Expand Down Expand Up @@ -154,6 +205,65 @@ func validateOCIFlags(
return opts, nil
}

func validateOCIRegistryFlags(
pubsubAddr,
blobAddr,
csubAddr string,
csubTls,
csubTlsSkipVerify,
useCsub,
poll,
pubToQueue bool,
args []string,
) (ociRegistryOptions, error) {
var opts ociRegistryOptions
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
opts.poll = poll
opts.publishToQueue = pubToQueue

if useCsub {
csubOpts, err := csubclient.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
if err != nil {
return opts, fmt.Errorf("unable to validate csub client flags: %w", err)
}
c, err := csubclient.NewClient(csubOpts)
if err != nil {
return opts, err
}
opts.dataSource, err = csubsource.NewCsubDatasource(c, 10*time.Second)
return opts, err
}

// else direct CLI call, no polling
if len(args) < 1 {
return opts, fmt.Errorf("expected positional argument(s) for registr(y|ies)")
}

sources := []datasource.Source{}
for _, arg := range args {
// Min check to validate registry by resolving hostname
_, err := net.LookupHost(arg)
if err != nil {
return opts, fmt.Errorf("registry parsing error. require format registry:port")
}
sources = append(sources, datasource.Source{
Value: arg,
})
}

var err error
opts.dataSource, err = inmemsource.NewInmemDataSources(&datasource.DataSources{
OciDataSources: sources,
})
if err != nil {
return opts, err
}

return opts, nil
}

func init() {
rootCmd.AddCommand(ociCmd)
rootCmd.AddCommand(ociRegistryCmd)
}
4 changes: 4 additions & 0 deletions cmd/guacone/cmd/collectsub_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ var getAllFilters = []*collectsub.CollectEntryFilter{
Type: collectsub.CollectDataType_DATATYPE_GITHUB_RELEASE,
Glob: "*",
},
{
Type: collectsub.CollectDataType_DATATYPE_OCI_REGISTRY,
Glob: "*",
},
}

/*
Expand Down
125 changes: 125 additions & 0 deletions cmd/guacone/cmd/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cmd
import (
"context"
"fmt"
"net"
"net/http"
"os"
"time"
Expand Down Expand Up @@ -45,6 +46,15 @@ type ociOptions struct {
queryLicenseOnIngestion bool
}

type ociRegistryOptions struct {
graphqlEndpoint string
headerFile string
dataSource datasource.CollectSource
csubClientOptions csub_client.CsubClientOptions
queryVulnOnIngestion bool
queryLicenseOnIngestion bool
}

var ociCmd = &cobra.Command{
Use: "image [flags] image_path1 image_path2...",
Short: "takes images to download sbom and attestation stored in OCI to add to GUAC graph, this command talks directly to the graphQL endpoint",
Expand Down Expand Up @@ -120,6 +130,81 @@ var ociCmd = &cobra.Command{
},
}

var ociRegistryCmd = &cobra.Command{
Use: "registry [flags] registry",
Short: "takes an OCI registry with catalog capability and downloads sbom and attestation stored in OCI to add to GUAC graph",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
opts, err := validateOCIRegistryFlags(
viper.GetString("gql-addr"),
viper.GetString("header-file"),
viper.GetString("csub-addr"),
viper.GetBool("csub-tls"),
viper.GetBool("csub-tls-skip-verify"),
viper.GetBool("add-vuln-on-ingest"),
viper.GetBool("add-license-on-ingest"),
args)
if err != nil {
fmt.Printf("unable to validate flags: %v\n", err)
_ = cmd.Help()
os.Exit(1)
}

ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

// Register collector
ociRegistryCollector := oci.NewOCIRegistryCollector(ctx, opts.dataSource, false, 30*time.Second)
err = collector.RegisterDocumentCollector(ociRegistryCollector, oci.OCIRegistryCollector)
if err != nil {
logger.Errorf("unable to register oci collector: %v", err)
}

// initialize collectsub client
csubClient, err := csub_client.NewClient(opts.csubClientOptions)
if err != nil {
logger.Infof("collectsub client initialization failed, this ingestion will not pull in any additional data through the collectsub service: %v", err)
csubClient = nil
} else {
defer csubClient.Close()
}

totalNum := 0
gotErr := false
// Set emit function to go through the entire pipeline
emit := func(d *processor.Document) error {
totalNum += 1
_, err := ingestor.Ingest(ctx, d, opts.graphqlEndpoint, transport, csubClient, opts.queryVulnOnIngestion, opts.queryLicenseOnIngestion)

if err != nil {
gotErr = true
return fmt.Errorf("unable to ingest document: %w", err)
}
return nil
}

// Collect
errHandler := func(err error) bool {
if err == nil {
logger.Info("collector ended gracefully")
return true
}
logger.Errorf("collector ended with error: %v", err)
return false
}
if err := collector.Collect(ctx, emit, errHandler); err != nil {
logger.Fatal(err)
}

if gotErr {
logger.Fatalf("completed ingestion with errors")
} else {
logger.Infof("completed ingesting %v documents", totalNum)
}
},
}

func validateOCIFlags(gqlEndpoint, headerFile, csubAddr string, csubTls, csubTlsSkipVerify bool,
queryVulnIngestion bool, queryLicenseIngestion bool, args []string) (ociOptions, error) {
var opts ociOptions
Expand Down Expand Up @@ -157,6 +242,46 @@ func validateOCIFlags(gqlEndpoint, headerFile, csubAddr string, csubTls, csubTls
return opts, nil
}

func validateOCIRegistryFlags(gqlEndpoint, headerFile, csubAddr string, csubTls, csubTlsSkipVerify bool,
queryVulnIngestion bool, queryLicenseIngestion bool, args []string) (ociRegistryOptions, error) {
var opts ociRegistryOptions
opts.graphqlEndpoint = gqlEndpoint
opts.headerFile = headerFile
opts.queryVulnOnIngestion = queryVulnIngestion
opts.queryLicenseOnIngestion = queryLicenseIngestion

csubOpts, err := csub_client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
if err != nil {
return opts, fmt.Errorf("unable to validate csub client flags: %w", err)
}
opts.csubClientOptions = csubOpts

if len(args) < 1 {
return opts, fmt.Errorf("expected positional argument(s) for registr(y|ies)")
}
sources := []datasource.Source{}
for _, arg := range args {
// Min check to validate registry by resolving hostname
_, err := net.LookupHost(arg)
if err != nil {
return opts, fmt.Errorf("registry parsing error. require format registry:port")
}
sources = append(sources, datasource.Source{
Value: arg,
})
}

opts.dataSource, err = inmemsource.NewInmemDataSources(&datasource.DataSources{
OciRegistryDataSources: sources,
})
if err != nil {
return opts, err
}

return opts, nil
}

func init() {
collectCmd.AddCommand(ociCmd)
collectCmd.AddCommand(ociRegistryCmd)
}
70 changes: 37 additions & 33 deletions pkg/collectsub/collectsub/collectsub.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7f7e751

Please sign in to comment.