Skip to content

Commit

Permalink
Patch references to UC schemas to capture dependencies automatically (#…
Browse files Browse the repository at this point in the history
…1989)

## Changes
Fixes #1977.  

This PR modifies the bundle configuration to capture the dependency that
a UC Volume or a DLT pipeline might have on a UC schema at deployment
time. It does so by replacing the schema name with a reference of the
form `${resources.schemas.foo.name}`.

For example:
The following UC Volume definition depends on the UC schema with the
name `schema_name`. This mutator converts this configuration

from:
```
resources:
  volumes:
    bar:
      catalog_name: catalog_name
      name: volume_name
      schema_name: schema_name

  schemas:
    foo:
      catalog_name: catalog_name
      name: schema_name
```

to:

```
resources:
  volumes:
    bar:
      catalog_name: catalog_name
      name: volume_name
      schema_name: ${resources.schemas.foo.name}`

  schemas:
    foo:
      catalog_name: catalog_name
      name: schema_name
```


## Tests
Unit tests and manually.
  • Loading branch information
shreyas-goenka authored Jan 16, 2025
1 parent fa87f22 commit f2bba63
Show file tree
Hide file tree
Showing 3 changed files with 379 additions and 0 deletions.
100 changes: 100 additions & 0 deletions bundle/config/mutator/capture_schema_dependency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package mutator

import (
"context"
"fmt"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/diag"
)

type captureSchemaDependency struct{}

// If a user defines a UC schema in the bundle, they can refer to it in DLT pipelines
// or UC Volumes using the `${resources.schemas.<schema_key>.name}` syntax. Using this
// syntax allows TF to capture the deploy time dependency this DLT pipeline or UC Volume
// has on the schema and deploy changes to the schema before deploying the pipeline or volume.
//
// This mutator translates any implicit schema references in DLT pipelines or UC Volumes
// to the explicit syntax.
func CaptureSchemaDependency() bundle.Mutator {
return &captureSchemaDependency{}
}

func (m *captureSchemaDependency) Name() string {
return "CaptureSchemaDependency"
}

func schemaNameRef(key string) string {
return fmt.Sprintf("${resources.schemas.%s.name}", key)
}

func findSchema(b *bundle.Bundle, catalogName, schemaName string) (string, *resources.Schema) {
if catalogName == "" || schemaName == "" {
return "", nil
}

for k, s := range b.Config.Resources.Schemas {
if s != nil && s.CreateSchema != nil && s.CatalogName == catalogName && s.Name == schemaName {
return k, s
}
}
return "", nil
}

func resolveVolume(v *resources.Volume, b *bundle.Bundle) {
if v == nil || v.CreateVolumeRequestContent == nil {
return
}
schemaK, schema := findSchema(b, v.CatalogName, v.SchemaName)
if schema == nil {
return
}

v.SchemaName = schemaNameRef(schemaK)
}

func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) {
if p == nil || p.PipelineSpec == nil {
return
}
if p.Schema == "" {
return
}
schemaK, schema := findSchema(b, p.Catalog, p.Schema)
if schema == nil {
return
}

p.Schema = schemaNameRef(schemaK)
}

func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) {
if p == nil || p.PipelineSpec == nil {
return
}
if p.Target == "" {
return
}
schemaK, schema := findSchema(b, p.Catalog, p.Target)
if schema == nil {
return
}
p.Target = schemaNameRef(schemaK)
}

func (m *captureSchemaDependency) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
for _, p := range b.Config.Resources.Pipelines {
// "schema" and "target" have the same semantics in the DLT API but are mutually
// exclusive i.e. only one can be set at a time. If schema is set, the pipeline
// is in direct publishing mode and can write tables to multiple schemas
// (vs target which is limited to a single schema).
resolvePipelineTarget(p, b)
resolvePipelineSchema(p, b)
}
for _, v := range b.Config.Resources.Volumes {
resolveVolume(v, b)
}
return nil
}
277 changes: 277 additions & 0 deletions bundle/config/mutator/capture_schema_dependency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package mutator

import (
"context"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCaptureSchemaDependencyForVolume(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": nil,
"emptyschema": {},
},
Volumes: map[string]*resources.Volume{
"volume1": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "foobar",
},
},
"volume2": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog2",
SchemaName: "foobar",
},
},
"volume3": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "barfoo",
},
},
"volume4": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalogX",
SchemaName: "foobar",
},
},
"volume5": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "schemaX",
},
},
"nilVolume": nil,
"emptyVolume": {},
},
},
},
}

d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)

assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Volumes["volume1"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Volumes["volume2"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Volumes["volume3"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "foobar", b.Config.Resources.Volumes["volume4"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "schemaX", b.Config.Resources.Volumes["volume5"].CreateVolumeRequestContent.SchemaName)

assert.Nil(t, b.Config.Resources.Volumes["nilVolume"])
assert.Nil(t, b.Config.Resources.Volumes["emptyVolume"].CreateVolumeRequestContent)
}

func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": nil,
"emptyschema": {},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "foobar",
},
},
"pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog2",
Schema: "foobar",
},
},
"pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "barfoo",
},
},
"pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalogX",
Schema: "foobar",
},
},
"pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "schemaX",
},
},
"pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Schema: "foobar",
},
},
"pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Schema: "",
Name: "whatever",
},
},
"nilPipeline": nil,
"emptyPipeline": {},
},
},
},
}

d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)

assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Pipelines["pipeline1"].Schema)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Pipelines["pipeline2"].Schema)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Pipelines["pipeline3"].Schema)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline4"].Schema)
assert.Equal(t, "schemaX", b.Config.Resources.Pipelines["pipeline5"].Schema)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline6"].Schema)
assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Schema)

assert.Nil(t, b.Config.Resources.Pipelines["nilPipeline"])
assert.Nil(t, b.Config.Resources.Pipelines["emptyPipeline"].PipelineSpec)

for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} {
assert.Empty(t, b.Config.Resources.Pipelines[k].Target)
}
}

func TestCaptureSchemaDependencyForPipelinesWithSchema(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": nil,
"emptyschema": {},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "foobar",
},
},
"pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog2",
Target: "foobar",
},
},
"pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "barfoo",
},
},
"pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalogX",
Target: "foobar",
},
},
"pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "schemaX",
},
},
"pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Target: "foobar",
},
},
"pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Target: "",
Name: "whatever",
},
},
},
},
},
}

d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)
assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Pipelines["pipeline1"].Target)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Pipelines["pipeline2"].Target)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Pipelines["pipeline3"].Target)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline4"].Target)
assert.Equal(t, "schemaX", b.Config.Resources.Pipelines["pipeline5"].Target)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline6"].Target)
assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Target)

for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} {
assert.Empty(t, b.Config.Resources.Pipelines[k].Schema)
}
}
2 changes: 2 additions & 0 deletions bundle/phases/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func Initialize() bundle.Mutator {
mutator.MergePipelineClusters(),
mutator.MergeApps(),

mutator.CaptureSchemaDependency(),

// Provide permission config errors & warnings after initializing all variables
permissions.PermissionDiagnostics(),
mutator.SetRunAs(),
Expand Down

0 comments on commit f2bba63

Please sign in to comment.