Skip to content

Commit

Permalink
Support for page refreshes and broadcasting (#499)
Browse files Browse the repository at this point in the history
* Support for page refreshes and broadcasting

This PR is the Rails companion for the Turbo changes to add page
refreshes.

```ruby
turbo_refreshes_with scroll method: :morph, scroll: :preserve
```

This adds new Active Record helpers to broadcast page refreshes from
models:

```ruby
class Board
  broadcast_refreshes
end
```

This works great in hierarchical structures, where child record touch
parent records automatically to invalidate cache:

```ruby
class Column
  belongs_to :board, touch: true # +Board+ will trigger a page refresh
  on column changes
end
```

You can also specify the streamable declaratively:

```ruby
class Column
  belongs_to :board
  broadcast_refreshes_to :board
end
```

There are also instance-level companion methods to broadcast page
refreshes:

- `broadcast_refresh_later`
- `broadcast_refresh_later_to(*streamables)`

This PR introduces a new mechanism to suppress broadcasting of turbo
treams for arbitrary blocks of code:

```ruby
Recording.suppressing_turbo_broadcasts do
...
end
```

When broadcasting page refreshes, the system will automatically debounce
multiple calls in a row to only broadcast the last one. This is meant
for scenarios where you process records in mass. Because of the nature
of such signals, it makes no sense to broadcast them repeatedly and
individually.

* Upgrade terser and its rollup plugin to support private method's syntax

* Build latest turbo version from the page-refreshes branch in hotwired/turbo

* Update with last turbo build

* Debounce jobs to broadcast page refresh signals

A page refresh stream signals the need to reload the page. It's a global action
that makes sense to aggregate when multiple signals are generated in a short period
of time for a given streamable.

This implementation is based on creating a thread-level debouncer associated to
the set of streamables. The debouncer is implemented using concurrent-ruby's
scheduled tasks.

* Build latest turbo version from the page-refreshes branch in hotwired/turbo

* Update with last changes

* Commit js changes

* Build with latest turbo master

* Make wait noop if scheduled_task is nil

This can happen when the Debouncer has finished its work and we clear it
from the current thread. The next call to refresh_debouncer_for creates
a new Debouncer, but it doesn't have a scheduled_task yet.

We only use the wait method in tests, to ensure that the debounces has
finished its work. But if the scheduled_task is nil, we know that the
debouncer has already finished its work.

* Build with latest turbo master

* Build with latest turbo/main

* Don't include JS changes in this branch

* Use released Turbo version

---------

Co-authored-by: Jorge Manrubia <jorge.manrubia@gmail.com>
Co-authored-by: Jorge Manrubia <jorge@hey.com>
  • Loading branch information
3 people authored Nov 23, 2023
1 parent e44b6a9 commit 7800f38
Show file tree
Hide file tree
Showing 25 changed files with 744 additions and 129 deletions.
59 changes: 43 additions & 16 deletions app/assets/javascripts/turbo.js
Original file line number Diff line number Diff line change
Expand Up @@ -4060,14 +4060,14 @@ var turbo_es2017Esm = Object.freeze({
let consumer;

async function getConsumer() {
return consumer || setConsumer(createConsumer().then(setConsumer));
return consumer || setConsumer(createConsumer$1().then(setConsumer));
}

function setConsumer(newConsumer) {
return consumer = newConsumer;
}

async function createConsumer() {
async function createConsumer$1() {
const {createConsumer: createConsumer} = await Promise.resolve().then((function() {
return index;
}));
Expand All @@ -4083,7 +4083,7 @@ var cable = Object.freeze({
__proto__: null,
getConsumer: getConsumer,
setConsumer: setConsumer,
createConsumer: createConsumer,
createConsumer: createConsumer$1,
subscribeTo: subscribeTo
});

Expand Down Expand Up @@ -4309,6 +4309,8 @@ ConnectionMonitor.staleThreshold = 6;

ConnectionMonitor.reconnectionBackoffRate = .15;

var ConnectionMonitor$1 = ConnectionMonitor;

var INTERNAL = {
message_types: {
welcome: "welcome",
Expand All @@ -4320,7 +4322,8 @@ var INTERNAL = {
disconnect_reasons: {
unauthorized: "unauthorized",
invalid_request: "invalid_request",
server_restart: "server_restart"
server_restart: "server_restart",
remote: "remote"
},
default_mount_path: "/cable",
protocols: [ "actioncable-v1-json", "actioncable-unsupported" ]
Expand All @@ -4337,7 +4340,7 @@ class Connection {
this.open = this.open.bind(this);
this.consumer = consumer;
this.subscriptions = this.consumer.subscriptions;
this.monitor = new ConnectionMonitor(this);
this.monitor = new ConnectionMonitor$1(this);
this.disconnected = true;
}
send(data) {
Expand All @@ -4353,11 +4356,12 @@ class Connection {
logger.log(`Attempted to open WebSocket, but existing socket is ${this.getState()}`);
return false;
} else {
logger.log(`Opening WebSocket, current state is ${this.getState()}, subprotocols: ${protocols}`);
const socketProtocols = [ ...protocols, ...this.consumer.subprotocols || [] ];
logger.log(`Opening WebSocket, current state is ${this.getState()}, subprotocols: ${socketProtocols}`);
if (this.webSocket) {
this.uninstallEventHandlers();
}
this.webSocket = new adapters.WebSocket(this.consumer.url, protocols);
this.webSocket = new adapters.WebSocket(this.consumer.url, socketProtocols);
this.installEventHandlers();
this.monitor.start();
return true;
Expand All @@ -4369,7 +4373,7 @@ class Connection {
if (!allowReconnect) {
this.monitor.stop();
}
if (this.isActive()) {
if (this.isOpen()) {
return this.webSocket.close();
}
}
Expand Down Expand Up @@ -4399,6 +4403,9 @@ class Connection {
isActive() {
return this.isState("open", "connecting");
}
triedToReconnect() {
return this.monitor.reconnectAttempts > 0;
}
isProtocolSupported() {
return indexOf.call(supportedProtocols, this.getProtocol()) >= 0;
}
Expand Down Expand Up @@ -4438,6 +4445,9 @@ Connection.prototype.events = {
const {identifier: identifier, message: message, reason: reason, reconnect: reconnect, type: type} = JSON.parse(event.data);
switch (type) {
case message_types.welcome:
if (this.triedToReconnect()) {
this.reconnectAttempted = true;
}
this.monitor.recordConnect();
return this.subscriptions.reload();

Expand All @@ -4452,7 +4462,16 @@ Connection.prototype.events = {

case message_types.confirmation:
this.subscriptions.confirmSubscription(identifier);
return this.subscriptions.notify(identifier, "connected");
if (this.reconnectAttempted) {
this.reconnectAttempted = false;
return this.subscriptions.notify(identifier, "connected", {
reconnected: true
});
} else {
return this.subscriptions.notify(identifier, "connected", {
reconnected: false
});
}

case message_types.rejection:
return this.subscriptions.reject(identifier);
Expand Down Expand Up @@ -4487,6 +4506,8 @@ Connection.prototype.events = {
}
};

var Connection$1 = Connection;

const extend = function(object, properties) {
if (properties != null) {
for (let key in properties) {
Expand Down Expand Up @@ -4556,10 +4577,12 @@ class SubscriptionGuarantor {
}
}

var SubscriptionGuarantor$1 = SubscriptionGuarantor;

class Subscriptions {
constructor(consumer) {
this.consumer = consumer;
this.guarantor = new SubscriptionGuarantor(this);
this.guarantor = new SubscriptionGuarantor$1(this);
this.subscriptions = [];
}
create(channelName, mixin) {
Expand Down Expand Up @@ -4636,7 +4659,8 @@ class Consumer {
constructor(url) {
this._url = url;
this.subscriptions = new Subscriptions(this);
this.connection = new Connection(this);
this.connection = new Connection$1(this);
this.subprotocols = [];
}
get url() {
return createWebSocketURL(this._url);
Expand All @@ -4657,6 +4681,9 @@ class Consumer {
return this.connection.open();
}
}
addSubProtocol(subprotocol) {
this.subprotocols = [ ...this.subprotocols, subprotocol ];
}
}

function createWebSocketURL(url) {
Expand All @@ -4674,7 +4701,7 @@ function createWebSocketURL(url) {
}
}

function createConsumer$1(url = getConfig("url") || INTERNAL.default_mount_path) {
function createConsumer(url = getConfig("url") || INTERNAL.default_mount_path) {
return new Consumer(url);
}

Expand All @@ -4687,17 +4714,17 @@ function getConfig(name) {

var index = Object.freeze({
__proto__: null,
Connection: Connection,
ConnectionMonitor: ConnectionMonitor,
Connection: Connection$1,
ConnectionMonitor: ConnectionMonitor$1,
Consumer: Consumer,
INTERNAL: INTERNAL,
Subscription: Subscription,
Subscriptions: Subscriptions,
SubscriptionGuarantor: SubscriptionGuarantor,
SubscriptionGuarantor: SubscriptionGuarantor$1,
adapters: adapters,
createWebSocketURL: createWebSocketURL,
logger: logger,
createConsumer: createConsumer$1,
createConsumer: createConsumer,
getConfig: getConfig
});

Expand Down
6 changes: 3 additions & 3 deletions app/assets/javascripts/turbo.min.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion app/assets/javascripts/turbo.min.js.map

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions app/channels/turbo/streams/broadcasts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def broadcast_prepend_to(*streamables, **opts)
broadcast_action_to(*streamables, action: :prepend, **opts)
end

def broadcast_refresh_to(*streamables, **opts)
broadcast_stream_to(*streamables, content: turbo_stream_refresh_tag)
end

def broadcast_action_to(*streamables, action:, target: nil, targets: nil, attributes: {}, **rendering)
broadcast_stream_to(*streamables, content: turbo_stream_action_tag(action, target: target, targets: targets, template:
rendering.delete(:content) || rendering.delete(:html) || (rendering[:render] != false && rendering.any? ? render_format(:html, **rendering) : nil),
Expand Down Expand Up @@ -64,6 +68,12 @@ def broadcast_prepend_later_to(*streamables, **opts)
broadcast_action_later_to(*streamables, action: :prepend, **opts)
end

def broadcast_refresh_later_to(*streamables, request_id: Turbo.current_request_id, **opts)
refresh_debouncer_for(*streamables, request_id: request_id).debounce do
Turbo::Streams::BroadcastStreamJob.perform_later stream_name_from(streamables), content: turbo_stream_refresh_tag(request_id: request_id, **opts)
end
end

def broadcast_action_later_to(*streamables, action:, target: nil, targets: nil, attributes: {}, **rendering)
Turbo::Streams::ActionBroadcastJob.perform_later \
stream_name_from(streamables), action: action, target: target, targets: targets, attributes: attributes, **rendering
Expand All @@ -81,6 +91,9 @@ def broadcast_stream_to(*streamables, content:)
ActionCable.server.broadcast stream_name_from(streamables), content
end

def refresh_debouncer_for(*streamables, request_id: nil) # :nodoc:
Turbo::ThreadDebouncer.for("turbo-refresh-debouncer-#{stream_name_from(streamables.including(request_id))}")
end

private
def render_format(format, **rendering)
Expand Down
12 changes: 12 additions & 0 deletions app/controllers/concerns/turbo/request_id_tracking.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module Turbo::RequestIdTracking
extend ActiveSupport::Concern

included do
around_action :turbo_tracking_request_id
end

private
def turbo_tracking_request_id(&block)
Turbo.with_request_id(request.headers["X-Turbo-Request-Id"], &block)
end
end
8 changes: 8 additions & 0 deletions app/helpers/turbo/drive_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ def turbo_exempts_page_from_preview
def turbo_page_requires_reload
provide :head, tag.meta(name: "turbo-visit-control", content: "reload")
end

def turbo_refreshes_with(method: :replace, scroll: :reset)
raise ArgumentError, "Invalid refresh option '#{method}'" unless method.in?(%i[ replace morph ])
raise ArgumentError, "Invalid scroll option '#{scroll}'" unless scroll.in?(%i[ reset preserve ])

provide :head, tag.meta(name: "turbo-refresh-method", content: method)
provide :head, tag.meta(name: "turbo-refresh-scroll", content: scroll)
end
end
6 changes: 5 additions & 1 deletion app/helpers/turbo/streams/action_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module Turbo::Streams::ActionHelper
# # => <turbo-stream action="remove" target="special_message_1"></turbo-stream>
#
def turbo_stream_action_tag(action, target: nil, targets: nil, template: nil, **attributes)
template = action.to_sym == :remove ? "" : tag.template(template.to_s.html_safe)
template = action.to_sym.in?(%i[ remove refresh ]) ? "" : tag.template(template.to_s.html_safe)

if target = convert_to_turbo_stream_dom_id(target)
tag.turbo_stream(template, **attributes, action: action, target: target)
Expand All @@ -35,6 +35,10 @@ def turbo_stream_action_tag(action, target: nil, targets: nil, template: nil, **
end
end

def turbo_stream_refresh_tag(request_id: Turbo.current_request_id, **attributes)
turbo_stream_action_tag(:refresh, **{ "request-id": request_id }.compact, **attributes)
end

private
def convert_to_turbo_stream_dom_id(target, include_selector: false)
if Array(target).any? { |value| value.respond_to?(:to_key) }
Expand Down
7 changes: 7 additions & 0 deletions app/jobs/turbo/streams/broadcast_stream_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class Turbo::Streams::BroadcastStreamJob < ActiveJob::Base
discard_on ActiveJob::DeserializationError

def perform(stream, content:)
Turbo::StreamsChannel.broadcast_stream_to(stream, content: content)
end
end
Loading

0 comments on commit 7800f38

Please sign in to comment.