Skip to content

Commit

Permalink
Allow iterating through a change stream (#339)
Browse files Browse the repository at this point in the history
  • Loading branch information
Joannis authored Apr 7, 2024
1 parent 1b249e7 commit 929e88f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 11 deletions.
25 changes: 15 additions & 10 deletions Sources/MongoClient/Cursor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,21 @@ public final class MongoCursor {
command.maxTimeMS = self.maxTimeMS
command.readConcern = readConcern

let newCursor = try await connection.executeCodable(
command,
decodeAs: GetMoreReply.self,
namespace: namespace,
in: self.transaction,
sessionId: session?.sessionId,
traceLabel: "\(traceLabel ?? "UnknownOperation").getMore",
serviceContext: context
)

let newCursor = try await withTaskCancellationHandler {
try await connection.executeCodable(
command,
decodeAs: GetMoreReply.self,
namespace: namespace,
in: self.transaction,
sessionId: session?.sessionId,
traceLabel: "\(traceLabel ?? "UnknownOperation").getMore",
serviceContext: context
)
} onCancel: {
Task {
try await self.close()
}
}
self.id = newCursor.cursor.id
return newCursor.cursor.nextBatch
}
Expand Down
21 changes: 20 additions & 1 deletion Sources/MongoKitten/ChangeStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ extension MongoCollection {
}

/// A change stream is a stream of change notifications for a collection or database
public struct ChangeStream<T: Decodable> {
public struct ChangeStream<T: Decodable>: AsyncSequence {
public typealias Notification = ChangeStreamNotification<T>
public typealias Element = Notification
typealias InputCursor = FinalizedCursor<MappedCursor<AggregateBuilderPipeline, Notification>>

internal let cursor: InputCursor
Expand All @@ -137,6 +138,24 @@ public struct ChangeStream<T: Decodable> {
public mutating func setGetMoreInterval(to interval: TimeAmount? = nil) {
self.getMoreInterval = interval
}

public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = Notification

private var iterator: InputCursor.AsyncIterator

init(iterator: InputCursor.AsyncIterator) {
self.iterator = iterator
}

public mutating func next() async throws -> Element? {
try await iterator.next()
}
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(iterator: cursor.makeAsyncIterator())
}

/// Iterates over the change stream notifications and calls the given handler for each notification
/// - Parameter handler: The handler to call for each notification
Expand Down

0 comments on commit 929e88f

Please sign in to comment.