Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.opcua_listener): OPC UA Event subscriptions #11786

Merged
merged 7 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ require (
github.com/google/go-github/v32 v32.1.0
github.com/google/licensecheck v0.3.1
github.com/google/uuid v1.3.0
github.com/gopcua/opcua v0.3.3
github.com/gopcua/opcua v0.3.7
github.com/gophercloud/gophercloud v1.0.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1283,8 +1283,8 @@ github.com/googleapis/go-type-adapters v1.0.0 h1:9XdMn+d/G57qq1s8dNc5IesGCXHf6V2
github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/gookit/color v1.3.6/go.mod h1:R3ogXq2B9rTbXoSHJ1HyUVAZ3poOJHpd9nQmyGZsfvQ=
github.com/gopcua/opcua v0.3.3 h1:tu1t/mx9fJybry1KYljqDdzxmik+BZk6410LJ4QzM2E=
github.com/gopcua/opcua v0.3.3/go.mod h1:n/qSWDVB/KSPIG4vYhBSbs5zdYAW3yOcDCRrWd1BZo0=
github.com/gopcua/opcua v0.3.7 h1:iGjLW3D+ztnjtZQPKsJ0nwibHyDw1m11NfqOU8KSFQ8=
github.com/gopcua/opcua v0.3.7/go.mod h1:n/qSWDVB/KSPIG4vYhBSbs5zdYAW3yOcDCRrWd1BZo0=
github.com/gophercloud/gophercloud v0.16.0/go.mod h1:wRtmUelyIIv3CSSDI47aUwbs075O6i+LY+pXsKCBsb4=
github.com/gophercloud/gophercloud v1.0.0 h1:9nTGx0jizmHxDobe4mck89FyQHVyA3CaXLIUSGJjP9k=
github.com/gophercloud/gophercloud v1.0.0/go.mod h1:Q8fZtyi5zZxPS/j9aj3sSxtvj41AdQMDwyo1myduD5c=
Expand Down
214 changes: 214 additions & 0 deletions plugins/common/opcua/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package opcua

import (
"context"
"fmt"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"net/url"
"strconv"
"time"
)

type OpcUAWorkarounds struct {
AdditionalValidStatusCodes []string `toml:"additional_valid_status_codes"`
}

type OpcUAClientConfig struct {
Endpoint string `toml:"endpoint"`
SecurityPolicy string `toml:"security_policy"`
SecurityMode string `toml:"security_mode"`
Certificate string `toml:"certificate"`
PrivateKey string `toml:"private_key"`
Username string `toml:"username"`
Password string `toml:"password"`
AuthMethod string `toml:"auth_method"`
ConnectTimeout config.Duration `toml:"connect_timeout"`
RequestTimeout config.Duration `toml:"request_timeout"`

Workarounds OpcUAWorkarounds `toml:"workarounds"`
}

func (o *OpcUAClientConfig) Validate() error {
return o.validateEndpoint()
}

func (o *OpcUAClientConfig) validateEndpoint() error {
if o.Endpoint == "" {
return fmt.Errorf("endpoint url is empty")
}

_, err := url.Parse(o.Endpoint)
if err != nil {
return fmt.Errorf("endpoint url is invalid")
}

switch o.SecurityPolicy {
case "None", "Basic128Rsa15", "Basic256", "Basic256Sha256", "auto":
default:
return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityPolicy, o.Endpoint)
}

switch o.SecurityMode {
case "None", "Sign", "SignAndEncrypt", "auto":
default:
return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityMode, o.Endpoint)
}
return nil
}

func (o *OpcUAClientConfig) CreateClient(log telegraf.Logger) (*OpcUAClient, error) {
err := o.Validate()
if err != nil {
return nil, err
}

c := &OpcUAClient{
Config: o,
Log: log,
}
c.Log.Debug("Initialising OpcUAClient")
c.State = Disconnected

err = c.setupWorkarounds()
return c, err
}

// ConnectionState used for constants
type ConnectionState int

const (
// Disconnected constant State 0
Disconnected ConnectionState = iota
// Connecting constant State 1
Connecting
// Connected constant State 2
Connected
)

type OpcUAClient struct {
Config *OpcUAClientConfig
Log telegraf.Logger

State ConnectionState
Client *opcua.Client

opts []opcua.Option
codes []ua.StatusCode
}

func (o *OpcUAClient) Init() error {
return o.setupOptions()
}

// / setupOptions read the endpoints from the specified server and setup all authentication
func (o *OpcUAClient) setupOptions() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout))
defer cancel()
// Get a list of the endpoints for our target server
endpoints, err := opcua.GetEndpoints(ctx, o.Config.Endpoint)
if err != nil {
return err
}

if o.Config.Certificate == "" && o.Config.PrivateKey == "" {
if o.Config.SecurityPolicy != "None" || o.Config.SecurityMode != "None" {
o.Log.Debug("Generating self-signed certificate")
cert, privateKey, err := generateCert("urn:telegraf:gopcua:client", 2048,
o.Config.Certificate, o.Config.PrivateKey, 365*24*time.Hour)
if err != nil {
return err
}

o.Config.Certificate = cert
o.Config.PrivateKey = privateKey
}
}

o.Log.Debug("Configuring OPC UA connection options")
o.opts, err = o.generateClientOpts(endpoints)

return err
}

func (o *OpcUAClient) setupWorkarounds() error {
o.codes = []ua.StatusCode{ua.StatusOK}
for _, c := range o.Config.Workarounds.AdditionalValidStatusCodes {
val, err := strconv.ParseInt(c, 0, 32) // setting 32 bits to allow for safe conversion
if err != nil {
return err
}
o.codes = append(o.codes, ua.StatusCode(uint32(val)))
}

return nil
}

func (o *OpcUAClient) StatusCodeOK(code ua.StatusCode) bool {
for _, val := range o.codes {
if val == code {
return true
}
}
return false
}

// Connect to an OPC UA device
func (o *OpcUAClient) Connect() error {
o.Log.Debug("Connecting OPC UA Client to server")
u, err := url.Parse(o.Config.Endpoint)
if err != nil {
return err
}

switch u.Scheme {
case "opc.tcp":
o.State = Connecting

if o.Client != nil {
o.Log.Warnf("Closing connection due to Connect called while already instantiated", u)
if err := o.Client.Close(); err != nil {
// Only log the error but to not bail-out here as this prevents
// reconnections for multiple parties (see e.g. #9523).
o.Log.Errorf("Closing connection failed: %v", err)
}
}

o.Client = opcua.NewClient(o.Config.Endpoint, o.opts...)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout))
defer cancel()
if err := o.Client.Connect(ctx); err != nil {
o.State = Disconnected
return fmt.Errorf("error in Client Connection: %s", err)
}

o.State = Connected
o.Log.Debug("Connected to OPC UA Server")

default:
return fmt.Errorf("unsupported scheme %q in endpoint. Expected opc.tcp", u.Scheme)
}
return nil
}

func (o *OpcUAClient) Disconnect(ctx context.Context) error {
o.Log.Debug("Disconnecting from OPC UA Server")
u, err := url.Parse(o.Config.Endpoint)
if err != nil {
return err
}

switch u.Scheme {
case "opc.tcp":
o.State = Disconnected
// We can't do anything about failing to close a connection
//nolint:errcheck,revive
err := o.Client.CloseWithContext(ctx)
o.Client = nil
return err
default:
return fmt.Errorf("invalid controller")
}
}
31 changes: 31 additions & 0 deletions plugins/common/opcua/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package opcua

import (
"github.com/gopcua/opcua/ua"
"github.com/stretchr/testify/require"
"testing"
)

func TestSetupWorkarounds(t *testing.T) {
o := OpcUAClient{
Config: &OpcUAClientConfig{
Workarounds: OpcUAWorkarounds{
AdditionalValidStatusCodes: []string{"0xC0", "0x00AA0000"},
},
},
}

err := o.setupWorkarounds()
require.NoError(t, err)

require.Len(t, o.codes, 3)
require.Equal(t, o.codes[0], ua.StatusCode(0))
require.Equal(t, o.codes[1], ua.StatusCode(192))
require.Equal(t, o.codes[2], ua.StatusCode(11141120))
}

func TestCheckStatusCode(t *testing.T) {
var o OpcUAClient
o.codes = []ua.StatusCode{ua.StatusCode(0), ua.StatusCode(192), ua.StatusCode(11141120)}
require.Equal(t, o.StatusCodeOK(ua.StatusCode(192)), true)
}
Loading