diff --git a/data/bigquery/v0/README.mdx b/data/bigquery/v0/README.mdx index 2225c451..a5ee12c3 100644 --- a/data/bigquery/v0/README.mdx +++ b/data/bigquery/v0/README.mdx @@ -9,6 +9,7 @@ The BigQuery component is a data component that allows users to insert data to B It can carry out the following tasks: - [Insert](#insert) +- [Read](#read) ## Release Stage @@ -41,3 +42,16 @@ Insert data to BigQuery. | Output | ID | Type | Description | | :--- | :--- | :--- | :--- | | Status | `status` | string | Status of the upload operation | + +### Read + +Read data from BigQuery. + +| Input | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Task ID (required) | `task` | string | `TASK_READ` | +| Filtering | `filtering` | string | The filter to be applied to the data, please start with where clause | + +| Output | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Data | `data` | array[object] | The data to be read from BigQuery | diff --git a/data/bigquery/v0/config/definition.json b/data/bigquery/v0/config/definition.json index dd5ca63f..62436095 100644 --- a/data/bigquery/v0/config/definition.json +++ b/data/bigquery/v0/config/definition.json @@ -1,6 +1,7 @@ { "availableTasks": [ - "TASK_INSERT" + "TASK_INSERT", + "TASK_READ" ], "custom": false, "documentationUrl": "https://www.instill.tech/docs/component/data/bigquery", diff --git a/data/bigquery/v0/config/tasks.json b/data/bigquery/v0/config/tasks.json index f77865d9..9e3ebc36 100644 --- a/data/bigquery/v0/config/tasks.json +++ b/data/bigquery/v0/config/tasks.json @@ -5,31 +5,16 @@ "instillUIOrder": 0, "properties": { "data": { - "additionalProperties": false, + "additionalProperties": true, "instillShortDescription": "The data to be inserted to BigQuery", "description": "The data to be inserted to BigQuery", "instillUIOrder": 0, - "patternProperties": { - "^[a-z_][-a-z_0-9]{0,31}$": { - "instillAcceptFormats": [ - "*" - ], - "instillUIOrder": 0, - "instillUpstreamTypes": [ - "reference", - "template" - ], - "title": "Data" - } - }, "required": [], "title": "Data", "type": "object" } }, - "required": [ - "data" - ], + "required": [], "title": "Input", "type": "object" }, @@ -50,5 +35,44 @@ "title": "Output", "type": "object" } + }, + "TASK_READ": { + "instillShortDescription": "Read data from BigQuery.", + "input": { + "instillUIOrder": 0, + "properties": { + "filtering": { + "instillShortDescription": "The filter to be applied to the data", + "description": "The filter to be applied to the data, please start with where clause", + "instillUIOrder": 0, + "required": [], + "title": "Filtering", + "type": "string" + } + }, + "required": [], + "title": "Input", + "type": "object" + }, + "output": { + "instillUIOrder": 0, + "description": "The data to be read from BigQuery", + "properties": { + "data": { + "description": "The data to be read from BigQuery", + "instillUIOrder": 0, + "title": "Data", + "type": "array", + "items": { + "title": "Data item", + "type": "object", + "required": [] + } + } + }, + "required": ["data"], + "title": "Output", + "type": "object" + } } } diff --git a/data/bigquery/v0/main.go b/data/bigquery/v0/main.go index c0ce8a5b..29a054c7 100644 --- a/data/bigquery/v0/main.go +++ b/data/bigquery/v0/main.go @@ -9,7 +9,10 @@ import ( "sync" "cloud.google.com/go/bigquery" + pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta" + "google.golang.org/api/iterator" "google.golang.org/api/option" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" "github.com/instill-ai/component/base" @@ -17,8 +20,11 @@ import ( const ( taskInsert = "TASK_INSERT" + taskRead = "TASK_READ" ) +var instillUpstreamTypes = []string{"value", "reference", "template"} + //go:embed config/definition.json var definitionJSON []byte @@ -102,6 +108,27 @@ func (e *execution) Execute(ctx context.Context, inputs []*structpb.Struct) ([]* return nil, err } output = &structpb.Struct{Fields: map[string]*structpb.Value{"status": {Kind: &structpb.Value_StringValue{StringValue: "success"}}}} + case taskRead: + + inputStruct := ReadInput{ + ProjectID: getProjectID(e.Setup), + DatasetID: getDatasetID(e.Setup), + TableName: getTableName(e.Setup), + Client: client, + } + err := base.ConvertFromStructpb(input, &inputStruct) + if err != nil { + return nil, err + } + outputStruct, err := readDataFromBigQuery(inputStruct) + if err != nil { + return nil, err + } + output, err = base.ConvertToStructpb(outputStruct) + if err != nil { + return nil, err + } + default: return nil, fmt.Errorf("unsupported task: %s", e.Task) } @@ -122,3 +149,147 @@ func (c *component) Test(sysVars map[string]any, setup *structpb.Struct) error { } return errors.New("project ID does not match") } + +type TableColumns struct { + TableName string + Columns []Column +} + +type Column struct { + Name string + Type string +} + +func (c *component) GetDefinition(sysVars map[string]any, compConfig *base.ComponentConfig) (*pb.ComponentDefinition, error) { + + ctx := context.Background() + oriDef, err := c.Component.GetDefinition(nil, nil) + if err != nil { + return nil, err + } + + if compConfig == nil { + return oriDef, nil + } + + def := proto.Clone(oriDef).(*pb.ComponentDefinition) + client, err := NewClient(compConfig.Setup["json-key"].(string), compConfig.Setup["project-id"].(string)) + if err != nil || client == nil { + return nil, fmt.Errorf("error creating BigQuery client: %v", err) + } + defer client.Close() + + myDataset := client.Dataset(compConfig.Setup["dataset-id"].(string)) + tables, err := constructTableColumns(myDataset, ctx, compConfig) + if err != nil { + return nil, err + } + + tableProperties, err := constructTableProperties(tables) + if err != nil { + return nil, err + } + + // TODO: chuang8511, remove table from definition.json and make it dynamic. + // It will be changed before 2024-06-26. + tableProperty := tableProperties[0] + for _, sch := range def.Spec.ComponentSpecification.Fields["oneOf"].GetListValue().Values { + data := sch.GetStructValue().Fields["properties"].GetStructValue().Fields["input"].GetStructValue().Fields["properties"].GetStructValue().Fields["data"].GetStructValue() + if data != nil { + data.Fields["properties"] = structpb.NewStructValue(tableProperty) + } + } + + for _, dataSpec := range def.Spec.DataSpecifications { + dataInput := dataSpec.Input.Fields["properties"].GetStructValue().Fields["data"].GetStructValue() + if dataInput != nil { + dataInput.Fields["properties"] = structpb.NewStructValue(tableProperty) + } + dataOutput := dataSpec.Output.Fields["properties"].GetStructValue().Fields["data"].GetStructValue() + + if dataOutput != nil { + aPieceData := dataOutput.Fields["items"].GetStructValue() + if aPieceData != nil { + aPieceData.Fields["properties"] = structpb.NewStructValue(tableProperty) + } + + } + } + + return def, nil +} + +func constructTableColumns(myDataset *bigquery.Dataset, ctx context.Context, compConfig *base.ComponentConfig) ([]TableColumns, error) { + tableIT := myDataset.Tables(ctx) + tables := []TableColumns{} + for { + table, err := tableIT.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, err + } + tableName := table.TableID + tableDetail := myDataset.Table(tableName) + metadata, err := tableDetail.Metadata(ctx) + if err != nil { + return nil, err + } + schema := metadata.Schema + columns := []Column{} + for _, field := range schema { + columns = append(columns, Column{Name: field.Name, Type: string(field.Type)}) + } + + // TODO: chuang8511, remove table from definition.json and make it dynamic. + // It will be changed before 2024-06-26. + if compConfig.Setup["table-name"].(string) == tableName { + tables = append(tables, TableColumns{TableName: tableName, Columns: columns}) + } + } + if len(tables) == 0 { + return nil, fmt.Errorf("table name is not found in the dataset") + } + return tables, nil +} + +func constructTableProperties(tables []TableColumns) ([]*structpb.Struct, error) { + tableProperties := make([]*structpb.Struct, len(tables)) + + for idx, table := range tables { + propertiesMap := make(map[string]interface{}) + for idx, column := range table.Columns { + propertiesMap[column.Name] = map[string]interface{}{ + "title": column.Name, + "instillUIOrder": idx, + "description": "Column " + column.Name + " of table " + table.TableName, + "instillFormat": getInstillAcceptFormat(column.Type), + "instillUpstreamTypes": instillUpstreamTypes, + "instillAcceptFormats": []string{getInstillAcceptFormat(column.Type)}, + "required": []string{}, + "type": getInstillAcceptFormat(column.Type), + } + } + propertyStructPB, err := base.ConvertToStructpb(propertiesMap) + if err != nil { + return nil, err + } + + tableProperties[idx] = propertyStructPB + } + return tableProperties, nil +} + +func getInstillAcceptFormat(tableType string) string { + switch tableType { + case "STRING": + return "string" + case "INTEGER": + return "integer" + case "BOOLEAN": + return "boolean" + default: + return "string" + } +} diff --git a/data/bigquery/v0/main_test.go b/data/bigquery/v0/main_test.go new file mode 100644 index 00000000..8393e7f6 --- /dev/null +++ b/data/bigquery/v0/main_test.go @@ -0,0 +1,3 @@ +// TODO: chuang8511, add test code +// It will be done before 2024-06-26. +package bigquery \ No newline at end of file diff --git a/data/bigquery/v0/read.go b/data/bigquery/v0/read.go new file mode 100644 index 00000000..c33117c1 --- /dev/null +++ b/data/bigquery/v0/read.go @@ -0,0 +1,59 @@ +package bigquery + +import ( + "context" + "fmt" + + "cloud.google.com/go/bigquery" + "google.golang.org/api/iterator" +) + +type ReadInput struct { + ProjectID string + DatasetID string + TableName string + Client *bigquery.Client + Filtering string +} + +type ReadOutput struct { + Data []map[string]any `json:"data"` +} + +func queryBuilder(input ReadInput) string { + if input.Filtering == "" { + return fmt.Sprintf("SELECT * FROM `%s.%s.%s`", input.ProjectID, input.DatasetID, input.TableName) + } + return fmt.Sprintf("SELECT * FROM `%s.%s.%s` %s", input.ProjectID, input.DatasetID, input.TableName, input.Filtering) +} + +func readDataFromBigQuery(input ReadInput) (ReadOutput, error) { + + ctx := context.Background() + client := input.Client + + sql := queryBuilder(input) + q := client.Query(sql) + it, err := q.Read(ctx) + if err != nil { + return ReadOutput{}, err + } + result := []map[string]any{} + for { + var values []bigquery.Value + err := it.Next(&values) + + if err == iterator.Done { + break + } + data := map[string]any{} + + for i, schema := range it.Schema { + data[schema.Name] = values[i] + } + + result = append(result, data) + } + + return ReadOutput{Data: result}, nil +}