-
Notifications
You must be signed in to change notification settings - Fork 367
/
Copy pathclient-with-encryption.py
85 lines (72 loc) · 2.58 KB
/
client-with-encryption.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import asyncio
import logging
import sys
import socket
from pathlib import Path
from cryptography.x509.oid import ExtendedKeyUsageOID
sys.path.insert(0, "..")
from asyncua import Client
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
from asyncua.crypto.cert_gen import setup_self_signed_certificate
from asyncua.crypto.validator import CertificateValidator, CertificateValidatorOptions
from asyncua.crypto.truststore import TrustStore
from asyncua import ua
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
USE_TRUST_STORE = True
cert_idx = 4
cert_base = Path(__file__).parent
cert = Path(cert_base / f"certificates/peer-certificate-example-{cert_idx}.der")
private_key = Path(cert_base / f"certificates/peer-private-key-example-{cert_idx}.pem")
async def task(loop):
host_name = socket.gethostname()
client_app_uri = f"urn:{host_name}:foobar:myselfsignedclient"
url = "opc.tcp://127.0.0.1:4840/freeopcua/server/"
await setup_self_signed_certificate(
private_key,
cert,
client_app_uri,
host_name,
[ExtendedKeyUsageOID.CLIENT_AUTH],
{
"countryName": "CN",
"stateOrProvinceName": "AState",
"localityName": "Foo",
"organizationName": "Bar Ltd",
},
)
client = Client(url=url)
client.application_uri = client_app_uri
await client.set_security(
SecurityPolicyBasic256Sha256,
certificate=str(cert),
private_key=str(private_key),
server_certificate="certificate-example.der",
)
if USE_TRUST_STORE:
trust_store = TrustStore([Path("examples") / "certificates" / "trusted" / "certs"], [])
await trust_store.load()
validator = CertificateValidator(
CertificateValidatorOptions.TRUSTED_VALIDATION | CertificateValidatorOptions.PEER_SERVER, trust_store
)
else:
validator = CertificateValidator(
CertificateValidatorOptions.EXT_VALIDATION | CertificateValidatorOptions.PEER_SERVER
)
client.certificate_validator = validator
try:
async with client:
objects = client.nodes.objects
child = await objects.get_child(["0:MyObject", "0:MyVariable"])
print(await child.get_value())
await child.set_value(42)
print(await child.get_value())
except ua.UaError as exp:
_logger.error(exp)
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.close()
if __name__ == "__main__":
main()