-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Paged Flux abstractions #6465
Paged Flux abstractions #6465
Changes from all commits
424490c
35f8814
1eb7381
d3a5d42
1dee7f5
7fd7e41
92029bd
946f8ae
8c69889
1540c6c
dfa1f40
b9fe5e9
317b7d8
6b82642
3175061
81bcb74
87ae9f8
b1df280
4670ab7
0e48a11
4a67f98
9f77c80
daff607
1607889
8e88204
759e88c
33de1d9
5afcc84
65942bb
879f7ca
ee078a8
5363af6
f23ac6e
d78a70b
8c5372f
79b1543
d2e764f
dfe68d4
f01f1c9
ac3a8f4
b08ec16
43e83e4
44fcc6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,22 +6,24 @@ | |
import com.azure.core.http.HttpRequest; | ||
|
||
import java.util.stream.Collectors; | ||
|
||
import com.azure.core.util.paging.PageRetriever; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
|
||
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* This class is a flux that can operate on a {@link PagedResponse} and also provides the ability to operate on | ||
* individual items. When processing the response by page, each response will contain the items in the page as well as | ||
* the request details like status code and headers. | ||
* This type is a Flux that provides the ability to operate on paginated REST responses of type {@link PagedResponse} | ||
* and individual items in such pages. When processing the response by page, each response will contain the items | ||
* in the page as well as the REST response details like status code and headers. | ||
* | ||
* <p>To process one item at a time, simply subscribe to this flux as shown below </p> | ||
* <p><strong>Code sample</strong></p> | ||
* {@codesnippet com.azure.core.http.rest.pagedflux.items} | ||
* | ||
* <p>To process one page at a time, use {@link #byPage} method as shown below </p> | ||
* <p>To process one page at a time, use {@link #byPage()} method as shown below </p> | ||
* <p><strong>Code sample</strong></p> | ||
* {@codesnippet com.azure.core.http.rest.pagedflux.pages} | ||
* | ||
|
@@ -37,24 +39,23 @@ | |
* @see Flux | ||
*/ | ||
public class PagedFlux<T> extends PagedFluxBase<T, PagedResponse<T>> { | ||
|
||
/** | ||
* Creates an instance of {@link PagedFlux} that consists of only a single page of results. The only argument to | ||
* this constructor therefore is a supplier that fetches the first (and known-only) page of {@code T}. | ||
* Creates an instance of {@link PagedFlux} that consists of only a single page. | ||
* This constructor takes a {@code Supplier} that return the single page of {@code T}. | ||
* | ||
* <p><strong>Code sample</strong></p> | ||
* {@codesnippet com.azure.core.http.rest.pagedflux.singlepage.instantiation} | ||
* | ||
* @param firstPageRetriever Supplier that retrieves the first page. | ||
*/ | ||
public PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever) { | ||
super(firstPageRetriever); | ||
this(firstPageRetriever, token -> Mono.empty()); | ||
} | ||
|
||
/** | ||
* Creates an instance of {@link PagedFlux}. The constructor takes in two arguments. The first argument is a | ||
* supplier that fetches the first page of {@code T}. The second argument is a function that fetches subsequent | ||
* pages of {@code T} | ||
* Creates an instance of {@link PagedFlux}. The constructor takes a {@code Supplier} and | ||
* {@code Function}. The {@code Supplier} returns the first page of {@code T}, | ||
* the {@code Function} retrieves subsequent pages of {@code T}. | ||
* | ||
* <p><strong>Code sample</strong></p> | ||
* {@codesnippet com.azure.core.http.rest.pagedflux.instantiation} | ||
|
@@ -64,21 +65,65 @@ public PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever) { | |
*/ | ||
public PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever, | ||
Function<String, Mono<PagedResponse<T>>> nextPageRetriever) { | ||
super(firstPageRetriever, nextPageRetriever); | ||
this(() -> (continuationToken, pageSize) -> continuationToken == null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Storage (e.g. in blob list) and many of the ARM services supports client driven pagination. Agree that a version of PageRetriever without pageSize can simplify implementation of client APIs that does not support page-size, but it brings a new type that is not used by customers but only by SDK implementors. I was thinking it is ok to keep the bar high for SDK implementor if that save us from adding a new type (and provide the type when customer ask for it) |
||
? firstPageRetriever.get().flux() | ||
: nextPageRetriever.apply(continuationToken).flux(), true); | ||
} | ||
|
||
/** | ||
* Create PagedFlux backed by Page Retriever Function Supplier. | ||
* | ||
* @param provider the Page Retrieval Provider | ||
* @param ignored param is ignored, exists in signature only to avoid conflict with first ctr | ||
*/ | ||
private PagedFlux(Supplier<PageRetriever<String, PagedResponse<T>>> provider, boolean ignored) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's too bad that we have to use a factory to create this. I think it makes it less discoverable since you can do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wish Java has a way to work around type erasure which is causing this pain. As pointed in the code comments - proposal is to update the current ctr taking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will unfortunately need to live with this inconsistency until we can do a breaking change release. The only positive is that we don't expect end users to be dealing with this API, so the only people encountering this will be client library developers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. Maybe this can be tracked in a github issue with appropriate labels so we don't forget to make this change when working on a major version release. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added this work item here: #6610 (comment) |
||
super(provider, ignored); | ||
} | ||
|
||
/** | ||
* Creates an instance of {@link PagedFlux} backed by a Page Retriever Supplier (provider). | ||
* When invoked provider should return {@link PageRetriever}. The provider will be called for each | ||
* Subscription to the PagedFlux instance. The Page Retriever can get called multiple times in serial | ||
* fashion, each time after the completion of the Flux returned from the previous invocation. | ||
* The final completion signal will be send to the Subscriber when the last Page emitted by the Flux | ||
* returned by Page Retriever has {@code null} continuation token. | ||
* | ||
* The provider is useful mainly in two scenarios: | ||
* <ul> | ||
* <li> To manage state across multiple call to Page Retrieval within the same Subscription. | ||
* <li> To decorate a PagedFlux to produce new PagedFlux. | ||
* </ul> | ||
* | ||
* <p><strong>Decoration sample</strong></p> | ||
* {@codesnippet com.azure.core.http.rest.pagedflux.create.decoration} | ||
* | ||
* @param provider the Page Retrieval Provider | ||
* @param <T> The type of items in a {@link PagedResponse} | ||
* @return PagedFlux backed by the Page Retriever Function Supplier | ||
*/ | ||
public static <T> PagedFlux<T> create(Supplier<PageRetriever<String, PagedResponse<T>>> provider) { | ||
return new PagedFlux<>(provider, true); | ||
} | ||
|
||
/** | ||
* Maps this PagedFlux instance of T to a PagedFlux instance of type S as per the provided mapper function. | ||
* Maps this PagedFlux instance of T to a PagedFlux instance of type S as per the provided mapper | ||
* function. | ||
* | ||
* @param mapper The mapper function to convert from type T to type S. | ||
* @param <S> The mapped type. | ||
* @return A PagedFlux of type S. | ||
* @deprecated refer the decoration samples for {@link PagedFlux#create(Supplier)}. | ||
*/ | ||
@Deprecated | ||
public <S> PagedFlux<S> mapPage(Function<T, S> mapper) { | ||
return new PagedFlux<S>(() -> getFirstPageRetriever().get() | ||
.map(mapPagedResponse(mapper)), | ||
continuationToken -> getNextPageRetriever().apply(continuationToken) | ||
.map(mapPagedResponse(mapper))); | ||
Supplier<PageRetriever<String, PagedResponse<S>>> provider = () -> (continuationToken, pageSize) -> { | ||
Flux<PagedResponse<T>> flux = (continuationToken == null) | ||
? byPage() | ||
: byPage(continuationToken); | ||
return flux | ||
.map(mapPagedResponse(mapper)); | ||
}; | ||
return PagedFlux.create(provider); | ||
} | ||
|
||
private <S> Function<PagedResponse<T>, PagedResponse<S>> mapPagedResponse(Function<T, S> mapper) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A
ContinuablePage
is a special type ofPage
andContinuablePage
would extend fromPage
.Ideally, we could have changed the name of this interface to
ContinuablePage
and the base interface could have been justPage
with one methodgetElements()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things here:
http
package since non-http based client libs might want to use it. A genuine concern from cosmos team was - inheritance hierarchy of TCP based cosmos type containing core types fromhttp
package, thePage
type is inhttp
package.Page
andContinuablePage
and call itPage
then move it out ofhttp
package but that is a breaking change.So we’ve above two constraints - hence the type ContinuablePage and inheritance hierarchy as proposed in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anuchandy Can you please be sure to update #6610 with the final set of changes we would make when a breaking-change release is possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because we have a released version already, we'll have to keep this hierarchy.