From 81edfb21ba746fc932aacde86f86ce75d5d76444 Mon Sep 17 00:00:00 2001 From: Hokeun Kim Date: Tue, 28 Dec 2021 20:42:10 -0800 Subject: [PATCH] Add message handling for PROVISIONAL_TAG_ADVANCE_GRANT (PTAG). Also use Tag class instead of TimeValue for precise handling of Tags. --- src/core/federation.ts | 63 +++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/src/core/federation.ts b/src/core/federation.ts index 1c5e7736c..dad3cd4d6 100644 --- a/src/core/federation.ts +++ b/src/core/federation.ts @@ -131,6 +131,16 @@ enum RTIMessageTypes { */ MSG_TYPE_TAG_ADVANCE_GRANT = 7, + /** + * Byte identifying a provisional time advance grant (PTAG) sent by the RTI to a federate + * in centralized coordination. This message is a promise by the RTI to the federate + * that no later message sent to the federate will have a tag earlier than the tag + * carried by this PTAG message. + * The next eight bytes will be the timestamp. + * The next four bytes will be the microstep. + */ + MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT = 8, + /** * Byte identifying a logical tag complete (LTC) message sent by a federate * to the RTI. @@ -683,19 +693,20 @@ class RTIClient extends EventEmitter { // MessageType: 1 byte. // Timestamp: 8 bytes. // Microstep: 4 bytes. - let incomplete = assembledData.length < 13 + bufferIndex; - - if (incomplete) { - thiz.chunkedBuffer = Buffer.alloc(assembledData.length - bufferIndex); - assembledData.copy(thiz.chunkedBuffer, 0, bufferIndex) - } else { - Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_TAG_ADVANCE_GRANT'}); - let timeBuffer = Buffer.alloc(8); - assembledData.copy(timeBuffer, 0, bufferIndex + 1, bufferIndex + 9); - let time = TimeValue.fromBinary(timeBuffer); - // FIXME: Process microstep properly. - thiz.emit('timeAdvanceGrant', time); - } + Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_TAG_ADVANCE_GRANT'}); + let tagBuffer = Buffer.alloc(12); + assembledData.copy(tagBuffer, 0, bufferIndex + 1, bufferIndex + 13); + let tag = Tag.fromBinary(tagBuffer); + thiz.emit('timeAdvanceGrant', tag); + bufferIndex += 13; + break; + } + case RTIMessageTypes.MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT: { + Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT'}); + let tagBuffer = Buffer.alloc(12); + assembledData.copy(tagBuffer, 0, bufferIndex + 1, bufferIndex + 13); + let tag = Tag.fromBinary(tagBuffer); + Log.debug(thiz, () => {return `PTAG value: ${tag}`}); bufferIndex += 13; break; } @@ -764,7 +775,7 @@ export class FederatedApp extends App { * An RTI synchronized Federate cannot advance its logical time * beyond this value. */ - private greatestTimeAdvanceGrant: TimeValue | null = null; + private greatestTimeAdvanceGrant: Tag | null = null; private upstreamFedIDs: number[] = []; private upstreamFedDelays: bigint[] = []; @@ -807,9 +818,9 @@ export class FederatedApp extends App { protected _canProceed(event: TaggedEvent) { if (this._isRTISynchronized()) { let greatestTAG = this._getGreatestTimeAdvanceGrant(); - let nextTime = event.tag.time; - if (greatestTAG === null || greatestTAG.isEarlierThan(nextTime)) { - this.sendRTINextEventTime(nextTime); + let nextTag = event.tag; + if (greatestTAG === null || greatestTAG.isSmallerThan(nextTag)) { + this.sendRTINextEventTime(nextTag); Log.debug(this, () => "The greatest time advance grant \ received from the RTI is less than the timestamp of the \ next event on the event queue"); @@ -943,13 +954,13 @@ export class FederatedApp extends App { * Send a next event time message to the RTI. This should be called * when this federated app is unable to advance logical time beause it * has not yet received a sufficiently large time advance grant. - * @param nextTime The time to which this federate would like to + * @param nextTag The time to which this federate would like to * advance logical time. */ - public sendRTINextEventTime(nextTime: TimeValue) { - let time = nextTime.toBinary(); - Log.debug(this, () => {return `Sending RTI next event time with time: ${time.toString('hex')}`}); - this.rtiClient.sendRTINextEventTime(time); + public sendRTINextEventTime(nextTag: Tag) { + let tag = nextTag.toBinary(); + Log.debug(this, () => {return `Sending RTI next event time with time: ${tag}`}); + this.rtiClient.sendRTINextEventTime(tag); } /** @@ -1048,12 +1059,12 @@ export class FederatedApp extends App { } }); - this.rtiClient.on('timeAdvanceGrant', (time: TimeValue) => { - Log.debug(this, () => {return `Time Advance Grant received from RTI for ${time}.`}); - if (this.greatestTimeAdvanceGrant === null || this.greatestTimeAdvanceGrant?.isEarlierThan(time)) { + this.rtiClient.on('timeAdvanceGrant', (tag: Tag) => { + Log.debug(this, () => {return `Time Advance Grant received from RTI for ${tag}.`}); + if (this.greatestTimeAdvanceGrant === null || this.greatestTimeAdvanceGrant?.isSmallerThan(tag)) { // Update the greatest time advance grant and immediately // wake up _next, in case it was blocked by the old time advance grant - this.greatestTimeAdvanceGrant = time; + this.greatestTimeAdvanceGrant = tag; this._requestImmediateInvocationOfNext(); } });