From 65d3d8c7d0d3956351cc020571c56e0afd22122e Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Mon, 10 Jun 2024 19:11:41 +0100 Subject: [PATCH 01/15] feat: add read task in bigquery component --- data/bigquery/v0/config/definition.json | 5 +- data/bigquery/v0/config/tasks.json | 48 ++++++++++++ data/bigquery/v0/main.go | 23 ++++++ data/bigquery/v0/read.go | 99 +++++++++++++++++++++++++ 4 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 data/bigquery/v0/read.go diff --git a/data/bigquery/v0/config/definition.json b/data/bigquery/v0/config/definition.json index dd5ca63f..a6554d4a 100644 --- a/data/bigquery/v0/config/definition.json +++ b/data/bigquery/v0/config/definition.json @@ -1,6 +1,7 @@ { - "availableTasks": [ - "TASK_INSERT" + "available_tasks": [ + "TASK_INSERT", + "TASK_READ" ], "custom": false, "documentationUrl": "https://www.instill.tech/docs/component/data/bigquery", diff --git a/data/bigquery/v0/config/tasks.json b/data/bigquery/v0/config/tasks.json index f77865d9..67d6baf2 100644 --- a/data/bigquery/v0/config/tasks.json +++ b/data/bigquery/v0/config/tasks.json @@ -50,5 +50,53 @@ "title": "Output", "type": "object" } + }, + "TASK_READ": { + "instillShortDescription": "Read data from BigQuery.", + "input": { + "instillUIOrder": 0, + "properties": { + "data": { + "additionalProperties": false, + "instillShortDescription": "The data to be inserted to BigQuery", + "description": "The data to be inserted to BigQuery", + "instillUIOrder": 0, + "patternProperties": { + "^[a-z_][-a-z_0-9]{0,31}$": { + "instillAcceptFormats": [ + "*" + ], + "instillUIOrder": 0, + "instillUpstreamTypes": [ + "reference", + "template" + ], + "title": "Data" + } + }, + "required": [], + "title": "Data", + "type": "object" + } + }, + "required": [], + "title": "Input", + "type": "object" + }, + "output": { + "instillUIOrder": 0, + "properties": { + "status": { + "description": "Status of the upload operation", + "instillFormat": "string", + "instillUIOrder": 0, + "title": "Status", + "type": "string" + } + }, + "required": [], + "title": "Output", + "type": "object" + } } } diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index c0ce8a5b..d9661e01 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -17,6 +17,7 @@ import ( const ( taskInsert = "TASK_INSERT" + taskRead = "TASK_READ" ) //go:embed config/definition.json @@ -102,6 +103,28 @@ func (e *execution) Execute(ctx context.Context, inputs []*structpb.Struct) ([]* return nil, err } output = &structpb.Struct{Fields: map[string]*structpb.Value{"status": {Kind: &structpb.Value_StringValue{StringValue: "success"}}}} + case taskRead: + + inputStruct := ReadInput{ + ProjectID: getProjectID(e.Setup), + DatasetID: getDatasetID(e.Setup), + TableName: getTableName(e.Setup), + Client: client, + } + fmt.Println("inputStruct", inputStruct) + err := base.ConvertFromStructpb(input, &inputStruct) + if err != nil { + return nil, err + } + outputStruct, err := readDataFromBigQuery(inputStruct) + if err != nil { + return nil, err + } + output, err = base.ConvertToStructpb(outputStruct) + if err != nil { + return nil, err + } + default: return nil, fmt.Errorf("unsupported task: %s", e.Task) } diff --git a/data/bigquery/v0/read.go b/data/bigquery/v0/read.go new file mode 100644 index 00000000..706f4909 --- /dev/null +++ b/data/bigquery/v0/read.go @@ -0,0 +1,99 @@ +package bigquery + +import ( + "context" + + "cloud.google.com/go/bigquery" + "google.golang.org/api/iterator" +) + +type ReadInput struct { + ProjectID string + DatasetID string + TableName string + Client *bigquery.Client + // If SelectColumns is empty, all columns will be selected + SelectColumns []string + QueryParameter map[string]any +} + +type ReadOutput struct { + Data []map[string]any +} + +func queryBuilder(input ReadInput) string { + sql := "SELECT " + + if len(input.SelectColumns) == 0 { + sql = sql + "*" + } + + for idx, column := range input.SelectColumns { + sql += column + if idx < len(input.SelectColumns)-1 { + sql += ", " + } + } + + sql += " FROM " + input.ProjectID + "." + input.DatasetID + "." + input.TableName + + keys := make([]string, len(input.QueryParameter)) + for k := range input.QueryParameter { + keys = append(keys, k) + } + + if len(input.QueryParameter) > 0 { + sql += " WHERE " + for idx, key := range keys { + sql += key + " = @" + key + if idx < len(keys)-1 { + sql += " AND " + } + } + } + + return sql +} + +func readDataFromBigQuery(input ReadInput) (ReadOutput, error) { + + ctx := context.Background() + client := input.Client + + sql := queryBuilder(input) + q := client.Query(sql) + var queryParameter []bigquery.QueryParameter + for key, value := range input.QueryParameter { + queryParameter = append(queryParameter, bigquery.QueryParameter{Name: key, Value: value}) + } + + q.Parameters = queryParameter + it, err := q.Read(ctx) + if err != nil { + return ReadOutput{}, err + } + + result := []map[string]any{} + for { + var values []bigquery.Value + err := it.Next(&values) + + if err == nil { + data := map[string]any{} + for idx, value := range values { + data[input.SelectColumns[idx]] = value + } + result = append(result, data) + } + + if err == iterator.Done { + break + } + if err != nil { + return ReadOutput{}, err + } + + } + + return ReadOutput{Data: result}, nil +} From edf37519a1d08f4555daf1622e59bdd28198a57c Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Tue, 11 Jun 2024 20:18:23 +0100 Subject: [PATCH 02/15] feat: add functions to init dynamic schema --- data/bigquery/v0/config/tasks.json | 59 ++++-------- data/bigquery/v0/main.go | 142 ++++++++++++++++++++++++++++- data/bigquery/v0/read.go | 62 +++---------- 3 files changed, 169 insertions(+), 94 deletions(-) diff --git a/data/bigquery/v0/config/tasks.json b/data/bigquery/v0/config/tasks.json index 67d6baf2..f9024026 100644 --- a/data/bigquery/v0/config/tasks.json +++ b/data/bigquery/v0/config/tasks.json @@ -5,31 +5,16 @@ "instillUIOrder": 0, "properties": { "data": { - "additionalProperties": false, + "additionalProperties": true, "instillShortDescription": "The data to be inserted to BigQuery", "description": "The data to be inserted to BigQuery", "instillUIOrder": 0, - "patternProperties": { - "^[a-z_][-a-z_0-9]{0,31}$": { - "instillAcceptFormats": [ - "*" - ], - "instillUIOrder": 0, - "instillUpstreamTypes": [ - "reference", - "template" - ], - "title": "Data" - } - }, "required": [], "title": "Data", "type": "object" } }, - "required": [ - "data" - ], + "required": [], "title": "Input", "type": "object" }, @@ -56,27 +41,13 @@ "input": { "instillUIOrder": 0, "properties": { - "data": { - "additionalProperties": false, - "instillShortDescription": "The data to be inserted to BigQuery", - "description": "The data to be inserted to BigQuery", + "filtering": { + "instillShortDescription": "The filter to be applied to the data", + "description": "The filter to be applied to the data", "instillUIOrder": 0, - "patternProperties": { - "^[a-z_][-a-z_0-9]{0,31}$": { - "instillAcceptFormats": [ - "*" - ], - "instillUIOrder": 0, - "instillUpstreamTypes": [ - "reference", - "template" - ], - "title": "Data" - } - }, "required": [], - "title": "Data", - "type": "object" + "title": "Filtering", + "type": "string" } }, "required": [], @@ -86,12 +57,18 @@ "output": { "instillUIOrder": 0, "properties": { - "status": { - "description": "Status of the upload operation", - "instillFormat": "string", + "data": { + "instillShortDescription": "The data to be read from BigQuery", + "description": "The data to be read from BigQuery", "instillUIOrder": 0, - "title": "Status", - "type": "string" + "items": { + "title": "Data item", + "type": "object", + "required": [] + }, + "required": [], + "title": "Data", + "type": "array" } }, "required": [], diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index d9661e01..9b7e4864 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -9,7 +9,10 @@ import ( "sync" "cloud.google.com/go/bigquery" + pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta" + "google.golang.org/api/iterator" "google.golang.org/api/option" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" "github.com/instill-ai/component/base" @@ -20,6 +23,8 @@ const ( taskRead = "TASK_READ" ) +var instillUpstreamTypes = []string{"value", "reference", "template"} + //go:embed config/definition.json var definitionJSON []byte @@ -111,7 +116,6 @@ func (e *execution) Execute(ctx context.Context, inputs []*structpb.Struct) ([]* TableName: getTableName(e.Setup), Client: client, } - fmt.Println("inputStruct", inputStruct) err := base.ConvertFromStructpb(input, &inputStruct) if err != nil { return nil, err @@ -120,7 +124,7 @@ func (e *execution) Execute(ctx context.Context, inputs []*structpb.Struct) ([]* if err != nil { return nil, err } - output, err = base.ConvertToStructpb(outputStruct) + output, err = base.ConvertToStructpb(outputStruct) if err != nil { return nil, err } @@ -145,3 +149,137 @@ func (c *component) Test(sysVars map[string]any, setup *structpb.Struct) error { } return errors.New("project ID does not match") } + +type TableColumns struct { + TableName string + Columns []Column +} + +type Column struct { + Name string + Type string +} + +func (c *component) GetDefinition(sysVars map[string]any, compConfig *base.ComponentConfig) (*pb.ComponentDefinition, error) { + + ctx := context.Background() + oriDef, err := c.Component.GetDefinition(nil, nil) + if err != nil { + return nil, err + } + + if sysVars == nil && compConfig == nil { + return oriDef, nil + } + + def := proto.Clone(oriDef).(*pb.ComponentDefinition) + client, err := NewClient(compConfig.Setup["json_key"].(string), compConfig.Setup["project_id"].(string)) + if err != nil || client == nil { + return nil, fmt.Errorf("error creating BigQuery client: %v", err) + } + defer client.Close() + + myDataset := client.Dataset(compConfig.Setup["dataset_id"].(string)) + tables, err := constructTableColumns(myDataset, ctx, compConfig) + if err != nil { + return nil, err + } + + tableProperties, err := constructTableProperties(tables) + if err != nil { + return nil, err + } + + // TODO: chuang8511, remove table from definition.json and make it dynamic. + tableProperty := tableProperties[0] + for _, sch := range def.Spec.ComponentSpecification.Fields["oneOf"].GetListValue().Values { + data := sch.GetStructValue().Fields["properties"].GetStructValue().Fields["input"].GetStructValue().Fields["properties"].GetStructValue().Fields["data"].GetStructValue() + if data != nil { + data.Fields["properties"] = structpb.NewStructValue(tableProperty) + } + } + + for _, dataSpec := range def.Spec.DataSpecifications { + dataInput := dataSpec.Input.Fields["properties"].GetStructValue().Fields["data"].GetStructValue() + if dataInput != nil { + dataInput.Fields["properties"] = structpb.NewStructValue(tableProperty) + } + dataOutput := dataSpec.Output.Fields["properties"].GetStructValue().Fields["data"].GetStructValue() + + if dataOutput != nil { + aPieceData := dataOutput.Fields["items"].GetStructValue() + if aPieceData != nil { + aPieceData.Fields["properties"] = structpb.NewStructValue(tableProperty) + } + + } + } + + return def, nil +} + +func constructTableColumns(myDataset *bigquery.Dataset, ctx context.Context, compConfig *base.ComponentConfig) ([]TableColumns, error) { + tableIT := myDataset.Tables(ctx) + tables := []TableColumns{} + for { + table, err := tableIT.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, err + } + tableName := table.TableID + tableDetail := myDataset.Table(tableName) + metadata, err := tableDetail.Metadata(ctx) + if err != nil { + return nil, err + } + schema := metadata.Schema + columns := []Column{} + for _, field := range schema { + columns = append(columns, Column{Name: field.Name, Type: string(field.Type)}) + } + + // TODO: chuang8511, remove table from definition.json and make it dynamic. + if compConfig.Setup["table_name"].(string) == tableName { + tables = append(tables, TableColumns{TableName: tableName, Columns: columns}) + } + } + return tables, nil +} + +func constructTableProperties(tables []TableColumns) ([]*structpb.Struct, error) { + tableProperties := make([]*structpb.Struct, len(tables)) + + for idx, table := range tables { + propertiesMap := make(map[string]interface{}) + for _, column := range table.Columns { + propertiesMap[column.Name] = map[string]interface{}{ + "instillAcceptFormats": []string{getInstillAcceptFormat(column.Type)}, + "instillUpstreamTypes": instillUpstreamTypes, + "type": getInstillAcceptFormat(column.Type), + } + } + propertyStructPB, err := base.ConvertToStructpb(propertiesMap) + if err != nil { + return nil, err + } + + tableProperties[idx] = propertyStructPB + } + return tableProperties, nil +} + +func getInstillAcceptFormat(tableType string) string { + switch tableType { + case "STRING": + return "string" + case "INTEGER": + return "integer" + case "BOOLEAN": + return "boolean" + default: + return "string" + } +} diff --git a/data/bigquery/v0/read.go b/data/bigquery/v0/read.go index 706f4909..cfbc86e7 100644 --- a/data/bigquery/v0/read.go +++ b/data/bigquery/v0/read.go @@ -2,6 +2,7 @@ package bigquery import ( "context" + "fmt" "cloud.google.com/go/bigquery" "google.golang.org/api/iterator" @@ -12,9 +13,7 @@ type ReadInput struct { DatasetID string TableName string Client *bigquery.Client - // If SelectColumns is empty, all columns will be selected - SelectColumns []string - QueryParameter map[string]any + Filtering string } type ReadOutput struct { @@ -22,37 +21,10 @@ type ReadOutput struct { } func queryBuilder(input ReadInput) string { - sql := "SELECT " - - if len(input.SelectColumns) == 0 { - sql = sql + "*" + if input.Filtering == "" { + return fmt.Sprintf("SELECT * FROM `%s.%s.%s`", input.ProjectID, input.DatasetID, input.TableName) } - - for idx, column := range input.SelectColumns { - sql += column - if idx < len(input.SelectColumns)-1 { - sql += ", " - } - } - - sql += " FROM " + input.ProjectID + "." + input.DatasetID + "." + input.TableName - - keys := make([]string, len(input.QueryParameter)) - for k := range input.QueryParameter { - keys = append(keys, k) - } - - if len(input.QueryParameter) > 0 { - sql += " WHERE " - for idx, key := range keys { - sql += key + " = @" + key - if idx < len(keys)-1 { - sql += " AND " - } - } - } - - return sql + return fmt.Sprintf("SELECT * FROM `%s.%s.%s` %s", input.ProjectID, input.DatasetID, input.TableName, input.Filtering) } func readDataFromBigQuery(input ReadInput) (ReadOutput, error) { @@ -62,37 +34,25 @@ func readDataFromBigQuery(input ReadInput) (ReadOutput, error) { sql := queryBuilder(input) q := client.Query(sql) - var queryParameter []bigquery.QueryParameter - for key, value := range input.QueryParameter { - queryParameter = append(queryParameter, bigquery.QueryParameter{Name: key, Value: value}) - } - - q.Parameters = queryParameter it, err := q.Read(ctx) if err != nil { return ReadOutput{}, err } - result := []map[string]any{} for { var values []bigquery.Value err := it.Next(&values) - if err == nil { - data := map[string]any{} - for idx, value := range values { - data[input.SelectColumns[idx]] = value - } - result = append(result, data) - } - if err == iterator.Done { break } - if err != nil { - return ReadOutput{}, err + data := map[string]any{} + + for i, schema := range it.Schema { + data[schema.Name] = values[i] } - + + result = append(result, data) } return ReadOutput{Data: result}, nil From 497abd259e9ef13ae66492b631311b610e4971d5 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Wed, 12 Jun 2024 15:46:50 +0100 Subject: [PATCH 03/15] feat: finish read task implementation in bigquery component --- data/bigquery/v0/config/tasks.json | 11 +++++------ data/bigquery/v0/main.go | 11 ++++++++--- data/bigquery/v0/read.go | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/data/bigquery/v0/config/tasks.json b/data/bigquery/v0/config/tasks.json index f9024026..0e62b077 100644 --- a/data/bigquery/v0/config/tasks.json +++ b/data/bigquery/v0/config/tasks.json @@ -56,22 +56,21 @@ }, "output": { "instillUIOrder": 0, + "description": "The data to be read from BigQuery", "properties": { "data": { - "instillShortDescription": "The data to be read from BigQuery", "description": "The data to be read from BigQuery", "instillUIOrder": 0, + "title": "Data", + "type": "array", "items": { "title": "Data item", "type": "object", "required": [] - }, - "required": [], - "title": "Data", - "type": "array" + } } }, - "required": [], + "required": ["data"], "title": "Output", "type": "object" } diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index 9b7e4864..637bba8d 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -124,7 +124,7 @@ func (e *execution) Execute(ctx context.Context, inputs []*structpb.Struct) ([]* if err != nil { return nil, err } - output, err = base.ConvertToStructpb(outputStruct) + output, err = base.ConvertToStructpb(outputStruct) if err != nil { return nil, err } @@ -191,6 +191,7 @@ func (c *component) GetDefinition(sysVars map[string]any, compConfig *base.Compo } // TODO: chuang8511, remove table from definition.json and make it dynamic. + // It will be change before 2024-06-26. tableProperty := tableProperties[0] for _, sch := range def.Spec.ComponentSpecification.Fields["oneOf"].GetListValue().Values { data := sch.GetStructValue().Fields["properties"].GetStructValue().Fields["input"].GetStructValue().Fields["properties"].GetStructValue().Fields["data"].GetStructValue() @@ -254,10 +255,14 @@ func constructTableProperties(tables []TableColumns) ([]*structpb.Struct, error) for idx, table := range tables { propertiesMap := make(map[string]interface{}) - for _, column := range table.Columns { + for idx, column := range table.Columns { propertiesMap[column.Name] = map[string]interface{}{ - "instillAcceptFormats": []string{getInstillAcceptFormat(column.Type)}, + "title": column.Name, + "instillUIOrder": idx, + "description": "Column " + column.Name + " of table " + table.TableName, + "instillFormat": getInstillAcceptFormat(column.Type), "instillUpstreamTypes": instillUpstreamTypes, + "required": []string{}, "type": getInstillAcceptFormat(column.Type), } } diff --git a/data/bigquery/v0/read.go b/data/bigquery/v0/read.go index cfbc86e7..c33117c1 100644 --- a/data/bigquery/v0/read.go +++ b/data/bigquery/v0/read.go @@ -17,7 +17,7 @@ type ReadInput struct { } type ReadOutput struct { - Data []map[string]any + Data []map[string]any `json:"data"` } func queryBuilder(input ReadInput) string { From 9bd550cba050db8d97b74ecc439cec527d8f0754 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Wed, 12 Jun 2024 18:05:43 +0100 Subject: [PATCH 04/15] chore: add todo for future development --- data/bigquery/v0/main.go | 2 +- data/bigquery/v0/main_test.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 data/bigquery/v0/main_test.go diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index 637bba8d..2e6a765b 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -191,7 +191,7 @@ func (c *component) GetDefinition(sysVars map[string]any, compConfig *base.Compo } // TODO: chuang8511, remove table from definition.json and make it dynamic. - // It will be change before 2024-06-26. + // It will be changed before 2024-06-26. tableProperty := tableProperties[0] for _, sch := range def.Spec.ComponentSpecification.Fields["oneOf"].GetListValue().Values { data := sch.GetStructValue().Fields["properties"].GetStructValue().Fields["input"].GetStructValue().Fields["properties"].GetStructValue().Fields["data"].GetStructValue() diff --git a/data/bigquery/v0/main_test.go b/data/bigquery/v0/main_test.go new file mode 100644 index 00000000..8393e7f6 --- /dev/null +++ b/data/bigquery/v0/main_test.go @@ -0,0 +1,3 @@ +// TODO: chuang8511, add test code +// It will be done before 2024-06-26. +package bigquery \ No newline at end of file From a89daf6705eca6b561591d360cae3edde4b56f75 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Thu, 13 Jun 2024 09:42:31 +0100 Subject: [PATCH 05/15] chore: add description in document --- data/bigquery/v0/README.mdx | 16 +++++++++++++++- data/bigquery/v0/config/tasks.json | 2 +- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/data/bigquery/v0/README.mdx b/data/bigquery/v0/README.mdx index 2225c451..ab4cde17 100644 --- a/data/bigquery/v0/README.mdx +++ b/data/bigquery/v0/README.mdx @@ -9,6 +9,7 @@ The BigQuery component is a data component that allows users to insert data to B It can carry out the following tasks: - [Insert](#insert) +- [Read](#read) ## Release Stage @@ -36,8 +37,21 @@ Insert data to BigQuery. | Input | ID | Type | Description | | :--- | :--- | :--- | :--- | | Task ID (required) | `task` | string | `TASK_INSERT` | -| Data (required) | `data` | object | The data to be inserted to BigQuery | +| Data | `data` | object | The data to be inserted to BigQuery | | Output | ID | Type | Description | | :--- | :--- | :--- | :--- | | Status | `status` | string | Status of the upload operation | + +### Read + +Read data from BigQuery. + +| Input | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Task ID (required) | `task` | string | `TASK_READ` | +| Filtering | `filtering` | string | The filter to be applied to the data, please start with where clause | + +| Output | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Data | `data` | array[object] | The data to be read from BigQuery | diff --git a/data/bigquery/v0/config/tasks.json b/data/bigquery/v0/config/tasks.json index 0e62b077..9e3ebc36 100644 --- a/data/bigquery/v0/config/tasks.json +++ b/data/bigquery/v0/config/tasks.json @@ -43,7 +43,7 @@ "properties": { "filtering": { "instillShortDescription": "The filter to be applied to the data", - "description": "The filter to be applied to the data", + "description": "The filter to be applied to the data, please start with where clause", "instillUIOrder": 0, "required": [], "title": "Filtering", From 6ec3ef9178c4681a1972354ec8e3e69a8486c534 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Thu, 13 Jun 2024 17:46:06 +0100 Subject: [PATCH 06/15] feat: dynamically make kebab-case --- base/formats.go | 38 ++++++++++++++++++++++++++++++++++++++ base/formats_test.go | 29 +++++++++++++++++++++++++++++ data/bigquery/v0/main.go | 4 +++- 3 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 base/formats_test.go diff --git a/base/formats.go b/base/formats.go index 9612987c..9c70ed38 100644 --- a/base/formats.go +++ b/base/formats.go @@ -2,7 +2,9 @@ package base import ( "encoding/base64" + "regexp" "strings" + "unicode" "github.com/gabriel-vasile/mimetype" "github.com/santhosh-tekuri/jsonschema/v5" @@ -197,3 +199,39 @@ func TrimBase64Mime(b64 string) string { splitB64 := strings.Split(b64, ",") return splitB64[len(splitB64)-1] } + +type InstillDynamicFormatTransformer struct{} + +func (InstillDynamicFormatTransformer) ConvertToKebab(str string) string { + if strings.Index(str, "_") >= 0 { + re := regexp.MustCompile(`_`) + return strings.ToLower(re.ReplaceAllString(str, "-")) + } else if containsCapital(str) { + return camelToKebab(str) + } + return str +} + +func containsCapital(s string) bool { + for _, r := range s { + if unicode.IsUpper(r) { + return true + } + } + return false +} + +func camelToKebab(s string) string { + var result strings.Builder + for i, r := range s { + if unicode.IsUpper(r) { + if i != 0 { + result.WriteRune('-') + } + result.WriteRune(unicode.ToLower(r)) + } else { + result.WriteRune(r) + } + } + return result.String() +} diff --git a/base/formats_test.go b/base/formats_test.go new file mode 100644 index 00000000..b768bae1 --- /dev/null +++ b/base/formats_test.go @@ -0,0 +1,29 @@ +package base + +import ( + "testing" + + "github.com/frankban/quicktest" +) + +func TestConvertToKebab(t *testing.T) { + c := quicktest.New(t) + + transformer := InstillDynamicFormatTransformer{} + + tests := []struct { + input string + expected string + }{ + {"hello_world", "hello-world"}, + {"HelloWorld", "hello-world"}, + {"helloWorld", "hello-world"}, + {"HELLO_WORLD", "hello-world"}, + {"", ""}, + } + + for _, tt := range tests { + got := transformer.ConvertToKebab(tt.input) + c.Assert(got, quicktest.Equals, tt.expected) + } +} diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index 2e6a765b..4b9745d5 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -252,11 +252,13 @@ func constructTableColumns(myDataset *bigquery.Dataset, ctx context.Context, com func constructTableProperties(tables []TableColumns) ([]*structpb.Struct, error) { tableProperties := make([]*structpb.Struct, len(tables)) + transformer := base.InstillDynamicFormatTransformer{} for idx, table := range tables { propertiesMap := make(map[string]interface{}) for idx, column := range table.Columns { - propertiesMap[column.Name] = map[string]interface{}{ + kebabCaseColumnName := transformer.ConvertToKebab(column.Name) + propertiesMap[kebabCaseColumnName] = map[string]interface{}{ "title": column.Name, "instillUIOrder": idx, "description": "Column " + column.Name + " of table " + table.TableName, From 9592d81e19d64f7f58db743adafcaf7f15624423 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Thu, 13 Jun 2024 17:52:22 +0100 Subject: [PATCH 07/15] feat: handle no table found case --- data/bigquery/v0/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index 4b9745d5..3b0bb662 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -247,6 +247,9 @@ func constructTableColumns(myDataset *bigquery.Dataset, ctx context.Context, com tables = append(tables, TableColumns{TableName: tableName, Columns: columns}) } } + if len(tables) == 0 { + return nil, fmt.Errorf("table name is not found in the dataset") + } return tables, nil } From cb2743b9c8fac11798ec205aaa9faeb386b84aee Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Thu, 13 Jun 2024 17:55:35 +0100 Subject: [PATCH 08/15] fix: fix lint error --- base/formats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/base/formats.go b/base/formats.go index 9c70ed38..e4dae699 100644 --- a/base/formats.go +++ b/base/formats.go @@ -203,7 +203,7 @@ func TrimBase64Mime(b64 string) string { type InstillDynamicFormatTransformer struct{} func (InstillDynamicFormatTransformer) ConvertToKebab(str string) string { - if strings.Index(str, "_") >= 0 { + if strings.Contains(str, "_") { re := regexp.MustCompile(`_`) return strings.ToLower(re.ReplaceAllString(str, "-")) } else if containsCapital(str) { From 9783aa132d5870ce6ced9e2ae02de7c83a3dbccd Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Thu, 13 Jun 2024 18:18:34 +0100 Subject: [PATCH 09/15] fix: convert snake to kebab when searching input --- data/bigquery/v0/insert.go | 5 ++++- data/bigquery/v0/main.go | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/data/bigquery/v0/insert.go b/data/bigquery/v0/insert.go index d333fed2..a868a181 100644 --- a/data/bigquery/v0/insert.go +++ b/data/bigquery/v0/insert.go @@ -5,6 +5,7 @@ import ( "fmt" "cloud.google.com/go/bigquery" + "github.com/instill-ai/component/base" "google.golang.org/protobuf/types/known/structpb" ) @@ -31,8 +32,10 @@ func insertDataToBigQuery(projectID, datasetID, tableName string, valueSaver Dat func getDataSaver(input *structpb.Struct, schema bigquery.Schema) (DataSaver, error) { inputObj := input.GetFields()["data"].GetStructValue() dataMap := map[string]bigquery.Value{} + transformer := base.InstillDynamicFormatTransformer{} for _, sc := range schema { - dataMap[sc.Name] = inputObj.GetFields()[sc.Name].AsInterface() + kebabName := transformer.ConvertToKebab(sc.Name) + dataMap[sc.Name] = inputObj.GetFields()[kebabName].AsInterface() } return DataSaver{Schema: schema, DataMap: dataMap}, nil } diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index 3b0bb662..2e913d60 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -99,6 +99,7 @@ func (e *execution) Execute(ctx context.Context, inputs []*structpb.Struct) ([]* if err != nil { return nil, err } + fmt.Println("input", input) valueSaver, err := getDataSaver(input, metaData.Schema) if err != nil { return nil, err From 91cf866271760b7d54c94351e68ce209dfc2a3f0 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Thu, 13 Jun 2024 18:22:21 +0100 Subject: [PATCH 10/15] chore: add comment --- data/bigquery/v0/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index 2e913d60..8c02984a 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -244,6 +244,7 @@ func constructTableColumns(myDataset *bigquery.Dataset, ctx context.Context, com } // TODO: chuang8511, remove table from definition.json and make it dynamic. + // It will be changed before 2024-06-26. if compConfig.Setup["table_name"].(string) == tableName { tables = append(tables, TableColumns{TableName: tableName, Columns: columns}) } From acf8541e8bacd5620e43346ec02e58fcfb31b47b Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Mon, 17 Jun 2024 16:56:42 +0100 Subject: [PATCH 11/15] fix: revert bigquery schema customization --- base/formats.go | 38 -------------------------------------- base/formats_test.go | 29 ----------------------------- data/bigquery/v0/insert.go | 5 +---- data/bigquery/v0/main.go | 4 +--- 4 files changed, 2 insertions(+), 74 deletions(-) delete mode 100644 base/formats_test.go diff --git a/base/formats.go b/base/formats.go index e4dae699..9612987c 100644 --- a/base/formats.go +++ b/base/formats.go @@ -2,9 +2,7 @@ package base import ( "encoding/base64" - "regexp" "strings" - "unicode" "github.com/gabriel-vasile/mimetype" "github.com/santhosh-tekuri/jsonschema/v5" @@ -199,39 +197,3 @@ func TrimBase64Mime(b64 string) string { splitB64 := strings.Split(b64, ",") return splitB64[len(splitB64)-1] } - -type InstillDynamicFormatTransformer struct{} - -func (InstillDynamicFormatTransformer) ConvertToKebab(str string) string { - if strings.Contains(str, "_") { - re := regexp.MustCompile(`_`) - return strings.ToLower(re.ReplaceAllString(str, "-")) - } else if containsCapital(str) { - return camelToKebab(str) - } - return str -} - -func containsCapital(s string) bool { - for _, r := range s { - if unicode.IsUpper(r) { - return true - } - } - return false -} - -func camelToKebab(s string) string { - var result strings.Builder - for i, r := range s { - if unicode.IsUpper(r) { - if i != 0 { - result.WriteRune('-') - } - result.WriteRune(unicode.ToLower(r)) - } else { - result.WriteRune(r) - } - } - return result.String() -} diff --git a/base/formats_test.go b/base/formats_test.go deleted file mode 100644 index b768bae1..00000000 --- a/base/formats_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package base - -import ( - "testing" - - "github.com/frankban/quicktest" -) - -func TestConvertToKebab(t *testing.T) { - c := quicktest.New(t) - - transformer := InstillDynamicFormatTransformer{} - - tests := []struct { - input string - expected string - }{ - {"hello_world", "hello-world"}, - {"HelloWorld", "hello-world"}, - {"helloWorld", "hello-world"}, - {"HELLO_WORLD", "hello-world"}, - {"", ""}, - } - - for _, tt := range tests { - got := transformer.ConvertToKebab(tt.input) - c.Assert(got, quicktest.Equals, tt.expected) - } -} diff --git a/data/bigquery/v0/insert.go b/data/bigquery/v0/insert.go index a868a181..d333fed2 100644 --- a/data/bigquery/v0/insert.go +++ b/data/bigquery/v0/insert.go @@ -5,7 +5,6 @@ import ( "fmt" "cloud.google.com/go/bigquery" - "github.com/instill-ai/component/base" "google.golang.org/protobuf/types/known/structpb" ) @@ -32,10 +31,8 @@ func insertDataToBigQuery(projectID, datasetID, tableName string, valueSaver Dat func getDataSaver(input *structpb.Struct, schema bigquery.Schema) (DataSaver, error) { inputObj := input.GetFields()["data"].GetStructValue() dataMap := map[string]bigquery.Value{} - transformer := base.InstillDynamicFormatTransformer{} for _, sc := range schema { - kebabName := transformer.ConvertToKebab(sc.Name) - dataMap[sc.Name] = inputObj.GetFields()[kebabName].AsInterface() + dataMap[sc.Name] = inputObj.GetFields()[sc.Name].AsInterface() } return DataSaver{Schema: schema, DataMap: dataMap}, nil } diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index 8c02984a..dfcae78c 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -257,13 +257,11 @@ func constructTableColumns(myDataset *bigquery.Dataset, ctx context.Context, com func constructTableProperties(tables []TableColumns) ([]*structpb.Struct, error) { tableProperties := make([]*structpb.Struct, len(tables)) - transformer := base.InstillDynamicFormatTransformer{} for idx, table := range tables { propertiesMap := make(map[string]interface{}) for idx, column := range table.Columns { - kebabCaseColumnName := transformer.ConvertToKebab(column.Name) - propertiesMap[kebabCaseColumnName] = map[string]interface{}{ + propertiesMap[column.Name] = map[string]interface{}{ "title": column.Name, "instillUIOrder": idx, "description": "Column " + column.Name + " of table " + table.TableName, From db4a90b533a0044898f18d056232b605a5ba1db2 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Mon, 17 Jun 2024 17:16:54 +0100 Subject: [PATCH 12/15] fix: fix conflict error --- data/bigquery/v0/config/definition.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/bigquery/v0/config/definition.json b/data/bigquery/v0/config/definition.json index a6554d4a..62436095 100644 --- a/data/bigquery/v0/config/definition.json +++ b/data/bigquery/v0/config/definition.json @@ -1,5 +1,5 @@ { - "available_tasks": [ + "availableTasks": [ "TASK_INSERT", "TASK_READ" ], From 7f1503a86c8239c9d699c84e554199ac3233f504 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Mon, 17 Jun 2024 17:24:45 +0100 Subject: [PATCH 13/15] chore: take out the unnecessary parts --- data/bigquery/v0/config/tasks.json | 4 +++- data/bigquery/v0/main.go | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/data/bigquery/v0/config/tasks.json b/data/bigquery/v0/config/tasks.json index 9e3ebc36..abaf3a69 100644 --- a/data/bigquery/v0/config/tasks.json +++ b/data/bigquery/v0/config/tasks.json @@ -14,7 +14,9 @@ "type": "object" } }, - "required": [], + "required": [ + "data" + ], "title": "Input", "type": "object" }, diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index dfcae78c..cee0ed39 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -99,7 +99,6 @@ func (e *execution) Execute(ctx context.Context, inputs []*structpb.Struct) ([]* if err != nil { return nil, err } - fmt.Println("input", input) valueSaver, err := getDataSaver(input, metaData.Schema) if err != nil { return nil, err From 71fa8fe1ee19d2b582fe173d8f29401ac7ed01ba Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Mon, 17 Jun 2024 17:25:42 +0100 Subject: [PATCH 14/15] chore: update document --- data/bigquery/v0/README.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/bigquery/v0/README.mdx b/data/bigquery/v0/README.mdx index ab4cde17..a5ee12c3 100644 --- a/data/bigquery/v0/README.mdx +++ b/data/bigquery/v0/README.mdx @@ -37,7 +37,7 @@ Insert data to BigQuery. | Input | ID | Type | Description | | :--- | :--- | :--- | :--- | | Task ID (required) | `task` | string | `TASK_INSERT` | -| Data | `data` | object | The data to be inserted to BigQuery | +| Data (required) | `data` | object | The data to be inserted to BigQuery | | Output | ID | Type | Description | | :--- | :--- | :--- | :--- | From 3f82148cca9d3657fcfe70b8c63540fb3fe38844 Mon Sep 17 00:00:00 2001 From: chuang8511 Date: Wed, 19 Jun 2024 12:47:04 +0100 Subject: [PATCH 15/15] fix: fix bug after recipe revamp --- data/bigquery/v0/config/tasks.json | 4 +--- data/bigquery/v0/main.go | 9 +++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/data/bigquery/v0/config/tasks.json b/data/bigquery/v0/config/tasks.json index abaf3a69..9e3ebc36 100644 --- a/data/bigquery/v0/config/tasks.json +++ b/data/bigquery/v0/config/tasks.json @@ -14,9 +14,7 @@ "type": "object" } }, - "required": [ - "data" - ], + "required": [], "title": "Input", "type": "object" }, diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index cee0ed39..29a054c7 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -168,18 +168,18 @@ func (c *component) GetDefinition(sysVars map[string]any, compConfig *base.Compo return nil, err } - if sysVars == nil && compConfig == nil { + if compConfig == nil { return oriDef, nil } def := proto.Clone(oriDef).(*pb.ComponentDefinition) - client, err := NewClient(compConfig.Setup["json_key"].(string), compConfig.Setup["project_id"].(string)) + client, err := NewClient(compConfig.Setup["json-key"].(string), compConfig.Setup["project-id"].(string)) if err != nil || client == nil { return nil, fmt.Errorf("error creating BigQuery client: %v", err) } defer client.Close() - myDataset := client.Dataset(compConfig.Setup["dataset_id"].(string)) + myDataset := client.Dataset(compConfig.Setup["dataset-id"].(string)) tables, err := constructTableColumns(myDataset, ctx, compConfig) if err != nil { return nil, err @@ -244,7 +244,7 @@ func constructTableColumns(myDataset *bigquery.Dataset, ctx context.Context, com // TODO: chuang8511, remove table from definition.json and make it dynamic. // It will be changed before 2024-06-26. - if compConfig.Setup["table_name"].(string) == tableName { + if compConfig.Setup["table-name"].(string) == tableName { tables = append(tables, TableColumns{TableName: tableName, Columns: columns}) } } @@ -266,6 +266,7 @@ func constructTableProperties(tables []TableColumns) ([]*structpb.Struct, error) "description": "Column " + column.Name + " of table " + table.TableName, "instillFormat": getInstillAcceptFormat(column.Type), "instillUpstreamTypes": instillUpstreamTypes, + "instillAcceptFormats": []string{getInstillAcceptFormat(column.Type)}, "required": []string{}, "type": getInstillAcceptFormat(column.Type), }