Skip to content

Commit

Permalink
Simplify hosted handlers setup
Browse files Browse the repository at this point in the history
Fix queue task scheduler
Increase queue retry attempts from 10 to 20
Fix queue names
Rename curl script
Code docs
  • Loading branch information
dluc committed Jul 25, 2023
1 parent 0fc3fb7 commit 9ff36ea
Show file tree
Hide file tree
Showing 22 changed files with 154 additions and 194 deletions.
2 changes: 1 addition & 1 deletion NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ as a web service.
* `lib-python`: reusable libraries for python webservices and handlers.
* `tools`: command line tools, e.g. scripts to start RabbitMQ locally.
* `samples`: samples showing how to upload files, how to use encoder/retrieval, etc.
* `upload-one-file.sh`: Bash command line tool to upload one document to the
* `upload-file.sh`: Bash command line tool to upload one document to the
memory web service.
* `FileImportExamples`: C# examples showing how to upload multiple files to the
memory web service, using different approaches:
Expand Down
3 changes: 2 additions & 1 deletion SemanticMemory.sln
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "clients", "clients", "{B0F7
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "curl", "curl", "{7761EC04-9E17-44EE-A6D8-9FB97F14AF61}"
ProjectSection(SolutionItems) = preProject
clients\curl\upload-one-file.sh = clients\curl\upload-one-file.sh
clients\curl\README.md = clients\curl\README.md
clients\curl\upload-file.sh = clients\curl\upload-file.sh
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{6335B02C-9964-4B39-9795-C5F5F0392515}"
Expand Down
13 changes: 13 additions & 0 deletions clients/curl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Simple client for command line uploads to Semantic Memory.

Instructions:

```bash
./upload-file.sh -h
```

Example:

```bash
./upload-file.sh -f test.pdf -s http://127.0.0.1:9001/upload -u curlUser -c curlDataCollection -i curlExample01
```
Binary file added clients/curl/test.pdf
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Help for Bash script
Usage:
./upload-one-file.sh -f <file path> -u <id> -c <list> -i <id> -s <url>
./upload-file.sh -f <file path> -u <id> -c <list> -i <id> -s <url>
-f file path Path to the document to upload.
-u userId User ID.
Expand All @@ -25,7 +25,7 @@ Usage:
Example:
./upload-one-file.sh -f myFile.pdf -u me -c "notes meetings" -i "bash test" -s http://127.0.0.1:9001/upload
./upload-file.sh -f myFile.pdf -u me -c "notes meetings" -i "bash test" -s http://127.0.0.1:9001/upload
For more information visit https://github.com/microsoft/semantic-memory
Expand Down Expand Up @@ -123,7 +123,7 @@ for x in $COLLECTIONS; do
done

# Send HTTP request using curl
set -x
#set -x
curl -v \
-F 'file1=@"'"${FILENAME}"'"' \
-F 'user="'"${USER_ID}"'"' \
Expand Down
7 changes: 5 additions & 2 deletions clients/dotnet/MemoryPipelineClient/MemoryPipelineClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.SemanticKernel.SemanticMemory.Core.Configuration;
using Microsoft.SemanticKernel.SemanticMemory.Core.AppBuilders;
using Microsoft.SemanticKernel.SemanticMemory.Core.Handlers;
using Microsoft.SemanticKernel.SemanticMemory.Core.Pipeline;
using Microsoft.SemanticKernel.SemanticMemory.Core20;
Expand Down Expand Up @@ -47,7 +47,10 @@ private async Task ImportFilesInternalAsync(string[] files, ImportFileOptions op
}

// TODO: .Then("index")
pipeline.Then("extract").Then("partition").Build();
pipeline
.Then("extract")
.Then("partition")
.Build();

// Execute pipeline
await orchestrator.RunPipelineAsync(pipeline).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.SemanticKernel.SemanticMemory.Core.Configuration;
using Microsoft.SemanticKernel.SemanticMemory.Core.AppBuilders;
using Microsoft.SemanticKernel.SemanticMemory.Core.ContentStorage;
using Microsoft.SemanticKernel.SemanticMemory.Core.Handlers;
using Microsoft.SemanticKernel.SemanticMemory.Core.Pipeline;
Expand Down
2 changes: 0 additions & 2 deletions clients/samples/FileImportExamples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
if (examplesToRun.Contains(3))
{
Console.WriteLine("============================");
Console.WriteLine("Press Enter to continue...");
Console.ReadLine();
Example3_CustomInProcessPipeline.RunAsync().Wait();
Console.WriteLine("============================");
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
using System;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.SemanticKernel.SemanticMemory.Core.Configuration;

namespace Microsoft.SemanticKernel.SemanticMemory.Core.Configuration;
namespace Microsoft.SemanticKernel.SemanticMemory.Core.AppBuilders;

public static class AppBuilder
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using Microsoft.SemanticKernel.SemanticMemory.Core.Pipeline;
using Microsoft.SemanticKernel.SemanticMemory.Core.Pipeline.Queue;

namespace Microsoft.SemanticKernel.SemanticMemory.Core;
namespace Microsoft.SemanticKernel.SemanticMemory.Core.AppBuilders;

#pragma warning disable CA1724 // The name conflicts with MSExtensions
public static class DependencyInjection
Expand Down Expand Up @@ -184,7 +184,13 @@ public static void UseDistributedPipeline(this IServiceCollection services, SKMe
}
}

public static void UseDefaultHandler<THandler>(this IServiceCollection services, string stepName) where THandler : class
/// <summary>
/// Register the handler as a DI service, passing the step name to ctor
/// </summary>
/// <param name="services">DI service collection</param>
/// <param name="stepName">Pipeline step name</param>
/// <typeparam name="THandler">Handler class</typeparam>
public static void UseHandler<THandler>(this IServiceCollection services, string stepName) where THandler : class
{
services.AddTransient<THandler>(serviceProvider => ActivatorUtilities.CreateInstance<THandler>(serviceProvider, stepName));
}
Expand Down
48 changes: 48 additions & 0 deletions lib/dotnet/Core/AppBuilders/HandlerAsAHostedService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.SemanticKernel.SemanticMemory.Core.Pipeline;

namespace Microsoft.SemanticKernel.SemanticMemory.Core.AppBuilders;

/// <summary>
/// Wrapper of handler classes, allowing to run handlers as services hosted by IHost
/// </summary>
/// <typeparam name="T">Handler class</typeparam>
public class HandlerAsAHostedService<T> : IHostedService where T : IPipelineStepHandler
{
private readonly T _handler;
private readonly IPipelineOrchestrator _orchestrator;
private readonly string _stepName;
private readonly ILogger<HandlerAsAHostedService<T>> _log;

public HandlerAsAHostedService(
string stepName,
IPipelineOrchestrator orchestrator,
T handler,
ILogger<HandlerAsAHostedService<T>>? log = null)
{
this._stepName = stepName;
this._orchestrator = orchestrator;
this._handler = handler;

this._log = log ?? NullLogger<HandlerAsAHostedService<T>>.Instance;
this._log.LogInformation("Handler as service created: {0}", stepName);
}

public Task StartAsync(CancellationToken cancellationToken)
{
this._log.LogInformation("Handler service started: {0}", this._stepName);
return this._orchestrator.AddHandlerAsync(this._handler, cancellationToken);
}

public Task StopAsync(CancellationToken cancellationToken)
{
this._log.LogInformation("Stopping handler service: {0}", this._stepName);
return this._orchestrator.StopAllPipelinesAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using Microsoft.SemanticKernel.SemanticMemory.Core.Configuration;
using Microsoft.SemanticKernel.SemanticMemory.Core.Pipeline;

namespace Microsoft.SemanticKernel.SemanticMemory.Core;
namespace Microsoft.SemanticKernel.SemanticMemory.Core.AppBuilders;

public static class HostedHandlersBuilder
{
Expand All @@ -34,16 +34,16 @@ public static HostApplicationBuilder CreateApplicationBuilder(string[]? args = n
return builder;
}

public static void AddHandler<THostedService>(this HostApplicationBuilder builder, string stepName) where THostedService : class, IHostedService, IPipelineStepHandler
/// <summary>
/// Register the handler as a hosted service, passing the step name to the handler ctor
/// </summary>
/// <param name="builder">Application builder</param>
/// <param name="stepName">Pipeline step name</param>
/// <typeparam name="THandler">Handler class</typeparam>
public static void UseHandlerAsHostedService<THandler>(this HostApplicationBuilder builder, string stepName) where THandler : class, IPipelineStepHandler
{
// Register the handler as a hosted service, assigned to a specific pipeline step
builder.Services.AddHostedService<THostedService>(serviceProvider => ActivatorUtilities.CreateInstance<THostedService>(serviceProvider, stepName));
}

public static IHost Build<THostedService>(string stepName, string[]? args = null) where THostedService : class, IHostedService, IPipelineStepHandler
{
var builder = CreateApplicationBuilder(args);
AddHandler<THostedService>(builder, stepName);
return builder.Build();
builder.Services.UseHandler<THandler>(stepName);
builder.Services.AddHostedService<HandlerAsAHostedService<THandler>>(serviceProvider
=> ActivatorUtilities.CreateInstance<HandlerAsAHostedService<THandler>>(serviceProvider, stepName));
}
}
23 changes: 22 additions & 1 deletion lib/dotnet/Core/Handlers/GenerateEmbeddingsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,48 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.SemanticKernel.SemanticMemory.Core.AppBuilders;
using Microsoft.SemanticKernel.SemanticMemory.Core.Configuration;
using Microsoft.SemanticKernel.SemanticMemory.Core.Pipeline;

namespace Microsoft.SemanticKernel.SemanticMemory.Core.Handlers;

/// <summary>
/// Memory ingestion pipeline handler responsible for generating text embedding and saving them to the content storage.
/// </summary>
public class GenerateEmbeddingsHandler : IPipelineStepHandler
{
private readonly IPipelineOrchestrator _orchestrator;
private readonly Dictionary<string, object> _embeddingGenerators;
private readonly ILogger<GenerateEmbeddingsHandler> _log;

/// <summary>
/// Note: stepName and other params are injected with DI, <see cref="DependencyInjection.UseHandler{THandler}"/>
/// </summary>
/// <param name="stepName">Pipeline step for which the handler will be invoked</param>
/// <param name="configuration">Application settings</param>
/// <param name="orchestrator">Current orchestrator used by the pipeline, giving access to content and other helps.</param>
/// <param name="log">Application logger</param>
public GenerateEmbeddingsHandler(
string stepName,
SKMemoryConfig configuration,
IPipelineOrchestrator orchestrator,
ILogger<GenerateEmbeddingsHandler>? log = null)
{
this.StepName = stepName;
this._orchestrator = orchestrator;
this._log = log ?? NullLogger<GenerateEmbeddingsHandler>.Instance;

// Setup the active embedding generators config, strongly typed
this._embeddingGenerators = configuration.GetHandlerConfig<EmbeddingGenerationConfig>(stepName, "EmbeddingGeneration").GetActiveGeneratorsTypedConfig(log);

this._log.LogInformation("Handler ready: {0}", stepName);
}

/// <inheritdoc />
public string StepName { get; }

/// <inheritdoc />
public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync(
DataPipeline pipeline, CancellationToken cancellationToken)
{
Expand All @@ -41,7 +62,7 @@ public GenerateEmbeddingsHandler(
{
case MimeTypes.PlainText:
case MimeTypes.MarkDown:
// calc embeddings
// TODO: calc embeddings
break;

default:
Expand Down
12 changes: 12 additions & 0 deletions lib/dotnet/Core/Handlers/TextExtractionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,27 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.SemanticKernel.SemanticMemory.Core.AppBuilders;
using Microsoft.SemanticKernel.SemanticMemory.Core.Pipeline;
using Microsoft.SemanticKernel.Services.DataFormats.Office;
using Microsoft.SemanticKernel.Services.DataFormats.Pdf;

namespace Microsoft.SemanticKernel.SemanticMemory.Core.Handlers;

/// <summary>
/// Memory ingestion pipeline handler responsible for extracting text from files and saving it to content storage.
/// </summary>
public class TextExtractionHandler : IPipelineStepHandler
{
private readonly IPipelineOrchestrator _orchestrator;
private readonly ILogger<TextExtractionHandler> _log;

/// <summary>
/// Note: stepName and other params are injected with DI, <see cref="DependencyInjection.UseHandler{THandler}"/>
/// </summary>
/// <param name="stepName">Pipeline step for which the handler will be invoked</param>
/// <param name="orchestrator">Current orchestrator used by the pipeline, giving access to content and other helps.</param>
/// <param name="log">Application logger</param>
public TextExtractionHandler(
string stepName,
IPipelineOrchestrator orchestrator,
Expand All @@ -24,6 +34,8 @@ public TextExtractionHandler(
this.StepName = stepName;
this._orchestrator = orchestrator;
this._log = log ?? NullLogger<TextExtractionHandler>.Instance;

this._log.LogInformation("Handler ready: {0}", stepName);
}

/// <inheritdoc />
Expand Down
9 changes: 9 additions & 0 deletions lib/dotnet/Core/Handlers/TextPartitioningHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.SemanticKernel.SemanticMemory.Core.AppBuilders;
using Microsoft.SemanticKernel.SemanticMemory.Core.Pipeline;
using Microsoft.SemanticKernel.Text;

Expand All @@ -19,6 +20,12 @@ public class TextPartitioningHandler : IPipelineStepHandler
private readonly IPipelineOrchestrator _orchestrator;
private readonly ILogger<TextPartitioningHandler> _log;

/// <summary>
/// Note: stepName and other params are injected with DI, <see cref="DependencyInjection.UseHandler{THandler}"/>
/// </summary>
/// <param name="stepName">Pipeline step for which the handler will be invoked</param>
/// <param name="orchestrator">Current orchestrator used by the pipeline, giving access to content and other helps.</param>
/// <param name="log">Application logger</param>
public TextPartitioningHandler(
string stepName,
IPipelineOrchestrator orchestrator,
Expand All @@ -27,6 +34,8 @@ public TextPartitioningHandler(
this.StepName = stepName;
this._orchestrator = orchestrator;
this._log = log ?? NullLogger<TextPartitioningHandler>.Instance;

this._log.LogInformation("Handler ready: {0}", stepName);
}

public string StepName { get; }
Expand Down
15 changes: 10 additions & 5 deletions lib/dotnet/Core/Pipeline/Queue/AzureQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private sealed class MessageEventArgs : EventArgs
private const int FetchLockSeconds = 300;

// How many times to dequeue a messages and process before moving it to a poison queue
private const int MaxRetryBeforePoisonQueue = 10;
private const int MaxRetryBeforePoisonQueue = 20;

// Suffix used for the poison queues
private const string PoisonQueueSuffix = "-poison";
Expand Down Expand Up @@ -99,6 +99,7 @@ public AzureQueue(Func<string, QueueClient> clientBuilder, ILogger<AzureQueue>?
/// <inherit />
public async Task<IQueue> ConnectToQueueAsync(string queueName, QueueOptions options = default, CancellationToken cancellationToken = default)
{
queueName = CleanQueueName(queueName);
this._log.LogTrace("Connecting to queue name: {0}", queueName);

if (string.IsNullOrEmpty(queueName))
Expand All @@ -113,10 +114,8 @@ public async Task<IQueue> ConnectToQueueAsync(string queueName, QueueOptions opt
throw new InvalidOperationException($"The queue is already connected to `{this._queueName}`");
}

#pragma warning disable CA1308 // Use uppercase
// Note: 3..63 chars, only lowercase letters, numbers and hyphens. No hyphens at start/end and no consecutive hyphens.
this._queueName = queueName.ToLowerInvariant();
#pragma warning restore CA2254
this._queueName = queueName;
this._log.LogDebug("Queue name: {0}", this._queueName);

this._queue = this._clientBuilder(this._queueName);
Expand Down Expand Up @@ -289,6 +288,7 @@ private void DispatchMessages(object? sender, ElapsedEventArgs ev)
{
try
{
this._log.LogTrace("Message content: {0}", message.MessageText);
await this.Received(this, new MessageEventArgs { Message = message }).ConfigureAwait(false);
}
#pragma warning disable CA1031 // Must catch all to log and keep the process alive
Expand All @@ -301,7 +301,7 @@ private void DispatchMessages(object? sender, ElapsedEventArgs ev)
state: null,
cancellationToken: this._cancellation.Token,
creationOptions: TaskCreationOptions.RunContinuationsAsynchronously,
scheduler: TaskScheduler.FromCurrentSynchronizationContext()
scheduler: TaskScheduler.Current
);
}

Expand Down Expand Up @@ -343,4 +343,9 @@ private static string ToJson(object data, bool indented = false)
{
return JsonSerializer.Serialize(data, new JsonSerializerOptions { WriteIndented = indented });
}

private static string CleanQueueName(string? name)
{
return name?.ToLowerInvariant().Replace('_', '-').Replace(' ', '-').Replace('.', '-') ?? string.Empty;
}
}
Loading

0 comments on commit 9ff36ea

Please sign in to comment.