Skip to content

Commit

Permalink
Add message handling for PROVISIONAL_TAG_ADVANCE_GRANT (PTAG). Also u…
Browse files Browse the repository at this point in the history
…se Tag class instead of TimeValue for precise handling of Tags.
  • Loading branch information
hokeun committed Dec 29, 2021
1 parent ff6f165 commit 81edfb2
Showing 1 changed file with 37 additions and 26 deletions.
63 changes: 37 additions & 26 deletions src/core/federation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ enum RTIMessageTypes {
*/
MSG_TYPE_TAG_ADVANCE_GRANT = 7,

/**
* Byte identifying a provisional time advance grant (PTAG) sent by the RTI to a federate
* in centralized coordination. This message is a promise by the RTI to the federate
* that no later message sent to the federate will have a tag earlier than the tag
* carried by this PTAG message.
* The next eight bytes will be the timestamp.
* The next four bytes will be the microstep.
*/
MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT = 8,

/**
* Byte identifying a logical tag complete (LTC) message sent by a federate
* to the RTI.
Expand Down Expand Up @@ -683,19 +693,20 @@ class RTIClient extends EventEmitter {
// MessageType: 1 byte.
// Timestamp: 8 bytes.
// Microstep: 4 bytes.
let incomplete = assembledData.length < 13 + bufferIndex;

if (incomplete) {
thiz.chunkedBuffer = Buffer.alloc(assembledData.length - bufferIndex);
assembledData.copy(thiz.chunkedBuffer, 0, bufferIndex)
} else {
Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_TAG_ADVANCE_GRANT'});
let timeBuffer = Buffer.alloc(8);
assembledData.copy(timeBuffer, 0, bufferIndex + 1, bufferIndex + 9);
let time = TimeValue.fromBinary(timeBuffer);
// FIXME: Process microstep properly.
thiz.emit('timeAdvanceGrant', time);
}
Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_TAG_ADVANCE_GRANT'});
let tagBuffer = Buffer.alloc(12);
assembledData.copy(tagBuffer, 0, bufferIndex + 1, bufferIndex + 13);
let tag = Tag.fromBinary(tagBuffer);
thiz.emit('timeAdvanceGrant', tag);
bufferIndex += 13;
break;
}
case RTIMessageTypes.MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT: {
Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT'});
let tagBuffer = Buffer.alloc(12);
assembledData.copy(tagBuffer, 0, bufferIndex + 1, bufferIndex + 13);
let tag = Tag.fromBinary(tagBuffer);
Log.debug(thiz, () => {return `PTAG value: ${tag}`});
bufferIndex += 13;
break;
}
Expand Down Expand Up @@ -764,7 +775,7 @@ export class FederatedApp extends App {
* An RTI synchronized Federate cannot advance its logical time
* beyond this value.
*/
private greatestTimeAdvanceGrant: TimeValue | null = null;
private greatestTimeAdvanceGrant: Tag | null = null;

private upstreamFedIDs: number[] = [];
private upstreamFedDelays: bigint[] = [];
Expand Down Expand Up @@ -807,9 +818,9 @@ export class FederatedApp extends App {
protected _canProceed(event: TaggedEvent<Present>) {
if (this._isRTISynchronized()) {
let greatestTAG = this._getGreatestTimeAdvanceGrant();
let nextTime = event.tag.time;
if (greatestTAG === null || greatestTAG.isEarlierThan(nextTime)) {
this.sendRTINextEventTime(nextTime);
let nextTag = event.tag;
if (greatestTAG === null || greatestTAG.isSmallerThan(nextTag)) {
this.sendRTINextEventTime(nextTag);
Log.debug(this, () => "The greatest time advance grant \
received from the RTI is less than the timestamp of the \
next event on the event queue");
Expand Down Expand Up @@ -943,13 +954,13 @@ export class FederatedApp extends App {
* Send a next event time message to the RTI. This should be called
* when this federated app is unable to advance logical time beause it
* has not yet received a sufficiently large time advance grant.
* @param nextTime The time to which this federate would like to
* @param nextTag The time to which this federate would like to
* advance logical time.
*/
public sendRTINextEventTime(nextTime: TimeValue) {
let time = nextTime.toBinary();
Log.debug(this, () => {return `Sending RTI next event time with time: ${time.toString('hex')}`});
this.rtiClient.sendRTINextEventTime(time);
public sendRTINextEventTime(nextTag: Tag) {
let tag = nextTag.toBinary();
Log.debug(this, () => {return `Sending RTI next event time with time: ${tag}`});
this.rtiClient.sendRTINextEventTime(tag);
}

/**
Expand Down Expand Up @@ -1048,12 +1059,12 @@ export class FederatedApp extends App {
}
});

this.rtiClient.on('timeAdvanceGrant', (time: TimeValue) => {
Log.debug(this, () => {return `Time Advance Grant received from RTI for ${time}.`});
if (this.greatestTimeAdvanceGrant === null || this.greatestTimeAdvanceGrant?.isEarlierThan(time)) {
this.rtiClient.on('timeAdvanceGrant', (tag: Tag) => {
Log.debug(this, () => {return `Time Advance Grant received from RTI for ${tag}.`});
if (this.greatestTimeAdvanceGrant === null || this.greatestTimeAdvanceGrant?.isSmallerThan(tag)) {
// Update the greatest time advance grant and immediately
// wake up _next, in case it was blocked by the old time advance grant
this.greatestTimeAdvanceGrant = time;
this.greatestTimeAdvanceGrant = tag;
this._requestImmediateInvocationOfNext();
}
});
Expand Down

0 comments on commit 81edfb2

Please sign in to comment.