End-to-end reliability for scalable distributed logs

Note: For the TL;DR skip to Proposed Protocol.

Introduction

This post proposes a way to model different chat communication scenarios as the consolidation of distributed logs.
It does so within the context of designing an end-to-end reliability protocol for Status chat protocols, specifically within the context of Status Communities.
It states the problem of end-to-end reliability for chat protocols within this abstraction of distributed logs.
We then propose some features that a reliability protocol should have in order to be scalable, mention alternatives and end with an example protocol proposal.

Much of this post takes inspiration from principles set in the Scuttlebutt protocol and the probabilistic ack of message references in a DAG proposed by @kaichao . It should serve as a starting point for conversation, not a fully fleshed-out design.

Model abstraction

Consider three typical chat scenarios:

1. Direct communication (1:1/private groups)

  • Between two parties.
  • Only the involved parties are privy to the conversation.
  • Both parties are interested in all messages.
  • Typical for 1:1 chat scenarios, but can be extended to form private group chats with multiple interconnected direct sessions.

2. Group chat (simple Community)

  • Between more than two parties.
  • All parties can see all messages using broadcast dissemination and a shared encryption key.
  • All parties are interested in all messages.

3. Group chat with channels (Community with channels)

  • Similar to group chat but with messages grouped into separate channels.
  • Parties may be interested in messages from specific channels only.

One way of modelling all three communication scenarios is as the consolidation of distributed append-only logs into a single log with causal ordering. The problem of end-to-end reliability in the system can then be stated as ensuring that all participants eventually see the same sequence of messages in the same causal order, despite the challenges of network latency, message loss, and scalability present in any communications transport layer, such as Waku.

Note on encryption: The above scenarios match the encryption schemes for Status chat: 1:1 chats and private group chats build on encrypted direct communications; Communities map to shared-key “group chats” with some direct communications for control messaging (e.g. RequestToJoin). Since end-to-end reliability assumes knowledge of participants in the communication, the proposed protocol must function within this encryption layer.

Goals

An end-to-end reliability protocol for a distributed logging system would ideally:

  • ensure partial causal ordering and eventual consistency for all three common scenarios: group chats with channels, group chat without channels and the “special case” of direct communication with only two participants.
  • scale to very large groups (goal for Status Communities is up to 10K participants per Community)
  • accommodate “ephemeral” messages for which no reliability is required.
  • not rely on centralised services for coordination. It may define helper services (such as high availability caches or mediator nodes) to improve efficiency of the protocol, but should continue to work in the absence of such hubs.
  • be transport-agnostic - it should work whether Waku or any other transport protocol is used for routing. This helps separate concerns to future proof the design against changes in the transport.

Assumptions

We make the following simplifying assumptions for a proposed reliability protocol:

  • Broadcast routing: messages are broadcast disseminated by the underlying transport. The selected transport takes care of routing messages to all participants of the communication.
  • Store nodes: there are high-availability caches (a.k.a. Store nodes) from which missed messages can be retrieved. These caches maintain the full history of all messages that have been broadcast.
  • Group awareness: each participant can identify all other participants in the communication (and, where applicable, specific channel) at any time. The size of the group is known.
  • Message ID: each message has a unique, immutable ID (or hash). Messages can be requested from the high-availability caches or other participants using the corresponding message ID.

Note on dependency on Store nodes: Assuming that trusted, highly available Store nodes exist, simplifies the required interactions for participants to retrieve missed messages. However, any proposed design should in theory also work with a decentralised Store service or direct IHAVE-IWANT type interactions between participants.

Note on Message ID: Within Status-Waku context, Message IDs can either be application-level IDs or Waku message hashes. The only requirement is that these IDs must be unique and queryable (directly or indirectly) from Store nodes or other participants.

Design proposals

We propose that an end-to-end reliability protocol for scalable distributed logs should have the following features:

  • each participant maintains a Lamport clock instance for each channel of communication (or for the entire communication in the case of no channels or direct communication)
  • each participant includes a short causal history for its view of the channel (local log) when publishing a message (that is the preceding n message IDs as causal dependency). n should likely be in the order of two to three message IDs.
  • each participant maintains a bloom filter over the last m message IDs for each channel. Since the bloom filter will be an indication of message ACK to senders (see below), it should ideally cover (at least?) the last 10 min to 1 hour of message IDs within the channel. The bloom filter for the associated channel is included with each published message. The bloom filter (size, number of hashes, number of insertions) should be designed to minimise the probability for false positives.
  • lazy pull: the combination of a Lamport timestamps and causal history in each published message allows recipients to maintain message order, detect missing dependencies and eventually request these messages from the Store nodes (or other participants). For this mechanism to scale, the scenario of multiple recipients simultaneously “pulling” the same message should be avoided as much as possible. Instead, message dependencies missing for many/all participants should be eagerly “pushed”/broadcast to all participants. See below.
  • eager push: using a combination of received message causal histories and bloom filters, senders can build a probabilistic model of which of their published messages has indeed been acknowledged as “received” by the rest of the communication participants and resend messages that failed to be published to the majority/all other participants
  • local conflict resolution: conflicts in causal ordering should preferably be resolved locally in a consistent way (without coordinating with other participants) to enhance scalability

Alternatives considered

There are many existing (and proven) alternative approaches to achieve e2e reliability for distributed logs. I’ll mention two:

1. Vector clocks
Vector clocks expand on the Lamport clock idea for causality by maintaining a vector of counters, one for each participant. Each participant updates its own counter and shares its view of the vector clock (i.e. the counter for all participants, including itself) with others to determine the causal order of events.

Limitations:
Vector clocks require maintaining and transmitting a vector of size proportional to the number of participants, which may not scale for very large groups. There are ways to compress vector clocks, but managing and merging vector clocks becomes complex as the number of participants and number of channels grow. It’s also possible to use a partial vector clock (with a size still proportional to group size), but this again requires significant processing and more detailed probabilistic modelling to ensure all participants receive sufficient information to: (a) order messages and detect all missing messages, and (b) determine the ACK status of all published messages. With more research this could be another promising avenue, though.

2. Conflict-Free Replicated Data Types (CRDTs)

CRDTs are data structures that ensure eventual consistency across distributed systems without requiring coordination. They can be based on structures like Merkle trees to track changes and merge them deterministically.

Limitations:
Although CRDTs can handle updates efficiently in smaller groups, the overhead of maintaining and synchronizing large Merkle trees or similar structures can be prohibitive in very large groups with multiple channels.

Proposed protocol

Message

Messages adhere to the following meta structure:

syntax = "proto3";

message Message {
  string sender_id = 1;           // Unique identifier of the sender
  string message_id = 2;          // Unique identifier of the message
  optional int32 lamport_timestamp = 3;    // Logical timestamp for causal ordering in channel
  optional repeated string causal_history = 4;  // List of 2 or 3 preceding message IDs that this message causally depends on
  string channel_id = 5;          // Identifier of the channel to which the message belongs
  optional bytes bloom_filter = 6;         // Bloom filter representing received message IDs in channel
  string content = 7;             // Actual content of the message
}

Participant state

Each participant maintains:

  • A Lamport timestamp for each channel of communication, initialized to 0.
  • A bloom filter for received message IDs per channel.
  • A buffer for unacknowledged outgoing messages
  • A buffer for incoming messages with unmet causal dependencies.

Messages in the unacknowledged outgoing buffer can be in one of three states:

  1. Unacknowledged - there has been no acknowledgement of message receipt by any participant in the channel
  2. Possibly acknowledged - there has been ambiguous indication that the message has been possibly received by at least one participant in the channel
  3. Acknowledged - there has been sufficient indication that the message has been received by at least some of the participants in the channel. This state will also remove the message from the outgoing buffer.

Protocol steps:

Within each channel of communication:

1. Send message

  1. Increase the local Lamport timestamp.
  2. Determine local causal history for the message (preceding n messages).
  3. Broadcast the message with the timestamp, causal history, and bloom filter.
  4. Add the message to the buffer of unacknowledged sent messages.

2. Receive message

  1. Review ACK status of messages in the unacknowledged outgoing buffer using the causal history and bloom filter in the received message.
  2. Update the bloom filter for this channel with the received message ID
  3. Check dependencies in the causal history:
    • If all dependencies are met, process the message.
    • Otherwise, add the message to the buffer of incoming messages with unmet causal dependencies.

3. Process message
(triggered by 2)

  1. If received timestamp > local Lamport timestamp, update the local timestamp.
  2. Insert the message into the local log at its Lamport timestamp.
  3. Resolve conflicts if multiple messages have the same timestamp.

4. Resolve conflicts
(triggered by 3)

  1. Insert messages with the same timestamp in ascending order of message ID.

5. Review ACK status
(triggered by 2)
For each message in the unacknowledged outgoing buffer, based on any received bloom filter and causal history:

  1. Mark all messages in the received causal history as acknowledged
  2. Mark all messages with positive inclusion in bloom filter as possibly acknowledged. If a message has been possibly acknowledged in a series of m received bloom filters, it could be marked as acknowledged on probabilistic grounds. Note that the size of the bloom filter and number of hashes need to be taken into account here.

6. Periodic incoming buffer sweep

  1. Periodically check causal dependencies for each message in the incoming buffer
  2. Batch retrieve missing dependencies from the high availability cache (Store node) if necessary.
  3. Process messages with all dependencies met, or mark them as irretrievably missed after a set period.

7. Periodic outgoing buffer sweep

  1. Rebroadcast unacknowledged messages after a set period.
  2. Use different resend periods for unacknowledged vs. possibly acknowledged messages.

8. Periodic sync message

  1. Participants periodically send a message with empty content to maintain sync state. The Lamport timestamp should not be increased before sending such messages.
  2. In order to avoid bursts in large groups, periodic sync messages could only be sent if no other messages have been broadcast on the channel after a set interval + random backoff period.
  3. Participants follow the same Receive message procedure as for regular messages.

Ephemeral messages

Ephemeral messages are sent with lamport_timestamp, causal_history, and bloom_filter left unset.
These messages are not added to the unacknowledged outgoing buffer and processed immediately on reception without buffering.

5 Likes

What the DAG would probably look like → canvas/packages/gossiplog at main · canvasxyz/canvas · GitHub

1 Like

I feel like both bloom filters and DAG fulfill the same purpose.

If you lengthen the DAG kept than you don’t need the filter? You can compute the same information from both. Maybe it’s just for efficiency?

:thinking: I would be curious to see a comparison of big DAGs VS filters in reality.

Also, I found this useful Bloom filter calculator

You could use the GossipLog approach of having the timestamp in the DAG.

So, the causal history would be multiple (hash, timestamp) tuples.

8. Periodic sync message

am I understanding correctly that this is for the case of a gap where no messages were sent and a new sender does not have messages to reference?

If that’s the case maybe the new sender should be able to request the causal history somehow?

Indeed something that bothers me - that these two mechanisms roughly perform the same function. The problem is that the causal history is essentially a local causal history - a local log - that other participants simply merge into their own. This means that for large groups, you may have independent “branches” that don’t fit into the causal history of any other messages.
For example, consider a group with Alice, Bob and Charlie:

  1. Alice attempts to publish a1, a2, a3. She assumes these messages were published and adds them to her local log. They are causally dependent on the previous messages in Alice’s log. However, none of these messages were actually published due to a break in Alice’s connectivity.
  2. Meanwhile Bob and Charlie have been publishing several messages to the group that were successfully processed, resulting in a log with causal history for both Bob and Charlie that happens to look exactly similar:
    b1 → c1 → c2 → b2 → c3 → c4 ->b3 (or whichever sequence).
  3. Alice’s connectivity is restored. She receives a new message published by Charlie, c5, with causal history dependent on c3 -> c4 -> b3. There is no way for Alice to know that her messages has been published, unless the causal history goes back in time to her last ACKed message.

Alice could publish a next message a4, which would result in Charlie and Bob iteratively requesting all messages a1…a3. However, we’re trying to avoid a situation where other participants eagerly pull in this way, as it simply doesn’t scale to very large groups. The correct remedy for this situation is for Alice to resend all messages a1…a3, allowing these messages to be merged into Charlie and Bob’s logs. However, this set of messages will not form part of the causal history of new messages because they’re too far back in the past (forming the “independent branch” I mentioned)

What Alice needs is a lightweight ACK mechanism that extends far enough back in time that it reasonably covers all short-term disconnection events.
You’re right in that we could simply make the causal history large enough to extend back in time to make it clear to Alice that her messages have not been merged into other participants’ logs. This seems to me to imply a fairly large state and a large causal history to be sent with each message. Furthermore, for larger groups we’d need longer causal histories in order to cover the same time period. In contrast, a reasonably performing bloom filter would likely be smaller, constant in size for every size of group and with tunable characteristics (e.g. reasonable probability of avoiding false positives).

1 Like

Not sure I see any benefit to using the timestamp here rather than a simple Lamport timestamp? Presumably the application would still use timestamp for ordering. This is just a simple way to build and merge the causality tree.

am I understanding correctly that this is for the case of a gap where no messages were sent and a new sender does not have messages to reference?

Not exactly. There is a simplifying assumption (I should make it explicit) that the sender starts in a synchronised state - i.e. you do a Store query to catch up to present state before the e2e reliability protocol kicks in for “real time” reliability. This is just as a way for participants to verify that they are still in sync with other participants. This affects e.g. the period the bloom filter must cover - without regular updates you might go out of sync without realising.

1 Like

Great summary! @haelius

I’m wondering how Lamport timestamp is used for lazy pull, if the received timestamp is much larger than local timestamp, should the node send a query to store node for potentially missed messages?
And whether it’s safe to use this timestamp as a signal to indicate the synced state among participants. Merkle root shows strong confidence comparing with lamport timestamp in this case.

Add bloom filter to each message seems less efficient, instead it can be sending out periodically, may be in the sync message.

The acknowledgement state in community/group is hard to be used at app level IMO, since user cares more about if everyone saw my message. I think we can introduce a consistent global view of all the distributed logs periodically, for example, admin/special service node publish its Merkle root periodically. And each participant can check with this global view with local data. If matched, it means the send messages are included in global, if not matched, the clients need to sync the state either pull the missing received messages or resend the missing outgoing messages, could probably use prolly tree. With user experience more with decentralized app, we can even introduce a peer to peer state comparison without depending on any admin or specific role in the group.

1 Like

This is probably much easier to implement and describe to users. :slight_smile:

1 Like

Thanks for the comments. :smiley:

I’m wondering how Lamport timestamp is used for lazy pull

The DAG created by causal histories is what is used for the lazy pull (and achieving eventual consistency). In other words, a participant would query missing dependencies iteratively until either causal history is fully resolved or, worst case, some are marked as irretrievably missing. The Lamport timestamps in this case are used to maintain a consistent (partial) causal ordering and to merge the causal histories of other participants into your own local log.

Add bloom filter to each message seems less efficient

This is a good point. My rough calculation is that a useful bloom filter would add up to 1KB per message. We could send this only periodically - either piggy-backed on existing messages or in a sync message. I would argue though that this should be part of an optimisation exercise for which the variables are still a bit unclear - if a publisher resend messages based on its inclusion (or not) in received bloom filters, it should receive them frequently enough to prevent unnecessary retransmissions. We may also want to determine ACK status based on multiple received bloom filters to mitigate impact of false positives. In a large group, if multiple publishers are too eager to resend messages, the impact on bandwidth may be higher than simply piggy-backing the bloom filter more frequently. That said, your point certainly stands that the bloom filter is not a requirement for each message, but can (and should) be sent only periodically.

Merkle root shows strong confidence comparing…
…introduce a consistent global view…
…Merkle root periodically

This is true about Merkle roots and I’m indeed considering how Merkle trees (or similar CRDT-type) structures can be used to enhance or even replace the scheme above. However, these structures are almost too precise, implying much state (both locally and on the admin node) and many individual interactions to sync state. It’s difficult for me to see how this scales to very large groups. For example, in the example above each participant would have to maintain a merkle tree for each channel. Each mismatch implies an iterative synchronisation process for each channel (either with the admin node or other participants). In short, I’m not convinced that a strict sync process could be implemented in a scalable fashion, even if we simplify it by offloading much of the interaction to a centralised “admin” node.

we can even introduce a peer to peer state comparison without depending on any admin or specific role in the group

True, but, again, the performance/bandwidth impact of such individual interactions is
why schemes like MVDS is not scalable to large groups and communities. The larger the group, the more individual interactions. The way around that would be to have only probabilistic interaction between participants, but IMO this adds more complexity than is needed to answer the simple questions:

  1. Which causal histories/logs have I not merged (i.e. which IDs am I missing)?
  2. Do I have reasonable ACK that my published messages have been merged into the logs of other participants?

These questions should be answerable without an explosion in individual interactions or overly high-overhead sync processes.

Thanks for the write up, very thorough and I particularly appreciate the Protocol steps section.

As discussed yesterday, I will add some notes:

sender_id

In Message sender_id is probably not necessary, as that’s most of the time either extracted from a signature or inferred (like in the case of the double ratchet when it’s inferred by the fact that the there’s only one other member who can have that shared secret with you, i.e plausible deniability).

message_id

message_id is also not specified generally, as it can be manipulated, so we generally use the hash of the content or similar.

lamport timestamps

With regards of lamport timestamps, we have adopted them in the current protocol, but with one difference as plain lamport timestamps don’t support one common case.
A chat can be seen as distributed system with no central coordination and permissionless.

In some cases there’s no explicit “joining” (public chats in the old app, but 1-to-1s are similar with that regard), so a new member that could not retrieve information about a channel, would start at 0 if we were to follow canonical lamport timestamps.
Also it’s important to note that the app is or wants to be offline first, so syncing before being able to send a message is not something we want in general.

If we are thinking about a chat, that would mean that the message would be ordered last, at it has no causal relationship with any other message, which is problematic. What we do instead, is we “hint” lamport timestamp with a wall clock:

clock = max(time.Now(), previous_clock+1)

This maintain causal relationship between events and is consistent with lamport timestamps (you can think about it as each member having started at 0 and ticked up each time internally at each second, unless aware of another clock having ticked up). We maintain a total order of events by breaking ties with ID, so that everyone has the same ordering of events.

casual_history

With regard of casual_history and specifically:

  1. Check dependencies in the causal history:
  • If all dependencies are met, process the message.
  • Otherwise, add the message to the buffer of incoming messages with unmet causal dependencies.

This is something that we discussed a lot back when mvds was first implemented (the feature I believe is in the codebase already). From our point of view, it’s not something that we generally want to have, as we almost never wants to delay processing messages as it leads to poor user experience in case of out of order messages or message loss (if you are strict about it, if any message failed to be delievered, then the chain is interrupted forever). Most of the protocol is build on the assumption that message loss will occur and it accommodates that (at the cost of extra bandwidth, we piggy back information often for example, or accept that there might be gaps etc).
There are only a handful of corner cases where that’s not currently the case, and this could be helpful, one of which is message segmentation, where you cannot process a partial message, but just id doesn’t cut it (we need something more like group_id+count), and currently community channel encryption as we haven’t quite figured out an efficient way to propagate the key, but I think that’s better solved by improving key exchange.
It can be useful to identify gaps of course. I would personally explore more those scenarios, but it’s certainly something of secondary importance from our perspective.

pulling from store nodes based on message_ids

This is fine, but I think it needs to be complimentary to querying by timestamp, as querying by ids it’s very chatty and will lead most likely to long syncing times, we already see long syncing times with straight timestamps with a cursor of 20 (up to 8/9 minutes to sync a day worth of data), and this seems to be strictly slower in the worst case scenario, which is quite common (user of an active community goes offline for a day).

I think these are the immediate things that I could spot on the document, but in general, it’s probably a good exercise to go a bit deeper on how it integrates with the current functionalities, since I think there’s a fair amount of unknowns, and from experience (mvds etc), if it’s not worked out to fit with the current technology, there’s a risk of it not being as effective as it could be, as there still a fair amount of unknowns.

1 Like

Thanks for the great feedback, @cammellos.

Let me add some thoughts on your comments in reverse order:

in general, it’s probably a good exercise to go a bit deeper on how it integrates with the current functionalities

Agreed. I hoped with this post to introduce and test the waters for general concepts and features. Once we have rough consensus on the desired features the next step would be to closely integrate it within Status context and adapt to what’s currently there, integrate in existing protobufs, etc.

This is fine, but I think it needs to be complimentary to querying by timestamp, as querying by ids it’s very chatty

Indeed. This scheme is mostly meant to achieve eventual consistency and confidence in reliability on top of existing schemes. Participants should still query by timestamp for (detected) offline periods. An e2e scheme should help detect if there are still undetected gaps, undetected offline periods, etc.

we almost never want to delay processing messages

I see the problem domain here as slightly different. With “processing” I mean processing within the context of the reliability mechanism of building a causality DAG - i.e. either having resolved causal dependencies or, worst case, concluded that there are missing messages but hopefully at least “knowing” which messages are missing. A message can be considered “processed” once it reaches this stage. This should not prevent the message from being shown to the user as soon as it arrives.

lamport timestamps

Great alternative and I think yours is a much better scheme. Thanks! As a more general point I’d say that the app’s chat ordering does not necessarily have to be exactly the same as the causality ordering within the reliability mechanism. It’s most likely simpler if it is, but conceptually I think these two mechanisms (the visible chat log and the causality log) can function independently. However, the scheme does indeed assume starting in a synchronised state - a limitation that your approach will fix. Will adapt accordingly.

sender_id … message_id

Indeed. Within the “meta” structure to illustrate the concept it was easier to just add these fields explicitly to show some dependency on these being known/derivable. Strictly speaking, as the proposal stands now, sender ID is not even used at all, but I imagined it might be a useful reference for the conversation in case we want to introduce direct interactions between peers (e.g. requesting messages from a peer rather than the Store) or if someone suggested (partial) vector clocks as a replacement for the bloom filter mechanism.
Message ID does not belong in the final protobuf as it’s indeed derived, not added. Will adapt/clarify in future versions.

2 Likes

What can be the implication on a the 30 days store message retention policy?

Probably out of scope, but wondering if such a mechanism can help sync older messages that are out of store with other nodes.

I am guessing the bandwidth overheard would be too great.

Well, the long store retention would still be necessary/helpful for time-based queries to catch up to real time after offline periods. This would still be by far the most efficient way to recover missed messages - i.e. detect any offline periods and perform a time-based query for the gap. The reliability mechanism above can be used though to eventually reach consistency by ensuring all “dependencies” (i.e. full causal history) for received messages have been met, iteratively. As long as you receive any message within a causality chain you can go back in history, causal dependency after causal dependency, until you reach the historical point to which you have synced previously. This may require an iterative query process which will be both slow and expensive, so it should preferably only be used for recent history and in cases where we didn’t detect offline periods.

UPDATE: I do think it may be worth investigating a process that checks dependencies of all previously unchecked messages in the local log up to the “beginning of history” to ensure these have all their dependencies met. Large pages of messages received via time-based queries may or may not have complete causal histories. We could treat these chunks of historical logs as “unchecked” and only mark as “complete” when a parallel process has verified causal dependencies.

Another brief note on possible use of the bloom_filter per channel:

If we define a 1KB (8000 bit) filter with 4 hash functions, even with up to 500 message IDs in the filter for this channel we’ll still only have a ~0.2% false positive rate (1 in 400). The best strategy is probably for participants to recompute the bloom filter every t minutes, independently. It should then be possible to derive an ACK for sent messages with close to 100% confidence from positive inclusion in a number of received bloom filters - with this number determined by the number of items currently in the filter. However, the most useful use for the bloom_filter is likely going to be its ability to provide 100% confidence NACKs, with 100% confidence ACKs being message IDs included in causal histories of received messages. It will always be possible (though probably bad for scalability) to continue retransmitting messages until they’re included in a received causal history.