Skip to content
This repository has been archived by the owner on Jan 9, 2025. It is now read-only.

feat: add read task in bigquery component #156

Merged
merged 15 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions data/bigquery/v0/README.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The BigQuery component is a data component that allows users to insert data to B
It can carry out the following tasks:

- [Insert](#insert)
- [Read](#read)

## Release Stage

Expand Down Expand Up @@ -41,3 +42,16 @@ Insert data to BigQuery.
| Output | ID | Type | Description |
| :--- | :--- | :--- | :--- |
| Status | `status` | string | Status of the upload operation |

### Read

Read data from BigQuery.

| Input | ID | Type | Description |
| :--- | :--- | :--- | :--- |
| Task ID (required) | `task` | string | `TASK_READ` |
| Filtering | `filtering` | string | The filter to be applied to the data, please start with where clause |

| Output | ID | Type | Description |
| :--- | :--- | :--- | :--- |
| Data | `data` | array[object] | The data to be read from BigQuery |
3 changes: 2 additions & 1 deletion data/bigquery/v0/config/definition.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"availableTasks": [
"TASK_INSERT"
"TASK_INSERT",
"TASK_READ"
],
"custom": false,
"documentationUrl": "https://www.instill.tech/docs/component/data/bigquery",
Expand Down
58 changes: 41 additions & 17 deletions data/bigquery/v0/config/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,16 @@
"instillUIOrder": 0,
"properties": {
"data": {
"additionalProperties": false,
"additionalProperties": true,
"instillShortDescription": "The data to be inserted to BigQuery",
"description": "The data to be inserted to BigQuery",
"instillUIOrder": 0,
"patternProperties": {
"^[a-z_][-a-z_0-9]{0,31}$": {
"instillAcceptFormats": [
"*"
],
"instillUIOrder": 0,
"instillUpstreamTypes": [
"reference",
"template"
],
"title": "Data"
}
},
"required": [],
"title": "Data",
"type": "object"
}
},
"required": [
"data"
],
"required": [],
"title": "Input",
"type": "object"
},
Expand All @@ -50,5 +35,44 @@
"title": "Output",
"type": "object"
}
},
"TASK_READ": {
"instillShortDescription": "Read data from BigQuery.",
"input": {
"instillUIOrder": 0,
"properties": {
"filtering": {
"instillShortDescription": "The filter to be applied to the data",
"description": "The filter to be applied to the data, please start with where clause",
"instillUIOrder": 0,
"required": [],
"title": "Filtering",
"type": "string"
}
},
"required": [],
"title": "Input",
"type": "object"
},
"output": {
"instillUIOrder": 0,
"description": "The data to be read from BigQuery",
"properties": {
"data": {
"description": "The data to be read from BigQuery",
"instillUIOrder": 0,
"title": "Data",
"type": "array",
"items": {
"title": "Data item",
"type": "object",
"required": []
}
}
},
"required": ["data"],
"title": "Output",
"type": "object"
}
}
}
171 changes: 171 additions & 0 deletions data/bigquery/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@ import (
"sync"

"cloud.google.com/go/bigquery"
pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"github.com/instill-ai/component/base"
)

const (
taskInsert = "TASK_INSERT"
taskRead = "TASK_READ"
)

var instillUpstreamTypes = []string{"value", "reference", "template"}

//go:embed config/definition.json
var definitionJSON []byte

Expand Down Expand Up @@ -102,6 +108,27 @@ func (e *execution) Execute(ctx context.Context, inputs []*structpb.Struct) ([]*
return nil, err
}
output = &structpb.Struct{Fields: map[string]*structpb.Value{"status": {Kind: &structpb.Value_StringValue{StringValue: "success"}}}}
case taskRead:

inputStruct := ReadInput{
ProjectID: getProjectID(e.Setup),
DatasetID: getDatasetID(e.Setup),
TableName: getTableName(e.Setup),
Client: client,
}
err := base.ConvertFromStructpb(input, &inputStruct)
if err != nil {
return nil, err
}
outputStruct, err := readDataFromBigQuery(inputStruct)
if err != nil {
return nil, err
}
output, err = base.ConvertToStructpb(outputStruct)
if err != nil {
return nil, err
}

default:
return nil, fmt.Errorf("unsupported task: %s", e.Task)
}
Expand All @@ -122,3 +149,147 @@ func (c *component) Test(sysVars map[string]any, setup *structpb.Struct) error {
}
return errors.New("project ID does not match")
}

type TableColumns struct {
TableName string
Columns []Column
}

type Column struct {
Name string
Type string
}

func (c *component) GetDefinition(sysVars map[string]any, compConfig *base.ComponentConfig) (*pb.ComponentDefinition, error) {

ctx := context.Background()
oriDef, err := c.Component.GetDefinition(nil, nil)
if err != nil {
return nil, err
}

if compConfig == nil {
return oriDef, nil
}

def := proto.Clone(oriDef).(*pb.ComponentDefinition)
client, err := NewClient(compConfig.Setup["json-key"].(string), compConfig.Setup["project-id"].(string))
if err != nil || client == nil {
return nil, fmt.Errorf("error creating BigQuery client: %v", err)
}
defer client.Close()

myDataset := client.Dataset(compConfig.Setup["dataset-id"].(string))
tables, err := constructTableColumns(myDataset, ctx, compConfig)
if err != nil {
return nil, err
}

tableProperties, err := constructTableProperties(tables)
if err != nil {
return nil, err
}

// TODO: chuang8511, remove table from definition.json and make it dynamic.
// It will be changed before 2024-06-26.
tableProperty := tableProperties[0]
for _, sch := range def.Spec.ComponentSpecification.Fields["oneOf"].GetListValue().Values {
data := sch.GetStructValue().Fields["properties"].GetStructValue().Fields["input"].GetStructValue().Fields["properties"].GetStructValue().Fields["data"].GetStructValue()
if data != nil {
data.Fields["properties"] = structpb.NewStructValue(tableProperty)
}
}

for _, dataSpec := range def.Spec.DataSpecifications {
dataInput := dataSpec.Input.Fields["properties"].GetStructValue().Fields["data"].GetStructValue()
if dataInput != nil {
dataInput.Fields["properties"] = structpb.NewStructValue(tableProperty)
}
dataOutput := dataSpec.Output.Fields["properties"].GetStructValue().Fields["data"].GetStructValue()

if dataOutput != nil {
aPieceData := dataOutput.Fields["items"].GetStructValue()
if aPieceData != nil {
aPieceData.Fields["properties"] = structpb.NewStructValue(tableProperty)
}

}
}

return def, nil
}

func constructTableColumns(myDataset *bigquery.Dataset, ctx context.Context, compConfig *base.ComponentConfig) ([]TableColumns, error) {
tableIT := myDataset.Tables(ctx)
tables := []TableColumns{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we only fetch the table we want to use?

If we still want to fetch multiple tables, we can use a map with table_name as the key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@donch1989
I was thinking to take the table name from setting parts to input.
Then, the users can choose the table / tables they want to extract with smart hint or drag-and-drop menu.
And, we also can adjust the columns without saving pipeline again if users want to change table.

What do you think about that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sounds good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created another ticket and leave TODO memo here.
I will deal with it in the near future.

for {
table, err := tableIT.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
tableName := table.TableID
tableDetail := myDataset.Table(tableName)
metadata, err := tableDetail.Metadata(ctx)
if err != nil {
return nil, err
}
schema := metadata.Schema
columns := []Column{}
for _, field := range schema {
columns = append(columns, Column{Name: field.Name, Type: string(field.Type)})
}

// TODO: chuang8511, remove table from definition.json and make it dynamic.
// It will be changed before 2024-06-26.
if compConfig.Setup["table-name"].(string) == tableName {
tables = append(tables, TableColumns{TableName: tableName, Columns: columns})
}
}
if len(tables) == 0 {
return nil, fmt.Errorf("table name is not found in the dataset")
}
return tables, nil
}

func constructTableProperties(tables []TableColumns) ([]*structpb.Struct, error) {
tableProperties := make([]*structpb.Struct, len(tables))

for idx, table := range tables {
propertiesMap := make(map[string]interface{})
for idx, column := range table.Columns {
propertiesMap[column.Name] = map[string]interface{}{
"title": column.Name,
"instillUIOrder": idx,
"description": "Column " + column.Name + " of table " + table.TableName,
"instillFormat": getInstillAcceptFormat(column.Type),
"instillUpstreamTypes": instillUpstreamTypes,
"instillAcceptFormats": []string{getInstillAcceptFormat(column.Type)},
"required": []string{},
"type": getInstillAcceptFormat(column.Type),
}
}
propertyStructPB, err := base.ConvertToStructpb(propertiesMap)
if err != nil {
return nil, err
}

tableProperties[idx] = propertyStructPB
}
return tableProperties, nil
}

func getInstillAcceptFormat(tableType string) string {
switch tableType {
case "STRING":
return "string"
case "INTEGER":
return "integer"
case "BOOLEAN":
return "boolean"
default:
return "string"
}
}
3 changes: 3 additions & 0 deletions data/bigquery/v0/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// TODO: chuang8511, add test code
// It will be done before 2024-06-26.
package bigquery
59 changes: 59 additions & 0 deletions data/bigquery/v0/read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package bigquery

import (
"context"
"fmt"

"cloud.google.com/go/bigquery"
"google.golang.org/api/iterator"
)

type ReadInput struct {
ProjectID string
DatasetID string
TableName string
Client *bigquery.Client
Filtering string
}

type ReadOutput struct {
Data []map[string]any `json:"data"`
}

func queryBuilder(input ReadInput) string {
if input.Filtering == "" {
return fmt.Sprintf("SELECT * FROM `%s.%s.%s`", input.ProjectID, input.DatasetID, input.TableName)
}
return fmt.Sprintf("SELECT * FROM `%s.%s.%s` %s", input.ProjectID, input.DatasetID, input.TableName, input.Filtering)
}

func readDataFromBigQuery(input ReadInput) (ReadOutput, error) {

ctx := context.Background()
client := input.Client

sql := queryBuilder(input)
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
return ReadOutput{}, err
}
result := []map[string]any{}
for {
var values []bigquery.Value
err := it.Next(&values)

if err == iterator.Done {
break
}
data := map[string]any{}

for i, schema := range it.Schema {
data[schema.Name] = values[i]
}

result = append(result, data)
}

return ReadOutput{Data: result}, nil
}
Loading