Skip to content

Commit

Permalink
[Storage] Changefeed tail log examples (Azure#14082)
Browse files Browse the repository at this point in the history
* Prepare Storage for release

* pr feedback.

* PR feedback.

* pr feedback.

* the test

* sample.
  • Loading branch information
kasobol-msft authored Aug 10, 2020
1 parent d52798c commit aeb9e80
Show file tree
Hide file tree
Showing 6 changed files with 2,118 additions and 3 deletions.
3 changes: 2 additions & 1 deletion sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ description: Samples for the Azure.Storage.Blobs.ChangeFeed client library

- Iterate through the entire Change Feed [synchronously](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/Sample01a_HelloWorld.cs#L23) or [asynchronously](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/Sample01b_HelloWorldAsync.cs#L24)
- Iterate through the Change Feed between a start and end date [synchronously](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/Sample01a_HelloWorld.cs#L46) or [asynchronously](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/Sample01b_HelloWorldAsync.cs#L49)
- Resuming a previous iteration of a Change Feed using the continuation token [synchronously](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/Sample01a_HelloWorld.cs#L78) or [asynchronously](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/Sample01b_HelloWorldAsync.cs#L83)
- Resuming a previous iteration of a Change Feed using the continuation token [synchronously](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/Sample01a_HelloWorld.cs#L78) or [asynchronously](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/Sample01b_HelloWorldAsync.cs#L83)
- Polling for events using the continuation token [synchronously](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/Sample01a_HelloWorld.cs#L124) or [asynchronously](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/Sample01b_HelloWorldAsync.cs#L134)
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using NUnit.Framework;

namespace Azure.Storage.Blobs.ChangeFeed.Samples
Expand Down Expand Up @@ -97,5 +96,53 @@ public void ChangeFeedResumeWithCursor()
changeFeedEvents.Add(changeFeedEvent);
}
}

/// <summary>
/// You can use the change feed cursor to periodically poll for new events.
/// </summary>
[Test]
public void ChangeFeedPollForEventsWithCursor()
{
// Get a connection string to our Azure Storage account.
string connectionString = ConnectionString;

// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = new BlobChangeFeedClient(connectionString);
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

// Create the start time. The change feed client will round start time down to
// the nearest hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = DateTimeOffset.Now;

// Create polling interval.
TimeSpan pollingInterval = TimeSpan.FromMinutes(5);

// Get initial set of events.
IEnumerable<Page<BlobChangeFeedEvent>> pages = changeFeedClient.GetChanges(start: startTime).AsPages();

string continuationToken = null;
while (true)
{
foreach (Page<BlobChangeFeedEvent> page in pages)
{
foreach (BlobChangeFeedEvent changeFeedEvent in page.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}

// Get the change feed continuation token. The continuation token is not required to get each page of events,
// it is intended to be saved and used to resume iterating at a later date.
// For the purpose of actively listening to events the continuation token from last page is used.
continuationToken = page.ContinuationToken;
}

// Wait before processing next batch of events.
Thread.Sleep(pollingInterval);

// Resume from last continuation token and fetch latest set of events.
pages = changeFeedClient.GetChanges(continuationToken).AsPages();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,55 @@ public async Task ChangeFeedResumeWithCursorAsync()
}
#endregion
}

/// <summary>
/// You can use the change feed cursor to periodically poll for new events.
/// </summary>
[Test]
public async Task ChangeFeedPollForEventsWithCursor()
{
// Get a connection string to our Azure Storage account.
string connectionString = ConnectionString;

// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = new BlobChangeFeedClient(connectionString);
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

#region Snippet:SampleSnippetsChangeFeed_PollForEventsWithCursor
// Create the start time. The change feed client will round start time down to
// the nearest hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = DateTimeOffset.Now;

// Create polling interval.
TimeSpan pollingInterval = TimeSpan.FromMinutes(5);

// Get initial set of events.
IAsyncEnumerable<Page<BlobChangeFeedEvent>> pages = changeFeedClient.GetChangesAsync(start: startTime).AsPages();

string continuationToken = null;
while (true)
{
await foreach (Page<BlobChangeFeedEvent> page in pages)
{
foreach (BlobChangeFeedEvent changeFeedEvent in page.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}

// Get the change feed continuation token. The continuation token is not required to get each page of events,
// it is intended to be saved and used to resume iterating at a later date.
// For the purpose of actively listening to events the continuation token from last page is used.
continuationToken = page.ContinuationToken;
}

// Wait before processing next batch of events.
await Task.Delay(pollingInterval);

// Resume from last continuation token and fetch latest set of events.
pages = changeFeedClient.GetChangesAsync(continuationToken).AsPages();
}
#endregion
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,83 @@ AsyncPageable<BlobChangeFeedEvent> blobChangeFeedAsyncPagable
}
}

/// <summary>
/// This test checks if tail of the change feed can be listened to.
/// To setup recording have an account where changes are generated quite frequently (i.e. every 1 minute).
/// This test runs long in recording mode as it waits multiple times for events.
/// </summary>
/// <returns></returns>
[Test]
[PlaybackOnly("Changefeed E2E tests require previously generated events")]
public async Task TestTailEvents()
{
// Uncomment when recording.
//DateTimeOffset startTime = DateTimeOffset.Now;

// Update and uncomment after recording.
DateTimeOffset startTime = new DateTimeOffset(2020, 8, 10, 16, 00, 00, TimeSpan.Zero);

TimeSpan pollInterval = Mode == RecordedTestMode.Playback ? TimeSpan.Zero : TimeSpan.FromMinutes(3);

BlobServiceClient service = GetServiceClient_SharedKey();
BlobChangeFeedClient blobChangeFeedClient = service.GetChangeFeedClient();
Page<BlobChangeFeedEvent> lastPage = null;
ISet<string> EventIdsPart1 = new HashSet<string>();
ISet<string> EventIdsPart2 = new HashSet<string>();
ISet<string> EventIdsPart3 = new HashSet<string>();

// Part 1
AsyncPageable<BlobChangeFeedEvent> blobChangeFeedAsyncPagable = blobChangeFeedClient.GetChangesAsync(start: startTime);
IAsyncEnumerable<Page<BlobChangeFeedEvent>> asyncEnumerable = blobChangeFeedAsyncPagable.AsPages();
await foreach (var page in asyncEnumerable)
{
lastPage = page;
foreach (var evt in page.Values)
{
EventIdsPart1.Add(evt.Id.ToString());
}
}

CollectionAssert.IsNotEmpty(EventIdsPart1);

await Task.Delay(pollInterval);

// Part 2
blobChangeFeedAsyncPagable = blobChangeFeedClient.GetChangesAsync(lastPage.ContinuationToken);
asyncEnumerable = blobChangeFeedAsyncPagable.AsPages();
await foreach (var page in asyncEnumerable)
{
lastPage = page;
foreach (var evt in page.Values)
{
EventIdsPart2.Add(evt.Id.ToString());
}
}

CollectionAssert.IsNotEmpty(EventIdsPart2);

await Task.Delay(pollInterval);

// Part 3
blobChangeFeedAsyncPagable = blobChangeFeedClient.GetChangesAsync(lastPage.ContinuationToken);
asyncEnumerable = blobChangeFeedAsyncPagable.AsPages();
await foreach (var page in asyncEnumerable)
{
lastPage = page;
foreach (var evt in page.Values)
{
EventIdsPart3.Add(evt.Id.ToString());
}
}

CollectionAssert.IsNotEmpty(EventIdsPart3);

// Assert events are not duplicated
CollectionAssert.IsEmpty(EventIdsPart1.Intersect(EventIdsPart2));
CollectionAssert.IsEmpty(EventIdsPart1.Intersect(EventIdsPart3));
CollectionAssert.IsEmpty(EventIdsPart2.Intersect(EventIdsPart3));
}

[Test]
[Ignore("For debugging larger Change Feeds locally")]
public async Task PageSizeTest()
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

0 comments on commit aeb9e80

Please sign in to comment.