From 61e42afcc81bfe8828f8e35cacee6291faf6015d Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 5 Jan 2023 16:47:28 +0800 Subject: [PATCH 1/8] delete changefeed api v2 --- cdc/api/v2/api.go | 1 + cdc/api/v2/changefeed.go | 53 ++++++++++++ cdc/api/v2/changefeed_test.go | 83 +++++++++++++++++++ pkg/api/v2/changefeed.go | 12 +++ pkg/cmd/cli/cli_changefeed_remove.go | 19 +++-- .../http_api_tls/util/test_case.py | 22 +++++ 6 files changed, 184 insertions(+), 6 deletions(-) diff --git a/cdc/api/v2/api.go b/cdc/api/v2/api.go index 53f05f5b097..661ad290435 100644 --- a/cdc/api/v2/api.go +++ b/cdc/api/v2/api.go @@ -48,6 +48,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { changefeedGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture)) changefeedGroup.POST("", api.createChangefeed) changefeedGroup.PUT("/:changefeed_id", api.updateChangefeed) + changefeedGroup.DELETE("/:changefeed_id", api.deleteChangefeed) changefeedGroup.GET("/:changefeed_id/meta_info", api.getChangeFeedMetaInfo) changefeedGroup.POST("/:changefeed_id/resume", api.resumeChangefeed) diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 456d77c8995..fdbc919ad6c 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/upstream" @@ -297,6 +298,58 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { c.JSON(http.StatusOK, toAPIModel(newCfInfo, true)) } +// deleteChangefeed handles delete changefeed request, +func (h *OpenAPIV2) deleteChangefeed(c *gin.Context) { + ctx := c.Request.Context() + changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID)) + if err := model.ValidateChangefeedID(changefeedID.ID); err != nil { + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", + changefeedID.ID)) + return + } + _, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID) + if err != nil { + if cerror.ErrChangeFeedNotExists.Equal(err) { + c.Status(http.StatusNoContent) + return + } + _ = c.Error(err) + return + } + + job := model.AdminJob{ + CfID: changefeedID, + Type: model.AdminRemove, + } + + if err := api.HandleOwnerJob(ctx, h.capture, job); err != nil { + _ = c.Error(err) + return + } + + // Owner needs at least two ticks to remove a changefeed, + // we need to wait for it. + err = retry.Do(ctx, func() error { + _, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID) + if err != nil { + if strings.Contains(err.Error(), "ErrChangeFeedNotExists") { + return nil + } + return err + } + return cerror.ErrChangeFeedDeletionUnfinished.GenWithStackByArgs(changefeedID) + }, + retry.WithMaxTries(100), // max retry duration is 1 minute + retry.WithBackoffBaseDelay(600), // default owner tick interval is 200ms + retry.WithIsRetryableErr(cerror.IsRetryableError)) + + if err != nil { + _ = c.Error(err) + return + } + c.Status(http.StatusNoContent) +} + // getChangeFeedMetaInfo returns the metaInfo of a changefeed func (h *OpenAPIV2) getChangeFeedMetaInfo(c *gin.Context) { ctx := c.Request.Context() diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 1c5b4bdcbb7..9e788f315ea 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -660,3 +660,86 @@ func TestResumeChangefeed(t *testing.T) { router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) } + +func TestDeleteChangefeed(t *testing.T) { + remove := testCase{url: "/api/v2/changefeeds/%s", method: "DELETE"} + helpers := NewMockAPIV2Helpers(gomock.NewController(t)) + cp := mock_capture.NewMockCapture(gomock.NewController(t)) + owner := mock_owner.NewMockOwner(gomock.NewController(t)) + apiV2 := NewOpenAPIV2ForTest(cp, helpers) + router := newRouter(apiV2) + + pdClient := &mockPDClient{} + etcdClient := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t)) + mockUpManager := upstream.NewManager4Test(pdClient) + statusProvider := mock_owner.NewMockStatusProvider(gomock.NewController(t)) + + etcdClient.EXPECT(). + GetEnsureGCServiceID(gomock.Any()). + Return(etcd.GcServiceIDForTest()).AnyTimes() + cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes() + cp.EXPECT().GetEtcdClient().Return(etcdClient, nil).AnyTimes() + cp.EXPECT().GetUpstreamManager().Return(mockUpManager, nil).AnyTimes() + cp.EXPECT().IsReady().Return(true).AnyTimes() + cp.EXPECT().IsOwner().Return(true).AnyTimes() + cp.EXPECT().GetOwner().Return(owner, nil).AnyTimes() + owner.EXPECT().EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + require.EqualValues(t, changeFeedID, adminJob.CfID) + require.EqualValues(t, model.AdminRemove, adminJob.Type) + close(done) + }).AnyTimes() + + // case 1: invalid changefeed id + w := httptest.NewRecorder() + invalidID := "@^Invalid" + req, _ := http.NewRequestWithContext(context.Background(), + remove.method, fmt.Sprintf(remove.url, invalidID), nil) + router.ServeHTTP(w, req) + respErr := model.HTTPError{} + err := json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Code, "ErrAPIInvalidParam") + + // case 2: changefeed not exists + validID := changeFeedID.ID + statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return( + nil, cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID)) + w = httptest.NewRecorder() + req, _ = http.NewRequestWithContext(context.Background(), remove.method, + fmt.Sprintf(remove.url, validID), nil) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusNoContent, w.Code) + + // case 3: query changefeed error + statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return( + nil, cerrors.ErrChangefeedUpdateRefused.GenWithStackByArgs(validID)) + w = httptest.NewRecorder() + req, _ = http.NewRequestWithContext(context.Background(), remove.method, + fmt.Sprintf(remove.url, validID), nil) + router.ServeHTTP(w, req) + respErr = model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Code, "ErrChangefeedUpdateRefused") + + // case 4: remove changefeed + statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return( + &model.ChangeFeedStatus{}, nil) + statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return( + nil, cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID)) + w = httptest.NewRecorder() + req, _ = http.NewRequestWithContext(context.Background(), remove.method, + fmt.Sprintf(remove.url, validID), nil) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusNoContent, w.Code) + + // case 5: remove changefeed failed + statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).AnyTimes().Return( + &model.ChangeFeedStatus{}, nil) + w = httptest.NewRecorder() + req, _ = http.NewRequestWithContext(context.Background(), remove.method, + fmt.Sprintf(remove.url, validID), nil) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusInternalServerError, w.Code) +} diff --git a/pkg/api/v2/changefeed.go b/pkg/api/v2/changefeed.go index 8bc941289fc..225aafa18c5 100644 --- a/pkg/api/v2/changefeed.go +++ b/pkg/api/v2/changefeed.go @@ -40,6 +40,8 @@ type ChangefeedInterface interface { name string) (*v2.ChangeFeedInfo, error) // Resume resumes a changefeed with given config Resume(ctx context.Context, cfg *v2.ResumeChangefeedConfig, name string) error + // Delete deletes a changefeed by name + Delete(ctx context.Context, name string) error } // changefeeds implements ChangefeedInterface @@ -112,3 +114,13 @@ func (c *changefeeds) Resume(ctx context.Context, WithBody(cfg). Do(ctx).Error() } + +// Delete a changefeed +func (c *changefeeds) Delete(ctx context.Context, + name string, +) error { + u := fmt.Sprintf("changefeeds/%s", name) + return c.client.Delete(). + WithURI(u). + Do(ctx).Error() +} diff --git a/pkg/cmd/cli/cli_changefeed_remove.go b/pkg/cmd/cli/cli_changefeed_remove.go index faf95d7188f..b0a458f0937 100644 --- a/pkg/cmd/cli/cli_changefeed_remove.go +++ b/pkg/cmd/cli/cli_changefeed_remove.go @@ -17,6 +17,7 @@ import ( "strings" apiv1client "github.com/pingcap/tiflow/pkg/api/v1" + apiv2client "github.com/pingcap/tiflow/pkg/api/v2" "github.com/pingcap/tiflow/pkg/cmd/context" "github.com/pingcap/tiflow/pkg/cmd/factory" "github.com/pingcap/tiflow/pkg/cmd/util" @@ -26,7 +27,8 @@ import ( // removeChangefeedOptions defines flags for the `cli changefeed remove` command. type removeChangefeedOptions struct { - apiClient apiv1client.APIV1Interface + apiV1Client apiv1client.APIV1Interface + apiV2Client apiv2client.APIV2Interface changefeedID string } @@ -44,12 +46,17 @@ func (o *removeChangefeedOptions) addFlags(cmd *cobra.Command) { // complete adapts from the command line args to the data and client required. func (o *removeChangefeedOptions) complete(f factory.Factory) error { - apiClient, err := f.APIV1Client() + v1Client, err := f.APIV1Client() + if err != nil { + return err + } + v2Client, err := f.APIV2Client() if err != nil { return err } - o.apiClient = apiClient + o.apiV1Client = v1Client + o.apiV2Client = v2Client return nil } @@ -57,7 +64,7 @@ func (o *removeChangefeedOptions) complete(f factory.Factory) error { func (o *removeChangefeedOptions) run(cmd *cobra.Command) error { ctx := context.GetDefaultContext() - changefeedDetail, err := o.apiClient.Changefeeds().Get(ctx, o.changefeedID) + changefeedDetail, err := o.apiV1Client.Changefeeds().Get(ctx, o.changefeedID) if err != nil { if strings.Contains(err.Error(), "ErrChangeFeedNotExists") { cmd.Printf("Changefeed not found.\nID: %s\n", o.changefeedID) @@ -71,14 +78,14 @@ func (o *removeChangefeedOptions) run(cmd *cobra.Command) error { checkpointTs := changefeedDetail.CheckpointTSO sinkURI := changefeedDetail.SinkURI - err = o.apiClient.Changefeeds().Delete(ctx, o.changefeedID) + err = o.apiV2Client.Changefeeds().Delete(ctx, o.changefeedID) if err != nil { cmd.Printf("Changefeed remove failed.\nID: %s\nError: %s\n", o.changefeedID, err.Error()) return err } - _, err = o.apiClient.Changefeeds().Get(ctx, o.changefeedID) + _, err = o.apiV1Client.Changefeeds().Get(ctx, o.changefeedID) // Should never happen here. This checking is for defending. // The reason is that changefeed query to owner is invoked in the subsequent owner // Tick and in that Tick, the in-memory data structure and the metadata stored in diff --git a/tests/integration_tests/http_api_tls/util/test_case.py b/tests/integration_tests/http_api_tls/util/test_case.py index 98282bb58b3..69be227997b 100644 --- a/tests/integration_tests/http_api_tls/util/test_case.py +++ b/tests/integration_tests/http_api_tls/util/test_case.py @@ -463,6 +463,28 @@ def unsafe_apis(): assert resp.status_code != rq.codes.not_found print("pass test: resolve lock") +def delete_changefeed_v2(): + # remove changefeed + url = BASE_URL0_V2+"/changefeeds/changefeed-test4" + resp = rq.delete(url, cert=CERT, verify=VERIFY) + assert resp.status_code == rq.codes.no_content + + # check if remove changefeed success + url = BASE_URL0+"/changefeeds/changefeed-test4" + for i in range(RETRY_TIME): + resp = rq.get(url, cert=CERT, verify=VERIFY) + if resp.status_code == rq.codes.bad_request: + break + time.sleep(1) + assert resp.status_code == rq.codes.bad_request + assert resp.json()["error_code"] == "CDC:ErrChangeFeedNotExists" + + # test remove changefeed not exists + url = BASE_URL0+"/changefeeds/changefeed-not-exists" + resp = rq.delete(url, cert=CERT, verify=VERIFY) + assert (resp.status_code == rq.codes.no_content) + + print("pass test: remove changefeed") # util functions define belows From 44abbe1c483c0d6662d57f02ac285ce603b25c66 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Fri, 6 Jan 2023 11:14:55 +0800 Subject: [PATCH 2/8] generate mock --- pkg/api/v2/mock/changefeed_mock.go | 18 ++++++++++++++++-- pkg/api/v2/mock/tso_mock.go | 18 ++++++------------ pkg/api/v2/mock/unsafe_mock.go | 24 ++++++++---------------- scripts/generate-mock.sh | 3 +++ 4 files changed, 33 insertions(+), 30 deletions(-) diff --git a/pkg/api/v2/mock/changefeed_mock.go b/pkg/api/v2/mock/changefeed_mock.go index e5e894ba678..b3452b515aa 100644 --- a/pkg/api/v2/mock/changefeed_mock.go +++ b/pkg/api/v2/mock/changefeed_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: changefeed.go +// Source: pkg/api/v2/changefeed.go -// Package mock_v2 is a generated GoMock package. +// Package mock is a generated GoMock package. package mock import ( @@ -88,6 +88,20 @@ func (mr *MockChangefeedInterfaceMockRecorder) Create(ctx, cfg interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockChangefeedInterface)(nil).Create), ctx, cfg) } +// Delete mocks base method. +func (m *MockChangefeedInterface) Delete(ctx context.Context, name string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", ctx, name) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete. +func (mr *MockChangefeedInterfaceMockRecorder) Delete(ctx, name interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockChangefeedInterface)(nil).Delete), ctx, name) +} + // GetInfo mocks base method. func (m *MockChangefeedInterface) GetInfo(ctx context.Context, name string) (*v2.ChangeFeedInfo, error) { m.ctrl.T.Helper() diff --git a/pkg/api/v2/mock/tso_mock.go b/pkg/api/v2/mock/tso_mock.go index 36b969c167a..2f78654df70 100644 --- a/pkg/api/v2/mock/tso_mock.go +++ b/pkg/api/v2/mock/tso_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: tso.go +// Source: pkg/api/v2/tso.go -// Package mock_v2 is a generated GoMock package. +// Package mock is a generated GoMock package. package mock import ( @@ -47,8 +47,7 @@ func (m *MockTsoGetter) Tso() v20.TsoInterface { // Tso indicates an expected call of Tso. func (mr *MockTsoGetterMockRecorder) Tso() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tso", - reflect.TypeOf((*MockTsoGetter)(nil).Tso)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tso", reflect.TypeOf((*MockTsoGetter)(nil).Tso)) } // MockTsoInterface is a mock of TsoInterface interface. @@ -75,9 +74,7 @@ func (m *MockTsoInterface) EXPECT() *MockTsoInterfaceMockRecorder { } // Query mocks base method. -func (m *MockTsoInterface) Query(ctx context.Context, - config *v2.UpstreamConfig, -) (*v2.Tso, error) { +func (m *MockTsoInterface) Query(ctx context.Context, config *v2.UpstreamConfig) (*v2.Tso, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Query", ctx, config) ret0, _ := ret[0].(*v2.Tso) @@ -86,10 +83,7 @@ func (m *MockTsoInterface) Query(ctx context.Context, } // Query indicates an expected call of Query. -func (mr *MockTsoInterfaceMockRecorder) Query(ctx, - config interface{}, -) *gomock.Call { +func (mr *MockTsoInterfaceMockRecorder) Query(ctx, config interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Query", - reflect.TypeOf((*MockTsoInterface)(nil).Query), ctx, config) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Query", reflect.TypeOf((*MockTsoInterface)(nil).Query), ctx, config) } diff --git a/pkg/api/v2/mock/unsafe_mock.go b/pkg/api/v2/mock/unsafe_mock.go index 56493473e68..0001c09bd0e 100644 --- a/pkg/api/v2/mock/unsafe_mock.go +++ b/pkg/api/v2/mock/unsafe_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: unsafe.go +// Source: pkg/api/v2/unsafe.go -// Package mock_v2 is a generated GoMock package. +// Package mock is a generated GoMock package. package mock import ( @@ -47,8 +47,7 @@ func (m *MockUnsafeGetter) Unsafe() v20.UnsafeInterface { // Unsafe indicates an expected call of Unsafe. func (mr *MockUnsafeGetterMockRecorder) Unsafe() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unsafe", - reflect.TypeOf((*MockUnsafeGetter)(nil).Unsafe)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unsafe", reflect.TypeOf((*MockUnsafeGetter)(nil).Unsafe)) } // MockUnsafeInterface is a mock of UnsafeInterface interface. @@ -75,9 +74,7 @@ func (m *MockUnsafeInterface) EXPECT() *MockUnsafeInterfaceMockRecorder { } // DeleteServiceGcSafePoint mocks base method. -func (m *MockUnsafeInterface) DeleteServiceGcSafePoint(ctx context.Context, - config *v2.UpstreamConfig, -) error { +func (m *MockUnsafeInterface) DeleteServiceGcSafePoint(ctx context.Context, config *v2.UpstreamConfig) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteServiceGcSafePoint", ctx, config) ret0, _ := ret[0].(error) @@ -85,12 +82,9 @@ func (m *MockUnsafeInterface) DeleteServiceGcSafePoint(ctx context.Context, } // DeleteServiceGcSafePoint indicates an expected call of DeleteServiceGcSafePoint. -func (mr *MockUnsafeInterfaceMockRecorder) DeleteServiceGcSafePoint(ctx, - config interface{}, -) *gomock.Call { +func (mr *MockUnsafeInterfaceMockRecorder) DeleteServiceGcSafePoint(ctx, config interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteServiceGcSafePoint", - reflect.TypeOf((*MockUnsafeInterface)(nil).DeleteServiceGcSafePoint), ctx, config) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteServiceGcSafePoint", reflect.TypeOf((*MockUnsafeInterface)(nil).DeleteServiceGcSafePoint), ctx, config) } // Metadata mocks base method. @@ -105,8 +99,7 @@ func (m *MockUnsafeInterface) Metadata(ctx context.Context) (*[]v2.EtcdData, err // Metadata indicates an expected call of Metadata. func (mr *MockUnsafeInterfaceMockRecorder) Metadata(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Metadata", - reflect.TypeOf((*MockUnsafeInterface)(nil).Metadata), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Metadata", reflect.TypeOf((*MockUnsafeInterface)(nil).Metadata), ctx) } // ResolveLock mocks base method. @@ -120,6 +113,5 @@ func (m *MockUnsafeInterface) ResolveLock(ctx context.Context, req *v2.ResolveLo // ResolveLock indicates an expected call of ResolveLock. func (mr *MockUnsafeInterfaceMockRecorder) ResolveLock(ctx, req interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResolveLock", - reflect.TypeOf((*MockUnsafeInterface)(nil).ResolveLock), ctx, req) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResolveLock", reflect.TypeOf((*MockUnsafeInterface)(nil).ResolveLock), ctx, req) } diff --git a/scripts/generate-mock.sh b/scripts/generate-mock.sh index 0e46f81ba67..e2f740d9f7e 100755 --- a/scripts/generate-mock.sh +++ b/scripts/generate-mock.sh @@ -32,6 +32,9 @@ fi "$MOCKGEN" -source cdc/capture/capture.go -destination cdc/capture/mock/capture_mock.go "$MOCKGEN" -source pkg/cmd/factory/factory.go -destination pkg/cmd/factory/mock/factory_mock.go -package mock_factory "$MOCKGEN" -source cdc/processor/sourcemanager/engine/engine.go -destination cdc/processor/sourcemanager/engine/mock/engine_mock.go +"$MOCKGEN" -source pkg/api/v2/changefeed.go -destination pkg/api/v2/mock/changefeed_mock.go -package mock +"$MOCKGEN" -source pkg/api/v2/tso.go -destination pkg/api/v2/mock/tso_mock.go -package mock +"$MOCKGEN" -source pkg/api/v2/unsafe.go -destination pkg/api/v2/mock/unsafe_mock.go -package mock # DM mock "$MOCKGEN" -package pbmock -destination dm/pbmock/dmmaster.go github.com/pingcap/tiflow/dm/pb MasterClient,MasterServer From 947ff4a6745eef1386ad43610c5a986e951ab90d Mon Sep 17 00:00:00 2001 From: sdojjy Date: Fri, 6 Jan 2023 14:03:40 +0800 Subject: [PATCH 3/8] fix ut --- pkg/cmd/cli/cli_changefeed_remove_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/cli/cli_changefeed_remove_test.go b/pkg/cmd/cli/cli_changefeed_remove_test.go index 3bd11b30a73..58767aaebb3 100644 --- a/pkg/cmd/cli/cli_changefeed_remove_test.go +++ b/pkg/cmd/cli/cli_changefeed_remove_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/api/v1/mock" + mock_v2 "github.com/pingcap/tiflow/pkg/api/v2/mock" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" ) @@ -29,12 +30,13 @@ func TestChangefeedRemoveCli(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() cf := mock.NewMockChangefeedInterface(ctrl) - f := &mockFactory{changefeeds: cf} + cfv2 := mock_v2.NewMockChangefeedInterface(ctrl) + f := &mockFactory{changefeeds: cf, changefeedsv2: cfv2} cmd := newCmdRemoveChangefeed(f) cf.EXPECT().Get(gomock.Any(), "abc").Return(&model.ChangefeedDetail{}, nil) - cf.EXPECT().Delete(gomock.Any(), "abc").Return(nil) + cfv2.EXPECT().Delete(gomock.Any(), "abc").Return(nil) cf.EXPECT().Get(gomock.Any(), "abc").Return(nil, cerror.ErrChangeFeedNotExists.GenWithStackByArgs("abc")) os.Args = []string{"remove", "--changefeed-id=abc"} From 014d47f59af4be785cbabed8bc08784f081274b8 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Fri, 6 Jan 2023 14:53:54 +0800 Subject: [PATCH 4/8] fix ut --- cdc/api/v2/changefeed_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 9e788f315ea..2d161ce423b 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -741,5 +741,9 @@ func TestDeleteChangefeed(t *testing.T) { req, _ = http.NewRequestWithContext(context.Background(), remove.method, fmt.Sprintf(remove.url, validID), nil) router.ServeHTTP(w, req) + respErr = model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Code, "ErrReachMaxTry") require.Equal(t, http.StatusInternalServerError, w.Code) } From 2e9456904998ad1655c2a36f2b1bbb45e313edc0 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 9 Jan 2023 11:16:34 +0800 Subject: [PATCH 5/8] add intergration test --- tests/integration_tests/http_api_tls/run.sh | 1 + tests/integration_tests/http_api_tls/util/test_case.py | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/integration_tests/http_api_tls/run.sh b/tests/integration_tests/http_api_tls/run.sh index cea454e3974..1b127dff726 100644 --- a/tests/integration_tests/http_api_tls/run.sh +++ b/tests/integration_tests/http_api_tls/run.sh @@ -120,6 +120,7 @@ function run() { "get_tso" "verify_table" "create_changefeed_v2" + "delete_changefeed_v2" "unsafe_apis" ) diff --git a/tests/integration_tests/http_api_tls/util/test_case.py b/tests/integration_tests/http_api_tls/util/test_case.py index 69be227997b..c6899b99595 100644 --- a/tests/integration_tests/http_api_tls/util/test_case.py +++ b/tests/integration_tests/http_api_tls/util/test_case.py @@ -525,6 +525,7 @@ def compose_tso(ps, ls): "get_tso": get_tso, "verify_table": verify_table, "create_changefeed_v2": create_changefeed_v2, + "delete_changefeed_v2": delete_changefeed_v2, "unsafe_apis": unsafe_apis } From d9fcf18819467189ae9994f93c975af342406370 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 9 Jan 2023 11:37:38 +0800 Subject: [PATCH 6/8] fix integration test --- tests/integration_tests/http_api_tls/run.sh | 1 + tests/integration_tests/http_api_tls/util/test_case.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration_tests/http_api_tls/run.sh b/tests/integration_tests/http_api_tls/run.sh index 1b127dff726..e16d4c69463 100644 --- a/tests/integration_tests/http_api_tls/run.sh +++ b/tests/integration_tests/http_api_tls/run.sh @@ -68,6 +68,7 @@ function run() { ensure $MAX_RETRIES check_changefeed_state "https://${TLS_PD_HOST}:${TLS_PD_PORT}" "changefeed-test1" "normal" "null" ${TLS_DIR} ensure $MAX_RETRIES check_changefeed_state "https://${TLS_PD_HOST}:${TLS_PD_PORT}" "changefeed-test2" "normal" "null" ${TLS_DIR} ensure $MAX_RETRIES check_changefeed_state "https://${TLS_PD_HOST}:${TLS_PD_PORT}" "changefeed-test3" "normal" "null" ${TLS_DIR} + ensure $MAX_RETRIES check_changefeed_state "https://${TLS_PD_HOST}:${TLS_PD_PORT}" "changefeed-test4" "normal" "null" ${TLS_DIR} # test processor query with no attached tables #TODO: comment this test temporary diff --git a/tests/integration_tests/http_api_tls/util/test_case.py b/tests/integration_tests/http_api_tls/util/test_case.py index c6899b99595..271c4ce374e 100644 --- a/tests/integration_tests/http_api_tls/util/test_case.py +++ b/tests/integration_tests/http_api_tls/util/test_case.py @@ -21,7 +21,7 @@ def create_changefeed(sink_uri): url = BASE_URL1+"/changefeeds" # create changefeed - for i in range(1, 4): + for i in range(1, 5): data = { "changefeed_id": "changefeed-test"+str(i), "sink_uri": "blackhole://", @@ -484,7 +484,7 @@ def delete_changefeed_v2(): resp = rq.delete(url, cert=CERT, verify=VERIFY) assert (resp.status_code == rq.codes.no_content) - print("pass test: remove changefeed") + print("pass test: remove changefeed v2") # util functions define belows From 7bafb077df05a3a9f695e1778a6463e279bbd382 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 9 Jan 2023 12:23:30 +0800 Subject: [PATCH 7/8] fix integration test --- tests/integration_tests/http_api_tls/util/test_case.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests/http_api_tls/util/test_case.py b/tests/integration_tests/http_api_tls/util/test_case.py index 271c4ce374e..34a7076ae7a 100644 --- a/tests/integration_tests/http_api_tls/util/test_case.py +++ b/tests/integration_tests/http_api_tls/util/test_case.py @@ -480,7 +480,7 @@ def delete_changefeed_v2(): assert resp.json()["error_code"] == "CDC:ErrChangeFeedNotExists" # test remove changefeed not exists - url = BASE_URL0+"/changefeeds/changefeed-not-exists" + url = BASE_URL0_V2+"/changefeeds/changefeed-not-exists" resp = rq.delete(url, cert=CERT, verify=VERIFY) assert (resp.status_code == rq.codes.no_content) From 177af6985f239786f0a55707b816d9b905e941ce Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 9 Jan 2023 14:22:27 +0800 Subject: [PATCH 8/8] fix integration test --- pkg/api/v1/changefeed.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pkg/api/v1/changefeed.go b/pkg/api/v1/changefeed.go index 5b5eca0721e..10fd9c29f0b 100644 --- a/pkg/api/v1/changefeed.go +++ b/pkg/api/v1/changefeed.go @@ -31,7 +31,6 @@ type ChangefeedsGetter interface { type ChangefeedInterface interface { Get(ctx context.Context, name string) (*model.ChangefeedDetail, error) List(ctx context.Context, state string) (*[]model.ChangefeedCommonInfo, error) - Delete(ctx context.Context, name string) error Pause(ctx context.Context, name string) error Resume(ctx context.Context, name string) error } @@ -88,11 +87,3 @@ func (c *changefeeds) Resume(ctx context.Context, name string) error { WithURI(u). Do(ctx).Error() } - -// Delete delete the changefeed -func (c *changefeeds) Delete(ctx context.Context, name string) error { - u := fmt.Sprintf("changefeeds/%s", name) - return c.client.Delete(). - WithURI(u). - Do(ctx).Error() -}