Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update references to deprecated APIs in the C and Python examples #2

Merged
merged 2 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions C/src/Deadline.lf
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ reactor Sensor {
break;
}
}
schedule(response, 0);
lf_schedule(response, 0);
if (c == EOF) {
break;
}
Expand All @@ -45,8 +45,8 @@ reactor Sensor {
=}

reaction(response) -> y {=
printf("Reacting to physical action at %lld\n", get_elapsed_logical_time());
SET(y, true);
printf("Reacting to physical action at %lld\n", lf_time_logical_elapsed());
lf_set(y, true);
=}
}

Expand All @@ -62,21 +62,21 @@ reactor Analysis {
printf("Skipping work!\n");
}
self->do_work = !self->do_work;
SET(y, true);
lf_set(y, true);
=}
}

reactor Actuator {
input x:bool;
reaction(x) {=
instant_t l = get_elapsed_logical_time();
instant_t p = get_elapsed_physical_time();
instant_t l = lf_time_logical_elapsed();
instant_t p = lf_time_physical_elapsed();
printf("Actuating... Logical time: %lld "
"Physical time: %lld Lag: %lld\n",
l, p, p-l);
=} deadline(500 usecs) {=
instant_t d = get_elapsed_physical_time()
- get_elapsed_logical_time();
instant_t d = lf_time_physical_elapsed()
- lf_time_logical_elapsed();
printf("Deadline missed! Lag: %lld "
"(too late by %lld nsecs)\n",
d, d-500000);
Expand Down
10 changes: 5 additions & 5 deletions C/src/Delay.lf
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ reactor Ramp {
output y:int;
state count:int(0);
reaction(t) -> y {=
SET(y, self->count);
lf_set(y, self->count);
self->count++;
=}
}
Expand All @@ -41,10 +41,10 @@ reactor Delay2 {
input x:int;
output y:int;
reaction(a) -> y {=
SET(y, a->value);
lf_set(y, a->value);
=}
reaction(x) -> a {=
schedule_int(a, 0, x->value);
lf_schedule_int(a, 0, x->value);
=}
}

Expand All @@ -56,7 +56,7 @@ reactor Print {
reaction(x) {=
printf("Logical time: %lld, Physical time %lld"
", Value: %d\n",
get_elapsed_logical_time(),
get_elapsed_physical_time(), x->value);
lf_time_logical_elapsed(),
lf_time_physical_elapsed(), x->value);
=}
}
18 changes: 9 additions & 9 deletions C/src/DistributedDatabase/ReplicatedDatabase.lf
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,25 @@ reactor Server(
output update:int;
state queries_outstanding:int(0);
reaction(query_trigger) -> query {=
SET(query, true);
lf_set(query, true);
self->queries_outstanding++;
=}

reaction(update_trigger) -> update {=
SET(update, self->update_amount);
lf_set(update, self->update_amount);
=} deadline(update_deadline) {=
error_print("At tag (%lld, %u), deadline missed at database \"%s\". Rejecting update.\n"
" Elapsed physical time is %lld.",
get_logical_time() - start_time,
lf_time_logical() - start_time,
get_microstep(),
self->name,
get_elapsed_physical_time()
lf_time_physical_elapsed()
);
=}

reaction(reply) {=
info_print("***** At tag (%lld, %u), server \"%s\" reports balance: %d.",
get_elapsed_logical_time(), get_microstep(), self->server_name, reply->value
lf_time_logical_elapsed(), get_microstep(), self->server_name, reply->value
);
self->queries_outstanding--;
=}
Expand Down Expand Up @@ -173,10 +173,10 @@ reactor Database(
}
info_print("At tag (%lld, %u), database \"%s\" updated balance to %d.\n"
" Elapsed physical time is %lld.",
get_logical_time() - start_time, get_microstep(),
lf_time_logical() - start_time, get_microstep(),
self->name,
self->record,
get_elapsed_physical_time()
lf_time_physical_elapsed()
);
=} STP (4 msec) {=
#ifdef FEDERATED_DECENTRALIZED
Expand All @@ -192,7 +192,7 @@ reactor Database(
remote_update[i]->value,
remote_update[i]->intended_tag.time - start_time,
remote_update[i]->intended_tag.microstep,
get_elapsed_physical_time()
lf_time_physical_elapsed()
);
self->record += remote_update[i]->value;
}
Expand All @@ -204,7 +204,7 @@ reactor Database(
=}

reaction(query) -> balance {=
SET(balance, self->record);
lf_set(balance, self->record);
=}
}

Expand Down
4 changes: 2 additions & 2 deletions C/src/DistributedHelloWorld/HelloWorld.lf
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ reactor MessageGenerator(prefix:string("")) {
// Populate the output string and increment the count.
snprintf(message->value, length, "%s %d", self->prefix, self->count++);

tag_t tag = get_current_tag();
tag_t tag = lf_tag();
info_print("At (elapsed) logical tag (%lld, %u), source sends message: %s",
tag.time - start_time, tag.microstep,
message->value
Expand All @@ -66,7 +66,7 @@ reactor MessageGenerator(prefix:string("")) {
reactor PrintMessage {
input message:char*;
reaction(message) {=
tag_t tag = get_current_tag();
tag_t tag = lf_tag();
info_print("At (elapsed) logical tag (%lld, %u), print receives: %s",
tag.time - start_time, tag.microstep,
message->value
Expand Down
2 changes: 1 addition & 1 deletion C/src/DistributedHelloWorld/HelloWorldDecentralized.lf
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ reactor PrintMessageWithDetector(offset:time(10 msec)) extends PrintMessage {
);
=}
reaction(local) {=
tag_t tag = get_current_tag();
tag_t tag = lf_tag();
info_print("Timer triggered at logical tag (%lld, %u).",
tag.time - start_time, tag.microstep
);
Expand Down
22 changes: 11 additions & 11 deletions C/src/DistributedResourceManagement/ResourceManagement.lf
Original file line number Diff line number Diff line change
Expand Up @@ -67,37 +67,37 @@ reactor Client(

reaction(startup, release_trigger) -> release, request_trigger {=
if (release_trigger->is_present) {
tag_t tag = get_current_tag();
tag_t tag = lf_tag();
info_print("%s (ID %d): At tag (%lld, %u), released access.",
self->name,
self->id,
tag.time - start_time, tag.microstep
);
SET(release, true);
lf_set(release, true);
}
schedule(request_trigger, 0);
lf_schedule(request_trigger, 0);
=}

reaction(request_trigger) -> request {=
tag_t tag = get_current_tag();
tag_t tag = lf_tag();
info_print("%s (ID %d): At tag (%lld, %u), requesting access.",
self->name,
self->id,
tag.time - start_time, tag.microstep
);
self->requests_outstanding++;
SET(request, true);
lf_set(request, true);
=}

reaction (grant) -> release_trigger {=
tag_t tag = get_current_tag();
tag_t tag = lf_tag();
info_print("%s (ID %d): At tag (%lld, %u), granted access.",
self->name,
self->id,
tag.time - start_time, tag.microstep
);
self->requests_outstanding--;
schedule(release_trigger, 0);
lf_schedule(release_trigger, 0);
=}

reaction(shutdown) {=
Expand Down Expand Up @@ -247,7 +247,7 @@ reactor ResourceManager(
// Just forward the request. The determination of whether
// the request can be granted cannot be made until we have received
// any remote requests with the same tag.
SET(request, self->id);
lf_set(request, self->id);
=}

reaction(local_release) -> release {=
Expand All @@ -259,7 +259,7 @@ reactor ResourceManager(
);
} else {
pop(&self->queue, NULL);
SET(release, self->id);
lf_set(release, self->id);
}
=}

Expand All @@ -280,7 +280,7 @@ reactor ResourceManager(
&& self->queue.queue[self->queue.head] == self->id
) {
// Next request on the queue is me!
SET(local_grant, true);
lf_set(local_grant, true);
}
// Since there shouldn't be more than one request, return now.
return;
Expand Down Expand Up @@ -358,7 +358,7 @@ reactor ResourceManager(
// matches the local ID, grant the request.
if (local_request->is_present && self->queue.queue[self->queue.head] == self->id) {
// Grant request.
SET(local_grant, true);
lf_set(local_grant, true);
}
=}

Expand Down
4 changes: 2 additions & 2 deletions C/src/MQTT/MQTTDistributed.lf
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ reactor MessageGenerator(root:string("")) {
// Populate the output string and increment the count.
snprintf(message->value, length, "%s %d", self->root, self->count++);
info_print("MessageGenerator: At time %lld, publish message: %s",
get_elapsed_logical_time(),
lf_time_logical_elapsed(),
message->value
);
=}
Expand All @@ -112,7 +112,7 @@ reactor PrintMessage {
input message:char*;
reaction(message) {=
info_print("PrintMessage: At (elapsed) logical time %lld, subscriber receives: %s",
get_elapsed_logical_time(),
lf_time_logical_elapsed(),
message->value
);
=}
Expand Down
4 changes: 2 additions & 2 deletions C/src/MQTT/MQTTPhysical.lf
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ reactor MessageGenerator(root:string("")) {
// Populate the output string and increment the count.
snprintf(message->value, length, "%s %d", self->root, self->count++);
printf("MessageGenerator: At time %lld, publish message: %s\n",
get_elapsed_logical_time(),
lf_time_logical_elapsed(),
message->value
);
=}
Expand All @@ -109,7 +109,7 @@ reactor PrintMessage {
input message:char*;
reaction(message) {=
printf("PrintMessage: At (elapsed) time %lld, subscriber receives: %s\n",
get_elapsed_logical_time(),
lf_time_logical_elapsed(),
message->value
);
=}
Expand Down
4 changes: 2 additions & 2 deletions C/src/MQTT/MQTTPublisher.lf
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ reactor MQTTPublisher (

// Append the current timestamp to the message.
// This is always last, after the physical timestamp if it is included.
encode_int64(get_logical_time(),
encode_int64(lf_time_logical(),
(unsigned char*)(self->inflight.message + length - sizeof(instant_t))
);
// printf("DEBUG: Timestamp of sending message: %lld.\n", *timestamp);
Expand All @@ -161,7 +161,7 @@ reactor MQTTPublisher (
// As close as possible to the publishing of the message, insert
// the physical timestamp if it has been requested.
if (self->include_physical_timestamp) {
encode_int64(get_physical_time(),
encode_int64(lf_time_physical(),
(unsigned char*)(self->inflight.message + length - 2 * sizeof(instant_t))
);
}
Expand Down
16 changes: 8 additions & 8 deletions C/src/MQTT/MQTTSubscriber.lf
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ reactor MQTTSubscriber (
int topicLen,
MQTTClient_message *message
) {
instant_t receive_physical_time = get_physical_time();
instant_t receive_physical_time = lf_time_physical();
// If a physical timestamp was sent, report the transport time.
size_t string_length = strlen((char*)message->payload); // Assumes null-terminated string.
if (message->payloadlen == string_length + 1 + 2*sizeof(instant_t)) {
Expand All @@ -103,7 +103,7 @@ reactor MQTTSubscriber (
lf_mutex_lock(&mutex);

instant_t timestamp = extract_int64((unsigned char*)message->payload + message->payloadlen - sizeof(instant_t));
interval_t delay = timestamp - get_logical_time();
interval_t delay = timestamp - lf_time_logical();
// printf("DEBUG: MQTTSubscriber.message_arrived: received timestamp that is %lld ahead of current_time %lld.\n", *timestamp - start_time, current_time);
// printf("DEBUG: MQTTSubscriber.message_arrived: physical time is ahead of current logical time by: %lld.\n", receive_physical_time - current_time);

Expand All @@ -115,14 +115,14 @@ reactor MQTTSubscriber (
// exactly the logical time at the publisher plus the offset.
// Otherwise, it will be scheduled at the current physical time.
// The incoming message is in dynamically allocated memory.
// We copy the message using schedule_copy() because, unfortunately, Paho MQTT uses its own
// We copy the message using lf_schedule_copy() because, unfortunately, Paho MQTT uses its own
// version of malloc() and free() (defined in Heap.h and Heap.c).
// We could modify Paho MQTT to use the generic malloc() and free(),
// and then we could use schedule_value() to avoid the copy.
// and then we could use lf_schedule_value() to avoid the copy.
// Note that the last 8 bytes of the message are the sender's timestamp.
// We include that in the copy so that the reaction to the physical action
// can measure the latency.
schedule_copy(incoming_message, delay, (char*)message->payload, message->payloadlen);
lf_schedule_copy(incoming_message, delay, (char*)message->payload, message->payloadlen);

lf_mutex_unlock(&mutex);

Expand Down Expand Up @@ -208,7 +208,7 @@ reactor MQTTSubscriber (
// The incoming_message action contains a token that we can just forward.
// The allocated memory will be freed when the token's reference count hits 0.
// Note that this token will still contain the sender's timestamp.
SET_TOKEN(message, incoming_message->token);
lf_set_token(message, incoming_message->token);

// Get the sender's timestamp.
instant_t* timestamp = (instant_t*)(
Expand All @@ -221,14 +221,14 @@ reactor MQTTSubscriber (
// and its receipt here, offset by the clock synchronization error,
// assuming that the sender sent the message at a physical time matching its
// logical timestamp.
interval_t latency = get_logical_time() - *timestamp;
interval_t latency = lf_time_logical() - *timestamp;
LOG_PRINT("MQTTSubscriber.reaction: Received timestamp is larger than sent timestamp by: %lld.", latency);

// If a physical timestamp was sent, use that to collect
// latency stats instead of the logical time increment.
size_t string_length = strlen(incoming_message->value); // Assumes null-terminated string.
if (incoming_message->token->length == string_length + 1 + 2*sizeof(instant_t)) {
instant_t receive_physical_time = get_physical_time();
instant_t receive_physical_time = lf_time_physical();
instant_t physical_timestamp = extract_int64((unsigned char*)(incoming_message->value + string_length + 1));
latency = receive_physical_time - physical_timestamp;
// printf("DEBUG: MQTTReceiver.reaction: Reacted to message after measured latency of %lld nsec (assuming synchronized clocks).\n", latency);
Expand Down
4 changes: 2 additions & 2 deletions C/src/Parallelism/ForkJoin.lf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ reactor Source {
output out:int;
state s:int(0);
reaction(t) -> out {=
SET(out, self->s);
lf_set(out, self->s);
self->s++;
=}
}
Expand All @@ -31,7 +31,7 @@ reactor TakeTime {
for (int i = 0; i < 100000000; i++) {
offset++;
}
SET(out, in->value + offset);
lf_set(out, in->value + offset);
=}
}
reactor Destination(width:int(4)) {
Expand Down
Loading