Skip to content

Commit

Permalink
Merge branch 'main' into mysql-conn-test
Browse files Browse the repository at this point in the history
  • Loading branch information
terzioglub authored Jan 28, 2025
2 parents e3eff51 + 6a70def commit 155fef7
Show file tree
Hide file tree
Showing 40 changed files with 1,663 additions and 636 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: '16'
node-version: '18'


- name: Install dependencies
Expand Down
6 changes: 3 additions & 3 deletions cmd/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func Format(isDebug *bool) *cli.Command {
changedAssetpaths = append(changedAssetpaths, assetPath)
}
} else {
asset, err := DefaultPipelineBuilder.CreateAssetFromFile(assetPath)
asset, err := DefaultPipelineBuilder.CreateAssetFromFile(assetPath, nil)
if err != nil {
logger.Debugf("failed to process path '%s': %v", assetPath, err)
return
Expand Down Expand Up @@ -165,7 +165,7 @@ func Format(isDebug *bool) *cli.Command {
}

func formatAsset(path string) (*pipeline.Asset, error) {
asset, err := DefaultPipelineBuilder.CreateAssetFromFile(path)
asset, err := DefaultPipelineBuilder.CreateAssetFromFile(path, nil)
if err != nil {
return nil, errors2.Wrap(err, "failed to build the asset")
}
Expand All @@ -184,7 +184,7 @@ func shouldFileChange(path string) (bool, error) {
normalizedOriginalContent := normalizeLineEndings(originalContent)

// Create the asset
asset, err := DefaultPipelineBuilder.CreateAssetFromFile(path)
asset, err := DefaultPipelineBuilder.CreateAssetFromFile(path, nil)
if err != nil {
return false, errors2.Wrap(err, "failed to build the asset")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func PatchAsset() *cli.Command {
return cli.Exit("", 1)
}

asset, err := DefaultPipelineBuilder.CreateAssetFromFile(assetPath)
asset, err := DefaultPipelineBuilder.CreateAssetFromFile(assetPath, nil)
if err != nil {
printErrorJSON(errors2.Wrap(err, "failed to create asset from the given path"))
return cli.Exit("", 1)
Expand Down
10 changes: 5 additions & 5 deletions cmd/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func Render() *cli.Command {
return cli.Exit("", 1)
}

asset, err := DefaultPipelineBuilder.CreateAssetFromFile(inputPath)
asset, err := DefaultPipelineBuilder.CreateAssetFromFile(inputPath, pl)
if err != nil {
printError(err, c.String("output"), "Failed to read the asset definition file:")
return cli.Exit("", 1)
Expand Down Expand Up @@ -177,7 +177,7 @@ func Render() *cli.Command {
output: c.String("output"),
}

return r.Run(inputPath)
return r.Run(inputPath, pl)
},
Before: telemetry.BeforeCommand,
After: telemetry.AfterCommand,
Expand All @@ -193,7 +193,7 @@ type queryMaterializer interface {
}

type taskCreator interface {
CreateAssetFromFile(path string) (*pipeline.Asset, error)
CreateAssetFromFile(path string, foundPipeline *pipeline.Pipeline) (*pipeline.Asset, error)
}

type RenderCommand struct {
Expand All @@ -205,15 +205,15 @@ type RenderCommand struct {
writer io.Writer
}

func (r *RenderCommand) Run(taskPath string) error {
func (r *RenderCommand) Run(taskPath string, foundPipeline *pipeline.Pipeline) error {
defer RecoverFromPanic()

if taskPath == "" {
r.printErrorOrJsonf("Please give an asset path to render: bruin render <path to the asset file>)\n")
return cli.Exit("", 1)
}

task, err := r.builder.CreateAssetFromFile(taskPath)
task, err := r.builder.CreateAssetFromFile(taskPath, foundPipeline)
if err != nil {
r.printErrorOrJsonf("Failed to build asset: %v\n", err.Error())
return cli.Exit("", 1)
Expand Down
18 changes: 9 additions & 9 deletions cmd/render_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ type mockBuilder struct {
mock.Mock
}

func (m *mockBuilder) CreateAssetFromFile(path string) (*pipeline.Asset, error) {
called := m.Called(path)
func (m *mockBuilder) CreateAssetFromFile(path string, foundPipeline *pipeline.Pipeline) (*pipeline.Asset, error) {
called := m.Called(path, foundPipeline)
if called.Get(0) == nil {
return nil, called.Error(1)
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestRenderCommand_Run(t *testing.T) {
taskPath: "/path/to/asset",
},
setup: func(f *fields) {
f.builder.On("CreateAssetFromFile", "/path/to/asset").
f.builder.On("CreateAssetFromFile", "/path/to/asset", mock.Anything).
Return(nil, assert.AnError)
},
wantErr: assert.Error,
Expand All @@ -111,7 +111,7 @@ func TestRenderCommand_Run(t *testing.T) {
taskPath: "/path/to/asset",
},
setup: func(f *fields) {
f.builder.On("CreateAssetFromFile", "/path/to/asset").
f.builder.On("CreateAssetFromFile", "/path/to/asset", mock.Anything).
Return(nil, nil)
},
wantErr: assert.Error,
Expand All @@ -122,7 +122,7 @@ func TestRenderCommand_Run(t *testing.T) {
taskPath: "/path/to/asset",
},
setup: func(f *fields) {
f.builder.On("CreateAssetFromFile", "/path/to/asset").
f.builder.On("CreateAssetFromFile", "/path/to/asset", mock.Anything).
Return(bqAsset, nil)

f.extractor.On("ExtractQueriesFromString", bqAsset.ExecutableFile.Content).
Expand All @@ -136,7 +136,7 @@ func TestRenderCommand_Run(t *testing.T) {
taskPath: "/path/to/asset",
},
setup: func(f *fields) {
f.builder.On("CreateAssetFromFile", "/path/to/asset").
f.builder.On("CreateAssetFromFile", "/path/to/asset", mock.Anything).
Return(bqAsset, nil)

f.extractor.On("ExtractQueriesFromString", bqAsset.ExecutableFile.Content).
Expand All @@ -153,7 +153,7 @@ func TestRenderCommand_Run(t *testing.T) {
taskPath: "/path/to/asset",
},
setup: func(f *fields) {
f.builder.On("CreateAssetFromFile", "/path/to/asset").
f.builder.On("CreateAssetFromFile", "/path/to/asset", mock.Anything).
Return(bqAsset, nil)

f.extractor.On("ExtractQueriesFromString", bqAsset.ExecutableFile.Content).
Expand All @@ -173,7 +173,7 @@ func TestRenderCommand_Run(t *testing.T) {
taskPath: "/path/to/asset",
},
setup: func(f *fields) {
f.builder.On("CreateAssetFromFile", "/path/to/asset").
f.builder.On("CreateAssetFromFile", "/path/to/asset", mock.Anything).
Return(nonBqAsset, nil)

f.extractor.On("ExtractQueriesFromString", nonBqAsset.ExecutableFile.Content).
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestRenderCommand_Run(t *testing.T) {
writer: f.writer,
}

tt.wantErr(t, render.Run(tt.args.taskPath))
tt.wantErr(t, render.Run(tt.args.taskPath, nil))
f.extractor.AssertExpectations(t)
f.bqMaterializer.AssertExpectations(t)
f.builder.AssertExpectations(t)
Expand Down
41 changes: 24 additions & 17 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func Run(isDebug *bool) *cli.Command {

var task *pipeline.Asset
if pipelineInfo.RunningForAnAsset {
task, err = DefaultPipelineBuilder.CreateAssetFromFile(inputPath)
task, err = DefaultPipelineBuilder.CreateAssetFromFile(inputPath, pipelineInfo.Pipeline)
if err != nil {
errorPrinter.Printf("Failed to build asset: %v\n", err)
return cli.Exit("", 1)
Expand Down Expand Up @@ -400,23 +400,8 @@ func GetPipeline(inputPath string, runConfig *scheduler.RunConfig, logger *zap.S

var task *pipeline.Asset
runDownstreamTasks := false
if runningForAnAsset {
task, err = DefaultPipelineBuilder.CreateAssetFromFile(inputPath)
if err != nil {
errorPrinter.Printf("Failed to build asset: %v. Are you sure you used the correct path?\n", err.Error())
return &PipelineInfo{
RunningForAnAsset: runningForAnAsset,
RunDownstreamTasks: runDownstreamTasks,
}, err
}
if task == nil {
errorPrinter.Printf("The given file path doesn't seem to be a Bruin task definition: '%s'\n", inputPath)
return &PipelineInfo{
RunningForAnAsset: runningForAnAsset,
RunDownstreamTasks: runDownstreamTasks,
}, err
}

if runningForAnAsset {
pipelinePath, err = path.GetPipelineRootFromTask(inputPath, pipelineDefinitionFiles)
if err != nil {
errorPrinter.Printf("Failed to find the pipeline this task belongs to: '%s'\n", inputPath)
Expand Down Expand Up @@ -453,6 +438,28 @@ func GetPipeline(inputPath string, runConfig *scheduler.RunConfig, logger *zap.S
}, err
}

if runningForAnAsset {
task, err = DefaultPipelineBuilder.CreateAssetFromFile(inputPath, foundPipeline)
if err != nil {
errorPrinter.Printf("Failed to build asset: %v. Are you sure you used the correct path?\n", err.Error())
return &PipelineInfo{
RunningForAnAsset: runningForAnAsset,
RunDownstreamTasks: runDownstreamTasks,
Pipeline: foundPipeline,
Config: cm,
}, err
}
if task == nil {
errorPrinter.Printf("The given file path doesn't seem to be a Bruin task definition: '%s'\n", inputPath)
return &PipelineInfo{
RunningForAnAsset: runningForAnAsset,
RunDownstreamTasks: runDownstreamTasks,
Pipeline: foundPipeline,
Config: cm,
}, err
}
}

return &PipelineInfo{
Pipeline: foundPipeline,
RunningForAnAsset: runningForAnAsset,
Expand Down
17 changes: 17 additions & 0 deletions docs/getting-started/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,22 @@ When you run a pipeline, Bruin will find this file in the repo root, parse the c
Default connections are top-level defaults that reduces repetition by stating what connections to use on types of assets.
For instance, a pipeline might have SQL queries that run on Google BigQuery or Snowflake, and based on the type of an asset Bruin picks the appropriate connection.

## Defaults

Defaults allow you to set baseline values for a pipeline, which apply to all assets within it. If an asset has its own specified value, it will take precedence over the default. Otherwise, the default value will be used. below is an example of a default:

```yaml
name: bruin-init
schedule: daily
default:
type: ingestr
parameters:
source_connection: chess-default
destination: duckdb
secrets:
- key: KEY1
inject_as: INJECTED1
```

## Sensors
Sensors are a special type of assets that are used to wait on certain external signals. Sensors are useful to wait on external signals such as a table being created in an external database, or a file being uploaded to S3. A common usecase for sensors is when there are datasets/files/tables that are created by a separate process and you need to wait for them to be created before running your assets.
16 changes: 15 additions & 1 deletion integration-tests/.bruin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ environments:
generic:
- name: KEY1
value: value1

env-run-with-filters:
connections:
duckdb:
Expand Down Expand Up @@ -98,6 +97,21 @@ environments:
generic:
- name: KEY1
value: value1
env-run-default-option:
connections:
duckdb:
- name: "duckdb-env-run-default-option"
path: "duckdb-files/env-run-default-option.db"
chess:
- name: "chess-run-default-option"
players:
- "MagnusCarlsen"
- "Hikaru"
generic:
- name: KEY1
value: value1
- name: KEY2
value: value2
env-run-malformed-sql:
connections:
duckdb:
Expand Down
Loading

0 comments on commit 155fef7

Please sign in to comment.