Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making databaseAccount read dynamic after V4 master merge #8466

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,28 @@

package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.LocationHelper;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionPolicy;
import com.azure.cosmos.DatabaseAccount;
import com.azure.cosmos.implementation.caches.AsyncCache;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.LocationHelper;
import org.apache.commons.collections4.list.UnmodifiableList;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.net.URISyntaxException;
import java.net.URI;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -45,6 +47,8 @@ public class GlobalEndpointManager implements AutoCloseable {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final Scheduler scheduler = Schedulers.fromExecutor(executor);
private volatile boolean isClosed;
private AtomicBoolean firstTimeDatabaseAccountInitialization = new AtomicBoolean(true);
private volatile DatabaseAccount latestDatabaseAccount;

public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPolicy connectionPolicy, Configs configs) {
this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * 1000;
Expand Down Expand Up @@ -159,6 +163,16 @@ public Mono<Void> refreshLocationAsync(DatabaseAccount databaseAccount, boolean
});
}

/**
* This will provide the latest databaseAccount.
* If due to some reason last databaseAccount update was null,
* this method will return previous valid value
* @return DatabaseAccount
*/
public DatabaseAccount getLatestDatabaseAccount() {
return this.latestDatabaseAccount;
}

private Mono<Void> refreshLocationPrivateAsync(DatabaseAccount databaseAccount) {
return Mono.defer(() -> {
logger.debug("refreshLocationPrivateAsync() refreshing locations");
Expand Down Expand Up @@ -254,7 +268,13 @@ private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {

private Mono<DatabaseAccount> getDatabaseAccountAsync(URI serviceEndpoint) {
return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint)
.doOnNext(i -> logger.debug("account retrieved: {}", i)).single();
.doOnNext(databaseAccount -> {
if(databaseAccount != null) {
this.latestDatabaseAccount = databaseAccount;
}

logger.debug("account retrieved: {}", databaseAccount);
}).single();
}

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,29 +251,16 @@ private RxDocumentClientImpl(URI serviceEndpoint,
}

private void initializeGatewayConfigurationReader() {
String resourceToken;
if(this.tokenResolver != null) {
resourceToken = this.tokenResolver.getAuthorizationToken(RequestVerb.GET, "", CosmosResourceType.System, null);
} else if(!this.hasAuthKeyResourceToken && this.authorizationTokenProvider == null) {
resourceToken = this.firstResourceTokenFromPermissionFeed;
} else {
assert this.masterKeyOrResourceToken != null || this.cosmosKeyCredential != null;
resourceToken = this.masterKeyOrResourceToken;
}

this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint,
this.hasAuthKeyResourceToken,
resourceToken,
this.connectionPolicy,
this.authorizationTokenProvider,
this.reactorHttpClient);

DatabaseAccount databaseAccount = this.gatewayConfigurationReader.initializeReaderAsync().block();
this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.globalEndpointManager);
DatabaseAccount databaseAccount = this.globalEndpointManager.getLatestDatabaseAccount();
//Database account should not be null here,
// this.globalEndpointManager.init() must have been already called
// hence asserting it
assert(databaseAccount != null);
this.useMultipleWriteLocations = this.connectionPolicy.getUsingMultipleWriteLocations() && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount);

// TODO: add support for openAsync
// https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589
this.globalEndpointManager.refreshLocationAsync(databaseAccount, false).block();
}

public void init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,11 @@
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionPolicy;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.RequestVerb;
import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.DatabaseAccount;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ReplicationPolicy;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import reactor.core.publisher.Mono;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -42,118 +24,25 @@
*/
public class GatewayServiceConfigurationReader {

public static final String GATEWAY_READER_NOT_INITIALIZED = "GatewayServiceConfigurationReader has not been initialized";
private GlobalEndpointManager globalEndpointManager;

public ReplicationPolicy userReplicationPolicy;
private ReplicationPolicy systemReplicationPolicy;
private ConsistencyLevel consistencyLevel;
private volatile boolean initialized;
private URI serviceEndpoint;
private final ConnectionPolicy connectionPolicy;
private Map<String, Object> queryEngineConfiguration;
private final BaseAuthorizationTokenProvider baseAuthorizationTokenProvider;
private final boolean hasAuthKeyResourceToken;
private final String authKeyResourceToken;
private HttpClient httpClient;

public GatewayServiceConfigurationReader(URI serviceEndpoint, boolean hasResourceToken, String resourceToken,
ConnectionPolicy connectionPolicy, BaseAuthorizationTokenProvider baseAuthorizationTokenProvider,
HttpClient httpClient) {
this.serviceEndpoint = serviceEndpoint;
this.baseAuthorizationTokenProvider = baseAuthorizationTokenProvider;
this.hasAuthKeyResourceToken = hasResourceToken;
this.authKeyResourceToken = resourceToken;
this.connectionPolicy = connectionPolicy;
this.httpClient = httpClient;
public GatewayServiceConfigurationReader(GlobalEndpointManager globalEndpointManager) {
this.globalEndpointManager = globalEndpointManager;
}

public ReplicationPolicy getUserReplicationPolicy() {
this.throwIfNotInitialized();
return this.userReplicationPolicy;
return BridgeInternal.getReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount());
}

public ReplicationPolicy getSystemReplicationPolicy() {
this.throwIfNotInitialized();
return this.systemReplicationPolicy;
}

public boolean enableAuthorization() {
return true;
return BridgeInternal.getSystemReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount());
}

public ConsistencyLevel getDefaultConsistencyLevel() {
this.throwIfNotInitialized();
return this.consistencyLevel;
}

public void setDefaultConsistencyLevel(ConsistencyLevel value) {
this.throwIfNotInitialized();
this.consistencyLevel = value;
return BridgeInternal.getConsistencyPolicy(this.globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel();
}

public Map<String, Object> getQueryEngineConfiguration() {
this.throwIfNotInitialized();
return this.queryEngineConfiguration;
}

private Mono<DatabaseAccount> getDatabaseAccountAsync(URI serviceEndpoint) {

HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.set(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION);

UserAgentContainer userAgentContainer = new UserAgentContainer();
String userAgentSuffix = this.connectionPolicy.getUserAgentSuffix();
if (userAgentSuffix != null && userAgentSuffix.length() > 0) {
userAgentContainer.setSuffix(userAgentSuffix);
}

httpHeaders.set(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent());
httpHeaders.set(HttpConstants.HttpHeaders.API_TYPE, Constants.Properties.SQL_API_TYPE);

String xDate = Utils.nowAsRFC1123();
httpHeaders.set(HttpConstants.HttpHeaders.X_DATE, xDate);

String authorizationToken;
if (this.hasAuthKeyResourceToken || baseAuthorizationTokenProvider == null) {
authorizationToken = HttpUtils.urlEncode(this.authKeyResourceToken);
} else {
// Retrieve the document service properties.
Map<String, String> header = new HashMap<>();
header.put(HttpConstants.HttpHeaders.X_DATE, xDate);
authorizationToken = baseAuthorizationTokenProvider
.generateKeyAuthorizationSignature(RequestVerb.GET, serviceEndpoint, header);
}
httpHeaders.set(HttpConstants.HttpHeaders.AUTHORIZATION, authorizationToken);

HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, serviceEndpoint, serviceEndpoint.getPort(), httpHeaders);
Mono<HttpResponse> httpResponse = httpClient.send(httpRequest);
return toDatabaseAccountObservable(httpResponse, httpRequest);
}

public Mono<DatabaseAccount> initializeReaderAsync() {
return GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.serviceEndpoint,

new ArrayList<>(this.connectionPolicy.getPreferredLocations()), url -> {
return getDatabaseAccountAsync(url);

}).doOnSuccess(databaseAccount -> {
userReplicationPolicy = BridgeInternal.getReplicationPolicy(databaseAccount);
systemReplicationPolicy = BridgeInternal.getSystemReplicationPolicy(databaseAccount);
queryEngineConfiguration = BridgeInternal.getQueryEngineConfiuration(databaseAccount);
consistencyLevel = BridgeInternal.getConsistencyPolicy(databaseAccount).getDefaultConsistencyLevel();
initialized = true;
});
}

private Mono<DatabaseAccount> toDatabaseAccountObservable(Mono<HttpResponse> httpResponse, HttpRequest httpRequest) {

return HttpClientUtils.parseResponseAsync(httpResponse, httpRequest)
.map(rxDocumentServiceResponse -> rxDocumentServiceResponse.getResource(DatabaseAccount.class));
}

private void throwIfNotInitialized() {
if (!this.initialized) {
throw new IllegalArgumentException(GATEWAY_READER_NOT_INITIALIZED);
}
return BridgeInternal.getQueryEngineConfiuration(this.globalEndpointManager.getLatestDatabaseAccount());
}
}
Loading