Skip to content

Commit

Permalink
[close #205] support host as sink url (#206)
Browse files Browse the repository at this point in the history
* fix hostname error

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix comment

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>
  • Loading branch information
zeminzhou authored Aug 22, 2022
1 parent 1d8847b commit ff3a1f0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 34 deletions.
52 changes: 19 additions & 33 deletions cdc/cdc/sink/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,37 +31,23 @@ func TestValidateSink(t *testing.T) {
replicateConfig := config.GetDefaultReplicaConfig()
opts := make(map[string]string)

// test sink uri right
sinkURI := "tikv://127.0.0.1:3306/"
err := Validate(ctx, sinkURI, replicateConfig, opts)
require.Nil(t, err)

sinkURI = "tikv://127.0.0.1:3306/?concurrency=4"
err = Validate(ctx, sinkURI, replicateConfig, opts)
require.Nil(t, err)

sinkURI = "tikv://127.0.0.1:3306,127.0.0.1:3307/?concurrency=4"
err = Validate(ctx, sinkURI, replicateConfig, opts)
require.Nil(t, err)

sinkURI = "blackhole://"
err = Validate(ctx, sinkURI, replicateConfig, opts)
require.Nil(t, err)

// test sink uri wrong
sinkURI = "tikv://http://127.0.0.1:3306/"
err = Validate(ctx, sinkURI, replicateConfig, opts)
require.NotNil(t, err)

sinkURI = "tikv://127.0.0.1:3306a/"
err = Validate(ctx, sinkURI, replicateConfig, opts)
require.NotNil(t, err)

sinkURI = "tikv://a127.0.0.1:3306/"
err = Validate(ctx, sinkURI, replicateConfig, opts)
require.NotNil(t, err)

sinkURI = "tikv://127.0.0.1:3306, tikv://127.0.0.1:3307/"
err = Validate(ctx, sinkURI, replicateConfig, opts)
require.NotNil(t, err)
testCases := []struct {
sinkURI string
expected bool
}{
{"tikv://127.0.0.1:3306/", true},
{"tikv://127.0.0.1:3306/?concurrency=4", true},
{"blackhole://", true},
{"tikv://127.0.0.1:3306,127.0.0.1:3307/", true},
{"tikv://hostname:3306", true},
{"tikv://http://127.0.0.1:3306/", false},
{"tikv://127.0.0.1:3306a/", false},
{"tikv://127.0.0.1:3306, tikv://127.0.0.1:3307/", false},
{"tikv://hostname:3306x", false},
}

for _, tc := range testCases {
err := Validate(ctx, tc.sinkURI, replicateConfig, opts)
require.Equal(t, tc.expected, err == nil)
}
}
2 changes: 1 addition & 1 deletion cdc/cdc/sink/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func parseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config,
if len(pdAddr) > 0 {
for i, addr := range pdAddr {
host, port, err := net.SplitHostPort(addr)
if err != nil || !validator.IsIP(host) || !validator.IsPort(port) {
if err != nil || !validator.IsHost(host) || !validator.IsPort(port) {
err = fmt.Errorf("Invalid pd addr: %v, err: %v", addr, err)
return nil, nil, cerror.WrapError(cerror.ErrTiKVInvalidConfig, err)
}
Expand Down

0 comments on commit ff3a1f0

Please sign in to comment.