Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More client work #159

Merged
merged 3 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ To build shared library for development use:
Note, this is not `compile:dev` because debug-mode in Rust has
[an issue](https://github.com/rust-lang/rust/issues/34283) that causes runtime stack size problems.

To build and test release:
To lint, build, and test release:

bundle exec rake

Expand Down
6 changes: 5 additions & 1 deletion temporalio/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Layout/ClassStructure:

# The default is too small and triggers simply setting lots of values on a proto
Metrics/AbcSize:
Max: 75
Max: 200

# The default is too small
Metrics/BlockLength:
Expand All @@ -44,6 +44,10 @@ Metrics/CyclomaticComplexity:
Metrics/MethodLength:
Max: 100

# The default is too small
Metrics/ModuleLength:
Max: 1000

# The default is too small
Metrics/PerceivedComplexity:
Max: 25
Expand Down
15 changes: 13 additions & 2 deletions temporalio/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,18 @@ Steep::RakeTask.new

require 'yard'

YARD::Rake::YardocTask.new
module CustomizeYardWarnings # rubocop:disable Style/Documentation
def process
super
rescue YARD::Parser::UndocumentableError
# We ignore if it's an API warning
raise unless statement.last.file.start_with?('lib/temporalio/api/')
end
end

YARD::Handlers::Ruby::ConstantHandler.prepend(CustomizeYardWarnings)

YARD::Rake::YardocTask.new { |t| t.options = ['--fail-on-warning'] }

require 'fileutils'
require 'google/protobuf'
Expand Down Expand Up @@ -321,4 +332,4 @@ Rake::Task[:build].enhance([:copy_parent_files]) do
rm ['LICENSE', 'README.md']
end

task default: ['rubocop', 'compile', 'rbs:install_collection', 'steep', 'test']
task default: ['rubocop', 'yard', 'compile', 'rbs:install_collection', 'steep', 'test']
177 changes: 75 additions & 102 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

require 'google/protobuf/well_known_types'
require 'temporalio/api'
require 'temporalio/client/async_activity_handle'
require 'temporalio/client/connection'
require 'temporalio/client/implementation'
require 'temporalio/client/interceptor'
require 'temporalio/client/workflow_execution'
require 'temporalio/client/workflow_execution_count'
require 'temporalio/client/workflow_handle'
require 'temporalio/client/workflow_query_reject_condition'
require 'temporalio/common_enums'
require 'temporalio/converters'
require 'temporalio/error'
require 'temporalio/internal/proto_utils'
require 'temporalio/retry_policy'
require 'temporalio/runtime'
require 'temporalio/search_attributes'

Expand Down Expand Up @@ -45,10 +50,10 @@ class Client
# TLS options are present, those TLS options will be used.
# @param data_converter [Converters::DataConverter] Data converter to use for all data conversions to/from payloads.
# @param interceptors [Array<Interceptor>] Set of interceptors that are chained together to allow intercepting of
# client calls. The earlier interceptors wrap the later ones. Any interceptors that also implement
# {Worker::Interceptor} will be used as worker interceptors too so they should not be given separately when
# creating a worker.
# @param default_workflow_query_reject_condition [Api::Enums::V1::QueryRejectCondition, nil] Default rejection
# client calls. The earlier interceptors wrap the later ones. Any interceptors that also implement worker
# interceptor will be used as worker interceptors too so they should not be given separately when creating a
# worker.
# @param default_workflow_query_reject_condition [WorkflowQueryRejectCondition, nil] Default rejection
# condition for workflow queries if not set during query. See {WorkflowHandle.query} for details on the
# rejection condition.
# @param rpc_metadata [Hash<String, String>] Headers to use for all calls to the server. Keys here can be overriden
Expand Down Expand Up @@ -116,11 +121,10 @@ def self.connect(
# @param interceptors [Array<Interceptor>] Set of interceptors that are chained together to allow intercepting of
# client calls. The earlier interceptors wrap the later ones.
#
# Any interceptors that also implement {Worker::Interceptor} will be used as worker interceptors too so they
# should not be given separately when creating a worker.
# @param default_workflow_query_reject_condition [Api::Enums::V1::QueryRejectCondition, nil] Default rejection
# condition for workflow queries if not set during query. See {WorkflowHandle.query} for details on the
# rejection condition.
# Any interceptors that also implement worker interceptor will be used as worker interceptors too so they should
# not be given separately when creating a worker.
# @param default_workflow_query_reject_condition [WorkflowQueryRejectCondition, nil] Default rejection condition for
# workflow queries if not set during query. See {WorkflowHandle.query} for details on the rejection condition.
#
# @see connect
def initialize(
Expand Down Expand Up @@ -218,7 +222,7 @@ def start_workflow(
@impl.start_workflow(Interceptor::StartWorkflowInput.new(
workflow:,
args:,
id:,
workflow_id: id,
task_queue:,
execution_timeout:,
run_timeout:,
Expand Down Expand Up @@ -264,7 +268,7 @@ def start_workflow(
#
# @return [Object] Successful result of the workflow.
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists.
# @raise [Error::WorkflowFailureError] Workflow failed with {Error::WorkflowFailureError.cause} as cause.
# @raise [Error::WorkflowFailureError] Workflow failed with +cause+ as the cause.
# @raise [Error::RPCError] RPC error from call.
def execute_workflow(
workflow,
Expand Down Expand Up @@ -320,103 +324,72 @@ def workflow_handle(
run_id: nil,
first_execution_run_id: nil
)
WorkflowHandle.new(self, workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:)
WorkflowHandle.new(client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:)
end

# @!visibility private
def _impl
@impl
# List workflows.
#
# @param query [String, nil] A Temporal visibility list filter.
# @param rpc_metadata [Hash<String, String>, nil] Headers to include on the RPC call.
# @param rpc_timeout [Float, nil] Number of seconds before timeout.
#
# @return [Enumerator<WorkflowExecution>] Enumerable workflow executions.
#
# @raise [Error::RPCError] RPC error from call.
#
# @see https://docs.temporal.io/visibility
def list_workflows(
query = nil,
rpc_metadata: nil,
rpc_timeout: nil
)
@impl.list_workflows(Interceptor::ListWorkflowsInput.new(
query:,
rpc_metadata:,
rpc_timeout:
))
end

# @!visibility private
class Implementation < Interceptor::Outbound
def initialize(client)
super(nil)
@client = client
end

# @!visibility private
def start_workflow(input)
# TODO(cretz): Signal/update with start
req = Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
request_id: SecureRandom.uuid,
namespace: @client.namespace,
workflow_type: Api::Common::V1::WorkflowType.new(name: input.workflow.to_s),
workflow_id: input.id,
task_queue: Api::TaskQueue::V1::TaskQueue.new(name: input.task_queue.to_s),
input: @client.data_converter.to_payloads(input.args),
workflow_execution_timeout: Internal::ProtoUtils.seconds_to_duration(input.execution_timeout),
workflow_run_timeout: Internal::ProtoUtils.seconds_to_duration(input.run_timeout),
workflow_task_timeout: Internal::ProtoUtils.seconds_to_duration(input.task_timeout),
identity: @client.connection.identity,
workflow_id_reuse_policy: input.id_reuse_policy,
workflow_id_conflict_policy: input.id_conflict_policy,
retry_policy: input.retry_policy&.to_proto,
cron_schedule: input.cron_schedule,
memo: Internal::ProtoUtils.memo_to_proto(input.memo, @client.data_converter),
search_attributes: input.search_attributes&.to_proto,
workflow_start_delay: Internal::ProtoUtils.seconds_to_duration(input.start_delay),
request_eager_execution: input.request_eager_start,
header: input.headers
)

# Send request
begin
resp = @client.workflow_service.start_workflow_execution(
req,
rpc_retry: true,
rpc_metadata: input.rpc_metadata,
rpc_timeout: input.rpc_timeout
)
rescue Error::RPCError => e
# Unpack and raise already started if that's the error, otherwise default raise
if e.code == Error::RPCError::Code::ALREADY_EXISTS && e.grpc_status.details.first
details = e.grpc_status.details.first.unpack(Api::ErrorDetails::V1::WorkflowExecutionAlreadyStartedFailure)
if details
raise Error::WorkflowAlreadyStartedError.new(
workflow_id: req.workflow_id,
workflow_type: req.workflow_type.name,
run_id: details.run_id
)
end
end
raise
end
# Count workflows.
#
# @param query [String, nil] A Temporal visibility list filter.
# @param rpc_metadata [Hash<String, String>, nil] Headers to include on the RPC call.
# @param rpc_timeout [Float, nil] Number of seconds before timeout.
#
# @return [WorkflowExecutionCount] Count of workflows.
#
# @raise [Error::RPCError] RPC error from call.
#
# @see https://docs.temporal.io/visibility
def count_workflows(
query = nil,
rpc_metadata: nil,
rpc_timeout: nil
)
@impl.count_workflows(Interceptor::CountWorkflowsInput.new(
query:,
rpc_metadata:,
rpc_timeout:
))
end

# Return handle
WorkflowHandle.new(
@client,
input.id,
result_run_id: resp.run_id,
first_execution_run_id: resp.run_id
)
# Get an async activity handle.
#
# @param task_token_or_id_reference [String, ActivityIDReference] Task token string or activity ID reference.
# @return [AsyncActivityHandle]
def async_activity_handle(task_token_or_id_reference)
if task_token_or_id_reference.is_a?(ActivityIDReference)
AsyncActivityHandle.new(client: self, task_token: nil, id_reference: task_token_or_id_reference)
elsif task_token_or_id_reference.is_a?(String)
AsyncActivityHandle.new(client: self, task_token: task_token_or_id_reference, id_reference: nil)
else
raise ArgumentError, 'Must be a string task token or an ActivityIDReference'
end
end

# @!visibility private
def fetch_workflow_history_event_page(input)
req = Api::WorkflowService::V1::GetWorkflowExecutionHistoryRequest.new(
namespace: @client.namespace,
execution: Api::Common::V1::WorkflowExecution.new(
workflow_id: input.id,
run_id: input.run_id || ''
),
maximum_page_size: input.page_size || 0,
next_page_token: input.next_page_token,
wait_new_event: input.wait_new_event,
history_event_filter_type: input.event_filter_type,
skip_archival: input.skip_archival
)
resp = @client.workflow_service.get_workflow_execution_history(
req,
rpc_retry: true,
rpc_metadata: input.rpc_metadata,
rpc_timeout: input.rpc_timeout
)
Interceptor::FetchWorkflowHistoryEventPage.new(
events: resp.history&.events || [],
next_page_token: resp.next_page_token.empty? ? nil : resp.next_page_token
)
end
# @!visibility private
def _impl
@impl
end
end
end
32 changes: 32 additions & 0 deletions temporalio/lib/temporalio/client/activity_id_reference.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

require 'temporalio/api'
require 'temporalio/client/interceptor'
require 'temporalio/error'

module Temporalio
class Client
# Reference to an existing activity by its workflow ID, run ID, and activity ID.
class ActivityIDReference
# @return [String] ID for the workflow.
attr_reader :workflow_id

# @return [String, nil] Run ID for the workflow.
attr_reader :run_id

# @return [String] ID for the activity.
attr_reader :activity_id

# Create an activity ID reference.
#
# @param workflow_id [String] ID for the workflow.
# @param run_id [String, nil] Run ID for the workflow.
# @param activity_id [String] ID for the workflow.
def initialize(workflow_id:, run_id:, activity_id:)
@workflow_id = workflow_id
@run_id = run_id
@activity_id = activity_id
end
end
end
end
Loading
Loading