-
-
Notifications
You must be signed in to change notification settings - Fork 297
/
Copy pathPulsarContainer.cs
121 lines (104 loc) · 4.72 KB
/
PulsarContainer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
namespace Testcontainers.Pulsar;
/// <inheritdoc cref="DockerContainer" />
[PublicAPI]
public sealed class PulsarContainer : DockerContainer
{
private readonly PulsarConfiguration _configuration;
/// <summary>
/// Initializes a new instance of the <see cref="PulsarContainer" /> class.
/// </summary>
/// <param name="configuration">The container configuration.</param>
public PulsarContainer(PulsarConfiguration configuration)
: base(configuration)
{
_configuration = configuration;
}
/// <summary>
/// Gets the Pulsar broker address.
/// </summary>
/// <returns>The Pulsar broker address.</returns>
public string GetBrokerAddress()
{
return new UriBuilder("pulsar", Hostname, GetMappedPublicPort(PulsarBuilder.PulsarBrokerDataPort)).ToString();
}
/// <summary>
/// Gets the Pulsar web service address.
/// </summary>
/// <returns>The Pulsar web service address.</returns>
public string GetServiceAddress()
{
return new UriBuilder(Uri.UriSchemeHttp, Hostname, GetMappedPublicPort(PulsarBuilder.PulsarWebServicePort)).ToString();
}
/// <summary>
/// Creates an authentication token.
/// </summary>
/// <param name="expiryTime">The time after the authentication token expires.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>A task that completes when the authentication token has been created.</returns>
/// <exception cref="ArgumentException"></exception>
public async Task<string> CreateAuthenticationTokenAsync(TimeSpan expiryTime, CancellationToken ct = default)
{
if (_configuration.AuthenticationEnabled.HasValue && !_configuration.AuthenticationEnabled.Value)
{
throw new ArgumentException("Failed to create token. Authentication is not enabled.");
}
var command = new List<string>
{
"bin/pulsar",
"tokens",
"create",
"--secret-key",
PulsarBuilder.SecretKeyFilePath,
"--subject",
PulsarBuilder.Username
};
if (!Timeout.InfiniteTimeSpan.Equals(expiryTime))
{
int secondsToMilliseconds;
if (_configuration.Image.Tag.StartsWith("3.2") || _configuration.Image.Tag.StartsWith("latest"))
{
Logger.LogWarning("The 'apachepulsar/pulsar:3.2.?' image contains a regression. The expiry time is converted to the wrong unit of time: https://github.com/apache/pulsar/issues/22811.");
secondsToMilliseconds = 1000;
}
else
{
secondsToMilliseconds = 1;
}
command.Add("--expiry-time");
command.Add($"{secondsToMilliseconds * expiryTime.TotalSeconds}s");
}
var tokensResult = await ExecAsync(command, ct)
.ConfigureAwait(false);
if (tokensResult.ExitCode != 0)
{
throw new ArgumentException($"Failed to create token. Command returned a non-zero exit code: {tokensResult.Stderr}.");
}
return tokensResult.Stdout;
}
/// <summary>
/// Copies the Pulsar startup script to the container.
/// </summary>
/// <param name="ct">Cancellation token.</param>
/// <returns>A task that completes when the startup script has been copied.</returns>
internal Task CopyStartupScriptAsync(CancellationToken ct = default)
{
var startupScript = new StringWriter();
startupScript.NewLine = "\n";
startupScript.WriteLine("#!/bin/bash");
if (_configuration.AuthenticationEnabled.HasValue && _configuration.AuthenticationEnabled.Value)
{
startupScript.WriteLine("bin/pulsar tokens create-secret-key --output " + PulsarBuilder.SecretKeyFilePath);
startupScript.WriteLine("export brokerClientAuthenticationParameters=token:$(bin/pulsar tokens create --secret-key $PULSAR_PREFIX_tokenSecretKey --subject $superUserRoles)");
startupScript.WriteLine("export CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters");
startupScript.WriteLine("bin/apply-config-from-env.py conf/standalone.conf");
startupScript.WriteLine("bin/apply-config-from-env-with-prefix.py CLIENT_PREFIX_ conf/client.conf");
}
startupScript.Write("bin/pulsar standalone");
if (_configuration.FunctionsWorkerEnabled.HasValue && !_configuration.FunctionsWorkerEnabled.Value)
{
startupScript.Write(" --no-functions-worker");
startupScript.Write(" --no-stream-storage");
}
return CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), PulsarBuilder.StartupScriptFilePath, Unix.FileMode755, ct);
}
}