forked from elastic/elasticsearch-formal-models
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreplication.tla
860 lines (727 loc) · 39.9 KB
/
replication.tla
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
`^\Large\bf
TLA+ Model of the Elasticsearch data replication approach ^'
-------------------------------------------------------------------------------------
This file provides a formal specification of how data replication will work in future versions of
Elasticsearch. We consider this work-in-progress: Model as well as implementation are still evolving
and might differ in substantial ways.
Introduction
------------
An index, which is a collection of documents, can be divided into multiple pieces, known as shards,
each of which can be stored on different machines. This approach of horizontal scaling enables Elasticsearch
to store much larger indices than could fit on a single machine. To ensure high availability and
scale out read access, shards are usually also replicated onto multiple machines. The main copy
is called the primary, all other copies are simply called replicas. The number of primary shards
for an index is fixed at creation time, which allows to deterministically route requests for specific
documents to the right shard, based on a hash of the document key (called the document "_id"). In the
context of data replication, shards do their work independently from each other. The specification
therefore only considers a single primary shard together with its copies, the replicas. It assumes a
fixed set of nodes, each of which can host either the primary or one of the replicas. Shard allocation
(i.e. which node has which shard) is dynamic and determined by the master, a process in the cluster
that is backed by a consensus module. The TLA+ specification does not model the consensus module, it
assumes that this component is working correctly. It shows however how data replication integrates with
the consensus module to achieve its guarantees.
How data is replicated
----------------------
Clients send requests to an arbitrary node in the cluster. The node then routes the request to the
node in the cluster that has the primary shard for the corresponding document id. The document is
indexed at the primary and then replicated concurrently to the replicas. The replicas index the
document and confirm successful replication to the primary. The primary then acknowledges successful
replication to the client.
What's covered / not covered in this model
------------------------------------------
Failures covered by the model:
- node crashes
- disconnects between nodes on a per request basis
Also covered:
- cluster state batching / asynchronous application (each node applies the cluster state, which is
the state backed by the consensus model, at different points in time)
- network delays: messages can arrive out-of-order and be delayed
Limitations of the model:
- shard initialization / recovery is not modeled: the model initially assumes shards to be started.
When a shard fails, it is not reallocated / reassigned to another node but stays unassigned.
When a primary shard fails, a random replica is promoted to primary (if replica exists).
- only the transaction log is modeled. Lucene store as an optimistic consumer of the transaction log
is not modeled.
- adding nodes to the cluster is not modeled, whereas in Elasticsearch, nodes can dynamically
be added to the cluster and those nodes will share in the hosting of shard data (shard data is
moved to the new node through the process of recovery, mentioned above, which is also not modeled).
Other differences between model and current Java implementation:
- the Java implementation uses zero-based numbering for sequence numbers whereas the TLA+ model
starts from one, as this is the natural way to access the first element of a sequence in TLA+.
- ...
------------------------------ MODULE replication ---------------------------------
\* Imported modules used in this specification
EXTENDS Naturals, FiniteSets, Sequences, TLC
----
\* `^\Large\bf Constants ^'
\* The specification first defines the constants of the model, which amount to values or sets of
\* values that are fixed.
\* Set of node ids uniquely representing each data node in the cluster
CONSTANTS Nodes
\* Set of possible document ids (i.e. values for "_id" field)
CONSTANTS DocumentIds
(* Communication between the nodes is done using an RPC-like mechanism.
For each request there is an associated response. In-flight requests / responses are modeled
using messages, which are represented in TLA+ as record types. To distinguish requests from
responses, the message record type contains a field "request" that denotes whether the message is
a request or a response (takes as value one of {TRUE, FALSE}). It also contains the node id of
the sender and receiver of the call (in the fields "source" and "dest"). The procedure that is
called is found under the field "method".
For example, sending a replication request from node n1 to n2 would yield the following message:
[request |-> TRUE,
method |-> Replication,
source |-> n1,
dest |-> n2]
Responses also have a field "success" which indicates whether the call was successful
or not (one of {TRUE, FALSE}).
The model currently supports two RPC methods, one to replicate data from the primary to the
replicas and another one to trim the translog (explained later). The reroute logic for rerouting
requests to the primary is not modeled as it has no impact on consistency/durability guarantees.
*)
CONSTANTS Replication, TrimTranslog
(* Shard allocation is determined by the master and broadcasted to the data nodes in the form of a
routing table, which is a map from node id to one of {Primary, Replica, Unassigned}, denoting
whether the respective node has the primary shard, a replica shard or no shard assigned at all.
*)
CONSTANTS Primary, Replica, Unassigned
\* The constant "Nil" denotes a non-existing value (e.g. in transaction log) or a place-holder for
\* a value that is to be computed.
CONSTANTS Nil
----
\* `^\Large\bf Variables ^'
\* The following describes the variable state of the model.
\* Set of in-flight requests and responses sent between data nodes.
VARIABLE messages
(* Beside managing and broadcasting the routing table, the master also tracks if
a primary failed and/or a replica was promoted to primary, incrementing a number called the
"primary term" whenever this happens. Each new primary operates under a new term, allowing nodes
to reject replication requests that come from a primary that has been demoted by the master.
The routing table and primary term together form the cluster state, which is simply a record type
containing both:
[routingTable |->
[n1 |-> Primary,
n2 |-> Replica,
n3 |-> Unassigned],
primaryTerm |-> 1]
The following variable represents the current cluster state on the master, which is not
explicitly modeled as a node.
*)
VARIABLE clusterStateOnMaster
\* For simplicity we assume that each client (index) request uses a new unique value to be indexed.
\* This is just a natural number incremented on each client operation.
VARIABLE nextClientValue
\* The set of (acknowledged) client responses. Stored in a variable so that we can make assertions
\* about successfully acknowledged requests (e.g. that they've been successfully stored).
VARIABLE clientResponses
\* To improve readibility of the specification, the following placeholder clientVars can be used
\* instead of explicitly writing the two variables on the right hand side.
clientVars == <<nextClientValue, clientResponses>>
(* After indexing a document into the primary, it is replicated concurrently to the replicas.
Writes are only acknowledged to the client after all replicas have acknowledged their writes to
the primary. To correlate acknowledgements from multiple replicas for the same write on the
primary, we use a unique request id that is shared by the concurrent replication requests going
out to the replicas for each specific indexing operation on the primary. The responses carry the
same request id as the requests they were originating from.
This is just a natural number denoting the next available request id. It is incremented whenever
a request id is used.
*)
VARIABLE nextRequestId
\* The following variables capture state on a per-node basis (maps with domain nodes).
(* Cluster states are determined by the master and broadcasted to nodes. Due to the distributed
nature of the system, they can arrive and be applied at different times on the nodes. ES only
guarantees that an older state is not applied on a node after a new one has been applied on the
same node. It supports batching, however, so cluster states can be skipped if a newer has
already arrived on a node before the old one has been processed on that node.
Cluster states applied on each node are represented as a map from node id to cluster state that
is currently applied on this node
*)
VARIABLE clusterStateOnNode
(* The cluster state contains the current term number. Nodes might learn about the highest primary
term number not only through cluster state updates, but also through other node-to-node
communication such as replication requests. They store the most recent information (highest term
they've heard about). The variable "currentTerm" is a map from node id to primary term number,
representing the highest primary term number that is known to the node.
*)
VARIABLE currentTerm
(* The transaction log is a history of operations. The primary shard determines the order in which
operations occur by assigning consecutive sequence numbers to the operations that are indexed.
The sequence number represents a slot in the transaction log that is occupied by the operation.
When a write on a primary is replicated to replicas, the replication request contains the sequence
number that was assigned to this operation on the primary. The replica then assigns the operation
to the same slot in its transaction log. Due to the concurrent nature of replication, replicas
might fill these slots out-of-order. If the primary crashes or some replication requests don't make
it to the replica, the replica can end up in a state where its transaction log has holes in it
(slots that are not filled while subsequent slots are filled).
Example of a transaction log on the primary and a replica:
`.
---------------------
| 1 | 2 | 3 | 4 | 5 |
primary |-------------------|
| x | x | x | x | |
---------------------
---------------------
| 1 | 2 | 3 | 4 | 5 |
replica |-------------------| (request for slot 2 is still in-flight)
| x | | x | x | |
---------------------
.'
The transaction log is modeled as a map from node id to map from sequence number to record type
consisting of document id, value to be stored, primary term and "pending confirmation" marker
(more on that later).
*)
VARIABLE tlog
(* Having a transaction log in place, it is useful to know about the highest slot in the transaction
log where all slots below it have been successfully replicated to all replicas, i.e. the common
history shared by all in-sync shard copies. It is useful because in the case where a primary
fails, a replica which is promoted to primary knows that it has to only worry about being out of sync
with other replicas on slots that are beyond this slot. The primary is in charge of tracking
this information, also called global checkpoint. For this, replica shards share information with the
primary on the highest slot they have filled where all lower slots are filled as well, called the
local checkpoint. The primary then establishes the global checkpoint as the minimum of the local
checkpoint value received from all shard copies (including its own local checkpoint) and broadcasts
this information to the replicas.
The global checkpoint is modeled as a map from node id to sequence number.
*)
VARIABLE globalCheckPoint
(* The local checkpoint is modeled as a map from node id (node that is doing the tracking)
to node id (node for which the local checkpoint is being tracked) to sequence number.
Only the primary maintains the local checkpoints from all replicas, but because of the possibility of
the primary changing over time, and in order to separate the state for each node more clearly, we maintain
a node id to local checkpoint map for each node in the cluster.
*)
VARIABLE localCheckPoint
\* The placeholder "nodeVars" is used as a shorthand for all node variables
nodeVars == <<clusterStateOnNode, tlog, localCheckPoint, globalCheckPoint, currentTerm>>
----
\* `^\Large\bf General helper functions ^'
\* Return the minimum value from a set, or undefined if the set is empty.
Min(s) == CHOOSE x \in s : \A y \in s : x <= y
\* Return the maximum value from a set, or undefined if the set is empty.
Max(s) == CHOOSE x \in s : \A y \in s : x >= y
----
\* `^\Large\bf Helper functions on routing table ^'
(* Note, in this section, the terms "shard" and "node id" are conflated, because
we are only considering one shard and all its copies, so each shard can be
uniquely identified by the node it resides on. The term "shard" or "node" is chosen
solely based on the context of what is being explained or specified.
*)
\* Returns shards that are marked as Primary in routing table
Primaries(routingTable) == {n \in DOMAIN routingTable : routingTable[n] = Primary}
\* Returns shards that are marked as Replica in routing table
Replicas(routingTable) == {n \in DOMAIN routingTable : routingTable[n] = Replica}
\* Returns shards that are marked as Primary or Replica in routing table
Assigned(routingTable) == {n \in DOMAIN routingTable : routingTable[n] /= Unassigned}
\* Determines whether the shard on node n was promoted to primary when a cluster state update occurs
ShardWasPromotedToPrimary(n, incomingRoutingTable, localRoutingTable) ==
LET oldPrimaries == Primaries(localRoutingTable)
newPrimaries == Primaries(incomingRoutingTable)
IN /\ n \notin oldPrimaries
/\ n \in newPrimaries
\* Calculates new cluster state based on shard failure on node n
FailShardOnMaster(n) ==
LET rt == clusterStateOnMaster.routingTable
IN
IF rt[n] = Unassigned THEN
UNCHANGED <<clusterStateOnMaster>>
ELSE
\* increase primary term on primary failure
LET newPt == IF rt[n] = Primary THEN
clusterStateOnMaster.primaryTerm + 1
ELSE
clusterStateOnMaster.primaryTerm
IN
IF rt[n] = Primary /\ Cardinality(Replicas(rt)) > 0 THEN
\* promote replica to primary
\E r \in Replicas(rt):
clusterStateOnMaster' = [clusterStateOnMaster EXCEPT
!.routingTable = [rt EXCEPT ![n] = Unassigned, ![r] = Primary],
!.primaryTerm = newPt]
ELSE
clusterStateOnMaster' = [clusterStateOnMaster EXCEPT
!.routingTable[n] = Unassigned,
!.primaryTerm = newPt]
----
\* `^\Large\bf Helper functions for sending/receiving messages ^'
\* Remove request from the set of messages and add response instead
Reply(response, request) == messages' = {response} \cup (messages \ {request})
\* Generate default replication response based on replication request m
\* Copies most of the fields over for convenience (to restore caller information upon return)
DefaultReplicationResponse(m) == [request |-> FALSE,
method |-> m.method,
source |-> m.dest,
dest |-> m.source,
req |-> m.req,
id |-> m.id,
seq |-> m.seq,
rterm |-> m.rterm,
sterm |-> m.sterm,
value |-> m.value,
client |-> m.client,
localCP |-> 0,
success |-> TRUE]
\* Generate default trim translog response based on trim translog request m
DefaultTrimTranslogResponse(m) == [request |-> FALSE,
method |-> m.method,
source |-> m.dest,
dest |-> m.source,
req |-> m.req,
maxseq |-> m.maxseq, \* trim above this sequence number
term |-> m.term, \* trim entries with term lower than this
success |-> TRUE]
\* Generate default response based on request m
DefaultResponse(m) == IF m.method = Replication THEN
DefaultReplicationResponse(m)
ELSE
DefaultTrimTranslogResponse(m)
\* Generate default failure response based on request m
FailedResponse(m) == [DefaultResponse(m) EXCEPT !.success = FALSE]
----
\* `^\Large\bf Helper functions on translog ^'
(* When a request comes to a primary, it has to select a slot (the sequence number) to put the
request into. It corresponds to the slot in the transaction log right after the highest slot
that's filled.
The following function yields the highest slot that's filled in the transaction log, or 0 if
no such slot exists.
*)
MaxSeq(ntlog) == Max(DOMAIN ntlog \cup {0})
(* `^\bf\large MaxConfirmedSeq ^'
The local checkpoint is defined as the highest slot in the translog, where all lower slots
are filled. It is not only holes in the translog that prevent the local checkpoint from moving foward.
We actually want to prevent the local checkpoint from moving past slots which are filled but marked as
pending confirmation. Pending entries in the translog are entries that appear after the global checkpoint that are not
allowed to contribute to the advancement of the local checkpoint until a resyncing phase happens due to
the primary shard changing nodes. Translog entries thus have a "pc" (pending confirmation) marker
(\in {TRUE, FALSE}) that says whether the local checkpoint can move past them.
This function yields highest sequence number which are not pending confirmation and where
all lower slots are not pending confirmation either.
Yields 0 if no such number exists.
Examples: (T stands for TRUE, F for FALSE)
`.
---------------------
| 1 | 2 | 3 | 4 | 5 |
|-------------------| MaxConfirmedSeq = 1
| x | x | x | x | |
pc markers: | F | T | T | F | |
---------------------
---------------------
| 1 | 2 | 3 | 4 | 5 |
|-------------------| MaxConfirmedSeq = 2
| x | x | | x | |
pc markers: | F | F | | F | |
---------------------
.'
*)
MaxConfirmedSeq(ntlog) ==
LET ConfirmedTlogSlot(i) == i \in DOMAIN ntlog /\ ntlog[i].pc = FALSE
IN CHOOSE i \in (DOMAIN ntlog \cup {0}) : /\ ConfirmedTlogSlot(i+1) = FALSE
/\ \A j \in 1..i : ConfirmedTlogSlot(j)
(* `^\bf\large MarkPC ^'
Yields translog where all entries at position strictly larger than "globalCP" (representing the
global checkpoint) are marked as "pending confirmation".
Example:
`.
--------------------- MarkPC ---------------------
| 1 | 2 | 3 | 4 | 5 | -----------> | 1 | 2 | 3 | 4 | 5 |
|-------------------| globalCP = 1 |-------------------|
| x | x | | x | | | x | x | | x | |
| F | F | | F | | | F | T | | T | |
--------------------- ---------------------
.'
*)
MarkPC(ntlog, globalCP) ==
[j \in DOMAIN ntlog |-> [ntlog[j] EXCEPT !.pc = IF j > globalCP THEN TRUE ELSE @]]
(* `^\bf\large FillAndUnmarkPC ^'
Yields translog where all gaps are filled and the pending confirmation marker is set to FALSE
for values at position strictly larger than "globalCP" representing the global checkpoint
Example:
`.
--------------------- FillAndUnmarkPC ---------------------
| 1 | 2 | 3 | 4 | 5 | -----------> | 1 | 2 | 3 | 4 | 5 |
|-------------------| globalCP = 1 |-------------------|
| x | x | | x | | | x | x | x | x | |
| T | T | | F | | | T | F | F | F | |
--------------------- ---------------------
.'
*)
FillAndUnmarkPC(ntlog, storedTerm, globalCP) ==
[j \in 1..Max(DOMAIN ntlog \cup {0}) |->
IF j > globalCP THEN
IF j \in DOMAIN ntlog THEN
[ntlog[j] EXCEPT !.pc = FALSE]
ELSE
[id |-> Nil,
term |-> storedTerm,
value |-> Nil,
pc |-> FALSE]
ELSE
ntlog[j]]
(* `^\bf\large TrimTlog ^'
Trim elements from translog with position strictly greater than maxseq and
term strictly lower than minterm.
Example:
`.
--------------------- TrimTlog ---------------------
| 1 | 2 | 3 | 4 | 5 | -----------> | 1 | 2 | 3 | 4 | 5 |
|-------------------| maxseq = 2 |-------------------|
| x | x | x | x | x | minterm = 2 | x | x | x | | x |
| T | T | T | F | T | | T | T | T | | F |
terms: | 1 | 1 | 2 | 1 | 2 | | 1 | 1 | 2 | | 2 |
--------------------- ---------------------
.'
*)
TrimTlog(ntlog, maxseq, minterm) ==
[j \in {i \in DOMAIN ntlog : i <= maxseq \/ ntlog[i].term >= minterm} |-> ntlog[j]]
----
\* `^\Large\bf Initial states ^'
\* All possible routing tables where there is one primary
RoutingTablesWithPrimary ==
UNION {
{
[n \in {pn} |-> Primary] @@ \* pn has the primary
[n \in rs |-> Replica] @@ \* rs is the subset of nodes having replicas
[n \in ((Nodes \ rs) \ {pn}) |-> Unassigned] \* remaining nodes have unassigned shards
: rs \in SUBSET (Nodes \ {pn})
} : pn \in Nodes
}
\* Possible initial routing tables are those which have a primary or where all shards are unassigned
InitialRoutingTables == RoutingTablesWithPrimary \cup {[n \in Nodes |-> Unassigned]}
\* The following constant denotes the set of possible initial cluster states that are to be
\* considered for exploring the model, containing cluster states such as
\* [ routingTable |-> [n1 |-> Primary, n2 |-> Replica, n3 |-> Replica], primaryTerm |-> 1 ]
InitialClusterStates == { [routingTable |-> rt, primaryTerm |-> 1] : rt \in InitialRoutingTables }
Init == /\ clusterStateOnMaster \in InitialClusterStates
/\ messages = {}
/\ nextClientValue = 1
/\ clientResponses = {}
/\ nextRequestId = 1
/\ tlog = [n \in Nodes |-> << >>]
/\ localCheckPoint = [n1 \in Nodes |-> [n2 \in Nodes |-> 0]]
/\ globalCheckPoint = [n \in Nodes |-> 0]
/\ clusterStateOnNode = [n \in Nodes |-> clusterStateOnMaster]
/\ currentTerm = [n \in Nodes |-> clusterStateOnMaster.primaryTerm]
----
\* `^\Large\bf Next-step relations ^'
\* Index request arrives on node n with document id docId
ClientRequest(n, docId) ==
/\ clusterStateOnNode[n].routingTable[n] = Primary \* node believes itself to be the primary
/\ LET
replicas == Replicas(clusterStateOnNode[n].routingTable)
primaryTerm == currentTerm[n]
tlogEntry == [id |-> docId,
term |-> primaryTerm,
value |-> nextClientValue,
pc |-> FALSE]
seq == MaxSeq(tlog[n]) + 1
\* create replication requests for each replica that the primary knows about
replRequests == {([request |-> TRUE,
method |-> Replication,
source |-> n,
dest |-> rn,
req |-> nextRequestId,
id |-> docId,
value |-> nextClientValue,
seq |-> seq,
rterm |-> primaryTerm, \* current term when issuing request
sterm |-> primaryTerm, \* term to be stored (differs for fast resync)
client |-> TRUE, \* it's a replication request initiated by client
globalCP |-> globalCheckPoint[n]]) : rn \in replicas}
IN
\* put entry into translog
/\ tlog' = [tlog EXCEPT ![n] = (seq :> tlogEntry) @@ @]
\* Make sure that each client request uses a unique value
/\ nextClientValue' = nextClientValue + 1
\* set next unique key to use for replication requests so that we can relate responses
/\ nextRequestId' = nextRequestId + 1
\* update local checkpoint
/\ localCheckPoint' = [localCheckPoint EXCEPT ![n][n] = seq]
/\ Assert(localCheckPoint'[n][n] = localCheckPoint[n][n] + 1, "localCheckPoint incremented")
\* send out replication requests
/\ messages' = messages \cup replRequests
/\ IF replicas = {} THEN
\* no replicas, directly acknowledge to the client
/\ clientResponses' = clientResponses \cup {[success |-> TRUE,
id |-> docId,
value |-> nextClientValue,
seq |-> seq,
term |-> primaryTerm]}
ELSE
\* replication requests sent out, wait for responses before acking to client
/\ UNCHANGED <<clientResponses>>
/\ UNCHANGED <<clusterStateOnMaster, clusterStateOnNode, globalCheckPoint, currentTerm>>
\* Helper function for marking translog entries as pending confirmation if incoming term is higher
\* than current term on node n.
MaybeMarkPC(incomingTerm, n, globalCP) ==
IF incomingTerm > currentTerm[n] THEN
\* there is a new primary, cannot safely advance local checkpoint
\* before resync done, move it back to global checkpoint and add
\* pending confirmation marker to all entries above global checkpoint
MarkPC(tlog[n], globalCP)
ELSE
tlog[n]
(*
`^\bf Note on handling replication requests with higher primary terms^'
If a replica receives a replication request with a primary term greater than its current primary term,
it means that new primary was promoted. The `^na\"ive^' handling of this case would be to forget all translog
operations above global checkpoint, reset local checkpoint to global checkpoint and await replay
of operations above global checkpoint from the newly promoted primary.
However, this approach does not work, because newly promoted primary might fail during resync process
and if our replica is promoted to primary - it may miss operations above global checkpoint, effectively
losing acknowledged writes.
That's why we preserve existing entries in translog above global checkpoint but mark them as
pending confirmation. During operations replay, replica replaces its translog entry with the entry received
from the primary (which can be noop) and resets pending confirmation flag.
*)
\* Replication request arrives on node n with message m
HandleReplicationRequest(n, m) ==
/\ m.request = TRUE
/\ m.method = Replication
/\ IF m.rterm < currentTerm[n] THEN
\* don't accept replication requests with lower term than we have
\* lower term means that it's coming from a primary that has since been demoted
/\ Reply(FailedResponse(m), m)
/\ UNCHANGED <<clusterStateOnMaster, nextRequestId, clientVars, nodeVars>>
ELSE
/\ LET
tlogEntry == [id |-> m.id,
term |-> m.sterm,
value |-> m.value,
pc |-> FALSE]
newGlobalCP == Max({m.globalCP, globalCheckPoint[n]})
\* mark translog entries as pending if higher term and write request into translog
newTlog == (m.seq :> tlogEntry) @@ MaybeMarkPC(m.rterm, n, newGlobalCP)
\* recompute local checkpoint
localCP == MaxConfirmedSeq(newTlog)
IN
/\ tlog' = [tlog EXCEPT ![n] = newTlog]
/\ currentTerm' = [currentTerm EXCEPT ![n] = m.rterm]
/\ globalCheckPoint' = [globalCheckPoint EXCEPT ![n] = newGlobalCP]
/\ Reply([DefaultResponse(m) EXCEPT !.localCP = localCP], m)
/\ UNCHANGED <<clusterStateOnMaster, clusterStateOnNode, nextClientValue, nextRequestId,
clientResponses, localCheckPoint>>
\* Trim translog request arrives on node n with message m
HandleTrimTranslogRequest(n, m) ==
/\ m.request = TRUE
/\ m.method = TrimTranslog
/\ IF m.term < currentTerm[n] THEN
\* don't handle requests with lower term than we have
\* lower term means that it's coming from a primary that has since been demoted
/\ Reply(FailedResponse(m), m)
/\ UNCHANGED <<tlog, globalCheckPoint, currentTerm>>
ELSE
/\ LET
newGlobalCP == Max({m.globalCP, globalCheckPoint[n]})
\* mark translog entries as pending if higher term and trim translog
newTlog == TrimTlog(MaybeMarkPC(m.term, n, newGlobalCP), m.maxseq, m.term)
IN
/\ tlog' = [tlog EXCEPT ![n] = newTlog]
/\ globalCheckPoint' = [globalCheckPoint EXCEPT ![n] = newGlobalCP]
/\ currentTerm' = [currentTerm EXCEPT ![n] = m.term]
/\ Reply(DefaultResponse(m), m)
/\ UNCHANGED <<clusterStateOnMaster, nextRequestId, clientVars, localCheckPoint,
clusterStateOnNode>>
\* Helper function for handling replication responses
FinishIfNeeded(m) ==
\* check if this is the last response we're waiting for
IF /\ m.client
/\ { ms \in messages : ms.req = m.req } = {m}
\* check if the request has not been failed already to the client
/\ { cr \in clientResponses : cr.success = FALSE /\ cr.req = m.req } = {} THEN
clientResponses' = clientResponses \cup {[success |-> TRUE,
id |-> m.id,
value |-> m.value,
seq |-> m.seq,
term |-> m.rterm]}
ELSE
UNCHANGED <<clientResponses>>
\* Helper function for handling replication responses
FinishAsFailed(m) ==
/\ clientResponses' = clientResponses \cup {[success |-> FALSE,
req |-> m.req]}
\* Replication response arrives on node n from node rn with message m
HandleReplicationResponse(n, rn, m) ==
/\ m.request = FALSE
/\ m.method = Replication
\* are we still interested in the response or already marked the overall client request as failed?
/\ IF m.success THEN
\* is it a newer local checkpoint than we have?
/\ IF m.localCP > localCheckPoint[n][rn] /\
\* is the shard still active on this node?
clusterStateOnNode[n].routingTable[n] /= Unassigned THEN
LET
newLocalCheckPoint == [localCheckPoint EXCEPT ![n][rn] = m.localCP]
assigned == Assigned(clusterStateOnNode[n].routingTable)
computedGlobalCP == Min({newLocalCheckPoint[n][i] : i \in assigned})
IN
/\ localCheckPoint' = newLocalCheckPoint
\* also update global checkpoint if necessary
/\ globalCheckPoint' = [globalCheckPoint EXCEPT ![n] = computedGlobalCP]
ELSE
UNCHANGED <<localCheckPoint, globalCheckPoint>>
/\ UNCHANGED <<clusterStateOnMaster, clusterStateOnNode>>
/\ FinishIfNeeded(m)
ELSE
\* replication failed, ask master to fail shard
/\ IF m.rterm < clusterStateOnMaster.primaryTerm THEN
\* term outdated, fail itself and don't ack client write
/\ FinishAsFailed(m)
/\ UNCHANGED <<clusterStateOnMaster>>
ELSE
\* fail shard and respond to client
/\ FailShardOnMaster(rn)
/\ FinishIfNeeded(m)
/\ UNCHANGED <<localCheckPoint, globalCheckPoint>>
/\ messages' = messages \ {m}
/\ UNCHANGED <<nextClientValue, nextRequestId, tlog, clusterStateOnNode,
currentTerm>>
\* Trim translog response arrives on node n from node rn with message m
HandleTrimTranslogResponse(n, rn, m) ==
/\ m.request = FALSE
/\ m.method = TrimTranslog
/\ messages' = messages \ {m}
/\ IF m.success = FALSE /\ m.term >= clusterStateOnMaster.primaryTerm THEN
\* fail shard
FailShardOnMaster(rn)
ELSE
UNCHANGED <<clusterStateOnMaster>>
/\ UNCHANGED <<nextClientValue, nextRequestId, nodeVars, clientResponses>>
\* Cluster state propagated from master is applied to node n
ApplyClusterStateFromMaster(n) ==
/\ clusterStateOnNode[n] /= clusterStateOnMaster
/\ clusterStateOnNode' = [clusterStateOnNode EXCEPT ![n] = clusterStateOnMaster]
/\ IF ShardWasPromotedToPrimary(n, clusterStateOnMaster.routingTable,
clusterStateOnNode[n].routingTable) THEN
\* shard promoted to primary, resync with replicas
LET
ntlog == tlog[n]
globalCP == globalCheckPoint[n]
newTerm == clusterStateOnMaster.primaryTerm
\* fill gaps in tlog and remove pending confirmation marker
newTlog == FillAndUnmarkPC(ntlog, newTerm, globalCP)
replicas == Replicas(clusterStateOnMaster.routingTable)
numReplicas == Cardinality(replicas)
startSeq == globalCP + 1
endSeq == Max((DOMAIN ntlog) \cup {0})
numDocs == endSeq + 1 - startSeq
\* resend all translog entries above global checkpoint to replicas
replRequests == {([request |-> TRUE,
method |-> Replication,
source |-> n,
dest |-> rn,
req |-> nextRequestId + (i - startSeq),
seq |-> i,
rterm |-> newTerm, \* new term when issuing request
sterm |-> newTlog[i].term, \* stored term for entry
id |-> newTlog[i].id,
value |-> newTlog[i].value,
client |-> FALSE, \* request not initiated by client
globalCP |-> globalCP]) : i \in startSeq..endSeq, rn \in replicas}
\* send trim request to replicas
trimRequests == {[request |-> TRUE,
method |-> TrimTranslog,
source |-> n,
dest |-> rn,
req |-> nextRequestId + numDocs,
maxseq |-> endSeq,
term |-> newTerm,
client |-> FALSE,
globalCP |-> globalCP] : rn \in replicas}
IN
/\ currentTerm' = [currentTerm EXCEPT ![n] = newTerm]
/\ tlog' = [tlog EXCEPT ![n] = newTlog]
/\ localCheckPoint' = [localCheckPoint EXCEPT ![n][n] = MaxConfirmedSeq(newTlog)]
/\ messages' = messages \cup replRequests \cup trimRequests
/\ nextRequestId' = nextRequestId + numDocs + 1
ELSE
/\ UNCHANGED <<messages, nextRequestId, currentTerm, localCheckPoint, tlog>>
/\ UNCHANGED <<clusterStateOnMaster, clientVars,
globalCheckPoint>>
\* Fail request message
FailRequestMessage(m) ==
/\ m.request = TRUE
/\ Reply(FailedResponse(m), m)
/\ UNCHANGED <<clusterStateOnMaster, nextRequestId, clientVars, nodeVars>>
\* Fail response message
FailResponseMessage(m) ==
/\ m.request = FALSE
/\ m.success = TRUE
/\ Reply([m EXCEPT !.success = FALSE], m)
/\ UNCHANGED <<clusterStateOnMaster, nextRequestId, clientVars, nodeVars>>
\* Node fault detection on master finds node n to be isolated from the cluster or crashed
NodeFaultDetectionKicksNodeOut(n) ==
/\ clusterStateOnMaster.routingTable[n] /= Unassigned \* not already unassigned
/\ FailShardOnMaster(n)
/\ UNCHANGED <<messages, nextRequestId, clientVars, nodeVars>>
\* Defines how the variables may transition.
Next == \/ \E n \in Nodes : \E docId \in DocumentIds : ClientRequest(n, docId)
\/ \E m \in messages : HandleReplicationRequest(m.dest, m)
\/ \E m \in messages : HandleReplicationResponse(m.dest, m.source, m)
\/ \E m \in messages : HandleTrimTranslogRequest(m.dest, m)
\/ \E m \in messages : HandleTrimTranslogResponse(m.dest, m.source, m)
\/ \E m \in messages : FailRequestMessage(m)
\/ \E m \in messages : FailResponseMessage(m)
\/ \E n \in Nodes : ApplyClusterStateFromMaster(n)
\/ \E n \in Nodes : NodeFaultDetectionKicksNodeOut(n)
----
\* `^\Large\bf Helper functions for making assertions ^'
\* no active messages
NoActiveMessages == messages = {}
\* shard that is considered active by the master
ActiveShard(n) == clusterStateOnMaster.routingTable[n] /= Unassigned
\* cluster state on master has been applied to all nodes that are still supposed to have an active shard
ClusterStateAppliedOnAllNodesWithActiveShards ==
\A n \in Nodes : ActiveShard(n) => clusterStateOnNode[n] = clusterStateOnMaster
\* everything in the translog up to and including slot i
UpToSlot(ntlog, i) == [j \in 1..i |-> ntlog[j]]
\* copy of translog, where we ignore the pending confirmation marker
ExceptPC(ntlog) == [j \in DOMAIN ntlog |-> [r \in DOMAIN ntlog[j] \ {"pc"} |-> ntlog[j][r] ]]
\* all shard copies contain same data
AllCopiesSameContents ==
\A n1, n2 \in Nodes:
/\ n1 /= n2
/\ ActiveShard(n1)
/\ ActiveShard(n2)
=> ExceptPC(tlog[n1]) = ExceptPC(tlog[n2])
----
\* `^\Large\bf Main invariants ^'
\* checks if the translog for all nodes are equivalent up to their global checkpoint, only differing
\* in the safety marker (which can be false sometimes if the global checkpoint on one shard is lower
\* than on another one)
SameTranslogUpToGlobalCheckPoint ==
\A n1, n2 \in Nodes:
/\ n1 /= n2
/\ ActiveShard(n1)
/\ ActiveShard(n2)
=> ExceptPC(UpToSlot(tlog[n1], globalCheckPoint[n1])) =
ExceptPC(UpToSlot(tlog[n2], globalCheckPoint[n1]))
\* checks if the translog for all nodes is eventually the same
AllCopiesSameContentsOnQuietDown ==
(/\ NoActiveMessages
/\ ClusterStateAppliedOnAllNodesWithActiveShards)
=> AllCopiesSameContents
\* checks if all (acked) responses to client are successfully and correctly stored
AllAckedResponsesStored ==
\A r \in clientResponses : \A n \in Nodes :
/\ r.success = TRUE
/\ ActiveShard(n)
=> /\ r.seq \in DOMAIN tlog[n]
/\ tlog[n][r.seq].id = r.id
/\ tlog[n][r.seq].value = r.value
/\ tlog[n][r.seq].term = r.term
\* checks that the global checkpoint is the same as or below the local checkpoint on each node
GlobalCheckPointBelowLocalCheckPoints ==
\A n \in Nodes : globalCheckPoint[n] <= MaxConfirmedSeq(tlog[n])
\* local checkpoint always corresponds to MaxSeq and MaxConfirmedSeq on the primary node
LocalCheckPointMatchesMaxConfirmedSeq ==
\A n \in Nodes : clusterStateOnNode[n].routingTable[n] = Primary
=> /\ localCheckPoint[n][n] = MaxConfirmedSeq(tlog[n])
/\ MaxSeq(tlog[n]) = MaxConfirmedSeq(tlog[n])
\* routing table is well-formed (has at most one primary, and has no replicas if no primaries)
WellFormedRoutingTable(routingTable) ==
/\ Cardinality(Primaries(routingTable)) <= 1
/\ ( Cardinality(Primaries(routingTable)) = 0
=> Cardinality(Assigned (routingTable)) = 0)
StateConstraint ==
/\ nextClientValue <= 3
/\ Cardinality(messages) <= 5
/\ 0 < Cardinality(Assigned (clusterStateOnMaster.routingTable))
=============================================================================