Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/#43 fix controller restart on config change #104

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
/**
* @param <M> the message type of the communication service
* @param <MB> the builder for message M
*
* @author <a href="mailto:divine@openbase.org">Divine Threepwood</a>
*/

Expand Down Expand Up @@ -116,6 +117,7 @@ public abstract class AbstractControllerServer<M extends AbstractMessage, MB ext
* Create a communication service.
*
* @param builder the initial data builder
*
* @throws InstantiationException if the creation fails
*/
public AbstractControllerServer(final MB builder) throws InstantiationException {
Expand Down Expand Up @@ -156,6 +158,7 @@ protected long getShutdownDelay() {

/**
* @param scope
*
* @throws InitializationException
* @throws InterruptedException
*/
Expand All @@ -165,6 +168,7 @@ public void init(final Scope scope) throws InitializationException, InterruptedE

/**
* @param scope
*
* @throws InitializationException
* @throws InterruptedException
*/
Expand All @@ -179,6 +183,7 @@ public void init(final String scope) throws InitializationException, Interrupted
/**
* @param scope
* @param communicatorConfig
*
* @throws InitializationException
* @throws InterruptedException
*/
Expand Down Expand Up @@ -245,7 +250,6 @@ public void init(final Scope scope, final CommunicatorConfig communicatorConfig)
// mark controller as online.
setAvailabilityState(ONLINE);

logger.debug("trigger initial sync");
notifyChange();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -318,7 +322,7 @@ public void activate() throws InterruptedException, CouldNotPerformException {
manageLock.lockWriteInterruptibly(this);
try {
validateInitialization();
logger.debug("Activate AbstractControllerServer for: " + this);
logger.debug("Activate AbstractControllerServer for: " + this + " on " + ScopeProcessor.generateStringRep(scope));
setAvailabilityState(ACTIVATING);
assert serverWatchDog != null;
assert publisherWatchDog != null;
Expand Down Expand Up @@ -351,7 +355,7 @@ public void deactivate() throws InterruptedException, CouldNotPerformException {
initialDataSyncFuture.cancel(true);
}

logger.debug("Deactivate AbstractControllerServer for: " + this);
logger.debug("Deactivate AbstractControllerServer for: " + this + " on " + ScopeProcessor.generateStringRep(scope));
// The order is important: The publisher publishes a zero event when the availabilityState is set to deactivating which leads remotes to disconnect
// The remotes try to reconnect again and start a requestData. If the server is still active it will respond
// and the remotes will think that the server is still there.
Expand Down Expand Up @@ -432,6 +436,7 @@ public boolean isActive() {
* {@inheritDoc}
*
* @return {@inheritDoc}
*
* @throws NotAvailableException {@inheritDoc}
*/
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -462,6 +467,7 @@ public Future<M> getDataFuture() {

/**
* @param controllerAvailability
*
* @throws InterruptedException
*/
private void setAvailabilityState(final AvailabilityState.State controllerAvailability) throws InterruptedException {
Expand Down Expand Up @@ -554,6 +560,7 @@ public BuilderSyncSetup<MB> getBuilderSetup() {
* {@inheritDoc}
*
* @param consumer {@inheritDoc}
*
* @return {@inheritDoc}
*/
@Override
Expand All @@ -567,6 +574,7 @@ public ClosableDataBuilder<MB> getDataBuilder(final Object consumer) {
*
* @param consumer {@inheritDoc}
* @param notificationStrategy {@inheritDoc}
*
* @return {@inheritDoc}
*/
@Override
Expand All @@ -579,6 +587,7 @@ public ClosableDataBuilder<MB> getDataBuilder(final Object consumer, final Notif
* {@inheritDoc}
*
* @param consumer {@inheritDoc}
*
* @return {@inheritDoc}
*/
@Override
Expand All @@ -591,6 +600,7 @@ public ClosableDataBuilder<MB> getDataBuilderInterruptible(final Object consumer
*
* @param consumer {@inheritDoc}
* @param notificationStrategy {@inheritDoc}
*
* @return {@inheritDoc}
*/
@Override
Expand Down Expand Up @@ -620,6 +630,7 @@ public ClosableDataBuilder<MB> getDataBuilderInterruptible(final Object consumer
* in background after leaving the try brackets.
*
* @param consumer a responsible instance which consumes the lock.
*
* @return a new builder wrapper which already locks the manage lock.
*/
protected CloseableWriteLockWrapper getManageWriteLock(final Object consumer) {
Expand Down Expand Up @@ -648,6 +659,7 @@ protected CloseableWriteLockWrapper getManageWriteLock(final Object consumer) {
* in background after leaving the try brackets.
*
* @param consumer a responsible instance which consumes the lock.
*
* @return a new builder wrapper which already locks the manage lock.
*/
protected CloseableReadLockWrapper getManageReadLock(final Object consumer) {
Expand Down Expand Up @@ -676,7 +688,9 @@ protected CloseableReadLockWrapper getManageReadLock(final Object consumer) {
* in background after leaving the try brackets.
*
* @param consumer a responsible instance which consumes the lock.
*
* @return a new builder wrapper which already locks the manage lock.
*
* @throws InterruptedException in case the thread was externally interrupted during the locking.
*/
protected CloseableWriteLockWrapper getManageWriteLockInterruptible(final Object consumer) throws InterruptedException {
Expand Down Expand Up @@ -705,7 +719,9 @@ protected CloseableWriteLockWrapper getManageWriteLockInterruptible(final Object
* in background after leaving the try brackets.
*
* @param consumer a responsible instance which consumes the lock.
*
* @return a new builder wrapper which already locks the manage lock.
*
* @throws InterruptedException in case the thread was externally interrupted during the locking.
*/
protected CloseableReadLockWrapper getManageReadLockInterruptible(final Object consumer) throws InterruptedException {
Expand Down Expand Up @@ -743,6 +759,7 @@ public CloseableWriteLockWrapper getCloseableWriteLock(final Object consumer) {
* {@inheritDoc}
*
* @return {@inheritDoc}
*
* @throws NotAvailableException {@inheritDoc}
*/
@Override
Expand Down Expand Up @@ -825,7 +842,9 @@ public void notifyChange() throws CouldNotPerformException, InterruptedException
* sub classes to update data which can be received by everyone.
*
* @param dataBuilder a clone of the current data builder.
*
* @return a message build from the data builder
*
* @throws CouldNotPerformException if the update fails
*/
protected M updateDataToPublish(MB dataBuilder) throws CouldNotPerformException {
Expand All @@ -836,6 +855,7 @@ protected M updateDataToPublish(MB dataBuilder) throws CouldNotPerformException
* Overwrite this method to get informed about data updates.
*
* @param data new arrived data messages.
*
* @throws CouldNotPerformException
*/
protected void notifyDataUpdate(M data) throws CouldNotPerformException {
Expand All @@ -845,6 +865,7 @@ protected void notifyDataUpdate(M data) throws CouldNotPerformException {
/**
* @param fieldNumber
* @param value
*
* @throws CouldNotPerformException
*/
protected final void setDataField(int fieldNumber, Object value) throws CouldNotPerformException {
Expand All @@ -870,6 +891,7 @@ protected final void setDataField(int fieldNumber, Object value) throws CouldNot
/**
* @param fieldName
* @param value
*
* @throws CouldNotPerformException
*/
protected final void setDataField(String fieldName, Object value) throws CouldNotPerformException {
Expand All @@ -894,7 +916,9 @@ protected final void setDataField(String fieldName, Object value) throws CouldNo

/**
* @param name
*
* @return
*
* @throws NotAvailableException
*/
protected final Object getDataField(String name) throws NotAvailableException {
Expand All @@ -912,7 +936,9 @@ protected final Object getDataField(String name) throws NotAvailableException {

/**
* @param name
*
* @return
*
* @throws CouldNotPerformException
*/
protected final boolean hasDataField(final String name) throws CouldNotPerformException {
Expand All @@ -930,7 +956,9 @@ protected final boolean hasDataField(final String name) throws CouldNotPerformEx

/**
* @param name
*
* @return
*
* @throws CouldNotPerformException
*/
protected final boolean supportsDataField(final String name) throws CouldNotPerformException {
Expand All @@ -944,6 +972,7 @@ protected final boolean supportsDataField(final String name) throws CouldNotPerf

/**
* @param fieldId
*
* @return
*/
protected final Descriptors.FieldDescriptor getDataFieldDescriptor(int fieldId) {
Expand Down Expand Up @@ -1027,6 +1056,7 @@ public boolean isDataBuilderWriteLockedByCurrentThread() {
* {@inheritDoc }
*
* @param timestamp {@inheritDoc }
*
* @return {@inheritDoc }
*/
@Override
Expand All @@ -1051,6 +1081,7 @@ public Future<Long> ping(final Long timestamp) {
* {@inheritDoc}
*
* @return {@inheritDoc}
*
* @throws org.openbase.jul.exception.CouldNotPerformException {@inheritDoc}
*/
@RPCMethod
Expand All @@ -1070,6 +1101,7 @@ public M requestStatus() throws CouldNotPerformException {
* Register methods for RPCs on the internal RPC server.
*
* @param server the rpc server on which the methods should be registered.
*
* @throws CouldNotPerformException if registering methods fails
*/
public abstract void registerMethods(final RPCServer server) throws CouldNotPerformException;
Expand Down Expand Up @@ -1121,6 +1153,7 @@ public void waitForData() throws CouldNotPerformException, InterruptedException
*
* @param timeout {@inheritDoc}
* @param timeUnit {@inheritDoc}.
*
* @throws CouldNotPerformException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.google.protobuf.Any as protoAny

class RPCClientImpl(
scope: ScopeType.Scope,
config: CommunicatorConfig
config: CommunicatorConfig,
) : RPCCommunicatorImpl(scope, config), RPCClient {

private val parameterParserMap: HashMap<String, List<(Any) -> protoAny>> = HashMap()
Expand All @@ -32,7 +32,7 @@ class RPCClientImpl(
override fun <RETURN : Any> callMethod(
methodName: String,
return_clazz: KClass<RETURN>,
vararg parameters: Any
vararg parameters: Any,
): Future<RPCResponse<RETURN>> {
lazyRegisterMethod(methodName, return_clazz, *parameters)

Expand Down Expand Up @@ -79,7 +79,7 @@ class RPCClientImpl(
private fun <RETURN> handleRPCResponse(
mqtt5Publish: Mqtt5Publish,
rpcFuture: CompletableFuture<RPCResponse<RETURN>>,
request: Request
request: Request,
) {
val response = Response.parseFrom(mqtt5Publish.payloadAsBytes)
if (response.error.isEmpty() && response.status != Response.Status.FINISHED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import kotlin.reflect.KFunction
class RPCServerImpl(
scope: Scope,
config: CommunicatorConfig,
dispatcher: CoroutineDispatcher? = GlobalCachedExecutorService.getInstance().executorService.asCoroutineDispatcher()
private val dispatcher: CoroutineDispatcher? = GlobalCachedExecutorService.getInstance().executorService.asCoroutineDispatcher(),

Check warning

Code scanning / detekt

Line detected that is longer than the defined maximum line length in the code style.

Line detected that is longer than the defined maximum line length in the code style.
) : RPCCommunicatorImpl(scope, config), RPCServer {

companion object {
Expand All @@ -46,7 +46,7 @@ class RPCServerImpl(

private val lock = SyncObject("Activation Lock")

private val coroutineScope = if (dispatcher != null) CoroutineScope(dispatcher) else null
private var coroutineScope: CoroutineScope? = null

internal fun getActivationFuture(): Future<out Any>? {
return this.activationFuture
Expand All @@ -64,36 +64,40 @@ class RPCServerImpl(
return
}

coroutineScope?.cancel()
coroutineScope = dispatcher?.let { CoroutineScope(dispatcher) }

activationFuture = mqttClient.subscribe(
Mqtt5Subscribe.builder().topicFilter(topic).qos(MqttQos.EXACTLY_ONCE).build(), { mqtt5Publish ->

// Note: this is a wrapper for the usage of a shared client
// which may remain subscribed even if deactivate is called
if (isActive) {
when (getPriority(mqtt5Publish)) {
org.openbase.jul.annotation.RPCMethod.Priority.HIGH -> handleRemoteCall(mqtt5Publish)
else -> {
coroutineScope?.ensureActive();
coroutineScope?.launch {
withTimeout(RPC_TIMEOUT.toMillis()) {
handleRemoteCall(mqtt5Publish)
}
} ?: handleRemoteCall(mqtt5Publish)
}
}

}
}, GlobalCachedExecutorService.getInstance().executorService
)

try {
activationFuture!!.get(ACTIVATION_TIMEOUT, TimeUnit.MILLISECONDS)
activationFuture?.get(ACTIVATION_TIMEOUT, TimeUnit.MILLISECONDS)
} catch (e: TimeoutException) {
activationFuture!!.cancel(true)
activationFuture?.cancel(true)
throw CouldNotPerformException("Could not activate Subscriber", e)
} catch (e: AsyncRuntimeException) {
activationFuture!!.cancel(true)
activationFuture?.cancel(true)
throw CouldNotPerformException("Could not activate Subscriber", e)
} catch (e: InterruptedException) {
activationFuture!!.cancel(true)
activationFuture?.cancel(true)
throw e;
}
}
Expand All @@ -105,8 +109,9 @@ class RPCServerImpl(
mqttClient.unsubscribe(
Mqtt5Unsubscribe.builder().topicFilter(topic).build()
)
coroutineScope?.cancel()
coroutineScope = null
}
coroutineScope?.cancel()
}

override fun registerMethod(
Expand Down
Loading