-
Notifications
You must be signed in to change notification settings - Fork 49
/
Copy pathconnector.go
143 lines (115 loc) · 4.48 KB
/
connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:generate mockgen -destination=mock/connector.go -package=mock -mock_names=Source=Source,Destination=Destination . Source,Destination
//go:generate stringer -type=Type -trimprefix Type
package connector
import (
"context"
"time"
"github.com/conduitio/conduit/pkg/record"
)
const (
TypeSource Type = iota + 1
TypeDestination
)
const (
ProvisionTypeAPI ProvisionType = iota
ProvisionTypeConfig
)
type (
// Type defines the connector type.
Type int
// ProvisionType defines provisioning type
ProvisionType int
)
type Connector interface {
ID() string
Type() Type
Config() Config
SetConfig(Config)
ProvisionedBy() ProvisionType
CreatedAt() time.Time
UpdatedAt() time.Time
SetUpdatedAt(time.Time)
// IsRunning returns true if the connector is running and ready to accept
// calls to Read or Write (depending on the connector type).
IsRunning() bool
// Validate checks if the connector is set up correctly.
Validate(ctx context.Context, settings map[string]string) error
// Errors returns a channel that is used to signal the node that the
// connector experienced an error when it was processing something
// asynchronously (e.g. persisting state).
Errors() <-chan error
// Open will start the plugin process and call the Open method on the
// plugin. After the connector has been successfully opened it is considered
// as running (IsRunning returns true) and can be stopped again with
// Teardown. Open will return an error if called on a running connector.
Open(context.Context) error
// Teardown will call the Teardown method on the plugin and stop the plugin
// process. After the connector has been successfully torn down it is
// considered as stopped (IsRunning returns false) and can be opened again
// with Open. Teardown will return an error if called on a stopped
// connector.
Teardown(context.Context) error
}
// Source is a connector that can read records from a source.
type Source interface {
Connector
State() SourceState
SetState(state SourceState)
// Read reads data from a data source and returns the record for the
// requested position.
Read(context.Context) (record.Record, error)
// Ack signals to the source that the message has been successfully
// processed and can be acknowledged.
Ack(context.Context, record.Position) error
// Stop signals to the source to stop producing records. After this call
// Read will produce records until the record with the last position has
// been read (Conduit might have already received that record).
Stop(context.Context) (record.Position, error)
}
// Destination is a connector that can write records to a destination.
type Destination interface {
Connector
State() DestinationState
SetState(state DestinationState)
// Write sends a record to the connector and returns nil if the record was
// successfully received. This does not necessarily mean that the record was
// successfully processed and written to the 3rd party system, it might have
// been cached and will be written at a later point in time. Acknowledgments
// can be received through Ack to figure out if a record was actually
// processed or if an error happened while processing it.
Write(context.Context, record.Record) error
// Ack blocks until an acknowledgment is received that a record was
// processed and returns the position of that record. If the record wasn't
// successfully processed the function returns the position and an error.
Ack(context.Context) (record.Position, error)
// Stop signals to the destination that no more records will be produced
// after record with the last position.
Stop(context.Context, record.Position) error
}
// Config collects common data stored for a connector.
type Config struct {
Name string
Settings map[string]string
Plugin string
PipelineID string
ProcessorIDs []string
}
type SourceState struct {
Position record.Position
}
type DestinationState struct {
Positions map[string]record.Position
}