Skip to content

Commit

Permalink
Add Logs upload method request handler (#956)
Browse files Browse the repository at this point in the history
* Blob uploader hookup

* Add JsonConstructor attribute

* Add tests

* Add tests
  • Loading branch information
varunpuranik authored Mar 19, 2019
1 parent fe8457f commit e064a59
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Requests
{
using Microsoft.Azure.Devices.Edge.Agent.Core.Logs;
using Microsoft.Azure.Devices.Edge.Util;
using Newtonsoft.Json;

public class LogsUploadRequest
{
public LogsUploadRequest(string id, LogsContentEncoding encoding, LogsContentType contentType, string sasUrl)
{
this.Id = Preconditions.CheckNonWhiteSpace(id, nameof(id));
this.Encoding = encoding;
this.ContentType = contentType;
this.SasUrl = sasUrl;
}

[JsonConstructor]
LogsUploadRequest(string id, LogsContentEncoding? encoding, LogsContentType? contentType, string sasUrl)
: this(id, encoding.HasValue ? encoding.Value : LogsContentEncoding.None, contentType.HasValue ? contentType.Value : LogsContentType.Json, sasUrl)
{
}

public string Id { get; }
public LogsContentEncoding Encoding { get; }
public LogsContentType ContentType { get; }
public string SasUrl { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Requests
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Agent.Core.Logs;
using Microsoft.Azure.Devices.Edge.Util;

public class LogsUploadRequestHandler : RequestHandlerBase<LogsUploadRequest, object>
{
readonly ILogsUploader logsUploader;
readonly ILogsProvider logsProvider;

public LogsUploadRequestHandler(ILogsUploader logsUploader, ILogsProvider logsProvider)
{
this.logsProvider = Preconditions.CheckNotNull(logsProvider, nameof(logsProvider));
this.logsUploader = Preconditions.CheckNotNull(logsUploader, nameof(logsUploader));
}

public override string RequestName => "UploadLogs";

protected override async Task<Option<object>> HandleRequestInternal(Option<LogsUploadRequest> payloadOption)
{
LogsUploadRequest payload = payloadOption.Expect(() => new ArgumentException("Request payload not found"));
var moduleLogOptions = new ModuleLogOptions(payload.Id, payload.Encoding, payload.ContentType);
byte[] logBytes = await this.logsProvider.GetLogs(moduleLogOptions, CancellationToken.None);
await this.logsUploader.Upload(payload.SasUrl, payload.Id, logBytes, payload.Encoding, payload.ContentType);
return Option.None<object>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public class AzureBlobLogsUploader : ILogsUploader
readonly string deviceId;
readonly IAzureBlobUploader azureBlobUploader;

public AzureBlobLogsUploader(string iotHubName, string deviceId)
: this(iotHubName, deviceId, new AzureBlobUploader())
{
}

public AzureBlobLogsUploader(string iotHubName, string deviceId, IAzureBlobUploader azureBlobUploader)
{
this.iotHubName = Preconditions.CheckNonWhiteSpace(iotHubName, nameof(iotHubName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,25 @@ public static async Task<int> MainAsync(IConfiguration configuration)
Option<string> productInfo = versionInfo != VersionInfo.Empty ? Option.Some(versionInfo.ToString()) : Option.None<string>();
Option<UpstreamProtocol> upstreamProtocol = configuration.GetValue<string>(Constants.UpstreamProtocolKey).ToUpstreamProtocol();
Option<IWebProxy> proxy = Proxy.Parse(configuration.GetValue<string>("https_proxy"), logger);
string iothubHostname = null;
string deviceId = null;
switch (mode.ToLowerInvariant())
{
case Constants.DockerMode:
var dockerUri = new Uri(configuration.GetValue<string>("DockerUri"));
string deviceConnectionString = configuration.GetValue<string>("DeviceConnectionString");
IotHubConnectionStringBuilder connectionStringParser = IotHubConnectionStringBuilder.Create(deviceConnectionString);
deviceId = connectionStringParser.DeviceId;
iothubHostname = connectionStringParser.HostName;
builder.RegisterModule(new AgentModule(maxRestartCount, intensiveCareTime, coolOffTimeUnitInSeconds, usePersistentStorage, storagePath));
builder.RegisterModule(new DockerModule(deviceConnectionString, edgeDeviceHostName, dockerUri, dockerAuthConfig, upstreamProtocol, proxy, productInfo));
break;

case Constants.IotedgedMode:
string managementUri = configuration.GetValue<string>(Constants.EdgeletManagementUriVariableName);
string workloadUri = configuration.GetValue<string>(Constants.EdgeletWorkloadUriVariableName);
string iothubHostname = configuration.GetValue<string>(Constants.IotHubHostnameVariableName);
string deviceId = configuration.GetValue<string>(Constants.DeviceIdVariableName);
iothubHostname = configuration.GetValue<string>(Constants.IotHubHostnameVariableName);
deviceId = configuration.GetValue<string>(Constants.DeviceIdVariableName);
string moduleId = configuration.GetValue(Constants.ModuleIdVariableName, Constants.EdgeAgentModuleIdentityName);
string moduleGenerationId = configuration.GetValue<string>(Constants.EdgeletModuleGenerationIdVariableName);
string apiVersion = configuration.GetValue<string>(Constants.EdgeletApiVersionVariableName);
Expand All @@ -130,7 +135,7 @@ public static async Task<int> MainAsync(IConfiguration configuration)
switch (configSourceConfig.ToLowerInvariant())
{
case "twin":
builder.RegisterModule(new TwinConfigSourceModule(backupConfigFilePath, configuration, versionInfo, TimeSpan.FromSeconds(configRefreshFrequencySecs)));
builder.RegisterModule(new TwinConfigSourceModule(iothubHostname, deviceId, backupConfigFilePath, configuration, versionInfo, TimeSpan.FromSeconds(configRefreshFrequencySecs)));
break;

case "local":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ await c.Resolve<Task<ICommandFactory>>(),
var environmentProvider = c.Resolve<Task<IEnvironmentProvider>>();
var planner = c.Resolve<Task<IPlanner>>();
var planRunner = c.Resolve<IPlanRunner>();
var reporter = c.Resolve<IReporter>();
var reporter = c.Resolve<Task<IReporter>>();
var moduleIdentityLifecycleManager = c.Resolve<IModuleIdentityLifecycleManager>();
var deploymentConfigInfoSerde = c.Resolve<ISerde<DeploymentConfigInfo>>();
var deploymentConfigInfoStore = c.Resolve<IEntityStore<string, string>>();
Expand All @@ -254,7 +254,7 @@ await c.Resolve<Task<ICommandFactory>>(),
await configSource,
await planner,
planRunner,
reporter,
await reporter,
moduleIdentityLifecycleManager,
await environmentProvider,
deploymentConfigInfoStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ protected override void Load(ContainerBuilder builder)
.As<Task<ICommandFactory>>()
.SingleInstance();

// IModuleRuntimeInfoProvider
builder.Register(c => new RuntimeInfoProvider<DockerReportedConfig>(c.Resolve<IModuleManager>()))
.As<IRuntimeInfoProvider>()
// Task<IRuntimeInfoProvider>
builder.Register(c => Task.FromResult(new RuntimeInfoProvider<DockerReportedConfig>(c.Resolve<IModuleManager>()) as IRuntimeInfoProvider))
.As<Task<IRuntimeInfoProvider>>()
.SingleInstance();

// Task<IEnvironmentProvider>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ protected override void Load(ContainerBuilder builder)
// Task<IReporter>
// TODO: When using a file backed config source we need to figure out
// how reporting will work.
builder.Register(c => NullReporter.Instance as IReporter)
.As<IReporter>()
builder.Register(c => Task.FromResult(NullReporter.Instance as IReporter))
.As<Task<IReporter>>()
.SingleInstance();

base.Load(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Service.Modules
using Autofac;
using Microsoft.Azure.Devices.Edge.Agent.Core;
using Microsoft.Azure.Devices.Edge.Agent.Core.ConfigSources;
using Microsoft.Azure.Devices.Edge.Agent.Core.Logs;
using Microsoft.Azure.Devices.Edge.Agent.Core.Requests;
using Microsoft.Azure.Devices.Edge.Agent.Core.Serde;
using Microsoft.Azure.Devices.Edge.Agent.Docker;
using Microsoft.Azure.Devices.Edge.Agent.IoTHub;
using Microsoft.Azure.Devices.Edge.Agent.IoTHub.Blob;
using Microsoft.Azure.Devices.Edge.Agent.IoTHub.ConfigSources;
using Microsoft.Azure.Devices.Edge.Agent.IoTHub.Reporters;
using Microsoft.Azure.Devices.Edge.Util;
Expand All @@ -23,13 +25,19 @@ public class TwinConfigSourceModule : Module
readonly IConfiguration configuration;
readonly VersionInfo versionInfo;
readonly TimeSpan configRefreshFrequency;
readonly string deviceId;
readonly string iotHubHostName;

public TwinConfigSourceModule(
string iotHubHostname,
string deviceId,
string backupConfigFilePath,
IConfiguration config,
VersionInfo versionInfo,
TimeSpan configRefreshFrequency)
{
this.iotHubHostName = Preconditions.CheckNonWhiteSpace(iotHubHostname, nameof(iotHubHostname));
this.deviceId = Preconditions.CheckNonWhiteSpace(deviceId, nameof(deviceId));
this.backupConfigFilePath = Preconditions.CheckNonWhiteSpace(backupConfigFilePath, nameof(backupConfigFilePath));
this.configuration = Preconditions.CheckNotNull(config, nameof(config));
this.versionInfo = Preconditions.CheckNotNull(versionInfo, nameof(versionInfo));
Expand All @@ -38,87 +46,110 @@ public TwinConfigSourceModule(

protected override void Load(ContainerBuilder builder)
{
// IRequestManager
// ILogsUploader
builder.Register(c => new AzureBlobLogsUploader(this.iotHubHostName, this.deviceId))
.As<ILogsUploader>()
.SingleInstance();

// Task<ILogsProvider>
builder.Register(
c =>
{
var requestHandlers = new List<IRequestHandler>
{
new PingRequestHandler()
};
return new RequestManager(requestHandlers);
})
.As<IRequestManager>()
async c =>
{
var logsProcessor = new LogsProcessor(new LogMessageParser(this.iotHubHostName, this.deviceId));
IRuntimeInfoProvider runtimeInfoProvider = await c.Resolve<Task<IRuntimeInfoProvider>>();
return new LogsProvider(runtimeInfoProvider, logsProcessor) as ILogsProvider;
})
.As<Task<ILogsProvider>>()
.SingleInstance();

// IEdgeAgentConnection
// Task<IRequestManager>
builder.Register(
c =>
async c =>
{
var logsUploader = c.Resolve<ILogsUploader>();
ILogsProvider logsProvider = await c.Resolve<Task<ILogsProvider>>();
var requestHandlers = new List<IRequestHandler>
{
var requestManager = c.Resolve<IRequestManager>();
var serde = c.Resolve<ISerde<DeploymentConfig>>();
var deviceClientprovider = c.Resolve<IModuleClientProvider>();
IEdgeAgentConnection edgeAgentConnection = new EdgeAgentConnection(deviceClientprovider, serde, requestManager, this.configRefreshFrequency);
return edgeAgentConnection;
})
.As<IEdgeAgentConnection>()
new PingRequestHandler(),
new LogsUploadRequestHandler(logsUploader, logsProvider)
};
return new RequestManager(requestHandlers) as IRequestManager;
})
.As<Task<IRequestManager>>()
.SingleInstance();

// Task<IEdgeAgentConnection>
builder.Register(
async c =>
{
var serde = c.Resolve<ISerde<DeploymentConfig>>();
var deviceClientprovider = c.Resolve<IModuleClientProvider>();
IRequestManager requestManager = await c.Resolve<Task<IRequestManager>>();
IEdgeAgentConnection edgeAgentConnection = new EdgeAgentConnection(deviceClientprovider, serde, requestManager, this.configRefreshFrequency);
return edgeAgentConnection;
})
.As<Task<IEdgeAgentConnection>>()
.SingleInstance();

// Task<IConfigSource>
builder.Register(
async c =>
{
var serde = c.Resolve<ISerde<DeploymentConfigInfo>>();
var edgeAgentConnection = c.Resolve<IEdgeAgentConnection>();
IEncryptionProvider encryptionProvider = await c.Resolve<Task<IEncryptionProvider>>();
var twinConfigSource = new TwinConfigSource(edgeAgentConnection, this.configuration);
IConfigSource backupConfigSource = new FileBackupConfigSource(this.backupConfigFilePath, twinConfigSource, serde, encryptionProvider);
return backupConfigSource;
})
async c =>
{
var serde = c.Resolve<ISerde<DeploymentConfigInfo>>();
var edgeAgentConnectionTask = c.Resolve<Task<IEdgeAgentConnection>>();
IEncryptionProvider encryptionProvider = await c.Resolve<Task<IEncryptionProvider>>();
IEdgeAgentConnection edgeAgentConnection = await edgeAgentConnectionTask;
var twinConfigSource = new TwinConfigSource(edgeAgentConnection, this.configuration);
IConfigSource backupConfigSource = new FileBackupConfigSource(this.backupConfigFilePath, twinConfigSource, serde, encryptionProvider);
return backupConfigSource;
})
.As<Task<IConfigSource>>()
.SingleInstance();

// IReporter
// Task<IReporter>
builder.Register(
c =>
async c =>
{
var runtimeInfoDeserializerTypes = new Dictionary<string, Type>
{
[DockerType] = typeof(DockerReportedRuntimeInfo),
[Constants.Unknown] = typeof(UnknownRuntimeInfo)
};

var edgeAgentDeserializerTypes = new Dictionary<string, Type>
{
var runtimeInfoDeserializerTypes = new Dictionary<string, Type>
{
[DockerType] = typeof(DockerReportedRuntimeInfo),
[Constants.Unknown] = typeof(UnknownRuntimeInfo)
};
[DockerType] = typeof(EdgeAgentDockerRuntimeModule),
[Constants.Unknown] = typeof(UnknownEdgeAgentModule)
};

var edgeAgentDeserializerTypes = new Dictionary<string, Type>
{
[DockerType] = typeof(EdgeAgentDockerRuntimeModule),
[Constants.Unknown] = typeof(UnknownEdgeAgentModule)
};
var edgeHubDeserializerTypes = new Dictionary<string, Type>
{
[DockerType] = typeof(EdgeHubDockerRuntimeModule),
[Constants.Unknown] = typeof(UnknownEdgeHubModule)
};

var edgeHubDeserializerTypes = new Dictionary<string, Type>
{
[DockerType] = typeof(EdgeHubDockerRuntimeModule),
[Constants.Unknown] = typeof(UnknownEdgeHubModule)
};
var moduleDeserializerTypes = new Dictionary<string, Type>
{
[DockerType] = typeof(DockerRuntimeModule)
};

var moduleDeserializerTypes = new Dictionary<string, Type>
{
[DockerType] = typeof(DockerRuntimeModule)
};
var deserializerTypesMap = new Dictionary<Type, IDictionary<string, Type>>
{
{ typeof(IRuntimeInfo), runtimeInfoDeserializerTypes },
{ typeof(IEdgeAgentModule), edgeAgentDeserializerTypes },
{ typeof(IEdgeHubModule), edgeHubDeserializerTypes },
{ typeof(IModule), moduleDeserializerTypes }
};

var deserializerTypesMap = new Dictionary<Type, IDictionary<string, Type>>
{
{ typeof(IRuntimeInfo), runtimeInfoDeserializerTypes },
{ typeof(IEdgeAgentModule), edgeAgentDeserializerTypes },
{ typeof(IEdgeHubModule), edgeHubDeserializerTypes },
{ typeof(IModule), moduleDeserializerTypes }
};
var edgeAgentConnectionTask = c.Resolve<Task<IEdgeAgentConnection>>();
IEdgeAgentConnection edgeAgentConnection = await edgeAgentConnectionTask;

return new IoTHubReporter(
c.Resolve<IEdgeAgentConnection>(),
new TypeSpecificSerDe<AgentState>(deserializerTypesMap),
this.versionInfo) as IReporter;
})
.As<IReporter>()
return new IoTHubReporter(
edgeAgentConnection,
new TypeSpecificSerDe<AgentState>(deserializerTypesMap),
this.versionInfo) as IReporter;
})
.As<Task<IReporter>>()
.SingleInstance();

base.Load(builder);
Expand Down
Loading

0 comments on commit e064a59

Please sign in to comment.