Skip to content

Commit

Permalink
add delete by expression method
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
  • Loading branch information
aoiasd committed Sep 6, 2023
1 parent 684ef2d commit 6637aec
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 0 deletions.
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ type Client interface {
FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error)
// DeleteByPks deletes entries related to provided primary keys
DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error
// DeleteByExpr deletes entries match expression
DeleteByExpr(ctx context.Context, collName string, partitionName string, expr string) error
// Upsert column-based data of collection, returns id column values
Upsert(ctx context.Context, collName string, partitionName string, columns ...entity.Column) (entity.Column, error)
// Search search with bool expression
Expand Down
66 changes: 66 additions & 0 deletions client/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,72 @@ func TestGrpcDeleteByPks(t *testing.T) {
})
}

func TestGrpcDeleteByExpr(t *testing.T) {
ctx := context.Background()

c := testClient(ctx, t)
defer c.Close()

mockServer.SetInjection(MDescribeCollection, describeCollectionInjection(t, 1, testCollectionName, defaultSchema()))
defer mockServer.DelInjection(MDescribeCollection)

t.Run("normal delete by pks", func(t *testing.T) {
partName := "testPart"
mockServer.SetInjection(MHasPartition, hasPartitionInjection(t, testCollectionName, true, partName))
defer mockServer.DelInjection(MHasPartition)
mockServer.SetInjection(MDelete, func(_ context.Context, raw proto.Message) (proto.Message, error) {
req, ok := raw.(*milvuspb.DeleteRequest)
if !ok {
t.FailNow()
}
assert.Equal(t, testCollectionName, req.GetCollectionName())
assert.Equal(t, partName, req.GetPartitionName())

resp := &milvuspb.MutationResult{}
s, err := SuccessStatus()
resp.Status = s
return resp, err
})
defer mockServer.DelInjection(MDelete)

err := c.DeleteByExpr(ctx, testCollectionName, partName, "")
assert.NoError(t, err)
})

t.Run("Bad request deletes", func(t *testing.T) {
partName := "testPart"
mockServer.SetInjection(MHasPartition, hasPartitionInjection(t, testCollectionName, false, partName))
defer mockServer.DelInjection(MHasPartition)

// non-exist collection
err := c.DeleteByExpr(ctx, "non-exists-collection", "", "")
assert.Error(t, err)

// non-exist parition
err = c.DeleteByExpr(ctx, testCollectionName, "non-exists-part", "")
assert.Error(t, err)
})
t.Run("delete services fail", func(t *testing.T) {
mockServer.SetInjection(MDelete, func(_ context.Context, raw proto.Message) (proto.Message, error) {
resp := &milvuspb.MutationResult{}
return resp, errors.New("mockServer.d error")
})

err := c.DeleteByExpr(ctx, testCollectionName, "", "")
assert.Error(t, err)

mockServer.SetInjection(MDelete, func(_ context.Context, raw proto.Message) (proto.Message, error) {
resp := &milvuspb.MutationResult{}
resp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
return resp, nil
})
err = c.DeleteByExpr(ctx, testCollectionName, "", "")
assert.Error(t, err)
})
}

type SearchSuite struct {
MockSuiteBase
sch *entity.Schema
Expand Down
38 changes: 38 additions & 0 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,44 @@ func (c *GrpcClient) DeleteByPks(ctx context.Context, collName string, partition
return nil
}

// DeleteByExpr deletes entries match expression
func (c *GrpcClient) DeleteByExpr(ctx context.Context, collName string, partitionName string, expr string) error {
if c.Service == nil {
return ErrClientNotReady
}

// check collection name
if err := c.checkCollectionExists(ctx, collName); err != nil {
return err
}

// check partition name
if partitionName != "" {
err := c.checkPartitionExists(ctx, collName, partitionName)
if err != nil {
return err
}
}

req := &milvuspb.DeleteRequest{
DbName: "",
CollectionName: collName,
PartitionName: partitionName,
Expr: expr,
}

resp, err := c.Service.Delete(ctx, req)
if err != nil {
return err
}
err = handleRespStatus(resp.GetStatus())
if err != nil {
return err
}
MetaCache.setSessionTs(collName, resp.Timestamp)
return nil
}

// Upsert Index into collection with column-based format
// collName is the collection name
// partitionName is the partition to upsert, if not specified(empty), default partition will be used
Expand Down
8 changes: 8 additions & 0 deletions test/base/milvus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,14 @@ func (mc *MilvusClient) DeleteByPks(ctx context.Context, collName string, partit
return err
}

// DeleteByExpr deletes entries match expression
func (mc *MilvusClient) DeleteByExpr(ctx context.Context, collName string, partitionName string, expr string) error {
preRequest("DeleteByPks", ctx, collName, partitionName, expr)
err := mc.mClient.DeleteByExpr(ctx, collName, partitionName, expr)
postResponse("DeleteByPks", err)
return err
}

// Search search from collection
func (mc *MilvusClient) Search(ctx context.Context, collName string, partitions []string, expr string,
outputFields []string, vectors []entity.Vector, vectorField string, metricType entity.MetricType, topK int, sp entity.SearchParam, opts ...client.SearchQueryOptionFunc,
Expand Down

0 comments on commit 6637aec

Please sign in to comment.