Skip to content

Commit

Permalink
RBS work and other things
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Sep 4, 2024
1 parent 515546f commit 74cfa27
Show file tree
Hide file tree
Showing 55 changed files with 1,414 additions and 87 deletions.
34 changes: 31 additions & 3 deletions temporalio/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ namespace :proto do
desc = Google::Protobuf::DescriptorPool.generated_pool.lookup(qualified_service_name)
raise 'Failed finding service descriptor' unless desc

# Open file to generate
# Open file to generate Ruby code
File.open("lib/temporalio/client/connection/#{file_name}.rb", 'w') do |file|
file.puts <<~TEXT
# frozen_string_literal: true
Expand Down Expand Up @@ -189,6 +189,35 @@ namespace :proto do
end
TEXT
end

# Open file to generate RBS code
# TODO(cretz): Improve this when RBS proto is supported
File.open("sig/temporalio/client/connection/#{file_name}.rbs", 'w') do |file|
file.puts <<~TEXT
# Generated code. DO NOT EDIT!
module Temporalio
class Client
class Connection
class #{class_name} < Service
def initialize: (Connection) -> void
TEXT

desc.each do |method|
# Camel case to snake case
rpc = method.name.gsub(/([A-Z])/, '_\1').downcase.delete_prefix('_')
file.puts <<-TEXT
def #{rpc}: (untyped request, ?rpc_retry: bool, ?rpc_metadata: Hash[String, String]?, ?rpc_timeout: Float?) -> untyped
TEXT
end

file.puts <<~TEXT
end
end
end
end
TEXT
end
end

require './lib/temporalio/api/workflowservice/v1/service'
Expand Down Expand Up @@ -293,5 +322,4 @@ Rake::Task[:build].enhance([:copy_parent_files]) do
rm ['LICENSE', 'README.md']
end

# TODO(cretz): Add rbs:install_collection and :steep back when RBS is ready
task default: %i[rubocop compile test]
task default: ['rubocop', 'compile', 'rbs:install_collection', 'steep', 'test']
2 changes: 1 addition & 1 deletion temporalio/Steepfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ target :lib do
library 'uri'

configure_code_diagnostics do |hash|
# TODO(cretz): Fix as more protos are generated
hash[D::Ruby::UnknownConstant] = :information
hash[D::Ruby::NoMethod] = :information
end
end
4 changes: 2 additions & 2 deletions temporalio/ext/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
class.define_singleton_method("async_new", function!(Client::async_new, 2))?;
class.define_method("async_invoke_rpc", method!(Client::async_invoke_rpc, -1))?;

let inner_class = class.define_error("RpcFailure", ruby.get_inner(&ROOT_ERR))?;
let inner_class = class.define_error("RPCFailure", ruby.get_inner(&ROOT_ERR))?;
inner_class.define_method("code", method!(RpcFailure::code, 0))?;
inner_class.define_method("message", method!(RpcFailure::message, 0))?;
inner_class.define_method("details", method!(RpcFailure::details, 0))?;
Expand Down Expand Up @@ -220,7 +220,7 @@ impl Client {

#[derive(DataTypeFunctions, TypedData)]
#[magnus(
class = "Temporalio::Internal::Bridge::Client::RpcFailure",
class = "Temporalio::Internal::Bridge::Client::RPCFailure",
free_immediately
)]
pub struct RpcFailure {
Expand Down
22 changes: 10 additions & 12 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ module Temporalio
# Client for accessing Temporal.
#
# Most users will use {connect} to connect a client. The {workflow_service} method provides access to a raw gRPC
# client. To create another client on the same connection, like for a different namespace, {dup_options} may be
# used to get the options as a struct which can then be altered and splatted as kwargs to the constructor (e.g.
# client. To create another client on the same connection, like for a different namespace, {options} may be used to
# get the options as a struct which can then be dup'd, altered, and splatted as kwargs to the constructor (e.g.
# +Client.new(**my_options.to_h)+).
#
# Clients are thread-safe and are meant to be reused for the life of the application. They are built to work in both
# synchronous and asynchronous contexts. Internally they use callbacks based on {::Queue} which means they are
# Fiber-compatible.
class Client
# Options as returned from {dup_options} for +**to_h+ splat use in {initialize}. See {initialize} for details.
# Options as returned from {options} for +**to_h+ splat use in {initialize}. See {initialize} for details.
Options = Struct.new(
:connection,
:namespace,
Expand Down Expand Up @@ -58,7 +58,7 @@ class Client
# @param identity [String] Identity for this client.
# @param keep_alive [Connection::KeepAliveOptions] Keep-alive options for the client connection. Can be set to +nil+
# to disable.
# @param http_connect_proxy [Connection::HTTPConnectProxyOptions] Options for HTTP CONNECT proxy.
# @param http_connect_proxy [Connection::HTTPConnectProxyOptions, nil] Options for HTTP CONNECT proxy.
# @param runtime [Runtime] Runtime for this client.
# @param lazy_connect [Boolean] If true, the client will not connect until the first call is attempted or a worker
# is created with it. Lazy clients cannot be used for workers if they have not performed a connection.
Expand Down Expand Up @@ -103,8 +103,12 @@ def self.connect(
)
end

# @return [Options] Frozen options for this client which has the same attributes as {initialize}.
attr_reader :options

# Create a client from an existing connection. Most users will prefer {connect} instead. Parameters here match
# {Options} returned from {dup_options} by intention so options can be altered and splatted to create a new client.
# {Options} returned from {options} by intention so options can be dup'd, altered, and splatted to create a new
# client.
#
# @param connection [Connection] Existing connection to create a client from.
# @param namespace [String] Namespace to use for client calls.
Expand Down Expand Up @@ -132,7 +136,7 @@ def initialize(
data_converter:,
interceptors:,
default_workflow_query_reject_condition:
)
).freeze
# Initialize interceptors
@impl = interceptors.reverse_each.reduce(Implementation.new(self)) do |acc, int|
int.intercept_client(acc)
Expand Down Expand Up @@ -164,12 +168,6 @@ def operator_service
connection.operator_service
end

# @return [Options] Shallow duplication of options for potential use in {initialize}. Note, this is shallow, so
# attributes like {Options.interceptors} are not duplicated, but no mutations will apply.
def dup_options
@options.dup
end

# Start a workflow and return its handle.
#
# @param workflow [Workflow, String] Name of the workflow
Expand Down
33 changes: 17 additions & 16 deletions temporalio/lib/temporalio/client/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Client
# Connection to Temporal server that is not namespace specific. Most users will use {Client.connect} instead of this
# directly.
class Connection
# Options as returned from {dup_options} for +**to_h+ splat use in {initialize}. See {initialize} for details.
# Options as returned from {options} for +**to_h+ splat use in {initialize}. See {initialize} for details.
Options = Struct.new(
:target_host,
:api_key,
Expand Down Expand Up @@ -72,7 +72,8 @@ class Connection
:max_retries,
keyword_init: true
) do
def initialize(*, **kwargs)
def initialize(**kwargs)
# @type var kwargs: untyped
kwargs[:initial_interval] = 0.1 unless kwargs.key?(:initial_interval)
kwargs[:randomization_factor] = 0.2 unless kwargs.key?(:randomization_factor)
kwargs[:multiplier] = 1.5 unless kwargs.key?(:multiplier)
Expand All @@ -95,7 +96,8 @@ def initialize(*, **kwargs)
:timeout,
keyword_init: true
) do
def initialize(*, **kwargs)
def initialize(**kwargs)
# @type var kwargs: untyped
kwargs[:interval] = 30.0 unless kwargs.key?(:interval)
kwargs[:timeout] = 15.0 unless kwargs.key?(:timeout)
super
Expand All @@ -117,8 +119,8 @@ def initialize(*, **kwargs)
keyword_init: true
)

# @return [String] Client identity.
attr_reader :identity
# @return [Options] Frozen options for this client which has the same attributes as {initialize}.
attr_reader :options

# @return [WorkflowService] Raw gRPC workflow service.
attr_reader :workflow_service
Expand All @@ -130,7 +132,7 @@ def initialize(*, **kwargs)
attr_reader :cloud_service

# Connect to Temporal server. Most users will use {Client.connect} instead of this directly. Parameters here match
# {Options} returned from {dup_options} by intention so options can be altered and splatted to create a new
# {Options} returned from {options} by intention so options can be dup'd, altered, splatted to create a new
# connection.
#
# @param target_host [String] +host:port+ for the Temporal server. For local development, this is often
Expand All @@ -146,7 +148,7 @@ def initialize(*, **kwargs)
# @param identity [String] Identity for this client.
# @param keep_alive [KeepAliveOptions] Keep-alive options for the client connection. Can be set to +nil+ to
# disable.
# @param http_connect_proxy [HTTPConnectProxyOptions] Options for HTTP CONNECT proxy.
# @param http_connect_proxy [HTTPConnectProxyOptions, nil] Options for HTTP CONNECT proxy.
# @param runtime [Runtime] Runtime for this client.
# @param lazy_connect [Boolean] If true, there is no connection until the first call is attempted or a worker
# is created with it. Clients from lazy connections cannot be used for workers if they have not performed a
Expand Down Expand Up @@ -176,7 +178,7 @@ def initialize(
http_connect_proxy:,
runtime:,
lazy_connect:
)
).freeze
# Create core client now if not lazy
_core_client unless lazy_connect
# Create service instances
Expand All @@ -190,10 +192,9 @@ def target_host
@options.target_host
end

# @return [Options] Shallow duplication of options for potential use in {initialize}. Note, this is shallow, so
# attributes like {Options.rpc_metadata} are not duplicated, but no mutations will apply.
def dup_options
@options.dup
# @return [String] Client identity.
def identity
@options.identity
end

# @return [Boolean] Whether this connection is connected. This is always `true` unless `lazy_connect` option was
Expand Down Expand Up @@ -237,10 +238,10 @@ def new_core_client
if @options.tls
options.tls = if @options.tls.is_a?(TLSOptions)
Internal::Bridge::Client::TLSOptions.new(
client_cert: @options.tls.client_cert,
client_private_key: @options.tls.client_private_key,
server_root_ca_cert: @options.tls.server_root_ca_cert,
domain: @options.tls.domain
client_cert: @options.tls.client_cert, # steep:ignore
client_private_key: @options.tls.client_private_key, # steep:ignore
server_root_ca_cert: @options.tls.server_root_ca_cert, # steep:ignore
domain: @options.tls.domain # steep:ignore
)
else
Internal::Bridge::Client::TLSOptions.new
Expand Down
2 changes: 1 addition & 1 deletion temporalio/lib/temporalio/client/connection/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def invoke_rpc(rpc:, request_class:, response_class:, request:, rpc_retry:, rpc_
rpc_metadata:,
rpc_timeout:
)
rescue Internal::Bridge::Client::RpcFailure => e
rescue Internal::Bridge::Client::RPCFailure => e
raise Error::RPCError.new(e.message, code: e.code, raw_grpc_status: e.details)
end
end
Expand Down
12 changes: 6 additions & 6 deletions temporalio/lib/temporalio/client/workflow_handle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ def result(
when :EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED
attrs = event.workflow_execution_completed_event_attributes
hist_run_id = attrs.new_execution_run_id
next if follow_runs && !hist_run_id.empty?
next if follow_runs && hist_run_id && !hist_run_id.empty?

return @client.data_converter.from_payloads(attrs.result).first
when :EVENT_TYPE_WORKFLOW_EXECUTION_FAILED
attrs = event.workflow_execution_failed_event_attributes
hist_run_id = attrs.new_execution_run_id
next if follow_runs && !hist_run_id.empty?
next if follow_runs && hist_run_id && !hist_run_id.empty?

raise Error::WorkflowFailureError.new(cause: @client.data_converter.from_failure(attrs.failure))
when :EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED
Expand All @@ -120,7 +120,7 @@ def result(
when :EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT
attrs = event.workflow_execution_timed_out_event_attributes
hist_run_id = attrs.new_execution_run_id
next if follow_runs && !hist_run_id.empty?
next if follow_runs && hist_run_id && !hist_run_id.empty?

raise Error::WorkflowFailureError.new(
cause: Error::TimeoutError.new(
Expand All @@ -131,7 +131,7 @@ def result(
when :EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW
attrs = event.workflow_execution_continued_as_new_event_attributes
hist_run_id = attrs.new_execution_run_id
next if follow_runs && !hist_run_id.empty?
next if follow_runs && hist_run_id && !hist_run_id.empty?

# TODO: Use more specific error and decode failure
raise Error::WorkflowContinuedAsNewError.new(new_run_id: attrs.new_execution_run_id)
Expand Down Expand Up @@ -182,8 +182,8 @@ def fetch_history_events_for_run(
wait_new_event:,
event_filter_type:,
skip_archival:,
rpc_metadata: nil,
rpc_timeout: nil
rpc_metadata:,
rpc_timeout:
)
Enumerator.new do |yielder|
input = Interceptor::FetchWorkflowHistoryEventPageInput.new(
Expand Down
18 changes: 10 additions & 8 deletions temporalio/lib/temporalio/converters/failure_converter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@ def initialize(encode_common_attributes: false)

# Convert a Ruby error to a Temporal failure.
#
# @param _error [Exception] Ruby error.
# @param _payload_converter [PayloadConverter] Payload converter.
# @param error [Exception] Ruby error.
# @param payload_converter [PayloadConverter] Payload converter.
# @return [Api::Failure::V1::Failure] Converted failure.
def to_failure(_error, _payload_converter)
raise 'TODO'
def to_failure(error, payload_converter)
# TODO
raise NotImplementedError
end

# Convert a Temporal failure to a Ruby error.
#
# @param _failure [Api::Failure::V1::Failure] Failure.
# @param _payload_converter [PayloadConverter] Payload converter.
# @param failure [Api::Failure::V1::Failure] Failure.
# @param payload_converter [PayloadConverter] Payload converter.
# @return [Exception] Converted Ruby error.
def from_failure(_failure, _payload_converter)
raise 'TODO'
def from_failure(failure, payload_converter)
# TODO
raise NotImplementedError
end
end
end
Expand Down
9 changes: 9 additions & 0 deletions temporalio/lib/temporalio/converters/payload_codec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

module Temporalio
module Converters
class PayloadCodec # rubocop:disable Lint/EmptyClass
# TODO
end
end
end
3 changes: 2 additions & 1 deletion temporalio/lib/temporalio/converters/payload_converter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module Temporalio
module Converters
# Base class for converting Ruby values to/from Temporal payloads.
class PayloadConverter
# @return [FailureConverter] Default payload converter.
# @return [PayloadConverter] Default payload converter.
def self.default
@default ||= new_with_defaults
end
Expand All @@ -20,6 +20,7 @@ def self.default
#
# @param json_parse_options [Hash] Options for {::JSON.parse}.
# @param json_generate_options [Hash] Options for {::JSON.generate}.
# @return [PayloadConverter] Created payload converter.
def self.new_with_defaults(json_parse_options: { create_additions: true }, json_generate_options: {})
Ractor.make_shareable(
PayloadConverter::Composite.new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def encoding
def to_payload(value)
return nil unless value.is_a?(Google::Protobuf::MessageExts)

# @type var value: Google::Protobuf::MessageExts
Api::Common::V1::Payload.new(
metadata: { 'encoding' => ENCODING, 'messageType' => value.class.descriptor.name },
data: value.to_proto
Expand All @@ -29,6 +30,7 @@ def to_payload(value)
# (see Encoding.from_payload)
def from_payload(payload)
type = payload.metadata['messageType']
# @type var desc: untyped
desc = Google::Protobuf::DescriptorPool.generated_pool.lookup(type)
raise "No protobuf message found in global pool for message type #{type}" unless desc

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Composite < PayloadConverter
class ConverterNotFound < Error; end
class EncodingNotSet < Error; end

# @return [Array<Encoding>] Encoding converters processed in order.
# @return [Hash<String, Encoding>] Encoding converters processed in order.
attr_reader :converters

# Create a payload converter with the given encoding converters processed in order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def to_payload(value)
# (see Encoding.from_payload)
def from_payload(payload)
type = payload.metadata['messageType']
# @type var desc: untyped
desc = Google::Protobuf::DescriptorPool.generated_pool.lookup(type)
raise "No protobuf message found in global pool for message type #{type}" unless desc

Expand Down
2 changes: 1 addition & 1 deletion temporalio/lib/temporalio/internal/bridge/runtime.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Runtime
)

# @!visibility private
PrometheusOptions = Struct.new(
PrometheusMetricsOptions = Struct.new(
:bind_address,
:counters_total_suffix,
:unit_suffix,
Expand Down
Loading

0 comments on commit 74cfa27

Please sign in to comment.