diff --git a/client_interface.go b/client_interface.go index 9b90eabb..eb46211c 100644 --- a/client_interface.go +++ b/client_interface.go @@ -263,4 +263,7 @@ type ClientInterface interface { UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) error GetScheduledSQL(project string, name string) (*ScheduledSQL, error) ListScheduledSQL(project, name, displayName string, offset, size int) ([]*ScheduledSQL, int, int, error) + GetScheduledSQLJobInstance(projectName, jobName, instanceId string, result bool) (instance *ScheduledSQLJobInstance, err error) + ModifyScheduledSQLJobInstanceState(projectName, jobName, instanceId string, state ScheduledSQLState) error + ListScheduledSQLJobInstances(projectName, jobName string, status *InstanceStatus) (instances []*ScheduledSQLJobInstance, total, count int64, err error) } diff --git a/client_scheduled_sql.go b/client_scheduled_sql.go index 5c192c31..053ddae5 100644 --- a/client_scheduled_sql.go +++ b/client_scheduled_sql.go @@ -2,6 +2,7 @@ package sls import ( "encoding/json" + "errors" "fmt" "io/ioutil" "net/url" @@ -12,6 +13,7 @@ type ResourcePool string type DataFormat string type JobType string type Status string +type ScheduledSQLState string const ( STANDARD SqlType = "standard" @@ -42,6 +44,12 @@ const ( DISABLED Status = "Disabled" ) +const ( + ScheduledSQL_RUNNING ScheduledSQLState = "RUNNING" + ScheduledSQL_FAILED ScheduledSQLState = "FAILED" + ScheduledSQL_SUCCEEDED ScheduledSQLState = "SUCCEEDED" +) + type ScheduledSQL struct { Name string `json:"name"` DisplayName string `json:"displayName"` @@ -206,3 +214,98 @@ func (c *Client) ListScheduledSQL(project, name, displayName string, offset, siz } return scheduledSqlList.Results, scheduledSqlList.Total, scheduledSqlList.Count, err } + +type ScheduledSQLJobInstance struct { + InstanceId string `json:"instanceId"` + JobName string `json:"jobName,omitempty"` + DisplayName string `json:"displayName,omitempty"` + Description string `json:"description,omitempty"` + JobScheduleId string `json:"jobScheduleId,omitempty"` + CreateTimeInMillis int64 `json:"createTimeInMillis"` + ScheduleTimeInMillis int64 `json:"scheduleTimeInMillis"` + UpdateTimeInMillis int64 `json:"updateTimeInMillis"` + State ScheduledSQLState `json:"state"` + ErrorCode string `json:"errorCode"` + ErrorMessage string `json:"errorMessage"` + Summary string `json:"summary,omitempty"` +} + +func (c *Client) GetScheduledSQLJobInstance(projectName, jobName, instanceId string, result bool) (*ScheduledSQLJobInstance, error) { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + uri := fmt.Sprintf("/jobs/%s/jobinstances/%s?result=%t", jobName, instanceId, result) + r, err := c.request(projectName, "GET", uri, h, nil) + if err != nil { + return nil, err + } + defer r.Body.Close() + buf, _ := ioutil.ReadAll(r.Body) + instance := &ScheduledSQLJobInstance{} + if err = json.Unmarshal(buf, instance); err != nil { + err = NewClientError(err) + } + return instance, err +} + +func (c *Client) ModifyScheduledSQLJobInstanceState(projectName, jobName, instanceId string, state ScheduledSQLState) error { + if ScheduledSQL_RUNNING != state { + return NewClientError(errors.New(fmt.Sprintf("Invalid state: %s, state must be RUNNING.", state))) + } + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + + uri := fmt.Sprintf("/jobs/%s/jobinstances/%s?state=%s", jobName, instanceId, state) + r, err := c.request(projectName, "PUT", uri, h, nil) + if err != nil { + return err + } + r.Body.Close() + return nil +} + +type InstanceStatus struct { + FromTime int64 + ToTime int64 + Offset int64 + Size int64 + State ScheduledSQLState +} + +func (c *Client) ListScheduledSQLJobInstances(projectName, jobName string, status *InstanceStatus) (instances []*ScheduledSQLJobInstance, total, count int64, err error) { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + v := url.Values{} + v.Add("jobType", "ScheduledSQL") + v.Add("start", fmt.Sprintf("%d", status.FromTime)) + v.Add("end", fmt.Sprintf("%d", status.ToTime)) + v.Add("offset", fmt.Sprintf("%d", status.Offset)) + v.Add("size", fmt.Sprintf("%d", status.Size)) + if status.State != "" { + v.Add("state", string(status.State)) + } + + uri := fmt.Sprintf("/jobs/%s/jobinstances?%s", jobName, v.Encode()) + r, err := c.request(projectName, "GET", uri, h, nil) + if err != nil { + return nil, 0, 0, err + } + defer r.Body.Close() + + type ScheduledSqlJobInstances struct { + Total int64 `json:"total"` + Count int64 `json:"count"` + Results []*ScheduledSQLJobInstance `json:"results"` + } + buf, _ := ioutil.ReadAll(r.Body) + jobInstances := &ScheduledSqlJobInstances{} + if err = json.Unmarshal(buf, jobInstances); err != nil { + err = NewClientError(err) + } + return jobInstances.Results, jobInstances.Total, jobInstances.Count, err +} diff --git a/client_scheduled_sql_test.go b/client_scheduled_sql_test.go index c02f3a5c..405b7317 100644 --- a/client_scheduled_sql_test.go +++ b/client_scheduled_sql_test.go @@ -2,94 +2,185 @@ package sls import ( "fmt" + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/suite" + "math/rand" "os" "testing" "time" ) -func TestCreate(t *testing.T) { - client := makeClient() - err := setUp(client) - if err != nil { - t.Fatalf("%v", err) - } - err = client.CreateScheduledSQL("test-scheduled-sql", getScheduleSQL("111")) - if err != nil { - t.Fatalf("%v", err) - } +func TestScheduledSQL(t *testing.T) { + suite.Run(t, new(ScheduledSQLTestSuite)) } -func TestDelete(t *testing.T) { - client := makeClient() - err := client.DeleteScheduledSQL("test-scheduled-sql", "test01") - if err != nil { - t.Fatalf("%v", err) - } +type ScheduledSQLTestSuite struct { + suite.Suite + endpoint string + accessKeyID string + accessKeySecret string + projectName string + sourceLogStore string + targetLogStoreName string + scheduledSQLName string + displayName string + client *Client } -func TestUpdate(t *testing.T) { - client := makeClient() - err := client.UpdateScheduledSQL("test-scheduled-sql", getScheduleSQL("222")) - if err != nil { - t.Fatalf("%v", err) +func (s *ScheduledSQLTestSuite) SetupTest() { + s.endpoint = os.Getenv("LOG_TEST_ENDPOINT") + s.accessKeyID = os.Getenv("LOG_TEST_ACCESS_KEY_ID") + s.accessKeySecret = os.Getenv("LOG_TEST_ACCESS_KEY_SECRET") + suffix := time.Now().Unix() + s.projectName = fmt.Sprintf("test-scheduled-sql-%d", suffix) + s.sourceLogStore = "test-source" + s.targetLogStoreName = "test-target" + s.scheduledSQLName = fmt.Sprintf("schedulesql-%d", suffix) + s.displayName = fmt.Sprintf("display-%d", suffix) + s.client = &Client{ + Endpoint: s.endpoint, + AccessKeyID: s.accessKeyID, + AccessKeySecret: s.accessKeySecret, } + s.setUp() } -func TestGet(t *testing.T) { - client := makeClient() - scheduledSQL, err := client.GetScheduledSQL("test-scheduled-sql", "test01") - if err != nil { - t.Fatalf("%v", err) - } - fmt.Printf("%v\n", scheduledSQL) +func (s *ScheduledSQLTestSuite) TearDownTest() { + err := s.client.DeleteProject(s.projectName) + s.Require().Nil(err) +} + +func (s *ScheduledSQLTestSuite) TestClient_CreateAndDeleteScheduledSQL() { + ce := s.client.CreateScheduledSQL(s.projectName, s.getScheduleSQL("111")) + s.Require().Nil(ce) + de := s.client.DeleteScheduledSQL(s.projectName, s.scheduledSQLName) + s.Require().Nil(de) +} + +func (s *ScheduledSQLTestSuite) TestClient_UpdateAndGetScheduledSQL() { + ce := s.client.CreateScheduledSQL(s.projectName, s.getScheduleSQL("111")) + s.Require().Nil(ce) + scheduledSQL, ge := s.client.GetScheduledSQL(s.projectName, s.scheduledSQLName) + s.Require().Nil(ge) + s.Require().Equal(s.scheduledSQLName, scheduledSQL.Name) + s.Require().Equal("111", scheduledSQL.Description) + ue := s.client.UpdateScheduledSQL(s.projectName, s.getScheduleSQL("222")) + s.Require().Nil(ue) + scheduledSQL2, ge2 := s.client.GetScheduledSQL(s.projectName, s.scheduledSQLName) + s.Require().Nil(ge2) + s.Require().Equal(s.scheduledSQLName, scheduledSQL2.Name) + s.Require().Equal("222", scheduledSQL2.Description) + de := s.client.DeleteScheduledSQL(s.projectName, s.scheduledSQLName) + s.Require().Nil(de) +} + +func (s *ScheduledSQLTestSuite) TestClient_ListScheduledSQL() { + ce := s.client.CreateScheduledSQL(s.projectName, s.getScheduleSQL("111")) + s.Require().Nil(ce) + scheduledsqls, total, count, le := s.client.ListScheduledSQL(s.projectName, "", "", 0, 10) + s.Require().Nil(le) + s.Require().Equal(1, len(scheduledsqls)) + s.Require().Equal(1, total) + s.Require().Equal(1, count) + de := s.client.DeleteScheduledSQL(s.projectName, s.scheduledSQLName) + s.Require().Nil(de) } -func TestList(t *testing.T) { - client := makeClient() - scheduledSQL, total, count, err := client.ListScheduledSQL("test-scheduled-sql", "", "", 0, 10) - if err != nil { - t.Fatalf("%v", err) +func (s *ScheduledSQLTestSuite) TestClient_ScheduledSQLInstances() { + prepareData(s) + ce := s.client.CreateScheduledSQL(s.projectName, s.getScheduleSQL("111")) + s.Require().Nil(ce) + time.Sleep(time.Minute * 5) + status := &InstanceStatus{ + FromTime: time.Now().Unix() - 600, + ToTime: time.Now().Unix() + 600, + Offset: 0, + Size: 3, + State: ScheduledSQL_SUCCEEDED, } - fmt.Printf("%v\n%d\n%d\n", scheduledSQL, total, count) + instances, total, count, le := s.client.ListScheduledSQLJobInstances(s.projectName, s.scheduledSQLName, status) + s.Require().Nil(le) + s.Require().Equal(3, len(instances)) + s.Require().Equal(int64(3), count) + s.Require().Equal(true, total > 3) + instance := instances[0] + jobInstance, ge := s.client.GetScheduledSQLJobInstance(s.projectName, s.scheduledSQLName, instance.InstanceId, true) + s.Require().Nil(ge) + s.Require().Equal(ScheduledSQL_SUCCEEDED, jobInstance.State) + me := s.client.ModifyScheduledSQLJobInstanceState(s.projectName, s.scheduledSQLName, instance.InstanceId, ScheduledSQL_RUNNING) + s.Require().Nil(me) + jobInstance2, ge2 := s.client.GetScheduledSQLJobInstance(s.projectName, s.scheduledSQLName, instance.InstanceId, true) + s.Require().Nil(ge2) + s.Require().Equal(true, ScheduledSQL_SUCCEEDED != jobInstance2.State) + time.Sleep(time.Second * 5) + de := s.client.DeleteScheduledSQL(s.projectName, s.scheduledSQLName) + s.Require().Nil(de) } -func makeClient() *Client { - return &Client{ - Endpoint: "pub-cn-hangzhou-staging-share.log.aliyuncs.com", - AccessKeyID: os.Getenv("ALICLOUD_ACCESS_KEY"), - AccessKeySecret: os.Getenv("ALICLOUD_SECRET_KEY"), +func prepareData(s *ScheduledSQLTestSuite) { + for loggroupIdx := 0; loggroupIdx < 10; loggroupIdx++ { + var logs []*Log + for logIdx := 0; logIdx < 100; logIdx++ { + var content []*LogContent + for colIdx := 0; colIdx < 10; colIdx++ { + if colIdx == 0 { + content = append(content, &LogContent{ + Key: proto.String(fmt.Sprintf("col_%d", colIdx)), + Value: proto.String(fmt.Sprintf("%d", rand.Intn(10000000))), + }) + } else { + content = append(content, &LogContent{ + Key: proto.String(fmt.Sprintf("col_%d", colIdx)), + Value: proto.String(fmt.Sprintf("logGroup idx: %d, log idx: %d, col idx: %d, value: %d", loggroupIdx, logIdx, colIdx, rand.Intn(10000000))), + }) + } + } + log := &Log{ + Time: proto.Uint32(uint32(time.Now().Unix())), + Contents: content, + } + logs = append(logs, log) + } + logGroup := &LogGroup{ + Topic: proto.String("test"), + Source: proto.String("10.238.222.116"), + Logs: logs, + } + err := s.client.PutLogs(s.projectName, s.sourceLogStore, logGroup) + s.Require().Nil(err) } + time.Sleep(time.Second * 5) } -func getScheduleSQL(des string) *ScheduledSQL { +func (s *ScheduledSQLTestSuite) getScheduleSQL(des string) *ScheduledSQL { return &ScheduledSQL{ - Name: "test01", - DisplayName: "dis001", + Name: s.scheduledSQLName, + DisplayName: s.displayName, Description: des, Status: ENABLED, Configuration: &ScheduledSQLConfiguration{ - SourceLogStore: "test-source", - DestProject: "test-schedulesql", - DestEndpoint: "cn-hangzhou-intranet.log.aliyuncs.com", - DestLogStore: "test-target", - Script: "*|SELECT COUNT(__value__)", + SourceLogStore: s.sourceLogStore, + DestProject: s.projectName, + DestEndpoint: s.endpoint, + DestLogStore: s.targetLogStoreName, + Script: "*|SELECT COUNT(col_0) as value_count", SqlType: SEARCH_QUERY, ResourcePool: DEFAULT, - RoleArn: os.Getenv("ROLE_ARN"), - DestRoleArn: os.Getenv("ROLE_ARN"), - FromTimeExpr: "@m-15m", + RoleArn: os.Getenv("LOG_TEST_ROLE_ARN"), + DestRoleArn: os.Getenv("LOG_TEST_ROLE_ARN"), + FromTimeExpr: "@m-1m", ToTimeExpr: "@m", MaxRunTimeInSeconds: 60, MaxRetries: 20, - FromTime: 1621828800, - ToTime: 1623311901, + FromTime: time.Now().Unix() - 300, + ToTime: time.Now().Unix() + 300, DataFormat: LOG_TO_LOG, Parameters: nil, }, Schedule: &Schedule{ Type: "FixedRate", - Interval: "15m", - Delay: 30, + Interval: "1m", + Delay: 10, DayOfWeek: 0, Hour: 0, }, @@ -99,42 +190,30 @@ func getScheduleSQL(des string) *ScheduledSQL { } } -func setUp(c *Client) error { - if ok, err := c.CheckProjectExist("test-scheduled-sql"); err != nil { - return err - } else if ok { - err := c.DeleteProject("test-scheduled-sql") - if err != nil { - return err - } - time.Sleep(time.Second * 30) - _, err = c.CreateProject("test-scheduled-sql", "test scheduled sql") - if err != nil { - return err - } else { - time.Sleep(time.Second * 60) - } - } - err1 := c.CreateLogStore("test-scheduled-sql", "test-source", 3, 2, true, 4) - if err1 != nil { - return err1 - } - err2 := c.CreateLogStore("test-scheduled-sql", "test-target", 3, 2, true, 4) - if err2 != nil { - return err2 - } - err3 := c.CreateIndex("test-scheduled-sql", "test-source", Index{ - Keys: map[string]IndexKey{"__labels__": { - Token: []string{",", " ", "'"}, - CaseSensitive: true, - Type: "text", - DocValue: true, - Chn: true, - }}, - Line: nil, +func (s *ScheduledSQLTestSuite) setUp() { + _, ce := s.client.CreateProject(s.projectName, "test scheduled sql") + s.Require().Nil(ce) + time.Sleep(time.Second * 60) + cle := s.client.CreateLogStore(s.projectName, s.sourceLogStore, 3, 2, true, 4) + s.Require().Nil(cle) + cle2 := s.client.CreateLogStore(s.projectName, s.targetLogStoreName, 3, 2, true, 4) + s.Require().Nil(cle2) + cie := s.client.CreateIndex(s.projectName, s.sourceLogStore, Index{ + Keys: map[string]IndexKey{ + "col_0": { + Token: []string{" "}, + DocValue: true, + CaseSensitive: false, + Type: "long", + }, + "col_1": { + Token: []string{",", ":", " "}, + DocValue: true, + CaseSensitive: false, + Type: "text", + }, + }, }) - if err3 != nil { - return err3 - } - return nil + s.Require().Nil(cie) + time.Sleep(time.Second * 60) } diff --git a/token_auto_update_client.go b/token_auto_update_client.go index 795803f0..c65e569e 100644 --- a/token_auto_update_client.go +++ b/token_auto_update_client.go @@ -1230,3 +1230,33 @@ func (c *TokenAutoUpdateClient) ListScheduledSQL(project, name, displayName stri } return } + +func (c *TokenAutoUpdateClient) GetScheduledSQLJobInstance(projectName, jobName, instanceId string, result bool) (instance *ScheduledSQLJobInstance, err error) { + for i := 0; i < c.maxTryTimes; i++ { + instance, err = c.logClient.GetScheduledSQLJobInstance(projectName, jobName, instanceId, result) + if !c.processError(err) { + return + } + } + return instance, err +} + +func (c *TokenAutoUpdateClient) ModifyScheduledSQLJobInstanceState(projectName, jobName, instanceId string, state ScheduledSQLState) (err error) { + for i := 0; i < c.maxTryTimes; i++ { + err = c.logClient.ModifyScheduledSQLJobInstanceState(projectName, jobName, instanceId, state) + if !c.processError(err) { + return + } + } + return err +} + +func (c *TokenAutoUpdateClient) ListScheduledSQLJobInstances(projectName, jobName string, status *InstanceStatus) (instances []*ScheduledSQLJobInstance, total, count int64, err error) { + for i := 0; i < c.maxTryTimes; i++ { + instances, total, count, err = c.logClient.ListScheduledSQLJobInstances(projectName, jobName, status) + if !c.processError(err) { + return + } + } + return instances, total, count, err +}