-
Notifications
You must be signed in to change notification settings - Fork 420
/
Copy pathpromises.rb
2167 lines (1837 loc) · 68 KB
/
promises.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
require 'concurrent/synchronization'
require 'concurrent/atomic/atomic_boolean'
require 'concurrent/atomic/atomic_fixnum'
require 'concurrent/collection/lock_free_stack'
require 'concurrent/errors'
require 'concurrent/re_include'
module Concurrent
# {include:file:docs-source/promises-main.md}
module Promises
# @!macro promises.param.default_executor
# @param [Executor, :io, :fast] default_executor Instance of an executor or a name of the
# global executor. Default executor propagates to chained futures unless overridden with
# executor parameter or changed with {AbstractEventFuture#with_default_executor}.
#
# @!macro promises.param.executor
# @param [Executor, :io, :fast] executor Instance of an executor or a name of the
# global executor. The task is executed on it, default executor remains unchanged.
#
# @!macro promises.param.args
# @param [Object] args arguments which are passed to the task when it's executed.
# (It might be prepended with other arguments, see the @yeild section).
#
# @!macro promises.shortcut.on
# Shortcut of {#$0_on} with default `:io` executor supplied.
# @see #$0_on
#
# @!macro promises.shortcut.using
# Shortcut of {#$0_using} with default `:io` executor supplied.
# @see #$0_using
#
# @!macro promise.param.task-future
# @yieldreturn will become result of the returned Future.
# Its returned value becomes {Future#value} fulfilling it,
# raised exception becomes {Future#reason} rejecting it.
#
# @!macro promise.param.callback
# @yieldreturn is forgotten.
# Container of all {Future}, {Event} factory methods. They are never constructed directly with
# new.
module FactoryMethods
extend ReInclude
extend self
module Configuration
# @return [Executor, :io, :fast] the executor which is used when none is supplied
# to a factory method. The method can be overridden in the receivers of
# `include FactoryMethod`
def default_executor
:io
end
end
include Configuration
# @!macro promises.shortcut.on
# @return [ResolvableEvent]
def resolvable_event
resolvable_event_on default_executor
end
# Created resolvable event, user is responsible for resolving the event once by
# {Promises::ResolvableEvent#resolve}.
#
# @!macro promises.param.default_executor
# @return [ResolvableEvent]
def resolvable_event_on(default_executor = self.default_executor)
ResolvableEventPromise.new(default_executor).future
end
# @!macro promises.shortcut.on
# @return [ResolvableFuture]
def resolvable_future
resolvable_future_on default_executor
end
# Creates resolvable future, user is responsible for resolving the future once by
# {Promises::ResolvableFuture#resolve}, {Promises::ResolvableFuture#fulfill},
# or {Promises::ResolvableFuture#reject}
#
# @!macro promises.param.default_executor
# @return [ResolvableFuture]
def resolvable_future_on(default_executor = self.default_executor)
ResolvableFuturePromise.new(default_executor).future
end
# @!macro promises.shortcut.on
# @return [Future]
def future(*args, &task)
future_on(default_executor, *args, &task)
end
# Constructs new Future which will be resolved after block is evaluated on default executor.
# Evaluation begins immediately.
#
# @!macro promises.param.default_executor
# @!macro promises.param.args
# @yield [*args] to the task.
# @!macro promise.param.task-future
# @return [Future]
def future_on(default_executor, *args, &task)
ImmediateEventPromise.new(default_executor).future.then(*args, &task)
end
# Creates resolved future with will be either fulfilled with the given value or rejection with
# the given reason.
#
# @param [true, false] fulfilled
# @param [Object] value
# @param [Object] reason
# @!macro promises.param.default_executor
# @return [Future]
def resolved_future(fulfilled, value, reason, default_executor = self.default_executor)
ImmediateFuturePromise.new(default_executor, fulfilled, value, reason).future
end
# Creates resolved future with will be fulfilled with the given value.
#
# @!macro promises.param.default_executor
# @param [Object] value
# @return [Future]
def fulfilled_future(value, default_executor = self.default_executor)
resolved_future true, value, nil, default_executor
end
# Creates resolved future with will be rejected with the given reason.
#
# @!macro promises.param.default_executor
# @param [Object] reason
# @return [Future]
def rejected_future(reason, default_executor = self.default_executor)
resolved_future false, nil, reason, default_executor
end
# Creates resolved event.
#
# @!macro promises.param.default_executor
# @return [Event]
def resolved_event(default_executor = self.default_executor)
ImmediateEventPromise.new(default_executor).event
end
# General constructor. Behaves differently based on the argument's type. It's provided for convenience
# but it's better to be explicit.
#
# @see rejected_future, resolved_event, fulfilled_future
# @!macro promises.param.default_executor
# @return [Event, Future]
#
# @overload make_future(nil, default_executor = self.default_executor)
# @param [nil] nil
# @return [Event] resolved event.
#
# @overload make_future(a_future, default_executor = self.default_executor)
# @param [Future] a_future
# @return [Future] a future which will be resolved when a_future is.
#
# @overload make_future(an_event, default_executor = self.default_executor)
# @param [Event] an_event
# @return [Event] an event which will be resolved when an_event is.
#
# @overload make_future(exception, default_executor = self.default_executor)
# @param [Exception] exception
# @return [Future] a rejected future with the exception as its reason.
#
# @overload make_future(value, default_executor = self.default_executor)
# @param [Object] value when none of the above overloads fits
# @return [Future] a fulfilled future with the value.
def make_future(argument = nil, default_executor = self.default_executor)
case argument
when AbstractEventFuture
# returning wrapper would change nothing
argument
when Exception
rejected_future argument, default_executor
when nil
resolved_event default_executor
else
fulfilled_future argument, default_executor
end
end
# @!macro promises.shortcut.on
# @return [Future, Event]
def delay(*args, &task)
delay_on default_executor, *args, &task
end
# Creates new event or future which is resolved only after it is touched,
# see {Concurrent::AbstractEventFuture#touch}.
#
# @!macro promises.param.default_executor
# @overload delay_on(default_executor, *args, &task)
# If task is provided it returns a {Future} representing the result of the task.
# @!macro promises.param.args
# @yield [*args] to the task.
# @!macro promise.param.task-future
# @return [Future]
# @overload delay_on(default_executor)
# If no task is provided, it returns an {Event}
# @return [Event]
def delay_on(default_executor, *args, &task)
event = DelayPromise.new(default_executor).event
task ? event.chain(*args, &task) : event
end
# @!macro promises.shortcut.on
# @return [Future, Event]
def schedule(intended_time, *args, &task)
schedule_on default_executor, intended_time, *args, &task
end
# Creates new event or future which is resolved in intended_time.
#
# @!macro promises.param.default_executor
# @!macro promises.param.intended_time
# @param [Numeric, Time] intended_time `Numeric` means to run in `intended_time` seconds.
# `Time` means to run on `intended_time`.
# @overload schedule_on(default_executor, intended_time, *args, &task)
# If task is provided it returns a {Future} representing the result of the task.
# @!macro promises.param.args
# @yield [*args] to the task.
# @!macro promise.param.task-future
# @return [Future]
# @overload schedule_on(default_executor, intended_time)
# If no task is provided, it returns an {Event}
# @return [Event]
def schedule_on(default_executor, intended_time, *args, &task)
event = ScheduledPromise.new(default_executor, intended_time).event
task ? event.chain(*args, &task) : event
end
# @!macro promises.shortcut.on
# @return [Future]
def zip_futures(*futures_and_or_events)
zip_futures_on default_executor, *futures_and_or_events
end
# Creates new future which is resolved after all futures_and_or_events are resolved.
# Its value is array of zipped future values. Its reason is array of reasons for rejection.
# If there is an error it rejects.
# @!macro promises.event-conversion
# If event is supplied, which does not have value and can be only resolved, it's
# represented as `:fulfilled` with value `nil`.
#
# @!macro promises.param.default_executor
# @param [AbstractEventFuture] futures_and_or_events
# @return [Future]
def zip_futures_on(default_executor, *futures_and_or_events)
ZipFuturesPromise.new_blocked_by(futures_and_or_events, default_executor).future
end
alias_method :zip, :zip_futures
# @!macro promises.shortcut.on
# @return [Event]
def zip_events(*futures_and_or_events)
zip_events_on default_executor, *futures_and_or_events
end
# Creates new event which is resolved after all futures_and_or_events are resolved.
# (Future is resolved when fulfilled or rejected.)
#
# @!macro promises.param.default_executor
# @param [AbstractEventFuture] futures_and_or_events
# @return [Event]
def zip_events_on(default_executor, *futures_and_or_events)
ZipEventsPromise.new_blocked_by(futures_and_or_events, default_executor).event
end
# @!macro promises.shortcut.on
# @return [Future]
def any_resolved_future(*futures_and_or_events)
any_resolved_future_on default_executor, *futures_and_or_events
end
alias_method :any, :any_resolved_future
# Creates new future which is resolved after first futures_and_or_events is resolved.
# Its result equals result of the first resolved future.
# @!macro promises.any-touch
# If resolved it does not propagate {Concurrent::AbstractEventFuture#touch}, leaving delayed
# futures un-executed if they are not required any more.
# @!macro promises.event-conversion
#
# @!macro promises.param.default_executor
# @param [AbstractEventFuture] futures_and_or_events
# @return [Future]
def any_resolved_future_on(default_executor, *futures_and_or_events)
AnyResolvedFuturePromise.new_blocked_by(futures_and_or_events, default_executor).future
end
# @!macro promises.shortcut.on
# @return [Future]
def any_fulfilled_future(*futures_and_or_events)
any_fulfilled_future_on default_executor, *futures_and_or_events
end
# Creates new future which is resolved after first of futures_and_or_events is fulfilled.
# Its result equals result of the first resolved future or if all futures_and_or_events reject,
# it has reason of the last resolved future.
# @!macro promises.any-touch
# @!macro promises.event-conversion
#
# @!macro promises.param.default_executor
# @param [AbstractEventFuture] futures_and_or_events
# @return [Future]
def any_fulfilled_future_on(default_executor, *futures_and_or_events)
AnyFulfilledFuturePromise.new_blocked_by(futures_and_or_events, default_executor).future
end
# @!macro promises.shortcut.on
# @return [Future]
def any_event(*futures_and_or_events)
any_event_on default_executor, *futures_and_or_events
end
# Creates new event which becomes resolved after first of the futures_and_or_events resolves.
# @!macro promises.any-touch
#
# @!macro promises.param.default_executor
# @param [AbstractEventFuture] futures_and_or_events
# @return [Event]
def any_event_on(default_executor, *futures_and_or_events)
AnyResolvedEventPromise.new_blocked_by(futures_and_or_events, default_executor).event
end
# TODO consider adding first(count, *futures)
# TODO consider adding zip_by(slice, *futures) processing futures in slices
# TODO or rather a generic aggregator taking a function
end
module InternalStates
# @!visibility private
class State
def resolved?
raise NotImplementedError
end
def to_sym
raise NotImplementedError
end
end
# @!visibility private
class Pending < State
def resolved?
false
end
def to_sym
:pending
end
end
# @!visibility private
class Reserved < Pending
end
# @!visibility private
class ResolvedWithResult < State
def resolved?
true
end
def to_sym
:resolved
end
def result
[fulfilled?, value, reason]
end
def fulfilled?
raise NotImplementedError
end
def value
raise NotImplementedError
end
def reason
raise NotImplementedError
end
def apply
raise NotImplementedError
end
end
# @!visibility private
class Fulfilled < ResolvedWithResult
def initialize(value)
@Value = value
end
def fulfilled?
true
end
def apply(args, block)
block.call value, *args
end
def value
@Value
end
def reason
nil
end
def to_sym
:fulfilled
end
end
# @!visibility private
class FulfilledArray < Fulfilled
def apply(args, block)
block.call(*value, *args)
end
end
# @!visibility private
class Rejected < ResolvedWithResult
def initialize(reason)
@Reason = reason
end
def fulfilled?
false
end
def value
nil
end
def reason
@Reason
end
def to_sym
:rejected
end
def apply(args, block)
block.call reason, *args
end
end
# @!visibility private
class PartiallyRejected < ResolvedWithResult
def initialize(value, reason)
super()
@Value = value
@Reason = reason
end
def fulfilled?
false
end
def to_sym
:rejected
end
def value
@Value
end
def reason
@Reason
end
def apply(args, block)
block.call(*reason, *args)
end
end
# @!visibility private
PENDING = Pending.new
# @!visibility private
RESERVED = Reserved.new
# @!visibility private
RESOLVED = Fulfilled.new(nil)
def RESOLVED.to_sym
:resolved
end
end
private_constant :InternalStates
# @!macro promises.shortcut.event-future
# @see Event#$0
# @see Future#$0
# @!macro promises.param.timeout
# @param [Numeric] timeout the maximum time in second to wait.
# @!macro promises.warn.blocks
# @note This function potentially blocks current thread until the Future is resolved.
# Be careful it can deadlock. Try to chain instead.
# Common ancestor of {Event} and {Future} classes, many shared methods are defined here.
class AbstractEventFuture < Synchronization::Object
safe_initialization!
attr_atomic(:internal_state)
private :internal_state=, :swap_internal_state, :compare_and_set_internal_state, :update_internal_state
# @!method internal_state
# @!visibility private
include InternalStates
def initialize(promise, default_executor)
super()
@Lock = Mutex.new
@Condition = ConditionVariable.new
@Promise = promise
@DefaultExecutor = default_executor
@Callbacks = LockFreeStack.new
@Waiters = AtomicFixnum.new 0
self.internal_state = PENDING
end
private :initialize
# Returns its state.
# @return [Symbol]
#
# @overload an_event.state
# @return [:pending, :resolved]
# @overload a_future.state
# Both :fulfilled, :rejected implies :resolved.
# @return [:pending, :fulfilled, :rejected]
def state
internal_state.to_sym
end
# Is it in pending state?
# @return [Boolean]
def pending?
!internal_state.resolved?
end
# Is it in resolved state?
# @return [Boolean]
def resolved?
internal_state.resolved?
end
# Propagates touch. Requests all the delayed futures, which it depends on, to be
# executed. This method is called by any other method requiring resolved state, like {#wait}.
# @return [self]
def touch
@Promise.touch
self
end
# @!macro promises.touches
# Calls {Concurrent::AbstractEventFuture#touch}.
# @!macro promises.method.wait
# Wait (block the Thread) until receiver is {#resolved?}.
# @!macro promises.touches
#
# @!macro promises.warn.blocks
# @!macro promises.param.timeout
# @return [self, true, false] self implies timeout was not used, true implies timeout was used
# and it was resolved, false implies it was not resolved within timeout.
def wait(timeout = nil)
result = wait_until_resolved(timeout)
timeout ? result : self
end
# Returns default executor.
# @return [Executor] default executor
# @see #with_default_executor
# @see FactoryMethods#future_on
# @see FactoryMethods#resolvable_future
# @see FactoryMethods#any_fulfilled_future_on
# @see similar
def default_executor
@DefaultExecutor
end
# @!macro promises.shortcut.on
# @return [Future]
def chain(*args, &task)
chain_on @DefaultExecutor, *args, &task
end
# Chains the task to be executed asynchronously on executor after it is resolved.
#
# @!macro promises.param.executor
# @!macro promises.param.args
# @return [Future]
# @!macro promise.param.task-future
#
# @overload an_event.chain_on(executor, *args, &task)
# @yield [*args] to the task.
# @overload a_future.chain_on(executor, *args, &task)
# @yield [fulfilled, value, reason, *args] to the task.
# @yieldparam [true, false] fulfilled
# @yieldparam [Object] value
# @yieldparam [Object] reason
def chain_on(executor, *args, &task)
ChainPromise.new_blocked_by1(self, @DefaultExecutor, executor, args, &task).future
end
# @return [String] Short string representation.
def to_s
format '%s %s>', super[0..-2], state
end
alias_method :inspect, :to_s
# Resolves the resolvable when receiver is resolved.
#
# @param [Resolvable] resolvable
# @return [self]
def chain_resolvable(resolvable)
on_resolution! { resolvable.resolve_with internal_state }
end
alias_method :tangle, :chain_resolvable
# @!macro promises.shortcut.using
# @return [self]
def on_resolution(*args, &callback)
on_resolution_using @DefaultExecutor, *args, &callback
end
# Stores the callback to be executed synchronously on resolving thread after it is
# resolved.
#
# @!macro promises.param.args
# @!macro promise.param.callback
# @return [self]
#
# @overload an_event.on_resolution!(*args, &callback)
# @yield [*args] to the callback.
# @overload a_future.on_resolution!(*args, &callback)
# @yield [fulfilled, value, reason, *args] to the callback.
# @yieldparam [true, false] fulfilled
# @yieldparam [Object] value
# @yieldparam [Object] reason
def on_resolution!(*args, &callback)
add_callback :callback_on_resolution, args, callback
end
# Stores the callback to be executed asynchronously on executor after it is resolved.
#
# @!macro promises.param.executor
# @!macro promises.param.args
# @!macro promise.param.callback
# @return [self]
#
# @overload an_event.on_resolution_using(executor, *args, &callback)
# @yield [*args] to the callback.
# @overload a_future.on_resolution_using(executor, *args, &callback)
# @yield [fulfilled, value, reason, *args] to the callback.
# @yieldparam [true, false] fulfilled
# @yieldparam [Object] value
# @yieldparam [Object] reason
def on_resolution_using(executor, *args, &callback)
add_callback :async_callback_on_resolution, executor, args, callback
end
# @!macro promises.method.with_default_executor
# Crates new object with same class with the executor set as its new default executor.
# Any futures depending on it will use the new default executor.
# @!macro promises.shortcut.event-future
# @abstract
# @return [AbstractEventFuture]
def with_default_executor(executor)
raise NotImplementedError
end
# @!visibility private
def resolve_with(state, raise_on_reassign = true, reserved = false)
if compare_and_set_internal_state(reserved ? RESERVED : PENDING, state)
# go to synchronized block only if there were waiting threads
@Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0
call_callbacks state
else
return rejected_resolution(raise_on_reassign, state)
end
self
end
# For inspection.
# @!visibility private
# @return [Array<AbstractPromise>]
def blocks
@Callbacks.each_with_object([]) do |(method, args), promises|
promises.push(args[0]) if method == :callback_notify_blocked
end
end
# For inspection.
# @!visibility private
def callbacks
@Callbacks.each.to_a
end
# For inspection.
# @!visibility private
def promise
@Promise
end
# For inspection.
# @!visibility private
def touched?
promise.touched?
end
# For inspection.
# @!visibility private
def waiting_threads
@Waiters.each.to_a
end
# @!visibility private
def add_callback_notify_blocked(promise, index)
add_callback :callback_notify_blocked, promise, index
end
# @!visibility private
def add_callback_clear_delayed_node(node)
add_callback(:callback_clear_delayed_node, node)
end
# @!visibility private
def with_hidden_resolvable
# TODO (pitr-ch 10-Dec-2018): documentation, better name if in edge
self
end
private
def add_callback(method, *args)
state = internal_state
if state.resolved?
call_callback method, state, args
else
@Callbacks.push [method, args]
state = internal_state
# take back if it was resolved in the meanwhile
call_callbacks state if state.resolved?
end
self
end
def callback_clear_delayed_node(state, node)
node.value = nil
end
# @return [Boolean]
def wait_until_resolved(timeout)
return true if resolved?
touch
@Lock.synchronize do
@Waiters.increment
begin
unless resolved?
@Condition.wait @Lock, timeout
end
ensure
# JRuby may raise ConcurrencyError
@Waiters.decrement
end
end
resolved?
end
def call_callback(method, state, args)
self.send method, state, *args
end
def call_callbacks(state)
method, args = @Callbacks.pop
while method
call_callback method, state, args
method, args = @Callbacks.pop
end
end
def with_async(executor, *args, &block)
Concurrent.executor(executor).post(*args, &block)
end
def async_callback_on_resolution(state, executor, args, callback)
with_async(executor, state, args, callback) do |st, ar, cb|
callback_on_resolution st, ar, cb
end
end
def callback_notify_blocked(state, promise, index)
promise.on_blocker_resolution self, index
end
end
# Represents an event which will happen in future (will be resolved). The event is either
# pending or resolved. It should be always resolved. Use {Future} to communicate rejections and
# cancellation.
class Event < AbstractEventFuture
alias_method :then, :chain
# @!macro promises.method.zip
# Creates a new event or a future which will be resolved when receiver and other are.
# Returns an event if receiver and other are events, otherwise returns a future.
# If just one of the parties is Future then the result
# of the returned future is equal to the result of the supplied future. If both are futures
# then the result is as described in {FactoryMethods#zip_futures_on}.
#
# @return [Future, Event]
def zip(other)
if other.is_a?(Future)
ZipFutureEventPromise.new_blocked_by2(other, self, @DefaultExecutor).future
else
ZipEventEventPromise.new_blocked_by2(self, other, @DefaultExecutor).event
end
end
alias_method :&, :zip
# Creates a new event which will be resolved when the first of receiver, `event_or_future`
# resolves.
#
# @return [Event]
def any(event_or_future)
AnyResolvedEventPromise.new_blocked_by2(self, event_or_future, @DefaultExecutor).event
end
alias_method :|, :any
# Creates new event dependent on receiver which will not evaluate until touched, see {#touch}.
# In other words, it inserts delay into the chain of Futures making rest of it lazy evaluated.
#
# @return [Event]
def delay
event = DelayPromise.new(@DefaultExecutor).event
ZipEventEventPromise.new_blocked_by2(self, event, @DefaultExecutor).event
end
# @!macro promise.method.schedule
# Creates new event dependent on receiver scheduled to execute on/in intended_time.
# In time is interpreted from the moment the receiver is resolved, therefore it inserts
# delay into the chain.
#
# @!macro promises.param.intended_time
# @return [Event]
def schedule(intended_time)
chain do
event = ScheduledPromise.new(@DefaultExecutor, intended_time).event
ZipEventEventPromise.new_blocked_by2(self, event, @DefaultExecutor).event
end.flat_event
end
# Converts event to a future. The future is fulfilled when the event is resolved, the future may never fail.
#
# @return [Future]
def to_future
future = Promises.resolvable_future
ensure
chain_resolvable(future)
end
# Returns self, since this is event
# @return [Event]
def to_event
self
end
# @!macro promises.method.with_default_executor
# @return [Event]
def with_default_executor(executor)
EventWrapperPromise.new_blocked_by1(self, executor).event
end
private
def rejected_resolution(raise_on_reassign, state)
Concurrent::MultipleAssignmentError.new('Event can be resolved only once') if raise_on_reassign
return false
end
def callback_on_resolution(state, args, callback)
callback.call(*args)
end
end
# Represents a value which will become available in future. May reject with a reason instead,
# e.g. when the tasks raises an exception.
class Future < AbstractEventFuture
# Is it in fulfilled state?
# @return [Boolean]
def fulfilled?
state = internal_state
state.resolved? && state.fulfilled?
end
# Is it in rejected state?
# @return [Boolean]
def rejected?
state = internal_state
state.resolved? && !state.fulfilled?
end
# @!macro promises.warn.nil
# @note Make sure returned `nil` is not confused with timeout, no value when rejected,
# no reason when fulfilled, etc.
# Use more exact methods if needed, like {#wait}, {#value!}, {#result}, etc.
# @!macro promises.method.value
# Return value of the future.
# @!macro promises.touches
#
# @!macro promises.warn.blocks
# @!macro promises.warn.nil
# @!macro promises.param.timeout
# @!macro promises.param.timeout_value
# @param [Object] timeout_value a value returned by the method when it times out
# @return [Object, nil, timeout_value] the value of the Future when fulfilled,
# timeout_value on timeout,
# nil on rejection.
def value(timeout = nil, timeout_value = nil)
if wait_until_resolved timeout
internal_state.value
else
timeout_value
end
end
# Returns reason of future's rejection.
# @!macro promises.touches
#
# @!macro promises.warn.blocks
# @!macro promises.warn.nil
# @!macro promises.param.timeout
# @!macro promises.param.timeout_value
# @return [Object, timeout_value] the reason, or timeout_value on timeout, or nil on fulfillment.
def reason(timeout = nil, timeout_value = nil)
if wait_until_resolved timeout
internal_state.reason
else
timeout_value
end
end
# Returns triplet fulfilled?, value, reason.
# @!macro promises.touches
#
# @!macro promises.warn.blocks
# @!macro promises.param.timeout
# @return [Array(Boolean, Object, Object), nil] triplet of fulfilled?, value, reason, or nil
# on timeout.
def result(timeout = nil)
internal_state.result if wait_until_resolved timeout
end
# @!macro promises.method.wait
# @raise [Exception] {#reason} on rejection
def wait!(timeout = nil)
result = wait_until_resolved!(timeout)
timeout ? result : self
end
# @!macro promises.method.value
# @return [Object, nil, timeout_value] the value of the Future when fulfilled,
# or nil on rejection,
# or timeout_value on timeout.
# @raise [Exception] {#reason} on rejection
def value!(timeout = nil, timeout_value = nil)
if wait_until_resolved! timeout
internal_state.value
else
timeout_value
end
end
# Allows rejected Future to be risen with `raise` method.
# If the reason is not an exception `Runtime.new(reason)` is returned.
#
# @example
# raise Promises.rejected_future(StandardError.new("boom"))
# raise Promises.rejected_future("or just boom")
# @raise [Concurrent::Error] when raising not rejected future