Skip to content

Commit

Permalink
Merge pull request #38 from meshtastic/wip
Browse files Browse the repository at this point in the history
MQTT Client Proxy handling and v2.1.18 protos
  • Loading branch information
thebentern authored Jul 9, 2023
2 parents cad7745 + 40b531f commit 802905e
Show file tree
Hide file tree
Showing 15 changed files with 1,406 additions and 270 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"dotnet.defaultSolution": "Meshtastic.sln"
}
3 changes: 3 additions & 0 deletions Meshtastic.Cli/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"dotnet.defaultSolution": "Meshtastic.Cli.sln"
}
106 changes: 106 additions & 0 deletions Meshtastic.Cli/CommandHandlers/MqttProxyCommandHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using Meshtastic.Data;
using Meshtastic.Data.MessageFactories;
using MQTTnet;
using Meshtastic.Protobufs;
using MQTTnet.Client;
using Microsoft.Extensions.Logging;

namespace Meshtastic.Cli.CommandHandlers;

public class MqttProxyCommandHandler : DeviceCommandHandler
{
public MqttProxyCommandHandler(DeviceConnectionContext context, CommandContext commandContext) : base(context, commandContext) { }

public async Task<DeviceStateContainer> Handle()
{
var wantConfig = new ToRadioMessageFactory().CreateWantConfigMessage();
var container = await Connection.WriteToRadio(wantConfig, CompleteOnConfigReceived);
Connection.Disconnect();
return container;
}

public override async Task OnCompleted(FromRadio packet, DeviceStateContainer container)
{
// connect to mqtt server with mqttnet
var factory = new MqttFactory();
using var mqttClient = factory.CreateMqttClient();
MqttClientOptions options = GetMqttClientOptions(container);
await mqttClient.ConnectAsync(options, CancellationToken.None);

var root = String.IsNullOrWhiteSpace(container.LocalModuleConfig.Mqtt.Root) ? "msh" : container.LocalModuleConfig.Mqtt.Root;
var prefix = $"{root}/{container.Metadata.FirmwareVersion.First()}";
var subscriptionTopic = $"{prefix}/#";

Logger.LogInformation($"Subscribing to topic: {subscriptionTopic}");
await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder()
.WithTopic(subscriptionTopic)
.Build());

mqttClient.ApplicationMessageReceivedAsync += async e =>
{
if (e.ApplicationMessage.Topic.StartsWith($"{prefix}/stat/"))
return;

Logger.LogInformation($"Received MQTT from host on topic: {e.ApplicationMessage.Topic}");

// Get bytes from utf8 string
var toRadio = new ToRadioMessageFactory()
.CreateMqttClientProxyMessage(e.ApplicationMessage.Topic, e.ApplicationMessage.PayloadSegment.ToArray(), e.ApplicationMessage.Retain);
Logger.LogDebug(toRadio.ToString());
await Connection.WriteToRadio(toRadio);
};

await Connection.ReadFromRadio(async (fromRadio, container) =>
{
if (fromRadio?.PayloadVariantCase == FromRadio.PayloadVariantOneofCase.MqttClientProxyMessage &&
fromRadio.MqttClientProxyMessage is not null)
{
var message = fromRadio.MqttClientProxyMessage;
Logger.LogInformation($"Received MQTT message from device to proxy on topic: {message.Topic}");
if (message.PayloadVariantCase == MqttClientProxyMessage.PayloadVariantOneofCase.Data)
{
Logger.LogDebug(ServiceEnvelope.Parser.ParseFrom(message.Data).ToString());
await mqttClient.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(message.Topic)
.WithPayload(message.Data.ToByteArray())
.WithRetainFlag(message.Retained)
.Build());
}
else if (message.PayloadVariantCase == MqttClientProxyMessage.PayloadVariantOneofCase.Text)
{
Logger.LogDebug(message.Text);
await mqttClient.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(message.Topic)
.WithPayload(message.Text)
.WithRetainFlag(message.Retained)
.Build());
}
}
return false;
});
}

private MqttClientOptions GetMqttClientOptions(DeviceStateContainer container)
{
var builder = new MqttClientOptionsBuilder()
.WithClientId(container.GetDeviceNodeInfo()?.User?.Id ?? container.MyNodeInfo.MyNodeNum.ToString());

var address = container.LocalModuleConfig.Mqtt.Address;
var host = address.Split(':').FirstOrDefault() ?? container.LocalModuleConfig.Mqtt.Address;
var port = address.Contains(":") ? address.Split(':').LastOrDefault() : null;

if (container.LocalModuleConfig.Mqtt.TlsEnabled)
{
builder = builder.WithTls()
.WithTcpServer(host, Int32.Parse(port ?? "8883"));
}
else {
builder = builder.WithTcpServer(host, Int32.Parse(port ?? "1883"));
}

if (container.LocalModuleConfig.Mqtt.Username is not null)
builder = builder.WithCredentials(container.LocalModuleConfig.Mqtt.Username, container.LocalModuleConfig.Mqtt.Password);

return builder.Build();
}
}
20 changes: 20 additions & 0 deletions Meshtastic.Cli/Commands/MqttProxyCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Meshtastic.Cli.Binders;
using Meshtastic.Cli.CommandHandlers;
using Meshtastic.Cli.Enums;
using Microsoft.Extensions.Logging;

namespace Meshtastic.Cli.Commands;
public class MqttProxyCommand : Command
{
public MqttProxyCommand(string name, string description, Option<string> port, Option<string> host,
Option<OutputFormat> output, Option<LogLevel> log) : base(name, description)
{
this.SetHandler(async (context, commandContext) =>
{
var handler = new MqttProxyCommandHandler(context, commandContext);
await handler.Handle();
},
new DeviceConnectionBinder(port, host),
new CommandContextBinder(log, output, new Option<uint?>("dest") { }, new Option<bool>("select-dest") { }));
}
}
1 change: 1 addition & 0 deletions Meshtastic.Cli/Meshtastic.Cli.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
<PackageReference Include="Microsoft.Extensions.Logging" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="7.0.0" />
<PackageReference Include="MQTTnet" Version="4.2.1.781" />
<PackageReference Include="QRCoder" Version="1.4.3" />
<PackageReference Include="SimpleExec" Version="11.0.0" />
<PackageReference Include="Spectre.Console" Version="0.46.0" />
Expand Down
25 changes: 25 additions & 0 deletions Meshtastic.Cli/Meshtastic.Cli.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.001.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Meshtastic.Cli", "Meshtastic.Cli.csproj", "{A3FB32D2-954E-4D1D-B789-1E14AA5B2CB6}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{A3FB32D2-954E-4D1D-B789-1E14AA5B2CB6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A3FB32D2-954E-4D1D-B789-1E14AA5B2CB6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A3FB32D2-954E-4D1D-B789-1E14AA5B2CB6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A3FB32D2-954E-4D1D-B789-1E14AA5B2CB6}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CF056994-0472-486E-8D07-0D470536C40D}
EndGlobalSection
EndGlobal
1 change: 1 addition & 0 deletions Meshtastic.Cli/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
root.AddCommand(new UpdateCommand("update", "Update the firmware of the serial connected device", port, host, output, log));
root.AddCommand(new ExportCommand("export", "Export the profile of the connected device as yaml", port, host, output, log));
root.AddCommand(new ImportCommand("import", "Import the profile export from a yaml file and set the connected device", port, host, output, log));
root.AddCommand(new MqttProxyCommand("mqtt-proxy", "Proxy to the MQTT server referenced in the MQTT module config of the connected device", port, host, output, log));

var parser = new CommandLineBuilder(root)
.UseExceptionHandler((ex, context) =>
Expand Down
4 changes: 4 additions & 0 deletions Meshtastic.Cli/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@
"import": {
"commandName": "Project",
"commandLineArgs": "import"
},
"mqtt": {
"commandName": "Project",
"commandLineArgs": "mqtt-proxy"
}
}
}
12 changes: 12 additions & 0 deletions Meshtastic/Data/MessageFactories/ToRadioMessageFactory.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Google.Protobuf;
using Meshtastic.Protobufs;
using static Meshtastic.Protobufs.XModem.Types;

Expand Down Expand Up @@ -29,4 +30,15 @@ public ToRadio CreateXmodemPacketMessage(Control control = XModem.Types.Control.
Control = control
}
};

public ToRadio CreateMqttClientProxyMessage(string topic, byte[] payload, bool retain = false) =>
new()
{
MqttClientProxyMessage = new MqttClientProxyMessage()
{
Topic = topic,
Data = ByteString.CopyFrom(payload),
Retained = retain,
}
};
}
2 changes: 1 addition & 1 deletion Meshtastic/Generated/Deviceonly.cs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public bool DidGpsReset {
private readonly pbc::RepeatedField<global::Meshtastic.Protobufs.NodeInfoLite> nodeDbLite_ = new pbc::RepeatedField<global::Meshtastic.Protobufs.NodeInfoLite>();
/// <summary>
///
/// New lite version of NodeDB to decrease
/// New lite version of NodeDB to decrease memory footprint
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
Expand Down
Loading

0 comments on commit 802905e

Please sign in to comment.