Note: For the TL;DR skip to Proposed Protocol.
Introduction
This post proposes a way to model different chat communication scenarios as the consolidation of distributed logs.
It does so within the context of designing an end-to-end reliability protocol for Status chat protocols, specifically within the context of Status Communities.
It states the problem of end-to-end reliability for chat protocols within this abstraction of distributed logs.
We then propose some features that a reliability protocol should have in order to be scalable, mention alternatives and end with an example protocol proposal.
Much of this post takes inspiration from principles set in the Scuttlebutt protocol and the probabilistic ack of message references in a DAG proposed by @kaichao . It should serve as a starting point for conversation, not a fully fleshed-out design.
Model abstraction
Consider three typical chat scenarios:
1. Direct communication (1:1/private groups)
- Between two parties.
- Only the involved parties are privy to the conversation.
- Both parties are interested in all messages.
- Typical for 1:1 chat scenarios, but can be extended to form private group chats with multiple interconnected direct sessions.
2. Group chat (simple Community)
- Between more than two parties.
- All parties can see all messages using broadcast dissemination and a shared encryption key.
- All parties are interested in all messages.
3. Group chat with channels (Community with channels)
- Similar to group chat but with messages grouped into separate channels.
- Parties may be interested in messages from specific channels only.
One way of modelling all three communication scenarios is as the consolidation of distributed append-only logs into a single log with causal ordering. The problem of end-to-end reliability in the system can then be stated as ensuring that all participants eventually see the same sequence of messages in the same causal order, despite the challenges of network latency, message loss, and scalability present in any communications transport layer, such as Waku.
Note on encryption: The above scenarios match the encryption schemes for Status chat: 1:1 chats and private group chats build on encrypted direct communications; Communities map to shared-key “group chats” with some direct communications for control messaging (e.g. RequestToJoin). Since end-to-end reliability assumes knowledge of participants in the communication, the proposed protocol must function within this encryption layer.
Goals
An end-to-end reliability protocol for a distributed logging system would ideally:
- ensure partial causal ordering and eventual consistency for all three common scenarios: group chats with channels, group chat without channels and the “special case” of direct communication with only two participants.
- scale to very large groups (goal for Status Communities is up to 10K participants per Community)
- accommodate “ephemeral” messages for which no reliability is required.
- not rely on centralised services for coordination. It may define helper services (such as high availability caches or mediator nodes) to improve efficiency of the protocol, but should continue to work in the absence of such hubs.
- be transport-agnostic - it should work whether Waku or any other transport protocol is used for routing. This helps separate concerns to future proof the design against changes in the transport.
Assumptions
We make the following simplifying assumptions for a proposed reliability protocol:
- Broadcast routing: messages are broadcast disseminated by the underlying transport. The selected transport takes care of routing messages to all participants of the communication.
- Store nodes: there are high-availability caches (a.k.a. Store nodes) from which missed messages can be retrieved. These caches maintain the full history of all messages that have been broadcast.
- Group awareness: each participant can identify all other participants in the communication (and, where applicable, specific channel) at any time. The size of the group is known.
- Message ID: each message has a unique, immutable ID (or hash). Messages can be requested from the high-availability caches or other participants using the corresponding message ID.
Note on dependency on Store nodes: Assuming that trusted, highly available Store nodes exist, simplifies the required interactions for participants to retrieve missed messages. However, any proposed design should in theory also work with a decentralised Store service or direct IHAVE-IWANT type interactions between participants.
Note on Message ID: Within Status-Waku context, Message IDs can either be application-level IDs or Waku message hashes. The only requirement is that these IDs must be unique and queryable (directly or indirectly) from Store nodes or other participants.
Design proposals
We propose that an end-to-end reliability protocol for scalable distributed logs should have the following features:
- each participant maintains a Lamport clock instance for each channel of communication (or for the entire communication in the case of no channels or direct communication)
- each participant includes a short causal history for its view of the channel (local log) when publishing a message (that is the preceding
n
message IDs as causal dependency).n
should likely be in the order of two to three message IDs. - each participant maintains a bloom filter over the last
m
message IDs for each channel. Since the bloom filter will be an indication of message ACK to senders (see below), it should ideally cover (at least?) the last 10 min to 1 hour of message IDs within the channel. The bloom filter for the associated channel is included with each published message. The bloom filter (size, number of hashes, number of insertions) should be designed to minimise the probability for false positives. - lazy pull: the combination of a Lamport timestamps and causal history in each published message allows recipients to maintain message order, detect missing dependencies and eventually request these messages from the Store nodes (or other participants). For this mechanism to scale, the scenario of multiple recipients simultaneously “pulling” the same message should be avoided as much as possible. Instead, message dependencies missing for many/all participants should be eagerly “pushed”/broadcast to all participants. See below.
- eager push: using a combination of received message causal histories and bloom filters, senders can build a probabilistic model of which of their published messages has indeed been acknowledged as “received” by the rest of the communication participants and resend messages that failed to be published to the majority/all other participants
- local conflict resolution: conflicts in causal ordering should preferably be resolved locally in a consistent way (without coordinating with other participants) to enhance scalability
Alternatives considered
There are many existing (and proven) alternative approaches to achieve e2e reliability for distributed logs. I’ll mention two:
1. Vector clocks
Vector clocks expand on the Lamport clock idea for causality by maintaining a vector of counters, one for each participant. Each participant updates its own counter and shares its view of the vector clock (i.e. the counter for all participants, including itself) with others to determine the causal order of events.
Limitations:
Vector clocks require maintaining and transmitting a vector of size proportional to the number of participants, which may not scale for very large groups. There are ways to compress vector clocks, but managing and merging vector clocks becomes complex as the number of participants and number of channels grow. It’s also possible to use a partial vector clock (with a size still proportional to group size), but this again requires significant processing and more detailed probabilistic modelling to ensure all participants receive sufficient information to: (a) order messages and detect all missing messages, and (b) determine the ACK status of all published messages. With more research this could be another promising avenue, though.
2. Conflict-Free Replicated Data Types (CRDTs)
CRDTs are data structures that ensure eventual consistency across distributed systems without requiring coordination. They can be based on structures like Merkle trees to track changes and merge them deterministically.
Limitations:
Although CRDTs can handle updates efficiently in smaller groups, the overhead of maintaining and synchronizing large Merkle trees or similar structures can be prohibitive in very large groups with multiple channels.
Proposed protocol
Message
Messages adhere to the following meta structure:
syntax = "proto3";
message Message {
string sender_id = 1; // Unique identifier of the sender
string message_id = 2; // Unique identifier of the message
optional int32 lamport_timestamp = 3; // Logical timestamp for causal ordering in channel
optional repeated string causal_history = 4; // List of 2 or 3 preceding message IDs that this message causally depends on
string channel_id = 5; // Identifier of the channel to which the message belongs
optional bytes bloom_filter = 6; // Bloom filter representing received message IDs in channel
string content = 7; // Actual content of the message
}
Participant state
Each participant maintains:
- A Lamport timestamp for each channel of communication, initialized to 0.
- A bloom filter for received message IDs per channel.
- A buffer for unacknowledged outgoing messages
- A buffer for incoming messages with unmet causal dependencies.
Messages in the unacknowledged outgoing buffer can be in one of three states:
- Unacknowledged - there has been no acknowledgement of message receipt by any participant in the channel
- Possibly acknowledged - there has been ambiguous indication that the message has been possibly received by at least one participant in the channel
- Acknowledged - there has been sufficient indication that the message has been received by at least some of the participants in the channel. This state will also remove the message from the outgoing buffer.
Protocol steps:
Within each channel of communication:
1. Send message
- Increase the local Lamport timestamp.
- Determine local causal history for the message (preceding
n
messages). - Broadcast the message with the timestamp, causal history, and bloom filter.
- Add the message to the buffer of unacknowledged sent messages.
2. Receive message
- Review ACK status of messages in the unacknowledged outgoing buffer using the causal history and bloom filter in the received message.
- Update the bloom filter for this channel with the received message ID
- Check dependencies in the causal history:
- If all dependencies are met, process the message.
- Otherwise, add the message to the buffer of incoming messages with unmet causal dependencies.
3. Process message
(triggered by 2)
- If received timestamp > local Lamport timestamp, update the local timestamp.
- Insert the message into the local log at its Lamport timestamp.
- Resolve conflicts if multiple messages have the same timestamp.
4. Resolve conflicts
(triggered by 3)
- Insert messages with the same timestamp in ascending order of message ID.
5. Review ACK status
(triggered by 2)
For each message in the unacknowledged outgoing buffer, based on any received bloom filter and causal history:
- Mark all messages in the received causal history as acknowledged
- Mark all messages with positive inclusion in bloom filter as possibly acknowledged. If a message has been possibly acknowledged in a series of
m
received bloom filters, it could be marked as acknowledged on probabilistic grounds. Note that the size of the bloom filter and number of hashes need to be taken into account here.
6. Periodic incoming buffer sweep
- Periodically check causal dependencies for each message in the incoming buffer
- Batch retrieve missing dependencies from the high availability cache (Store node) if necessary.
- Process messages with all dependencies met, or mark them as irretrievably missed after a set period.
7. Periodic outgoing buffer sweep
- Rebroadcast unacknowledged messages after a set period.
- Use different resend periods for unacknowledged vs. possibly acknowledged messages.
8. Periodic sync message
- Participants periodically send a message with empty content to maintain sync state. The Lamport timestamp should not be increased before sending such messages.
- In order to avoid bursts in large groups, periodic sync messages could only be sent if no other messages have been broadcast on the channel after a set interval + random backoff period.
- Participants follow the same Receive message procedure as for regular messages.
Ephemeral messages
Ephemeral messages are sent with lamport_timestamp
, causal_history
, and bloom_filter
left unset.
These messages are not added to the unacknowledged outgoing buffer and processed immediately on reception without buffering.