Distributed Algorithms

From MyInfoRepo
Jump to navigation Jump to search

Distributed Algorithms (DA) are the basis of distributed systems. Many systems that may seem trivial require such algorithms. This summary lists some algorithms that are used as a basis for complex networks or have been used and expanded on in the past decades.




Processes have internal events, where steps are executed on the same machine / process. When communication with another machine or process, a message step must occur. This communication step is dependent on time and the system needs to keep track of the ordering, to make sure the steps each machine takes in response to a message is correct.

Happened-Before Relation

  1. If a,b happen in an event space and a occurred in a process i before b, then a -> b; (local order)
  2. if a is the event of sending a message m in process i and b is receiving the message in process j with i != j, then a->b; (message exchange)
  3. given a->b and b->c, then a -> c. (transitivity)

The HB-relationship is also called the causality relation. a->b means a causally affects b. If a->b, there is a path (of causal relations) from a to b.

Some relationships between events are unknown. If a,b have no (known) relationship a->b or b->a, we call their relationship concurrent, written as a||b. A very simple event where this may happen is when two processes, i and j, communicate and we have 4 events. a,b on process i and c,d on process j, with a->b in i and c->d on process j and finally a message for which b->d between the processes. We now have no information about the relation a and c. Therefore we say a||c or the event a and c are concurrent.

Logical clocks

Logical clocks make heavy use of the happened before relation. Such a clock can be defined as a function C : E -> S, with S being a partially ordered set with partial order <, consistent with the HB-relation and E being the events that occur in all processes. That means C(a) < C(b) for every two events a,b in E with a->b. This is the weak clock condition. We also have a,b in E: C(a) < C(b) iff a->b. Also, the strong clock condition.

A function C:E->S is a logical clock if two conditions are satisfied. The first are the local events: on each process i it holds that every event with a HB-relation a->b has C(a) < C(b) limited to process i. Second, the messages for inter-process events. Any event a that sends a message m and event b receiving that message in a different process, has C(a) < C(b).

The set S may be portrayed by the set of natural numbers , however, since there may be concurrent events and the natural numbers have a total order, this does not characterize the HB-relation. To get around this restriction, vector clocks were introduced, where S = .

Vector clocks

One dimensional vector clocks (scalar vector clocks) can be constructed by having processes maintain a counter, initialised at 0. Any event that is not a message reception event will increment the vector clock of the process. The vector clocks get updated by the process that sends with +1, or on receipt of a message, taking over the value of the scalar clock of the received message. This will ensure a global time synchronization. Some constraints may be placed onto allowing the process to read reception events. It may first go in a buffer. When it is read from the buffer, we say we are delivering the message.

Deadlock and livelock

Deadlocks are an ubiquitous problem in software development. This is the case where a process A is waiting for a resource X, held by process B, which is waiting for resource Y (which is in use, not necessarily by A).
A livelock on the other hand, is when A is waiting for X and holding Y. When B is holding X and waiting for Y, we can see neither process can advance. When they try to reallocate resources to advance, they both release their resources and try to reacquire. A acquires X, B acquires Y. Now the roles are reversed. B is waiting for X and holding Y, A is waiting for Y and holding X. The infinite continuation of this process is the livelock.


Liveness in DA is the progress made by processes. Due to them having to communicate and wait for replies before advancing, we have to prove that the algorithms make progress.

Deadlock detection


Mutual exclusion (mutex)

There are token-based and assertion-based mutual-exclusion algorithms. Token-based algorithms have a single distinguished message, the token. If a process possesses the token, it may execute its critical section. Here, the main issues are deadlock and starvation. In assertion-based algorithms, access is requested from other nodes when required. Safety and liveness need to be proved.

Lamport's mutex algorithm

All edges are FIFO, with messages having a timestamp with a clock and the id of the sender. When requesting a resource, it is added to all nodes' queue Q, ordered by timestamp. The receiving nodes reply with a REPLY message. If the sender received a reply from all processes, and itself is ready to work on the request, it will enter the critical section (CS). After, it will send a RELEASE message, to let all nodes know the request may be removed from Q.

Ricart-Agrawala mutex algorithm

There is an nonoptimality in the Lamport mutex algorithm, which occurs when a process j already has its own request in their process queue, and meanwhile, it receives a request from process i, it can combine the REPLY and RELEASE method. This is the case because process i may never enter their CS while j has yet to respond. This reduces message complexity by a single message for each request.

Maekawa's mutex algorithm

The algorithms of Lamport and Ricart-Agrawala have no restrictions on the multicast for acquiring their CS lock. They send a broadcast to every node in the system, which can mean a lot of overhead. Maekawa solves this with sending less messages, in particular only to smaller, overlapping sets R.

Each process has a request set R, and the intersection of any two request sets is non-empty. A timestamped REQUEST message is sent to R when acquiring a lock. It waits for GRANTs for all processes in R and sends a RELEASE when it is finished. Deadlock is avoided by replying only to a single process with a GRANT, pending a RELEASE. It does queue the request in Q and sends a POSTPONE message back in case the new request has a later timestamp. If the timestamp is older, it sends an INQUIRE to the node it sent GRANT to, which may get postponed in that case. The initial requesting node RELINGQUIShes their attempt at the lock. The process receiving the RELINGQUISH, will GRANT the older (oldest) timestamp's node.

Generalized mutex algorithm


Token-based algorithm: Suzuki-Kasami broadcast-based mutex algorithm

A token is passed along the processes, which may only enter their CS when they have the token. Each process may set their state to requesting (R), which indicates they want to receive the token. They have to send these requests to every node, to make them aware of their request.

They also keep an array N, which holds the latest known state of handled requests. An array with the satisfied request is also sent along with the token, we call this array TN. When comparing TN with N, a process can see that when TN < N at any index, the process at such an index has an outstanding request. The token will then be sent to such a process.

N is updated any time a process receives a request from another process, or from itself. The search on the token (TN comparison to N) is started at the process in N the token is at right now, to avoid local starvation (avoiding bias towards small ids). It will keep the token if TN = N (all processes are up to date), until a request arrives.

Token-based algorithm: Singhal

This is an optimization of the Suzuki-Kasami algorithm. When requesting the token, it tries to limit the complexity by only sending the request to processes it believes may have the token. Since the token is passed upwards in the processes, initially, the requests are made downward. For i processes, process 0 will receive the token, now process 1 only has to request from processes lower than itself, which means process 0. This goes on in an inductive fashion, up to i (with i = n-1), requesting to processes up to i-1. This will halve the number of requests. This follows from process 1 making 1 requests, process 2 making 2, up to process i, making i requests. This means 1 + 2 + 3 + ... + i-2 + i-1 = i/2 by summation over these (there are i/2 pairs that add up to i requests, e.g. i-1 + 1, i-2+2 .... i-k + k). After the initial round, the requests are only sent to processes that are requesting, since these may have the token (initializing can therefore be done by setting the first i-1 values of S to R on process i).

When receiving a request, the algorithm performs a different step for processes, depending on its own state. When a process is Executing (E), or Other (O), it sets the incoming process to R. The algorithm also makes sure that, when a request is received from a process for which it thought no request was present, it sends a request and sets the state for that process to requesting. Finally, when the process that receives a request is holding the token, it updates S, sets itself to O, updates the token S and TS vector to R and the TN to the request to the request number of the requester (r for process j), before sending it to j.

Upon receipt of the token, an update in the states is done as well. It sets its own state to E, while running the CS, updating it to O when its done, along with TS. It will also update TN and N for each process state.

Raymond’s token-based mutual-exclusion algorithm in a tree

Relies on a pre-created spanning tree. The root of the tree holds the token, this changes over time. Every process keeps track of his parent as a node on the path to the root. Waves of messages propagate through the tree to notify the other nodes of the structure. Nodes request CS locks by sending a message to their parent. When a process receives a REQUEST, the id is appended to its local queue and the receiver REQUESTS to its parent. The root node completes its CS and sends the token to the next process in the queue. The node is removed from the queue, and it becomes the child of this new root. If the CS request is not at the top, the token is propagated to the head of the queue. This node then becomes the parent and is removed from the queue.

Detection of the loss and the regeneration of a token

For the event of token loss, when nodes go down, we need to generate a new token. This can be done by introducing a new token, that detects the loss of the other. This relationship is symmetrical, both tokens watch each other. [todo: explanation]


Often we want a leader in distributed systems. These should be dynamically assigned, due to failure of these nodes. We require algorithms that take care of this leader election. If nodes in a network have unique ids, the system is non-anonymous, otherwise it is anonymous. Either the highest, or lowest id is elected as leader. The anonymous systems work with randomized assignment of ids. These systems have some properties that affect the complexity and the existence of solutions:

  1. Topology: Election algorithms are studied extensively for uni- and bi-directional rings.
  2. Synchronicity
  3. Anonymity: an anonymous system only has non-deterministic solutions. After assigning random ids, the non-anonymous election algorithm is executed.
  4. Known network size: unknown sizes are called uniform networks.
  5. Whether the algo is comparison based

There exists a pretty trivial solution, in a non-anonymous network. Sending from all nodes, the id to every other node, comparing their id to the others'. One will be elected, as it will be larger than n-1 nodes. This has a message complexity and a O(1) time complexity.

Bi-directional rings: Hirschberg-Sinclair election for bi-directional rings

By looking in phases, counted by l (l=1 initialization), at and determining if a node is the largest in this neighborhood. That is, an area around the node we are looking from, until nodes have been probed. It sends along it's id, and a hop count . On receipt of such a message, nodes will: discard if their id is larger. Else: check hop count, if so, decrement the hop count send the message along (hop will reach 0 on the limit of the reach). If not positive, send an OK message back, with the id received. The node will check for OK messages, so 2 in the first round. If it does, it continues with the next phase. When it doesn't, it will not start the next phase.

The messages are labeled with either in or out, to distinguish them and choosing an action. It starts by sending out messages to left and right.

Termination is reached when either n messages get received, in known size, or when you receive your own messages. Message complexity is of O(nlogn).

Election in a synchronous unidirectional ring (non-comparison-based)

Sending in a unidirectional ring is one-way, we assume we send to the right-hand neighbor. We show a lower complexity is possible with non-comparison based algorithms in unidirectional rings. Here, the minimum id is elected. Now, if an id is 1 (lowest possible integer) is trivially chosen, and when a node has this id, the id is sent along the ring. All nodes relay this id and when a node does not receive 1 in n rounds (network size n), it knows min(id) > 1. Any id k, will be known to be elected in k(n-1) rounds, if it receives no lower ids. Then it will relay this through the network. (not sure why this is true, but has O(n) time and message complexity)

An election algorithm in a bidirectional ring

This algorithm is more dynamic in sending out its messages. Still, in round 1, it sends ids to its two neighbors. If it is larger, it remains active, if not, it will become passive. Active processes will follow this procedure, while passive processes only relay messages.

The figure shows the original right, the active ids in the ring after one round, and after two.


Chang’s and Roberts’s election algorithm in a unidirectional ring

A very simple algorithm, where an id sends its id and when it received its own id, it was elected. Each receipt updates the sent id with max(r_id, id), and only sends once per largest id it receives, it will not send a message with id lower than already was sent before.

Peterson’s election algorithm in a unidirectional ring

A more efficient solution. The same thing happens as before, but processes can deactivate and become relay processes. Both still check if r_id = id and if so, they are elected. Each round, the active nodes are cut in half, achieving O(n) complexity. Since Peterson sends in a unidirectional way, it can not compare two neighbors of a node, to get around this, each node is compared one node ahead, so three nodes can be compared, and a bidirectional ring is simulated.

Afek’s and Gafni’s algorithm for election in a synchronous complete network

For complete networks. Similar to Hirschberg-Sinclair, sends messages to an increasing segment. Ids are compared, OK messages returned, if not larger, no OK message is returned. Now, when |OK| < |segment|, the node is no longer an election candidate. The size of the segment is 1, then doubles each round until size n ( for each round k). Nodes only send to each neighbor once, and is elected after n OKs received.

Nodes can have two processes, candidate and ordinary. Only candidates may be elected. If ordinary_id < candidate_id, the candidate captures the ordinary id and becomes its owner.

Afek’s and Gafni’s algorithm for election in an asynchronous complete network

In the asynchronous version of this algorithm, the level indicates the number of captures of a process. After a capture of a process that was already captured, the previous owner needs to be killed, as it will not become the leader. The algorithm traverses the links that are untraversed, until this set is empty. Updating the owner level by capturing if the id is equal or higher to the current owner id and killing the owner if the newly received is higher.

Traversal algorithms: Tarry’s, Cheung’s, Awerbuch’s and Cidon’s algorithms

The following algorithms use spanning trees to traverse the network. These networks work by sending a TOKEN through the network, building a tree structure along the way.


All nodes maintain a set of nodes, excluding its parent, which it has not sent the TOKEN to. As long as this set is not empty upon receipt of the TOKEN by a node, it will send it along one of the edges. If all edges/nodes have been traversed, it sends the TOKEN back to its parent.

Cheung (DFS)

The same as Tarry, but sends the token back if it has received it before, creating lengthy, depth-first chains of the nodes (until they reach a terminal node, where Tarry would have branched).


The same idea as Cheung, but with less TOKEN messages. On receipt, a node will send VISITED to all its neighbors, except for the parent and the next recipient. It awaits an ACK from all nodes. These nodes will not receive the TOKEN in the future. Even those VISITED message may be withheld if already received on the target node, for slight further optimization.


Again, similar to Awerbuch. This algorithm does not wait for ACKs after sending VISITED. It immediately forwards the token to an unvisited node, then sends the VISITED message. Nodes that have the token and receive a VISITED message after, will forward the token and disregard the receipt of TOKEN.

Shortest-Path Spanning Trees

To understand the MST algorithm below, we need to grasp some concepts. First, a lemma about MSTs from weighted connected undirected graphs, where all weights are different.

Lemma: such a graph has a unique MST.

Proof: Suppose there exists such a graph G width different edge weights that has two different MSTs T and T'. Let be the edge of minimum weight that occurs in one, but not the other. Without loss of generality, we can assume that e occurs in T, but not in T' (arbitrary choice), we can say this, because all weights are different, and trivially, you would choose such an edge for your MST when constructing it and it is minimal, as said by T, since it is a valid MST for G. Now, since T' does not contain this edge, if we add the edge, there will be a cycle, since the graph is already a MST, and with the constraint of all different weights, we know e is smaller than all other edges. We will have to remove the maximal edge, which causes the cycle, we can say this is edge e'. Now we have a new T, with weight(T) < weight(T'). This is a contradiction.

Next, a minimum-weight outgoing edge (MOE) and fragments, will be described. A fragment is a subtree of the MST. An outgoing edge of a fragment is an edge with one node inside and one outside the fragment. The MOE of a fragment is the outgoing edge with minimum weight. Any MOE from a fragment, goes into another fragment, this is always true, since otherwise there is a cycle and some other edge can be removed from the MST.

Minimum-weight spanning trees: Gallager-Humblet-Spira

Fragments are connected along their MOE until one fragment is created. This will then be the MST.

Fragments are merged if there are fragments F of level l and F' of level l' where l=l', and the MOE coincide. A level l+1 fragment is created by merging F and F' along their MOE.

If l<l', F is absorbed by F' along the MOE of F, remaining at level l'.

If l>l', or l=l', but MOEs of F and F' do not coincide, connecting is postponed.

Fault Tolerance

Two types of faults are permanent and transient, as their name suggest, one of these never recovers from the fault (permanent malfunction), while the other does (temporary power loss / transmission errors). Both these may cause the system to terminate, called a stop fail. However, if the system is equipped to handle (up to this amount of) failures, the system keeps on going. Redundancy protects against faults.


We want to reach consensus in a system. All processes have to have to fulfill the three conditions:

  1. Agreement, any two processes decide on the same value;
  2. Validity, a starting value, always lead to the same value;
  3. Termination, non-faulty nodes, terminate in finite time.

An algorithm for agreement in a synchronous system with at most f crash failures

This algorithm satisfies the three conditions. is the maximum amount of failures this algorithm can withstand.

Each process starts with a value, stored in a set W. Now, we progress through some rounds . Each broadcasts W. Now, all processes receive W' from all other processes. The union of the existing W and W' is assigned as new value for W, that is: . Do this for r rounds. After those rounds, in case of , the value of this sole element of W is decided on. In case of , the process decides on the default value (e.g. 0). This works, since with f+1 rounds and f failures, we still have one valid round.

Byzantine agreements

To understand Byzantine agreements, a little context is needed. Familiarization through a Google search should suffice. These three conditions must hold in the agreements described below ("special" case of the general agreement problem):

  1. Agreement: all lieutenants follow the same order;
  2. Validity: when the commander is loyal, all lieutenants follow the commander's order;
  3. Termination: all loyal lieutenants decide in finite time.

The number of faulty nodes f, has to satisfy f < n/3.

Both the byzantine agreements here assume a fully connected network.

Byzantine agreement with oral messages: Lamport-Pease-Shostak

The base case is the number of faults f=0, clearly a commander and lieutenants will decide on the same value. With a f>0, the commander sends his value to lieutenants, who all execute the algorithm with f-1 recursively, where they assume the commander role. Each lieutenant now decides with the majority of all commanders' values. On failure of a message, the default is sent.

Byzantine agreement with authentication: Lamport-Pease-Shostak

The original sender signs the messages, as do the receivers. It is impossible to forge signatures and modifications are detected. The message are signed and sent through the system while being signed. Each time the amount of signatures is increased. Majority will be achieved.

State machine replication (SMR)

Paxos protocol

In a system with no malicious users, with faults (message timeouts/loss, out of order) consensus is reached by Paxos.

A majority of nodes must agree to a decision before it is made.

2 stages: promise stage, commit stage. Useful for getting locks or performing a single action.

When some nodes go down, some information is not shared. This is no problem as the majority will reject new requests or proposals (PREPARE). After running into such a conflict, a broadcast is sent to update the promise information across the network. Nodes with higher ids will get priority over lower nodes.

If a decision is committed, it must be that there was a majority, since majorities (3/5 with 5 nodes) will overlap.

Proposers get round numbers pre-assigned to them, to make sure that each proposer gets a chance to propose and there is no conflict between which proposer may propose in any round. Rounds may be executed simultaneously or out of order.


There are three roles: proposers, acceptors and learners. Nodes may take any of the roles, even multiple at a time. Proposers want to acquire a majority of acceptors and send out their proposed value. Acceptors choose values that have been proposed. Learners are processes that learn about the results.

The algorithm

Nodes must be aware of how many acceptors are a majority.

There are two phases, in the first phase proposers send messages to acceptors, which will decide if it will make a PROMISE on that message. This is done for each round, round numbers are incremented each round. When the proposer gets a majority of PROMISEs back (end of phase 1), it may send out a message with a value out to all acceptors. Now we are in phase 2, where a majority to accept the value from the proposer is sought. All the acceptors that did not choose to ignore this proposer node will reply with an ACCEPT message to this vote and also send it to all learners, of course, taking into account the latest round number. Proposers and learners that get a majority of ACCEPTs know consensus was reached. Any acceptor will only vote for a single value in each round.

Consistency / proof

To make sure the algorithm does not decide two different outputs on a proposed value v, it needs to hold that this value is not decided before or still being decided on.

Proof: Suppose that pval = v and v was voted for in round k<i. Consider any round number j with j<i.

Case 1: j>k
Case 2: j=k
Case 3: j<k

Case 1, voting in round j by the majority is impossible, due to the fact that the accepters promise not to vote in any round < i. So a majority Q' in round j is impossible and no value can be chosen that round.
Case 2, this is trivial, there is only one vote per round. Case 3, because of the majority in k, it is not possible for a majority Q' in j. If there were, Q in k would not have been a majority.


livelock in Paxos

Since there is the possibility of livelock in the Paxos consensus protocol, we have to find a solution. By running an election algorithm to elect a leader which does proposals, this problem is solved.

Multi-value Paxos

[may fill this in some time]

Practical Byzantine fault tolerance