Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchrophasor #112

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 0 additions & 3 deletions examples/C/src/lib/WebSocketServerString.lf
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ reactor WebSocketServerString(hostport: int = 8080, initial_file: string = {= NU

reaction(startup) {=
self->ws.connected = false;
if (self->initial_file != NULL) {
lf_print("**** Point your browser to http://localhost:%d", self->hostport);
}
=}

reaction(in_dynamic, in_static) -> server.send {=
Expand Down
152 changes: 152 additions & 0 deletions examples/C/src/synchrophasors/Synchrophasors.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
<!DOCTYPE html>
<html>
<head>
<title>Synchrophasors</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
<h1>Synchrophasors</h1>
<p>
Phase measurement units (PMUs) periodically emit a phasor measurement.
In a normally functioning power grid, the phases in nearby PMUs are correlated and vary smoothly.
</p>
<h2>Ideal Readings</h2>
<p>
In the plot below, there are 50 PMUs, one of which is exhibiting a fault (number 42):
</p>
<div style="position: relative; height:400px; width:600px">
<canvas id="cleanPlot" width="400"></canvas>
</div>
<h2>Noisy Readings</h2>
<p>
In the plot below, phasor data experiences random network delays and the plot is updated as data arrives.
This results in the simultaneous display of inconsistent data (data from different times), which masks the fault at PMU 42.
</p>
<div style="height:400px; width:600px">
<canvas id="noisyPlot" width="400"></canvas>
</div>
<p id="message"></p>

<script>
// Number of points to plot.
const n = 50;
const ctx = document.getElementById('cleanPlot');

cleanChart = new Chart(ctx, {
type: 'bar',
data: {
labels: [...Array(n).keys()],
datasets: [{
label: 'real part',
data: new Array(n).fill(0),
borderWidth: 1
}, {
label: 'imaginary part',
data: new Array(n).fill(0),
borderWidth: 1
}]
},
options: {
animation: false,
scales: {
y: {
beginAtZero: true,
min: -100,
max: 100
}
}
}
});

const ctx2 = document.getElementById('noisyPlot');
noisyChart = new Chart(ctx2, {
type: 'bar',
data: {
labels: [...Array(n).keys()],
datasets: [{
label: 'real part',
data: new Array(n).fill(0),
borderWidth: 1
}, {
label: 'imaginary part',
data: new Array(n).fill(0),
borderWidth: 1
}]
},
options: {
animation: false,
scales: {
y: {
beginAtZero: true,
min: -100,
max: 100
}
}
}
});

window.onload = function() {

// Create the web socket connection for the data source.
const cleanSocket = new WebSocket('ws://localhost:8080', 'ws');

cleanSocket.addEventListener('open', (event) => {
console.log('WebSocket connection established for synchrophasor display');
});

cleanSocket.onerror = function(error) {
console.log('Synchrophasor WebSocket Error: ' + error);
};

cleanSocket.addEventListener('message', (event) => {
var message = event.data;
// console.log(`WebSocket message received: ${message}`);

try {
// console.log('"' + message + '"');
var data = JSON.parse(message);
for (let i = 0; i < data.length; i++) {
if (data[i][0] < cleanChart.data.datasets[0].data.length) { // Safety check.
cleanChart.data.datasets[0].data[data[i][0]] = data[i][1][0] // Real part.
cleanChart.data.datasets[1].data[data[i][0]] = data[i][1][1] // Imaginary part.
}
}
cleanChart.update();
} catch(error) {
console.error(error);
}
});

// Create the web socket connection for the data source.
const noisySocket = new WebSocket('ws://localhost:8081', 'ws');

noisySocket.addEventListener('open', (event) => {
console.log('WebSocket connection established for noisy synchrophasor display');
});

noisySocket.onerror = function(error) {
console.log('Synchrophasor WebSocket Error: ' + error);
};

noisySocket.addEventListener('message', (event) => {
var message = event.data;
// console.log(`WebSocket message received: ${message}`);

try {
// console.log('"' + message + '"');
var data = JSON.parse(message);
for (let i = 0; i < data.length; i++) {
if (data[i][0] < noisyChart.data.datasets[0].data.length) { // Safety check.
noisyChart.data.datasets[0].data[data[i][0]] = data[i][1][0] // Real part.
noisyChart.data.datasets[1].data[data[i][0]] = data[i][1][1] // Imaginary part.
}
}
noisyChart.update();
} catch(error) {
console.error(error);
}
});
}
</script>
</body>
</html>
155 changes: 155 additions & 0 deletions examples/C/src/synchrophasors/Synchrophasors.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/**
* The RandomDelay reactor simulates a physical connection with a random delay. The timestamp at the
* receiving end is larger than the sender's timestamp by a random amount given by an exponential
* random variable with
*/
target C {
keepalive: true,
build-type: debug
}

import WebSocketServerString from "../lib/WebSocketServerString.lf"
import Random from "../lib/Random.lf"

preamble {=
#include <math.h>
typedef struct complex_t {
double real;
double imaginary;
} complex_t;
typedef struct timestamped_complex_t {
complex_t phasor;
instant_t timestamp;
} timestamped_complex_t;
=}

main reactor(n: int = 50) {
s = new[n] PhaseMeasurementUnit(drift=0.1, period = 100 ms, faulty_index=42)
t = new[n] RandomDelay(average = 1 s)
clean = new Observer(
n=n,
hostport=8080,
initial_file = {= LF_SOURCE_DIRECTORY LF_FILE_SEPARATOR "Synchrophasors.html" =})
noisy = new Observer(n=n, hostport=8081)
s.phasor -> clean.phasors
s.phasor -> t.in
t.out -> noisy.phasors
}

reactor RandomDelay(average: time = 1 sec, bank_index: int = 0) extends Random {
input in: timestamped_complex_t
output out: timestamped_complex_t
logical action a: timestamped_complex_t

reaction(startup) {=
// Make sure each instance has a different seed.
self->seed = (unsigned int) self->bank_index;
=}

reaction(a) -> out {=
lf_set(out, a->value);
=}

reaction(in) -> a {=
double lambda = SEC(1) / ((double)self->average);
double exp = exponential(lambda);
interval_t delay = (interval_t)llround(exp * (double)SEC(1));
lf_schedule_copy(a, delay, &in->value, 1);
=}
}

reactor PhaseMeasurementUnit(
period: time = 100 ms,
bank_index: int = 0, // identifier
faulty_index: int = -1,
initial_phase: double = 0.0,
// radians per second
drift: double = 0.0) {
output phasor: timestamped_complex_t
timer t(0, period)
state phase: double = initial_phase

reaction(startup) {=
// Give each instance a unique starting phase.
self->phase = 0.1 * self->bank_index;
if (self->bank_index == self->faulty_index) self->phase += 0.2;
=}

reaction(t) -> phasor {=
complex_t reading;
reading.real = cos(self->phase);
reading.imaginary = sin(self->phase);
timestamped_complex_t tc;
tc.phasor = reading;
tc.timestamp = lf_time_logical();
lf_set(phasor, tc);
self->phase += (self->drift * self->period) / SEC(1);
=}
}

/**
* Upon receiving inputs on the `phasors` port, construct a JSON string to convey the update via a
* web socket to the observing web page, if one is connected. The array has the form `[[channel,
* [real, imag]], ...]`, where `channel` is the index of the multiport input providing new data, and
* `real` and `imag` are the real and imaginary parts of the data. The size of the array is the
* number of present inputs.
*
* To avoid overwhelming the web socket communication, this reactor accumulates all inputs that
* arrive during a period of physical time given by the `period` parameter before sending any data
* out over the web socket. This can result in some data points being updated more than once in a
* single message. It will also send out data whenever the local buffer fills up.
*/
reactor Observer(
n: int = 2, // Number of inputs
period: time = 100 ms,
hostport: int = 8080,
initial_file: string = {= NULL =}) {
input[n] phasors: timestamped_complex_t
state connected: bool = false
state json: char* = {= NULL =}
state p: char* = {= NULL =}
state last_sent: time = 0

w = new WebSocketServerString(hostport=hostport, initial_file=initial_file)

reaction(phasors) -> w.in_dynamic {=
if (self->connected) {
for (int n = 0; n < self->n; n++) {
if (phasors[n]->is_present) {
if (self->json == NULL) {
// Construct array big enough to hold the maximum possible size string, which
// occurs when all inputs are present. The fact that this buffer always has the
// same size should help prevent memory fragmentation because malloc will repeatedly
// allocate the same memory once it has been processed and freed by the
// WebSocketServerString reactor.
self->json = (char*)malloc(sizeof(char) * (3 + (20 * self->n)));
self->p = self->json; // pointer to next position to write.
*self->p++ = '[';
} else {
*self->p++ = ',';
}
long int real = lround(phasors[n]->value.phasor.real * 100);
long int imag = lround(phasors[n]->value.phasor.imaginary * 100);
int len = snprintf(self->p, 18, "[%3d,[%4ld,%4ld]]", n, real, imag);
if (len > 0) self->p += len; // Excluding trailing null terminator.

interval_t now = lf_time_physical_elapsed();
if (self->p - self->json > 2 + 20 * (self->n - 1) // Not enough room for another entry
|| now - self->last_sent >= self->period // Enough physical time has elapsed.
) {
self->last_sent = now;
*self->p++ = ']';
*self->p++ = '\0';
lf_set(w.in_dynamic, self->json);
// lf_print("%s", self->json);
self->json = NULL; // Mark to reallocate on next arrival.
}
}
}
}
=}

reaction(w.connected) {=
self->connected = w.connected->value;
=}
}
Loading