Library with broadcast primitives for building distributed systems.
This is an educational project to implement different building blocks for distributed systems with a focus on performance.
Implemented abstractions include:
- Perfect Links. Reliable transport over UDP with eventual delivery guarantee.
- Uniform Reliable Broadcast. Broadcast primitive that ensures that the message is either delivered by all processes or by none.
- FIFO Broadcast. Broadcast primitive that additionally ensures that the messages are delivered in the same order as broadcasted.
- Lattice Agreement. Weaker form of a consensus protocol where processes decide on sets of values.
- cmake 3.9 or higher
- gcc or clang (tested on gcc 11.4 and clang 15)
mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=<cmake-build-type> -DASAN=<ON/OFF> -DTSAN=<ON/OFF> -DUNIT_TESTS=<ON/OFF> ..
makeWhere the following should be filled:
CMAKE_BUILD_TYPE: build type of generators.ASAN: whether Address Sanitizer is enabled.TSAN: whether Thread Sanitizer is enabled.UNIT_TESTS: whether the build of unit tests is enabled.
The library can be used independently or within a test application to showcase functionality. Use the run script to run the example application on the node:
cd tools
./run.sh --id <node-id> --hosts <host-file> --output <output-results-file> <config> <mode> Where the following should be filled:
node-id: ID of the current node.host-file: file with host configuration.output-results-file: path to the output file.config: path to the config file.mode: example application mode (PERFECT_LINKS/FIFO/LATTICE).
Provided examples configs in tools/ can be used to run the examples:
- Perfect Links
# Run in terminal 1
/run.sh --id 1 --hosts perfect_links_example_config/hosts --output perfect_links_example_config/output/1.txt perfect_links_example_config/config PERFECT_LINKS
# Run in terminal 2
/run.sh --id 2 --hosts perfect_links_example_config/hosts --output perfect_links_example_config/output/1.txt perfect_links_example_config/config PERFECT_LINKS
# Note: the second application won't stop by itself- FIFO
# Run in terminal 1
./run.sh --id 1 --hosts fifo_example_config/hosts --output fifo_example_config/output/1.txt fifo_example_config/config FIFO
# Run in terminal 2
./run.sh --id 2 --hosts fifo_example_config/hosts --output fifo_example_config/output/2.txt fifo_example_config/config FIFO
# Run in terminal 3
./run.sh --id 3 --hosts fifo_example_config/hosts --output fifo_example_config/output/3.txt fifo_example_config/config FIFO
# Note: all applications won't stop by themselves - Lattice Agreement
# Run in terminal 1
./run.sh --id 1 --hosts lattice_agreement_example_config/hosts --output lattice_agreement_example_config/output/1.txt lattice_agreement_example_config/config_1 LATTICE
# Run in terminal 2
./run.sh --id 2 --hosts lattice_agreement_example_config/hosts --output lattice_agreement_example_config/output/2.txt lattice_agreement_example_config/config_2 LATTICE
# Run in terminal 3
./run.sh --id 3 --hosts lattice_agreement_example_config/hosts --output lattice_agreement_example_config/output/1.txt lattice_agreement_example_config/config_3 LATTICE
# Note: all applications won't stop by themselves The host file describes the configuration of the system. Each node has a user-defined ID in the format of 1 byte number, IP address and port.
node-id-1 ip-address-1 ip-port-1
node-id-2 ip-address-2 ip-port-2
...
Configuration file format depends on the application mode:
PERFECT_LINKS: The config file contains two integersm iin its first line. The integers are separated by a single space character.mdefines how many messages each sender process should send.iis the index of the receiver process. The receiver process only receives messages while the sender processes only send messages. Alln-1sender processes sendmmessages each. Sender processes send messages1tomin order.FIFO: The config file contains an integermin its first line.mdefines how many messages each process should broadcast. Processes broadcast messages1tomin order.LATTICE: The config file consists of multiple lines. The first line contains three integers,p vs ds(separated by single spaces).pdenotes the number of proposals for each process,vsdenotes the maximum number of elements in a proposal, anddsdenotes the maximum number of distinct elements across all proposals of all processes. The subsequentplines contain proposals. Each proposal is a set of positive integers, written as a list of integers separated by single spaces. Every line can have up tovsintegers.
The output file contains data which depend on the application:
PERFECT_LINKS: The output file contains a log of send/receive events. Each event is represented by one line of the output file, terminated by a Unix-style line break\n. There are two types of events to be logged:- sending of an application message, using the format
b seq_nr, whereseq_nris the sequence number of the message. These messages are numbered sequentially at each process, starting from1. - delivery of an application message, using the format
d sender seq_nr, where thesenderis the id of the process that sent the message andseq_nris the sequence number of the message (as numbered by the sending process).
- sending of an application message, using the format
FIFO: The output file contains a log of broadcast/deliver events. Each event is represented by one line of the output file, terminated by a Unix-style line break\n. There are two types of events to be logged:- broadcast of an application message, using the format
b seq_nr, whereseq_nris the sequence number of the message. These messages are numbered sequentially at each process, starting from 1. - delivery of an application message, using the format
d sender seq_nr, where thesenderis the number of the process that broadcasts the message andseq_nris the sequence number of the message (as numbered by the broadcasting process).
- broadcast of an application message, using the format
LATTICE: The text file contains a log of decisions. Each decision is represented by one line of the output file, by a Unix-style line break\n. Given that proposals are a set of integers, so are decisions. Thus, a decision should contain a list of integers separated by single spaces (and terminated by a Unix-style line break\n). The order of the lines in the output file must be the same as the order of the lines (proposals) in the config file.
Run all tests with all possible build combinations with gcc and clang:
cd build
./tests.shThere are 3 example applications:
- Perfect Links. In this application, a set of processes exchange messages using perfect links. In particular, a single process only receives messages while the rest of the processes only send messages. The communication of every sender with the receiver is realized using the perfect links abstraction.
- FIFO Broadcast. Informally, every process is both broadcasting and delivering messages from every other process (including itself) with ordering guarantees. FIFO broadcast is implemented on top of Uniform Reliable Broadcast (URB).
- Lattice Agreement. Lattice Agreement is strictly weaker than consensus, as it can be solved in the asynchronous model. In a nutshell, processes propose sets of values and also decide set on a sequence of proposals. In other words, in multi-shot lattice agreement, processes run single-shot lattice agreement on a series of slots. The main property states that (1) the decided set must include the proposal set, and (2) the decided set includes the proposals of other processes (i.e., the decided set cannot include values which were not proposed).
- User-space context switch with both ASAN and TSAN support.
- Stackful co-routines (fibers) with synchronization primitives (OneShotEvent, Mutex, WaitGroup) (currently, they are not included in the applications).
- Futures with functional combinators.
- IO event loop over epoll with multithreading support.
- Event-based message channel with binary serialization over UDP.
- Established-over-unconnected socket technique for UDP.
- Reliable message channel with exponential backoff and rate limiters.
- Scalable timer service using the timing wheel schema.
- Instrumentation (logger, statistics).
- Best effort broadcast.
- FIFO broadcast.
- Uniform reliable broadcast.
- Lattice agreement.
The message exchange component is the most performance-critical. It's implemented as a reliable transport (with retries) over UDP.
The architecture of the message exchange service is in the following picture:
The whole design consists of 5 layers:
ReliableTransportallows us to send a message with eventual delivery guarantees and wait for the delivery on theFuture(or combine it with functional combinators).ReliableChannelencapsulates retry logic, delivery contract logic and handling different internal types of messages (acknowledgement type, payload type).FairLossChannelprovides a middle layer between high-level logic and the operating system level (UDP socket, epoll). It controls the rate of sending UDP packets and registers async read/write events in the event loop.EventLoophandles events execution via epoll.
The whole sending message flow consists of the following steps:
- The user sends a
messageto thedestination_idof the peer viaReliableTransportand gets aFutureof the contract for the delivery event. ReliableTransportroutes themessageto the correspondingReliableChannelwithdestination_id.ReliableChannelstarts a retry timer in theTimerServiceand registers a new contract in thePromiseStorage. It wraps the message into a packet and sends it viaFairLossChannel. Then it returns the correspondingFutureto the caller.FairLossChannelpushes the packet intoBatched Lock-Free Queuethat limits the speed of sending packets and batches requests together.FairLossChannelalso registers theWriteEventandReadEventof the corresponding socket in theEventLoop, so that it will send packets in a non-blocking way.- Every connected peer has a connected UDP socket over the unconnected one. This technique allows us to save on address lookup for every request after the first one.
- When the peer's socket is ready, some amount of requests are taken from the
Batched Queueand sent to the UDP socket until it is blocked. Unsent requests are put into the queue again.
Receiving message flow has the following steps:
FairLossChannelregistersReadEventon the peer's socket.- When the socket is ready, the packet is fetched from it and the
FairLossChanneldelivery callback is called. - In the callback, the packet's header is deserialized and the
PacketMatcherroutes the packet to the corresponding callback. - If the packet contains the user's data, the acknowledgement message is sent using
FairLossChannel(note that this message won't be retried in the case of loss) and theReliablaChanneldelivery callback is called. - If the packet is an acknowledgement on another message, the contract in the
PromiseStorageis fulfilled and the corresponding retry in theTimerServiceis cancelled. - On receiving the message,
ReliableChannelroutes it back with the sender id. Since we are using the connected UDP socket technique, the sender will be known by the kernel demultiplexing to the right socket. - On receiving the message,
ReliableTransportputs the message callback execution into thePacketBatcher. It is an asynchronous mutex that serializes callbacks and executes them in batches on some worker in theThreadPool. It allows us not to use any synchronization in the callbacks and move computation to data but not data to computation.