Skip to content

Commit

Permalink
Merge branch 'main' into docs-default
Browse files Browse the repository at this point in the history
  • Loading branch information
y-bruin authored Feb 3, 2025
2 parents 437cd18 + 271ead7 commit 80eb032
Show file tree
Hide file tree
Showing 18 changed files with 543 additions and 42 deletions.
16 changes: 16 additions & 0 deletions docs/assets/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ image: python:3.11
print('hello world')
```

## Environment Variables
Bruin introduces a set of environment variables by default to every Python asset.

The following environment variables are available in every Python asset execution:

* `BRUIN_START_DATE`: The start date of the pipeline run in `YYYY-MM-DD` format (e.g. `2024-01-15`)
* `BRUIN_START_DATETIME`: The start date and time of the pipeline run in `YYYY-MM-DDThh:mm:ss` format (e.g. `2024-01-15T13:45:30`)
* `BRUIN_START_TIMESTAMP`: The start timestamp of the pipeline run in RFC3339 format with timezone (e.g. `2024-01-15T13:45:30.000000Z07:00`)
* `BRUIN_END_DATE`: The end date of the pipeline run in `YYYY-MM-DD` format (e.g. `2024-01-15`)
* `BRUIN_END_DATETIME`: The end date and time of the pipeline run in `YYYY-MM-DDThh:mm:ss` format (e.g. `2024-01-15T13:45:30`)
* `BRUIN_END_TIMESTAMP`: The end timestamp of the pipeline run in RFC3339 format with timezone (e.g. `2024-01-15T13:45:30.000000Z07:00`)
* `BRUIN_RUN_ID`: The unique identifier for the pipeline run
* `BRUIN_PIPELINE`: The name of the pipeline being executed
* `BRUIN_FULL_REFRESH`: Set to `1` when the pipeline is running with the `--full-refresh` flag, empty otherwise



## Materialization - Beta

Expand Down
2 changes: 1 addition & 1 deletion docs/assets/seed.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Seed Assets
Seeds are CSV-files that contain data that is prepared outside of your pipeline that will be loaded into your data platform. Bruin supports seed assets natively, allowing you to simply drop a CSV file in your pipeline and ensuring the data is loaded to the destination platform accurately.

You can define seed assets in a file ending with `.yaml`:
You can define seed assets in a file ending with `.asset.yaml`:
```yaml
name: dashboard.hello
type: duckdb.seed
Expand Down
3 changes: 2 additions & 1 deletion docs/platforms/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ connections:
host: "some-clickhouse-host.somedomain.com"
port: 9000
database: "dev" #Optional for other assets, uneffective when using ClickHouse as an ingestr destination, as ingestr takes the database name from the asset file.
http_port: 8123 #Only specify if you are using clickhouse as ingestr destination, by default it is 8123
http_port: 8443 #Only specify if you are using clickhouse as ingestr destination, by default it is 8443
secure: 1 #Only specify if you are using clickhouse as ingestr destination, by default, it is set to 1 (secure). Use 0 for a non-secure connection and 1 for a secure connection.
```
## Ingestr Assets:
After adding connection in `bruin.yml`. To ingest data to clickhouse, you need to create an [asset configuration](/assets/ingestr#asset-structure) file. This file defines the data flow from the source to the destination. Create a YAML file (e.g., stripe_ingestion.yml) inside the assets folder and add the following content:
Expand Down
6 changes: 5 additions & 1 deletion integration-tests/expected_connections_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@
},
"http_port": {
"type": "integer"
},
"secure": {
"type": "integer"
}
},
"additionalProperties": false,
Expand All @@ -211,7 +214,8 @@
"host",
"port",
"database",
"http_port"
"http_port",
"secure"
]
},
"Connections": {
Expand Down
22 changes: 18 additions & 4 deletions pkg/clickhouse/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package clickhouse

import (
"fmt"
"net/url"
"strconv"

click_house "github.com/ClickHouse/clickhouse-go/v2"
)
Expand All @@ -13,6 +15,7 @@ type Config struct {
Port int
Database string
HTTPPort int
Secure *int
}

func (c *Config) ToClickHouseOptions() *click_house.Options {
Expand All @@ -28,10 +31,21 @@ func (c *Config) ToClickHouseOptions() *click_house.Options {
}

func (c *Config) GetIngestrURI() string {
uri := url.URL{
Scheme: "clickhouse",
User: url.UserPassword(c.Username, c.Password),
Host: fmt.Sprintf("%s:%d", c.Host, c.Port),
}
query := url.Values{}

if c.HTTPPort != 0 {
//nolint:nosprintfhostport
return fmt.Sprintf("clickhouse://%s:%s@%s:%d?http_port=%d", c.Username, c.Password, c.Host, c.Port, c.HTTPPort)
query.Set("http_port", strconv.Itoa(c.HTTPPort))
}
if c.Secure != nil {
query.Set("secure", strconv.Itoa(*c.Secure))
}
//nolint:nosprintfhostport
return fmt.Sprintf("clickhouse://%s:%s@%s:%d", c.Username, c.Password, c.Host, c.Port)

uri.RawQuery = query.Encode()

return uri.String()
}
1 change: 1 addition & 0 deletions pkg/config/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ type ClickHouseConnection struct {
Port int `yaml:"port" json:"port" mapstructure:"port"`
Database string `yaml:"database" json:"database" mapstructure:"database"`
HTTPPort int `yaml:"http_port" json:"http_port" mapstructure:"http_port"`
Secure *int `yaml:"secure" json:"secure" mapstructure:"secure"`
}

func (c ClickHouseConnection) GetName() string {
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func TestLoadFromFile(t *testing.T) {
servicefile = "/path/to/service_account.json"
}

clickhouseSecureValue := 0

devEnv := Environment{
Connections: &Connections{
GoogleCloudPlatform: []GoogleCloudPlatformConnection{
Expand Down Expand Up @@ -224,6 +226,7 @@ func TestLoadFromFile(t *testing.T) {
Path: duckPath,
},
},

ClickHouse: []ClickHouseConnection{
{
Name: "conn-clickhouse",
Expand All @@ -233,6 +236,7 @@ func TestLoadFromFile(t *testing.T) {
Password: "clickhousepass",
Database: "clickhousedb",
HTTPPort: 8124,
Secure: &clickhouseSecureValue,
},
},
Hubspot: []HubspotConnection{
Expand Down
1 change: 1 addition & 0 deletions pkg/config/testdata/simple.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ environments:
password: "clickhousepass"
database: "clickhousedb"
http_port: 8124
secure: 0
gcs:
- name: "gcs-1"
service_account_file: "/path/to/service_account.json"
Expand Down
1 change: 1 addition & 0 deletions pkg/config/testdata/simple_win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ environments:
password: "clickhousepass"
database: "clickhousedb"
http_port: 8124
secure: 0
gcs:
- name: "gcs-1"
service_account_file: "/path/to/service_account.json"
Expand Down
1 change: 1 addition & 0 deletions pkg/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,7 @@ func (m *Manager) AddClickHouseConnectionFromConfig(connection *config.ClickHous
Password: connection.Password,
Database: connection.Database,
HTTPPort: connection.HTTPPort,
Secure: connection.Secure,
})
if err != nil {
return err
Expand Down
20 changes: 9 additions & 11 deletions pkg/jinja/jinja.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,15 @@ func NewRenderer(context Context) *Renderer {

func PythonEnvVariables(startDate, endDate *time.Time, pipelineName, runID string, fullRefresh bool) map[string]string {
vars := map[string]string{
"BRUIN_START_DATE": startDate.Format("2006-01-02"),
"BRUIN_START_DATE_NODASH": startDate.Format("20060102"),
"BRUIN_START_DATETIME": startDate.Format("2006-01-02T15:04:05"),
"BRUIN_START_TIMESTAMP": startDate.Format("2006-01-02T15:04:05.000000Z07:00"),
"BRUIN_END_DATE": endDate.Format("2006-01-02"),
"BRUIN_END_DATE_NODASH": endDate.Format("20060102"),
"BRUIN_END_DATETIME": endDate.Format("2006-01-02T15:04:05"),
"BRUIN_END_TIMESTAMP": endDate.Format("2006-01-02T15:04:05.000000Z07:00"),
"BRUIN_RUN_ID": runID,
"BRUIN_PIPELINE": pipelineName,
"BRUIN_FULL_REFRESH": "",
"BRUIN_START_DATE": startDate.Format("2006-01-02"),
"BRUIN_START_DATETIME": startDate.Format("2006-01-02T15:04:05"),
"BRUIN_START_TIMESTAMP": startDate.Format("2006-01-02T15:04:05.000000Z07:00"),
"BRUIN_END_DATE": endDate.Format("2006-01-02"),
"BRUIN_END_DATETIME": endDate.Format("2006-01-02T15:04:05"),
"BRUIN_END_TIMESTAMP": endDate.Format("2006-01-02T15:04:05.000000Z07:00"),
"BRUIN_RUN_ID": runID,
"BRUIN_PIPELINE": pipelineName,
"BRUIN_FULL_REFRESH": "",
}

if fullRefresh {
Expand Down
19 changes: 14 additions & 5 deletions pkg/pipeline/lineage.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,11 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset
if upstream.Table == asset.Name {
continue
}

upstreamAsset := foundPipeline.GetAssetByName(upstream.Table)
if upstreamAsset == nil {
if err := p.addColumnToAsset(asset, lineageCol.Name, nil, &Column{
Name: upstream.Column,
Name: lineageCol.Name,
Type: lineageCol.Type,
Checks: []ColumnCheck{},
Upstreams: []*UpstreamColumn{
Expand All @@ -211,7 +212,7 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset
upstreamCol := upstreamAsset.GetColumnWithName(upstream.Column)
if upstreamCol == nil {
upstreamCol = &Column{
Name: upstream.Column,
Name: lineageCol.Name,
Type: lineageCol.Type,
Checks: []ColumnCheck{},
Upstreams: []*UpstreamColumn{
Expand All @@ -232,14 +233,22 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset

// addColumnToAsset adds a new column to the asset based on upstream information.
func (p *LineageExtractor) addColumnToAsset(asset *Asset, colName string, upstreamAsset *Asset, upstreamCol *Column) error {
if asset == nil || upstreamCol == nil || colName == "" {
if asset == nil || colName == "" {
return errors.New("invalid arguments: all parameters must be non-nil and colName must not be empty")
}

if colName == "*" {
return nil
}

if upstreamAsset == nil {
existingCol := asset.GetColumnWithName(strings.ToLower(upstreamCol.Name))
if existingCol == nil {
asset.Columns = append(asset.Columns, *upstreamCol)
return nil
}
return nil
}
existingCol := asset.GetColumnWithName(colName)
if existingCol != nil {
if len(existingCol.Description) == 0 {
Expand All @@ -254,6 +263,7 @@ func (p *LineageExtractor) addColumnToAsset(asset *Asset, colName string, upstre
newUpstream := UpstreamColumn{
Column: upstreamCol.Name,
}

if upstreamAsset != nil {
newUpstream.Table = upstreamAsset.Name
}
Expand Down Expand Up @@ -293,8 +303,7 @@ func (p *LineageExtractor) addColumnToAsset(asset *Asset, colName string, upstre
// upstreamExists checks if a given upstream already exists in the list.
func upstreamExists(upstreams []*UpstreamColumn, newUpstream UpstreamColumn) bool {
for _, existingUpstream := range upstreams {
if strings.EqualFold(existingUpstream.Column, newUpstream.Column) &&
strings.EqualFold(existingUpstream.Table, newUpstream.Table) {
if strings.EqualFold(existingUpstream.Column, newUpstream.Column) {
return true
}
}
Expand Down
74 changes: 73 additions & 1 deletion pkg/pipeline/lineage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func testJoinsAndComplexQueries(t *testing.T) {
},
{
Name: "total_amount",
Type: "int64",
Type: "float64",
Description: "Total order amount",
Upstreams: []*UpstreamColumn{
{Column: "quantity", Table: "orders"},
Expand Down Expand Up @@ -694,6 +694,78 @@ func testJoinsAndComplexQueries(t *testing.T) {

func testAdvancedSQLFeatures(t *testing.T) {
tests := []TestCase{
{
name: "snowflake column name with as",
pipeline: &Pipeline{
Assets: []*Asset{
{
Name: "sales_summary",
Type: "bq.sql",
ExecutableFile: ExecutableFile{
Content: `
SELECT
order_date as order,
COUNT(DISTINCT customer_id) as unique_customers,
SUM(amount) as total_sales,
AVG(amount) as avg_sale,
NOW() as report_generated_at
FROM raw_sales
GROUP BY DATE_TRUNC(order_date, MONTH)
`,
},
Upstreams: []Upstream{{Value: "raw_sales"}},
},
{
Name: "raw_sales",
Type: "bq.sql",
ExecutableFile: ExecutableFile{
Content: "SELECT * FROM data_sales",
},
Columns: []Column{
{Name: "order_date", Type: "timestamp", Description: "Order timestamp"},
{Name: "customer_id", Type: "int64", Description: "Customer identifier"},
{Name: "amount", Type: "float64", Description: "Sale amount"},
},
},
},
},
after: &Pipeline{
Assets: []*Asset{
{
Name: "sales_summary",
ExecutableFile: ExecutableFile{
Content: `
SELECT
order_date as order,
COUNT(DISTINCT customer_id) as unique_customers,
SUM(amount) as total_sales,
AVG(amount) as avg_sale,
NOW() as report_generated_at
FROM raw_sales
GROUP BY DATE_TRUNC(order_date, MONTH)
`,
},
Columns: []Column{
{Name: "order", Type: "timestamp", Description: "Order timestamp", Upstreams: []*UpstreamColumn{{Column: "order_date", Table: "raw_sales"}}},
{Name: "unique_customers", Type: "int64", Description: "Customer identifier", Upstreams: []*UpstreamColumn{{Column: "customer_id", Table: "raw_sales"}}},
{Name: "total_sales", Type: "float64", Description: "Sale amount", Upstreams: []*UpstreamColumn{{Column: "amount", Table: "raw_sales"}}},
{Name: "avg_sale", Type: "float64", Description: "Sale amount", Upstreams: []*UpstreamColumn{{Column: "amount", Table: "raw_sales"}}},
{Name: "report_generated_at", Type: "UNKNOWN", Description: "Report generated at", Upstreams: []*UpstreamColumn{{}}},
},
Upstreams: []Upstream{{Value: "raw_sales"}},
},
{
Name: "raw_sales",
Columns: []Column{
{Name: "order_date", Type: "timestamp", Description: "Order timestamp"},
{Name: "customer_id", Type: "int64", Description: "Customer identifier"},
{Name: "amount", Type: "float64", Description: "Sale amount"},
},
},
},
},
want: nil,
},
{
name: "advanced SQL functions and aggregations",
pipeline: &Pipeline{
Expand Down
2 changes: 1 addition & 1 deletion pkg/python/uv.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var AvailablePythonVersions = map[string]bool{
const (
UvVersion = "0.5.0"
pythonVersionForIngestr = "3.11"
ingestrVersion = "0.13.0"
ingestrVersion = "0.13.2"
)

// UvChecker handles checking and installing the uv package manager.
Expand Down
8 changes: 6 additions & 2 deletions pkg/snowflake/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (db *DB) PushColumnDescriptions(ctx context.Context, asset *pipeline.Asset)
if col.Description != "" && existingComments[col.Name] != col.Description {
query := fmt.Sprintf(
`ALTER TABLE %s.%s.%s MODIFY COLUMN %s COMMENT '%s'`,
db.config.Database, schemaName, tableName, col.Name, col.Description,
db.config.Database, schemaName, tableName, col.Name, escapeSQLString(col.Description),
)
updateQueries = append(updateQueries, query)
}
Expand All @@ -344,7 +344,7 @@ func (db *DB) PushColumnDescriptions(ctx context.Context, asset *pipeline.Asset)
if asset.Description != "" {
updateTableQuery := fmt.Sprintf(
`COMMENT ON TABLE %s.%s.%s IS '%s'`,
db.config.Database, schemaName, tableName, asset.Description,
db.config.Database, schemaName, tableName, escapeSQLString(asset.Description),
)
if err := db.RunQueryWithoutResult(ctx, &query.Query{Query: updateTableQuery}); err != nil {
return errors.Wrap(err, "failed to update table description")
Expand All @@ -353,3 +353,7 @@ func (db *DB) PushColumnDescriptions(ctx context.Context, asset *pipeline.Asset)

return nil
}

func escapeSQLString(s string) string {
return strings.ReplaceAll(s, "'", "''") // Escape single quotes for SQL safety
}
Loading

0 comments on commit 80eb032

Please sign in to comment.