From 1ecff8ac7612d785bf172a414d0768dd4df9c084 Mon Sep 17 00:00:00 2001 From: ChunHao <64747455+chuang8511@users.noreply.github.com> Date: Tue, 14 May 2024 11:21:07 +0100 Subject: [PATCH] feat: implement Slack component (#120) Because - We'd like to introduce a new component connector to Slack. This commit - Implements first stage of Slack component, including - Read task to retrieve the messages from Slack. - Write task to send the message to Slack. --- go.mod | 10 +- go.sum | 16 ++ pkg/connector/main.go | 2 + pkg/connector/slack/v0/README.mdx | 58 +++++ pkg/connector/slack/v0/apiFunctions.go | 194 ++++++++++++++ pkg/connector/slack/v0/client.go | 15 ++ pkg/connector/slack/v0/config/definition.json | 46 ++++ pkg/connector/slack/v0/config/tasks.json | 239 ++++++++++++++++++ pkg/connector/slack/v0/connector_test.go | 169 +++++++++++++ pkg/connector/slack/v0/main.go | 103 ++++++++ pkg/connector/slack/v0/mockSlack.go | 71 ++++++ pkg/connector/slack/v0/structs.go | 38 +++ pkg/connector/slack/v0/taskFunctions.go | 114 +++++++++ 13 files changed, 1071 insertions(+), 4 deletions(-) create mode 100644 pkg/connector/slack/v0/README.mdx create mode 100644 pkg/connector/slack/v0/apiFunctions.go create mode 100644 pkg/connector/slack/v0/client.go create mode 100644 pkg/connector/slack/v0/config/definition.json create mode 100644 pkg/connector/slack/v0/config/tasks.json create mode 100644 pkg/connector/slack/v0/connector_test.go create mode 100644 pkg/connector/slack/v0/main.go create mode 100644 pkg/connector/slack/v0/mockSlack.go create mode 100644 pkg/connector/slack/v0/structs.go create mode 100644 pkg/connector/slack/v0/taskFunctions.go diff --git a/go.mod b/go.mod index 6b723da9..499a7835 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.24.0 golang.org/x/image v0.15.0 - golang.org/x/text v0.14.0 + golang.org/x/text v0.15.0 google.golang.org/api v0.149.0 google.golang.org/grpc v1.61.1 google.golang.org/protobuf v1.33.0 @@ -73,6 +73,7 @@ require ( github.com/google/uuid v1.4.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/gorilla/websocket v1.5.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect github.com/itchyny/timefmt-go v0.1.5 // indirect github.com/jaytaylor/html2text v0.0.0-20200412013138-3577fbdbcff7 // indirect @@ -95,6 +96,7 @@ require ( github.com/rivo/uniseg v0.4.4 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca // indirect + github.com/slack-go/slack v0.12.5 // indirect github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf // indirect github.com/temoto/robotstxt v1.1.1 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect @@ -105,12 +107,12 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect - golang.org/x/crypto v0.21.0 // indirect + golang.org/x/crypto v0.23.0 // indirect golang.org/x/mod v0.10.0 // indirect - golang.org/x/net v0.23.0 // indirect + golang.org/x/net v0.25.0 // indirect golang.org/x/oauth2 v0.15.0 // indirect golang.org/x/sync v0.5.0 // indirect - golang.org/x/sys v0.18.0 // indirect + golang.org/x/sys v0.20.0 // indirect golang.org/x/tools v0.9.1 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/go.sum b/go.sum index 0bd53af1..b3f6761e 100644 --- a/go.sum +++ b/go.sum @@ -96,6 +96,7 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-resty/resty/v2 v2.0.0/go.mod h1:dZGr0i9PLlaaTD4H/hoZIDjQ+r6xq8mgbRzHZf7f2J8= github.com/go-resty/resty/v2 v2.12.0 h1:rsVL8P90LFvkUYq/V5BTVe203WfRIU4gvcf+yfzJzGA= github.com/go-resty/resty/v2 v2.12.0/go.mod h1:o0yGPrkS3lOe1+eFajk6kBW8ScXzwU3hD69/gt2yB/0= +github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= @@ -141,6 +142,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -155,6 +157,9 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU= github.com/h2non/filetype v1.1.3 h1:FKkx9QbD7HR/zjK1Ia5XiBsq9zdLi5Kf3zGyFTAFkGg= @@ -247,12 +252,15 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/simplereach/timeutils v1.2.0/go.mod h1:VVbQDfN/FHRZa1LSqcwo4kNZ62OOyqLLGQKYB3pB0Q8= +github.com/slack-go/slack v0.12.5 h1:ddZ6uz6XVaB+3MTDhoW04gG+Vc/M/X1ctC+wssy2cqs= +github.com/slack-go/slack v0.12.5/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw= github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf h1:pvbZ0lM0XWPBqUKqFU8cmavspvIl9nulOYwdy6IFRRo= github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf/go.mod h1:RJID2RhlZKId02nZ62WenDCkgHFerpIOmW0iT7GKmXM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -299,6 +307,8 @@ golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= @@ -338,6 +348,8 @@ golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.15.0 h1:s8pnnxNVzjWyrvYdFUQq5llS1PX2zhPXmccZv99h7uQ= golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM= @@ -365,6 +377,8 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -383,6 +397,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/pkg/connector/main.go b/pkg/connector/main.go index b71b0fe1..66de6f92 100644 --- a/pkg/connector/main.go +++ b/pkg/connector/main.go @@ -21,6 +21,7 @@ import ( "github.com/instill-ai/component/pkg/connector/pinecone/v0" "github.com/instill-ai/component/pkg/connector/redis/v0" "github.com/instill-ai/component/pkg/connector/restapi/v0" + "github.com/instill-ai/component/pkg/connector/slack/v0" "github.com/instill-ai/component/pkg/connector/stabilityai/v0" "github.com/instill-ai/component/pkg/connector/website/v0" @@ -96,6 +97,7 @@ func Init( conStore.Import(redis.Init(baseConn)) conStore.Import(restapi.Init(baseConn)) conStore.Import(website.Init(baseConn)) + conStore.Import(slack.Init(baseConn)) }) diff --git a/pkg/connector/slack/v0/README.mdx b/pkg/connector/slack/v0/README.mdx new file mode 100644 index 00000000..8cfae1a9 --- /dev/null +++ b/pkg/connector/slack/v0/README.mdx @@ -0,0 +1,58 @@ +--- +title: "Slack" +lang: "en-US" +draft: false +description: "Learn about how to set up a VDP Slack connector https://github.com/instill-ai/instill-core" +--- + +The Slack component is a application connector that allows users to get and send message on Slack. +It can carry out the following tasks: + +- [Read Message](#read-message) +- [Send Message](#send-message) + +## Release Stage + +`Alpha` + +## Configuration + +The component configuration is defined and maintained [here](https://github.com/instill-ai/component/blob/main/pkg/connector/slack/v0/config/definition.json). + +## Connection + +| Field | Field ID | Type | Note | +| :--- | :--- | :--- | :--- | +| token | `token` | string | Fill your token | + +## Supported Tasks + +### Read Message + +Get the latest message since specific date + +| Input | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Task ID (required) | `task` | string | `TASK_READ_MESSAGE` | +| Channel Name (required) | `channel_name` | string | A channel name display in Slack | +| Start to read date | `start_to_read_date` | string | earliest date in all read messages | +| Public channel | `is_public_channel` | boolean | Whether all members can read the channel | + +| Output | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Conversations | `conversations` | array[object] | An array of conversations with thread messages | + +### Send Message + +send message to a specific channel + +| Input | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Task ID (required) | `task` | string | `TASK_WRITE_MESSAGE` | +| Channel Name (required) | `channel_name` | string | A channel name display in Slack | +| Message (required) | `message` | string | message to be sent to the target channel | +| Public channel | `is_public_channel` | boolean | Whether all members can read the channel | + +| Output | ID | Type | Description | +| :--- | :--- | :--- | :--- | +| Result | `result` | string | result for sending message | diff --git a/pkg/connector/slack/v0/apiFunctions.go b/pkg/connector/slack/v0/apiFunctions.go new file mode 100644 index 00000000..89717171 --- /dev/null +++ b/pkg/connector/slack/v0/apiFunctions.go @@ -0,0 +1,194 @@ +package slack + +import ( + "fmt" + "strconv" + "time" + + "github.com/slack-go/slack" +) + +func loopChannelListAPI(e *execution, isPublic bool, channelName string) (string, error) { + var apiParams slack.GetConversationsParameters + setChannelType(&apiParams, isPublic) + + var targetChannelID string + for { + + slackChannels, nextCur, err := e.client.GetConversations(&apiParams) + if err != nil { + return "", err + } + + targetChannelID := getChannelID(channelName, slackChannels) + + if targetChannelID != "" { + break + } + + if targetChannelID == "" && nextCur == "" { + err := fmt.Errorf("there is no match name in slack channel [%v]", channelName) + return "", err + } + + apiParams.Cursor = nextCur + + } + + return targetChannelID, nil +} + +// Todo: make it multiple options +func setChannelType(params *slack.GetConversationsParameters, isPublicChannel bool) { + if !isPublicChannel { + params.Types = append(params.Types, "private_channel") + } else { + params.Types = append(params.Types, "public_channel") + } +} + +func getChannelID(channelName string, channels []slack.Channel) (channelID string) { + for _, slackChannel := range channels { + if channelName == slackChannel.Name { + return slackChannel.ID + } + } + return "" +} + +func getConversationHistory(e *execution, channelID string, nextCur string) (*slack.GetConversationHistoryResponse, error) { + apiHistoryParams := slack.GetConversationHistoryParameters{ + ChannelID: channelID, + Cursor: nextCur, + } + + historiesResp, err := e.client.GetConversationHistory(&apiHistoryParams) + if err != nil { + return nil, err + } + if !historiesResp.Ok { + err := fmt.Errorf("slack api error: %v", historiesResp.Error) + return nil, err + } + + return historiesResp, nil +} + +func getConversationReply(e *execution, channelID string, ts string) ([]slack.Message, error) { + apiParams := slack.GetConversationRepliesParameters{ + ChannelID: channelID, + Timestamp: ts, + } + msgs, _, nextCur, err := e.client.GetConversationReplies(&apiParams) + + if err != nil { + return nil, err + } + + if nextCur == "" { + return msgs, nil + } + + allMsgs := msgs + + for nextCur != "" { + apiParams.Cursor = nextCur + msgs, _, nextCur, err = e.client.GetConversationReplies(&apiParams) + if err != nil { + return nil, err + } + allMsgs = append(allMsgs, msgs...) + } + + return allMsgs, nil +} + +func setAPIRespToReadTaskResp(apiResp []slack.Message, readTaskResp *ReadTaskResp, startReadDateString string) error { + + for _, msg := range apiResp { + formatedDateString, err := transformTSToDate(msg.Timestamp, time.DateOnly) + if err != nil { + return err + } + + startReadDate, err := time.Parse("2006-01-02", startReadDateString) + if err != nil { + return err + } + + formatedDate, err := time.Parse("2006-01-02", formatedDateString) + if err != nil { + return err + } + + if startReadDate.After(formatedDate) { + continue + } + + conversation := Conversation{ + UserID: msg.User, + Message: msg.Text, + StartDate: formatedDateString, + LastDate: formatedDateString, + ReplyCount: msg.ReplyCount, + TS: msg.Timestamp, + } + conversation.ThreadReplyMessage = []ThreadReplyMessage{} + readTaskResp.Conversations = append(readTaskResp.Conversations, conversation) + } + return nil +} + +func setRepliedToConversation(resp *ReadTaskResp, replies []slack.Message, idx int) error { + c := resp.Conversations[idx] + lastDay, err := time.Parse("2006-01-02", c.LastDate) + if err != nil { + return err + } + for _, msg := range replies { + + if c.TS == msg.Timestamp { + continue + } + + formatedDateTime, err := transformTSToDate(msg.Timestamp, time.RFC3339) + if err != nil { + return err + } + reply := ThreadReplyMessage{ + UserID: msg.User, + DateTime: formatedDateTime, + Message: msg.Text, + } + + foramtedDate, err := transformTSToDate(msg.Timestamp, time.DateOnly) + if err != nil { + return err + } + + replyDate, err := time.Parse("2006-01-02", foramtedDate) + if err != nil { + return err + } + + if replyDate.After(lastDay) { + replyDateString := replyDate.Format("2006-01-02") + resp.Conversations[idx].LastDate = replyDateString + } + resp.Conversations[idx].ThreadReplyMessage = append(resp.Conversations[idx].ThreadReplyMessage, reply) + } + return nil +} + +func transformTSToDate(ts string, format string) (string, error) { + + tsFloat, err := strconv.ParseFloat(ts, 64) + if err != nil { + return "", err + } + + timestamp := time.Unix(int64(tsFloat), int64((tsFloat-float64(int64(tsFloat)))*1e9)) + + formatedTS := timestamp.Format(format) + return formatedTS, nil +} diff --git a/pkg/connector/slack/v0/client.go b/pkg/connector/slack/v0/client.go new file mode 100644 index 00000000..c19211ff --- /dev/null +++ b/pkg/connector/slack/v0/client.go @@ -0,0 +1,15 @@ +package slack + +import ( + "github.com/slack-go/slack" + "google.golang.org/protobuf/types/known/structpb" +) + +func newClient(config *structpb.Struct) *slack.Client { + return slack.New(getToken(config)) +} + +// Need to confirm where the map is +func getToken(config *structpb.Struct) string { + return config.GetFields()["token"].GetStringValue() +} diff --git a/pkg/connector/slack/v0/config/definition.json b/pkg/connector/slack/v0/config/definition.json new file mode 100644 index 00000000..5cbc8e7a --- /dev/null +++ b/pkg/connector/slack/v0/config/definition.json @@ -0,0 +1,46 @@ +{ + "available_tasks": [ + "TASK_READ_MESSAGE", + "TASK_WRITE_MESSAGE" + ], + "custom": false, + "documentation_url": "", + "icon": "assets/xxx.svg", + "icon_url": "", + "id": "slack", + "public": true, + "spec": { + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": false, + "properties": { + "token": { + "description": "Fill your token", + "instillUpstreamTypes": [ + "reference" + ], + "instillAcceptFormats": [ + "string" + ], + "instillCredentialField": false, + "instillUIOrder": 0, + "title": "token", + "type": "string" + } + }, + "required": [], + "title": "Slack Connection", + "type": "object" + } + }, + "title": "Slack", + "description": "Get and send message on Slack", + "tombstone": false, + "type": "CONNECTOR_TYPE_APPLICATION", + "uid": "1e9f469e-da5e-46eb-8a89-23466627e3b5", + "vendor": "Slack", + "vendor_attributes": {}, + "version": "0.1.0", + "source_url": "https://github.com/instill-ai/component/blob/main/pkg/connector/slack/v0", + "release_stage": "RELEASE_STAGE_ALPHA" + } \ No newline at end of file diff --git a/pkg/connector/slack/v0/config/tasks.json b/pkg/connector/slack/v0/config/tasks.json new file mode 100644 index 00000000..bbfbc932 --- /dev/null +++ b/pkg/connector/slack/v0/config/tasks.json @@ -0,0 +1,239 @@ +{ + "TASK_READ_MESSAGE": { + "instillShortDescription": "Get the latest message since specific date", + "input": { + "description": "Please input the channel name and the date that we want to start to read", + "instillUIOrder": 0, + "properties": { + "channel_name": { + "description": "A channel name display in Slack", + "instillAcceptFormats": [ + "string" + ], + "instillUIMultiline": true, + "instillUIOrder": 0, + "instillUpstreamTypes": [ + "value", + "reference", + "template" + ], + "title": "Channel Name", + "type": "string" + }, + "start_to_read_date": { + "description": "earliest date in all read messages", + "instillAcceptFormats": [ + "string" + ], + "instillUIMultiline": true, + "instillUIOrder": 1, + "instillUpstreamTypes": [ + "value", + "reference", + "template" + ], + "title": "Start to read date", + "type": "string" + }, + "is_public_channel": { + "description": "Whether all members can read the channel", + "instillAcceptFormats": [ + "boolean" + ], + "instillUIMultiline": true, + "instillUIOrder": 2, + "instillUpstreamTypes": [ + "value", + "reference", + "template" + ], + "title": "Public channel", + "type": "boolean" + } + }, + "required": [ + "channel_name" + ], + "title": "Input", + "type": "object" + }, + "output": { + "description": "All messages in Slack channel", + "instillUIOrder": 0, + "properties": { + "conversations": { + "description": "An array of conversations with thread messages", + "instillUIOrder": 0, + "title": "Conversations", + "type": "array", + "items": { + "title": "conversation details", + "type": "object", + "properties": { + "user_id": { + "description": "unique id from Slack", + "instillFormat": "string", + "instillUIOrder": 1, + "title": "User UID", + "type": "string" + }, + "message": { + "description": "message to start a conversation", + "instillFormat": "string", + "instillUIOrder": 2, + "title": "Start Conversation Message", + "type": "string" + }, + "start_date": { + "description": "when a conversation starts", + "instillFormat": "string", + "instillUIOrder": 3, + "required": [], + "title": "Start Date", + "type": "string" + }, + "last_date": { + "description": "Date of the last message", + "instillFormat": "string", + "instillUIOrder": 4, + "required": [], + "title": "Last Date", + "type": "string" + }, + "thread_reply_messages": { + "description": "replies in a conversation", + "instillFormat": "array", + "instillUIOrder": 5, + "title": "Replied messages", + "type": "array", + "items": { + "title": "relied details", + "type": "object", + "properties": { + "user_id": { + "description": "unique id from Slack", + "instillFormat": "string", + "instillUIOrder": 1, + "title": "User UID", + "type": "string" + }, + "datetime": { + "description": "replied datetime", + "instillFormat": "string", + "instillUIOrder": 2, + "title": "Replied Time", + "type": "string" + }, + "message": { + "description": "message to reply a conversation", + "instillFormat": "string", + "instillUIOrder": 3, + "title": "Replied Message", + "type": "string" + } + }, + "required": [ + "user_id", + "datetime", + "message" + ] + } + } + }, + "required": [ + "user_id", + "message", + "start_date" + ] + } + } + }, + "required": [ + "conversations" + ], + "title": "Output", + "type": "object" + } + }, + "TASK_WRITE_MESSAGE": { + "instillShortDescription": "send message to a specific channel", + "title": "Send Message", + "input": { + "description": "Input", + "instillUIOrder": 0, + "properties": { + "channel_name": { + "description": "A channel name display in Slack", + "instillAcceptFormats": [ + "string" + ], + "instillUIMultiline": true, + "instillUIOrder": 0, + "instillUpstreamTypes": [ + "value", + "reference", + "template" + ], + "title": "Channel Name", + "type": "string" + }, + "message": { + "description": "message to be sent to the target channel", + "instillAcceptFormats": [ + "string" + ], + "instillUIMultiline": true, + "instillUIOrder": 1, + "instillUpstreamTypes": [ + "value", + "reference", + "template" + ], + "title": "Message", + "type": "string" + }, + "is_public_channel": { + "description": "Whether all members can read the channel", + "instillAcceptFormats": [ + "boolean" + ], + "instillUIMultiline": true, + "instillUIOrder": 2, + "instillUpstreamTypes": [ + "value", + "reference", + "template" + ], + "title": "Public channel", + "type": "boolean" + } + }, + "required": [ + "channel_name", + "message" + ], + "title": "Input", + "type": "object" + }, + "output": { + "description": "The greeting sentence", + "instillUIOrder": 0, + "properties": { + "result": { + "description": "result for sending message", + "instillEditOnNodeFields": [], + "instillUIOrder": 0, + "required": [], + "title": "Result", + "type": "string", + "instillFormat": "string" + } + }, + "required": [ + "result" + ], + "title": "Output", + "type": "object" + } + } + } \ No newline at end of file diff --git a/pkg/connector/slack/v0/connector_test.go b/pkg/connector/slack/v0/connector_test.go new file mode 100644 index 00000000..a29c87ee --- /dev/null +++ b/pkg/connector/slack/v0/connector_test.go @@ -0,0 +1,169 @@ +package slack + +import ( + "context" + "encoding/json" + "testing" + "time" + + qt "github.com/frankban/quicktest" + "github.com/instill-ai/component/pkg/base" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/structpb" +) + +const ( + apiKey = "testkey" +) + +func TestConnector_ExecuteWriteTask(t *testing.T) { + c := qt.New(t) + ctx := context.Background() + bc := base.Connector{Logger: zap.NewNop()} + connector := Init(bc) + + testcases := []struct { + name string + input UserInputWriteTask + wantResp WriteTaskResp + wantErr string + }{ + { + name: "ok to write", + input: UserInputWriteTask{ + ChannelName: "test_channel", + Message: "I am unit test", + IsPublicChannel: true, + }, + wantResp: WriteTaskResp{ + Result: "succeed", + }, + }, + { + name: "fail to write", + input: UserInputWriteTask{ + ChannelName: "test_channel_1", + Message: "I am unit test", + IsPublicChannel: true, + }, + wantErr: `there is no match name in slack channel \[test_channel_1\]`, + }, + } + + for _, tc := range testcases { + c.Run(tc.name, func(c *qt.C) { + + connection, err := structpb.NewStruct(map[string]any{ + "api_key": apiKey, + }) + c.Assert(err, qt.IsNil) + + // It will increase the modification range if we change the input of CreateExecution. + // So, we replaced it with the code below to cover the test for taskFunctions.go + e := &execution{ + ConnectorExecution: base.ConnectorExecution{Connector: connector, SystemVariables: nil, Connection: connection, Task: taskWriteMessage}, + client: &MockSlackClient{}, + } + e.execute = e.sendMessage + exec := &base.ExecutionWrapper{Execution: e} + + pbIn, err := base.ConvertToStructpb(tc.input) + c.Assert(err, qt.IsNil) + + got, err := exec.Execution.Execute(ctx, []*structpb.Struct{pbIn}) + + if tc.wantErr != "" { + c.Assert(err, qt.ErrorMatches, tc.wantErr) + return + } + + wantJSON, err := json.Marshal(tc.wantResp) + c.Assert(err, qt.IsNil) + c.Check(wantJSON, qt.JSONEquals, got[0].AsMap()) + }) + } +} + +func TestConnector_ExecuteReadTask(t *testing.T) { + + c := qt.New(t) + ctx := context.Background() + bc := base.Connector{Logger: zap.NewNop()} + connector := Init(bc) + + mockDateTime, _ := transformTSToDate("1715159449.399879", time.RFC3339) + testcases := []struct { + name string + input UserInputReadTask + wantResp ReadTaskResp + wantErr string + }{ + { + name: "ok to read", + input: UserInputReadTask{ + ChannelName: "test_channel", + }, + wantResp: ReadTaskResp{ + Conversations: []Conversation{ + { + UserID: "user123", + Message: "Hello, world!", + StartDate: "2024-05-08", + LastDate: "2024-05-08", + TS: "1715159446.644219", + ReplyCount: 1, + ThreadReplyMessage: []ThreadReplyMessage{ + { + UserID: "user456", + Message: "Hello, how are you", + DateTime: mockDateTime, + }, + }, + }, + }, + }, + }, + { + name: "fail to read", + input: UserInputReadTask{ + ChannelName: "test_channel_1", + }, + wantErr: `there is no match name in slack channel \[test_channel_1\]`, + }, + } + + for _, tc := range testcases { + c.Run(tc.name, func(c *qt.C) { + connection, err := structpb.NewStruct(map[string]any{ + "api_key": apiKey, + }) + c.Assert(err, qt.IsNil) + + // It will increase the modification range if we change the input of CreateExecution. + // So, we replaced it with the code below to cover the test for taskFunctions.go + e := &execution{ + ConnectorExecution: base.ConnectorExecution{Connector: connector, SystemVariables: nil, Connection: connection, Task: taskReadMessage}, + client: &MockSlackClient{}, + } + e.execute = e.readMessage + exec := &base.ExecutionWrapper{Execution: e} + + pbIn, err := base.ConvertToStructpb(tc.input) + c.Assert(err, qt.IsNil) + + got, err := exec.Execution.Execute(ctx, []*structpb.Struct{pbIn}) + + if tc.wantErr != "" { + c.Assert(err, qt.ErrorMatches, tc.wantErr) + return + } + + wantJSON, err := json.Marshal(tc.wantResp) + c.Assert(err, qt.IsNil) + c.Check(wantJSON, qt.JSONEquals, got[0].AsMap()) + + }) + + } + +} diff --git a/pkg/connector/slack/v0/main.go b/pkg/connector/slack/v0/main.go new file mode 100644 index 00000000..a2085210 --- /dev/null +++ b/pkg/connector/slack/v0/main.go @@ -0,0 +1,103 @@ +//go:generate compogen readme --connector ./config ./README.mdx +package slack + +import ( + "context" + _ "embed" + "fmt" + "sync" + + "google.golang.org/protobuf/types/known/structpb" + + "github.com/instill-ai/component/pkg/base" + "github.com/instill-ai/x/errmsg" + "github.com/slack-go/slack" +) + +const ( + taskWriteMessage = "TASK_WRITE_MESSAGE" + taskReadMessage = "TASK_READ_MESSAGE" +) + +var ( + //go:embed config/definition.json + definitionJSON []byte + //go:embed config/tasks.json + tasksJSON []byte + + once sync.Once + con *connector +) + +type connector struct { + base.Connector +} + +type execution struct { + base.ConnectorExecution + + execute func(*structpb.Struct) (*structpb.Struct, error) + client SlackClient +} + +type SlackClient interface { + GetConversations(params *slack.GetConversationsParameters) ([]slack.Channel, string, error) + PostMessage(channelID string, options ...slack.MsgOption) (string, string, error) + GetConversationHistory(params *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error) + GetConversationReplies(params *slack.GetConversationRepliesParameters) ([]slack.Message, bool, string, error) +} + +// Init returns an implementation of IConnector that interacts with Slack. +func Init(bc base.Connector) *connector { + once.Do(func() { + con = &connector{Connector: bc} + err := con.LoadConnectorDefinition(definitionJSON, tasksJSON, nil) + if err != nil { + panic(err) + } + }) + + return con +} + +func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { + e := &execution{ + ConnectorExecution: base.ConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, + client: newClient(connection), + } + + switch task { + case taskWriteMessage: + e.execute = e.sendMessage + case taskReadMessage: + e.execute = e.readMessage + default: + return nil, errmsg.AddMessage( + fmt.Errorf("not supported task: %s", task), + fmt.Sprintf("%s task is not supported.", task), + ) + } + + return &base.ExecutionWrapper{Execution: e}, nil +} + +// Execute performs calls the Slack API to execute a task. +func (e *execution) Execute(_ context.Context, inputs []*structpb.Struct) ([]*structpb.Struct, error) { + outputs := make([]*structpb.Struct, len(inputs)) + + for i, input := range inputs { + output, err := e.execute(input) + if err != nil { + return nil, err + } + + outputs[i] = output + } + + return outputs, nil +} + +func (c connector) Test(sysVars map[string]any, connection *structpb.Struct) error { + + return nil +} diff --git a/pkg/connector/slack/v0/mockSlack.go b/pkg/connector/slack/v0/mockSlack.go new file mode 100644 index 00000000..c48a7a32 --- /dev/null +++ b/pkg/connector/slack/v0/mockSlack.go @@ -0,0 +1,71 @@ +package slack + +import "github.com/slack-go/slack" + +type MockSlackClient struct{} + +func (m *MockSlackClient) GetConversations(params *slack.GetConversationsParameters) ([]slack.Channel, string, error) { + + var channels []slack.Channel + nextCursor := "" + fakeChannel := slack.Channel{ + GroupConversation: slack.GroupConversation{ + Conversation: slack.Conversation{ + ID: "G0AKFJBEU", + }, + Name: "test_channel", + }, + } + channels = append(channels, fakeChannel) + + return channels, nextCursor, nil +} + +func (m *MockSlackClient) PostMessage(channelID string, options ...slack.MsgOption) (string, string, error) { + + return "", "", nil +} + +func (m *MockSlackClient) GetConversationHistory(params *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error) { + + fakeResp := slack.GetConversationHistoryResponse{ + SlackResponse: slack.SlackResponse{ + Ok: true, + }, + Messages: []slack.Message{ + { + Msg: slack.Msg{ + Timestamp: "1715159446.644219", + User: "user123", + Text: "Hello, world!", + ReplyCount: 1, + }, + }, + }, + } + + return &fakeResp, nil +} + +func (m *MockSlackClient) GetConversationReplies(params *slack.GetConversationRepliesParameters) ([]slack.Message, bool, string, error) { + + fakeMessages := []slack.Message{ + { + Msg: slack.Msg{ + Timestamp: "1715159446.644219", + User: "user123", + Text: "Hello, world!", + }, + }, + { + Msg: slack.Msg{ + Timestamp: "1715159449.399879", + User: "user456", + Text: "Hello, how are you", + }, + }, + } + hasMore := false + nextCursor := "" + return fakeMessages, hasMore, nextCursor, nil +} diff --git a/pkg/connector/slack/v0/structs.go b/pkg/connector/slack/v0/structs.go new file mode 100644 index 00000000..14deee28 --- /dev/null +++ b/pkg/connector/slack/v0/structs.go @@ -0,0 +1,38 @@ +package slack + +type UserInputWriteTask struct { + ChannelName string `json:"channel_name"` + Message string `json:"message"` + IsPublicChannel bool `json:"is_public_channel"` +} + +type WriteTaskResp struct { + Result string `json:"result"` +} + +// TODO: Read Task +type UserInputReadTask struct { + ChannelName string `json:"channel_name"` + StartToReadDate string `json:"start_to_read_date"` + IsPublicChannel bool `json:"is_public_channel"` +} + +type ReadTaskResp struct { + Conversations []Conversation `json:"conversations"` +} + +type ThreadReplyMessage struct { + UserID string `json:"user_id"` + DateTime string `json:"datetime"` + Message string `json:"message"` +} + +type Conversation struct { + UserID string `json:"user_id"` + Message string `json:"message"` + StartDate string `json:"start_date"` + LastDate string `json:"last_date"` + TS string `json:"ts"` + ReplyCount int `json:"reply_count"` + ThreadReplyMessage []ThreadReplyMessage `json:"thread_reply_messages"` +} diff --git a/pkg/connector/slack/v0/taskFunctions.go b/pkg/connector/slack/v0/taskFunctions.go new file mode 100644 index 00000000..de4b967a --- /dev/null +++ b/pkg/connector/slack/v0/taskFunctions.go @@ -0,0 +1,114 @@ +package slack + +import ( + "fmt" + "sync" + "time" + + "github.com/instill-ai/component/pkg/base" + "github.com/slack-go/slack" + "google.golang.org/protobuf/types/known/structpb" +) + +func (e *execution) readMessage(in *structpb.Struct) (*structpb.Struct, error) { + + params := UserInputReadTask{} + + if err := base.ConvertFromStructpb(in, ¶ms); err != nil { + return nil, err + } + + targetChannelID, err := loopChannelListAPI(e, params.IsPublicChannel, params.ChannelName) + + if err != nil { + return nil, err + } + + resp, err := getConversationHistory(e, targetChannelID, "") + if err != nil { + return nil, err + } + + // TODO: discussed if only collect X days ago as default. + if params.StartToReadDate == "" { + currentTime := time.Now() + sevenDaysAgo := currentTime.AddDate(0, 0, -7) + sevenDaysAgoString := sevenDaysAgo.Format("2006-01-02") + params.StartToReadDate = sevenDaysAgoString + } + + var readTaskResp ReadTaskResp + err = setAPIRespToReadTaskResp(resp.Messages, &readTaskResp, params.StartToReadDate) + if err != nil { + return nil, err + } + + // TODO: fetch historyAPI first if there are more conversations. + // if resp.ResponseMetaData.NextCursor != "" { + + // } + + var mu sync.Mutex + var wg sync.WaitGroup + + for i, conversation := range readTaskResp.Conversations { + if conversation.ReplyCount > 0 { + wg.Add(1) + go func(readTaskResp *ReadTaskResp, idx int) { + defer wg.Done() + replies, _ := getConversationReply(e, targetChannelID, readTaskResp.Conversations[idx].TS) + // TODO: to be discussed about this error handdling + // fail? or not fail? + // if err != nil { + // } + + // TODO: fetch further replies if there are + + mu.Lock() + err := setRepliedToConversation(readTaskResp, replies, idx) + // TODO: think a better way to pass lint, maybe use channel + if err != nil { + fmt.Println("error when set the output: ", err) + } + mu.Unlock() + + }(&readTaskResp, i) + } + } + wg.Wait() + + out, err := base.ConvertToStructpb(readTaskResp) + if err != nil { + return nil, err + } + + return out, nil +} + +func (e *execution) sendMessage(in *structpb.Struct) (*structpb.Struct, error) { + params := UserInputWriteTask{} + + if err := base.ConvertFromStructpb(in, ¶ms); err != nil { + return nil, err + } + + targetChannelID, err := loopChannelListAPI(e, params.IsPublicChannel, params.ChannelName) + if err != nil { + return nil, err + } + + _, _, err = e.client.PostMessage(targetChannelID, slack.MsgOptionText(params.Message, false)) + + if err != nil { + return nil, err + } + + out, err := base.ConvertToStructpb(WriteTaskResp{ + Result: "succeed", + }) + if err != nil { + return nil, err + } + + return out, nil +}