Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into named-parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
elexisvenator committed Jan 2, 2025
2 parents 0ecf9ca + ecdcf75 commit 3b4a83a
Show file tree
Hide file tree
Showing 24 changed files with 747 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<Project>
<PropertyGroup>
<Version>7.34.1</Version>
<Version>7.35.0</Version>
<LangVersion>12.0</LangVersion>
<Authors>Jeremy D. Miller;Babu Annamalai;Oskar Dudycz;Joona-Pekka Kokko</Authors>
<PackageIconUrl>https://martendb.io/logo.png</PackageIconUrl>
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ And finally, use Oakton as the command line parser and executor by replacing `Ap
// as the last line of your Program.cs file
return await app.RunOaktonCommands(args);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/samples/MinimalAPI/Program.cs#L51-L57' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_webapplication_2' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/samples/MinimalAPI/Program.cs#L55-L61' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_webapplication_2' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Once the _Marten.CommandLine_ Nuget is installed and Oakton is handling your command line parsing, you should be able to see the Marten commands by typing `dotnet run -- help` in the command line terminal of your choice at the root of your project:
Expand Down
37 changes: 37 additions & 0 deletions docs/events/projections/async-daemon.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,3 +528,40 @@ var latest = await session.QueryForNonStaleData<Trip>(5.Seconds())

Do note that this can time out if the projection just can't catch up to the latest event sequence in time. You may need to
be both cautious with using this in general, and also cautious especially with the timeout setting.

## Migrating a Projection from Inline to Async <Badge type="tip" text="7.35" />

::: warning
This will only work correctly *if* you have system downtime before migrating the new version of the code with this option
enabled. This feature cannot support a "blue/green" deployment model. Marten needs to system to be at rest before it starts
up the projection asynchronously or there's a chance you may "skip" events in the projection.
:::

During the course of a system's lifetime, you may find that you want to change a projection that's currently running
with a lifecycle of `Inline` to running asynchronously instead. If you need to do this *and* there is no structural change
to the projection that would require a projection rebuild, you can direct Marten to start that projection at the highest
sequence number assigned by the system (not the high water mark, but the event sequence number which may be higher).

To do so, use this option when registering the projection:

<!-- snippet: sample_using_subscribe_as_inline_to_async -->
<a id='snippet-sample_using_subscribe_as_inline_to_async'></a>
```cs
opts
.Projections
.Snapshot<SimpleAggregate>(SnapshotLifecycle.Async, o =>
{
// This option tells Marten to start the async projection at the highest
// event sequence assigned as the processing floor if there is no previous
// async daemon progress for this projection
o.SubscribeAsInlineToAsync();
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/converting_projection_from_inline_to_async.cs#L31-L43' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_subscribe_as_inline_to_async' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Just to be clear, when Marten's async daemon starts a projection with this starting option:

1. If there is no previously recorded progression, Marten will start processing this projection with the highest assigned event sequence
in the database as the floor and record that value as the current progress
2. If there is a previously recorded progression, Marten will start processing this projection at the recorded sequence as normal
54 changes: 54 additions & 0 deletions docs/events/projections/multi-stream-projections.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,60 @@ public class UserFeatureTogglesProjection: MultiStreamProjection<UserFeatureTogg
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Projections/MultiStreamProjections/CustomGroupers/custom_grouper_with_document_session.cs#L15-L74' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_view-projection-custom-grouper-with-querysession' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

### Resolving Missing Shared Identifiers with Custom Grouper and Flat Table Projection

When follow-up events lack a shared identifier like `ProjectId`, you can combine a **custom `IAggregateGrouper`** with a **flat table projection** to handle grouping efficiently. The flat table projection manages the mapping between `UserId` and `ProjectId` in the database, enabling the custom grouper to resolve the `ProjectId` dynamically.

#### Step 1: Define the Flat Table Projection

Use a `FlatTableProjection` to store the mapping between `UserId` and `ProjectId`:

::: tip
Note that you should register this projection as inline, such that you are sure it's available when projecting starts.
:::

```cs
public class UserProjectFlatTableProjection : FlatTableProjection
{
public UserProjectFlatTableProjection() : base("user_project_mapping", SchemaNameSource.EventSchema)
{
Table.AddColumn<Guid>("user_id").AsPrimaryKey();
Table.AddColumn<Guid>("project_id").NotNull();

TeardownDataOnRebuild = true;

Project<UserJoinedProject>(map =>
{
map.Map(x => x.UserId);
map.Map(x => x.ProjectId);
});
}
}
```

#### Step 2: Use the Mapping in a Custom Grouper

The custom grouper resolves the `ProjectId` dynamically based on the `UserId` stored in the flat table:

```cs
public class ProjectEventGrouper : IAggregateGrouper<Guid>
{
public async Task Group(IQuerySession session, IEnumerable<IEvent> events, ITenantSliceGroup<Guid> grouping)
{
foreach (var @event in userEvents)
{
if(@event.Data is TaskCompleted taskCompleted)
{
var mapping = await session.Query<UserJoinedProject>()
.Where(mapping => mapping.UserId == @event.StreamId)
.SingleAsync();
grouping.AddEvent(mapping.ProjectId, @event);
}
}
}
}
```

## View Projection with Custom Slicer

::: tip
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System.Threading.Tasks;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
using Xunit;

namespace CoreTests.Bugs;

public class Bug_3603_migration_of_soft_deleted_partitions : BugIntegrationContext
{
[Fact]
public async Task do_not_repeatedly_patch()
{
// Weasel has been erroneously "finding" deltas when it should not

StoreOptions(opts =>
{
opts.Schema.For<Target>().SoftDeletedWithPartitioningAndIndex();

opts.Events.UseArchivedStreamPartitioning = true;
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

await theStore.Storage.Database.AssertDatabaseMatchesConfigurationAsync();

var store2 = SeparateStore(opts =>
{
opts.Schema.For<Target>().SoftDeletedWithPartitioningAndIndex();
opts.Events.UseArchivedStreamPartitioning = true;
});

await store2.Storage.Database.AssertDatabaseMatchesConfigurationAsync();
}
}
66 changes: 66 additions & 0 deletions src/CoreTests/Partitioning/partitioning_and_foreign_keys.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System.Threading.Tasks;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
using Shouldly;
using Weasel.Postgresql.Tables;
using Xunit;

namespace CoreTests.Partitioning;

public class partitioning_and_foreign_keys : OneOffConfigurationsContext
{
/*
* Partitioned to partitioned
* Not partitioned to partitioned
*
*
*
*/

[Fact]
public async Task from_partitioned_to_not_partitioned()
{
StoreOptions(opts =>
{
opts.Schema.For<Issue>()
.SoftDeletedWithPartitioningAndIndex()
.ForeignKey<User>(x => x.AssigneeId);
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();
await theStore.Storage.Database.AssertDatabaseMatchesConfigurationAsync();
}

[Fact]
public async Task from_partitioned_to_partitioned()
{
StoreOptions(opts =>
{
opts.Schema.For<Issue>()
.SoftDeletedWithPartitioningAndIndex()
.ForeignKey<User>(x => x.AssigneeId);

opts.Schema.For<User>().SoftDeletedWithPartitioningAndIndex();
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();
await theStore.Storage.Database.AssertDatabaseMatchesConfigurationAsync();
}

[Fact]
public async Task from_not_partitioned_to_partitioned()
{
StoreOptions(opts =>
{
opts.Schema.For<Issue>()
.ForeignKey<User>(x => x.AssigneeId);

opts.Schema.For<User>().SoftDeletedWithPartitioningAndIndex();
});

await Should.ThrowAsync<InvalidForeignKeyException>(async () =>
{
await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();
});
}
}
24 changes: 24 additions & 0 deletions src/DaemonTests/AsyncOptionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,30 @@ public async Task subscribe_from_sequence_hit_with_prior_higher_than_threshold_a
.ShouldBe(new Position(222L, true));
}

[Fact]
public async Task transition_from_inline_to_async_no_initial_progress()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(0);
theDatabase.FetchHighestEventSequenceNumber().Returns(1234L);
var options = new AsyncOptions();
options.SubscribeAsInlineToAsync();

(await options.DetermineStartingPositionAsync(1000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(1234L, true));
}

[Fact]
public async Task transition_from_inline_to_async_but_there_is_initial_progress()
{
theDatabase.ProjectionProgressFor(theName, theToken).Returns(1000L);
theDatabase.FetchHighestEventSequenceNumber().Returns(2005L);
var options = new AsyncOptions();
options.SubscribeAsInlineToAsync();

(await options.DetermineStartingPositionAsync(2003L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
.ShouldBe(new Position(1000L, false));
}




Expand Down
Loading

0 comments on commit 3b4a83a

Please sign in to comment.