Skip to content

read receipts: handle implicit receipts for unread counts #3054

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 112 additions & 12 deletions crates/matrix-sdk-base/src/read_receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ use ruma::{
SyncMessageLikeEvent,
},
serde::Raw,
EventId, OwnedEventId, RoomId, UserId,
EventId, OwnedEventId, OwnedUserId, RoomId, UserId,
};
use serde::{Deserialize, Serialize};
use tracing::{debug, instrument, trace};
use tracing::{debug, instrument, trace, warn};

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
struct LatestReadReceipt {
Expand Down Expand Up @@ -332,24 +332,27 @@ impl ReceiptSelector {
if event_pos >= *best_pos {
*best_pos = event_pos;
self.latest_event_with_receipt = Some(event_id.to_owned());
trace!("saving better");
debug!("saving better");
} else {
trace!("not better, keeping previous");
}
} else {
// We didn't have a previous receipt, this is the first one we
// store: remember it.
self.latest_event_pos = Some(event_pos);
self.latest_event_with_receipt = Some(event_id.to_owned());
trace!("saving for the first time");
debug!("saving for the first time");
}
}

/// Try to match pending receipts against new events.
#[instrument(skip_all)]
fn handle_pending_receipts(&mut self, pending: &mut RingBuffer<OwnedEventId>) {
// Try to match stashed receipts against the new events.
trace!("handle_pending_receipts");
pending.retain(|event_id| {
if let Some(event_pos) = self.event_id_to_pos.get(event_id) {
// Maybe select this read receipt as it might be better than the ones we had.
trace!(%event_id, "matching event against its stashed receipt");
self.try_select_later(event_id, *event_pos);

// Remove this stashed read receipt from the pending list, as it's been
Expand All @@ -371,23 +374,25 @@ impl ReceiptSelector {
///
/// Returns any receipts (for the current user) that we could not match
/// against any event - these are "pending".
#[instrument(skip_all)]
fn handle_new_receipt(
&mut self,
user_id: &UserId,
receipt_event: &ReceiptEventContent,
) -> Vec<OwnedEventId> {
trace!("handle_new_receipt");
let mut pending = Vec::new();
// Now consider new receipts.
for (event_id, receipts) in &receipt_event.0 {
for ty in [ReceiptType::Read, ReceiptType::ReadPrivate] {
if let Some(receipt) = receipts.get(&ty).and_then(|receipts| receipts.get(user_id))
{
if matches!(receipt.thread, ReceiptThread::Main | ReceiptThread::Unthreaded) {
trace!(%event_id, "found new candidate");
if let Some(event_pos) = self.event_id_to_pos.get(event_id) {
self.try_select_later(event_id, *event_pos);
} else {
// It's a new pending receipt.
trace!(%event_id, "stashed as pending");
pending.push(event_id.clone());
}
}
Expand All @@ -397,6 +402,24 @@ impl ReceiptSelector {
pending
}

/// Try to match an implicit receipt, that is, the one we get for events we
/// sent ourselves.
#[instrument(skip_all)]
fn try_match_implicit(&mut self, user_id: &UserId, new_events: &[SyncTimelineEvent]) {
for ev in new_events {
// Get the `sender` field, if any, or skip this event.
let Ok(Some(sender)) = ev.event.get_field::<OwnedUserId>("sender") else { continue };
if sender == user_id {
// Get the event id, if any, or skip this event.
let Some(event_id) = ev.event_id() else { continue };
if let Some(event_pos) = self.event_id_to_pos.get(&event_id) {
trace!(%event_id, "found an implicit receipt candidate");
self.try_select_later(&event_id, *event_pos);
}
}
}
}

/// Returns the event id referred to by a new later active read receipt.
///
/// If it's not set, we can consider that each new event is *after* the
Expand Down Expand Up @@ -457,6 +480,7 @@ pub(crate) fn compute_unread_counts(
&all_events,
read_receipts.latest_active.as_ref().map(|receipt| &*receipt.event_id),
);
selector.try_match_implicit(user_id, new_events);
selector.handle_pending_receipts(&mut read_receipts.pending);
if let Some(receipt_event) = receipt_event {
let new_pending = selector.handle_new_receipt(user_id, receipt_event);
Expand All @@ -483,7 +507,7 @@ pub(crate) fn compute_unread_counts(
// safely from here.
read_receipts.find_and_process_events(&event_id, user_id, all_events.iter());

trace!(?read_receipts, "after finding a better receipt");
debug!(?read_receipts, "after finding a better receipt");
return;
}

Expand All @@ -498,15 +522,15 @@ pub(crate) fn compute_unread_counts(
read_receipts.process_event(event, user_id);
}

trace!(?read_receipts, "no better receipt, {} new events", new_events.len());
debug!(?read_receipts, "no better receipt, {} new events", new_events.len());
}

/// Is the event worth marking a room as unread?
fn marks_as_unread(event: &Raw<AnySyncTimelineEvent>, user_id: &UserId) -> bool {
let event = match event.deserialize() {
Ok(event) => event,
Err(err) => {
debug!(
warn!(
"couldn't deserialize event {:?}: {err}",
event.get_field::<String>("event_id").ok().flatten()
);
Expand Down Expand Up @@ -582,7 +606,7 @@ fn marks_as_unread(event: &Raw<AnySyncTimelineEvent>, user_id: &UserId) -> bool

_ => {
// What I don't know about, I don't care about.
debug!("unhandled timeline event type: {}", event.event_type());
warn!("unhandled timeline event type: {}", event.event_type());
false
}
}
Expand Down Expand Up @@ -1108,7 +1132,7 @@ mod tests {
)]);

let mut read_receipts = RoomReadReceipts::default();
assert_eq!(read_receipts.pending.len(), 0);
assert!(read_receipts.pending.is_empty());

// Given a receipt event that contains a read receipt referring to an unknown
// event, and some preexisting events with different ids,
Expand Down Expand Up @@ -1147,7 +1171,7 @@ mod tests {
// case, only consider the new events in isolation, and compute the
// correct count.
let mut read_receipts = RoomReadReceipts::default();
assert_eq!(read_receipts.pending.len(), 0);
assert!(read_receipts.pending.is_empty());

let ev0 = events[0].clone();

Expand Down Expand Up @@ -1525,4 +1549,80 @@ mod tests {
assert_eq!(best_receipt.unwrap().event_id, event_id!("$4"));
}
}

#[test]
fn test_try_match_implicit() {
let myself = owned_user_id!("@alice:example.org");
let bob = user_id!("@bob:example.org");

let mut events = make_test_events(bob);

// When the selector sees only other users' events,
let mut selector = ReceiptSelector::new(&events, None);
// And I search for my implicit read receipt,
selector.try_match_implicit(&myself, &events.iter().cloned().collect::<Vec<_>>());
// Then I don't find any.
let best_receipt = selector.select();
assert!(best_receipt.is_none());

// Now, if there are events I've written too...
events.push_back(sync_timeline_message(&myself, "$6", "A mulatto, an albino"));
events.push_back(sync_timeline_message(bob, "$7", "A mosquito, my libido"));

let mut selector = ReceiptSelector::new(&events, None);
// And I search for my implicit read receipt,
selector.try_match_implicit(&myself, &events.iter().cloned().collect::<Vec<_>>());
// Then my last sent event counts as a read receipt.
let best_receipt = selector.select();
assert_eq!(best_receipt.unwrap().event_id, event_id!("$6"));
}

#[test]
fn test_compute_unread_counts_with_implicit_receipt() {
let user_id = owned_user_id!("@alice:example.org");
let bob = user_id!("@bob:example.org");
let room_id = room_id!("!room:example.org");

// Given a set of events sent by Bob,
let mut events = make_test_events(bob);

// One by me,
events.push_back(sync_timeline_message(&user_id, "$6", "A mulatto, an albino"));

// And others by Bob,
events.push_back(sync_timeline_message(bob, "$7", "A mosquito, my libido"));
events.push_back(sync_timeline_message(bob, "$8", "A denial, a denial"));

let events: Vec<_> = events.into_iter().collect();

// I have a read receipt attached to one of Bob's event sent before my message,
let receipt_event = EventBuilder::new().make_receipt_event_content([(
owned_event_id!("$3"),
ReceiptType::Read,
user_id.clone(),
ReceiptThread::Unthreaded,
)]);

let mut read_receipts = RoomReadReceipts::default();

// And I compute the unread counts for all those new events (no previous events
// in that room),
compute_unread_counts(
&user_id,
room_id,
Some(&receipt_event),
Vector::new(),
&events,
&mut read_receipts,
);

// Only the last two events sent by Bob count as unread.
assert_eq!(read_receipts.num_unread, 2);

// There are no pending receipts.
assert!(read_receipts.pending.is_empty());

// And the active receipt is the implicit one on my event.
assert_eq!(read_receipts.latest_active.unwrap().event_id, event_id!("$6"));
}
}
4 changes: 4 additions & 0 deletions crates/matrix-sdk-ui/src/timeline/inner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,9 +1066,11 @@ impl TimelineInner {
if let Some((old_pub_read, _)) =
state.user_receipt(own_user_id, ReceiptType::Read, room).await
{
trace!(%old_pub_read, "found a previous public receipt");
if let Some(relative_pos) =
state.meta.compare_events_positions(&old_pub_read, event_id)
{
trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
return relative_pos == RelativePosition::After;
}
}
Expand All @@ -1079,9 +1081,11 @@ impl TimelineInner {
if let Some((old_priv_read, _)) =
state.latest_user_read_receipt(own_user_id, room).await
{
trace!(%old_priv_read, "found a previous private receipt");
if let Some(relative_pos) =
state.meta.compare_events_positions(&old_priv_read, event_id)
{
trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
return relative_pos == RelativePosition::After;
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/matrix-sdk-ui/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use ruma::{
};
use thiserror::Error;
use tokio::sync::{mpsc::Sender, Mutex, Notify};
use tracing::{debug, error, info, instrument, warn};
use tracing::{debug, error, info, instrument, trace, warn};

use self::futures::SendAttachment;

Expand Down Expand Up @@ -708,9 +708,13 @@ impl Timeline {
event_id: OwnedEventId,
) -> Result<bool> {
if !self.inner.should_send_receipt(&receipt_type, &thread, &event_id).await {
trace!(
"not sending receipt, because we already cover the event with a previous receipt"
);
return Ok(false);
}

trace!("sending receipt");
self.room().send_single_receipt(receipt_type, thread, event_id).await?;
Ok(true)
}
Expand Down