Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apiv2(ticdc): delete changefeed #8020

Merged
merged 12 commits into from
Jan 10, 2023
1 change: 1 addition & 0 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
changefeedGroup.POST("", api.createChangefeed)
changefeedGroup.PUT("/:changefeed_id", api.updateChangefeed)
changefeedGroup.DELETE("/:changefeed_id", api.deleteChangefeed)
changefeedGroup.GET("/:changefeed_id/meta_info", api.getChangeFeedMetaInfo)
changefeedGroup.POST("/:changefeed_id/resume", api.resumeChangefeed)

Expand Down
53 changes: 53 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
Expand Down Expand Up @@ -296,6 +297,58 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
c.JSON(http.StatusOK, toAPIModel(newCfInfo, true))
}

// deleteChangefeed handles delete changefeed request,
func (h *OpenAPIV2) deleteChangefeed(c *gin.Context) {
ctx := c.Request.Context()
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
_, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
if cerror.ErrChangeFeedNotExists.Equal(err) {
c.Status(http.StatusNoContent)
return
}
_ = c.Error(err)
return
}

job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminRemove,
}

if err := api.HandleOwnerJob(ctx, h.capture, job); err != nil {
_ = c.Error(err)
return
}

// Owner needs at least two ticks to remove a changefeed,
// we need to wait for it.
err = retry.Do(ctx, func() error {
_, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
if strings.Contains(err.Error(), "ErrChangeFeedNotExists") {
return nil
}
return err
}
return cerror.ErrChangeFeedDeletionUnfinished.GenWithStackByArgs(changefeedID)
},
retry.WithMaxTries(100), // max retry duration is 1 minute
retry.WithBackoffBaseDelay(600), // default owner tick interval is 200ms
retry.WithIsRetryableErr(cerror.IsRetryableError))

if err != nil {
_ = c.Error(err)
return
}
c.Status(http.StatusNoContent)
}

// getChangeFeedMetaInfo returns the metaInfo of a changefeed
func (h *OpenAPIV2) getChangeFeedMetaInfo(c *gin.Context) {
ctx := c.Request.Context()
Expand Down
87 changes: 87 additions & 0 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,3 +660,90 @@ func TestResumeChangefeed(t *testing.T) {
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
}

func TestDeleteChangefeed(t *testing.T) {
remove := testCase{url: "/api/v2/changefeeds/%s", method: "DELETE"}
helpers := NewMockAPIV2Helpers(gomock.NewController(t))
cp := mock_capture.NewMockCapture(gomock.NewController(t))
owner := mock_owner.NewMockOwner(gomock.NewController(t))
apiV2 := NewOpenAPIV2ForTest(cp, helpers)
router := newRouter(apiV2)

pdClient := &mockPDClient{}
etcdClient := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t))
mockUpManager := upstream.NewManager4Test(pdClient)
statusProvider := mock_owner.NewMockStatusProvider(gomock.NewController(t))

etcdClient.EXPECT().
GetEnsureGCServiceID(gomock.Any()).
Return(etcd.GcServiceIDForTest()).AnyTimes()
cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient, nil).AnyTimes()
cp.EXPECT().GetUpstreamManager().Return(mockUpManager, nil).AnyTimes()
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().IsOwner().Return(true).AnyTimes()
cp.EXPECT().GetOwner().Return(owner, nil).AnyTimes()
owner.EXPECT().EnqueueJob(gomock.Any(), gomock.Any()).
Do(func(adminJob model.AdminJob, done chan<- error) {
require.EqualValues(t, changeFeedID, adminJob.CfID)
require.EqualValues(t, model.AdminRemove, adminJob.Type)
close(done)
}).AnyTimes()

// case 1: invalid changefeed id
w := httptest.NewRecorder()
invalidID := "@^Invalid"
req, _ := http.NewRequestWithContext(context.Background(),
remove.method, fmt.Sprintf(remove.url, invalidID), nil)
router.ServeHTTP(w, req)
respErr := model.HTTPError{}
err := json.NewDecoder(w.Body).Decode(&respErr)
require.Nil(t, err)
require.Contains(t, respErr.Code, "ErrAPIInvalidParam")

// case 2: changefeed not exists
validID := changeFeedID.ID
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return(
nil, cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID))
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), remove.method,
fmt.Sprintf(remove.url, validID), nil)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusNoContent, w.Code)

// case 3: query changefeed error
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return(
nil, cerrors.ErrChangefeedUpdateRefused.GenWithStackByArgs(validID))
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), remove.method,
fmt.Sprintf(remove.url, validID), nil)
router.ServeHTTP(w, req)
respErr = model.HTTPError{}
err = json.NewDecoder(w.Body).Decode(&respErr)
require.Nil(t, err)
require.Contains(t, respErr.Code, "ErrChangefeedUpdateRefused")

// case 4: remove changefeed
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return(
&model.ChangeFeedStatus{}, nil)
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return(
nil, cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID))
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), remove.method,
fmt.Sprintf(remove.url, validID), nil)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusNoContent, w.Code)

// case 5: remove changefeed failed
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).AnyTimes().Return(
&model.ChangeFeedStatus{}, nil)
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), remove.method,
fmt.Sprintf(remove.url, validID), nil)
router.ServeHTTP(w, req)
respErr = model.HTTPError{}
err = json.NewDecoder(w.Body).Decode(&respErr)
require.Nil(t, err)
require.Contains(t, respErr.Code, "ErrReachMaxTry")
require.Equal(t, http.StatusInternalServerError, w.Code)
}
9 changes: 0 additions & 9 deletions pkg/api/v1/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type ChangefeedsGetter interface {
type ChangefeedInterface interface {
Get(ctx context.Context, name string) (*model.ChangefeedDetail, error)
List(ctx context.Context, state string) (*[]model.ChangefeedCommonInfo, error)
Delete(ctx context.Context, name string) error
Pause(ctx context.Context, name string) error
Resume(ctx context.Context, name string) error
}
Expand Down Expand Up @@ -88,11 +87,3 @@ func (c *changefeeds) Resume(ctx context.Context, name string) error {
WithURI(u).
Do(ctx).Error()
}

// Delete delete the changefeed
func (c *changefeeds) Delete(ctx context.Context, name string) error {
u := fmt.Sprintf("changefeeds/%s", name)
return c.client.Delete().
WithURI(u).
Do(ctx).Error()
}
12 changes: 12 additions & 0 deletions pkg/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type ChangefeedInterface interface {
name string) (*v2.ChangeFeedInfo, error)
// Resume resumes a changefeed with given config
Resume(ctx context.Context, cfg *v2.ResumeChangefeedConfig, name string) error
// Delete deletes a changefeed by name
Delete(ctx context.Context, name string) error
}

// changefeeds implements ChangefeedInterface
Expand Down Expand Up @@ -112,3 +114,13 @@ func (c *changefeeds) Resume(ctx context.Context,
WithBody(cfg).
Do(ctx).Error()
}

// Delete a changefeed
func (c *changefeeds) Delete(ctx context.Context,
name string,
) error {
u := fmt.Sprintf("changefeeds/%s", name)
return c.client.Delete().
WithURI(u).
Do(ctx).Error()
}
18 changes: 16 additions & 2 deletions pkg/api/v2/mock/changefeed_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 6 additions & 12 deletions pkg/api/v2/mock/tso_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 8 additions & 16 deletions pkg/api/v2/mock/unsafe_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading