Skip to content

Handle notifications as retry events in the invoker #3177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 18, 2025

Conversation

tillrohrmann
Copy link
Contributor

Since the retry logic also tracks notification proposals in the JournalTracker, we need to handle notifications as retry events because they can unblock a waiting for retry invocation. This commit changes the behavior accordingly.

Since the retry logic also tracks notification proposals in the
JournalTracker, we need to handle notifications as retry events
because they can unblock a waiting for retry invocation. This
commit changes the behavior accordingly.
Copy link
Contributor

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct, my previous fix solved only the is_ready_to_retry() condition, but failed to consider that the retry timer might fire before getting the notification, so here we are.

Could you add a unit test exactly for this scenario?

  • Mock the invocation in waiting to retry
  • Fire the timer
  • Then send the notification, and see if this triggers the retry

@slinkydeveloper
Copy link
Contributor

slinkydeveloper commented Apr 18, 2025

30 LLM calls later:


    #[test(restate_core::test)]
    async fn notification_triggers_retry() {
        let invoker_options = InvokerOptionsBuilder::default()
            .retry_policy(RetryPolicy::fixed_delay(Duration::ZERO, Some(1)))
            .inactivity_timeout(Duration::ZERO.into())
            .abort_timeout(Duration::ZERO.into())
            .disable_eager_state(false)
            .message_size_warning(NonZeroUsize::new(1024).unwrap())
            .message_size_limit(None)
            .build()
            .unwrap();

        let invocation_id = InvocationId::mock_random();
        let invocation_target = InvocationTarget::mock_virtual_object();

        // Create a mock ServiceInner that tracks when an invocation task is started
        let (task_started_tx, mut task_started_rx) = mpsc::channel(1);
        let (_, _status_tx, mut service_inner) = ServiceInner::mock(
            move |partition,
                  invocation_id,
                  invocation_target,
                  _storage_reader,
                  _invoker_tx,
                  _invoker_rx,
                  _input_journal| {
                let task_started_tx = task_started_tx.clone();
                async move {
                    // Signal that the task has started
                    let _ = task_started_tx
                        .send((partition, invocation_id, invocation_target))
                        .await;
                    // Never end
                    pending::<()>().await
                }
            },
            None,
        );

        // Register a mock partition
        let _ = service_inner.register_mock_partition(EmptyStorageReader);

        // Create an invocation state machine
        let mut ism = InvocationStateMachine::create(
            invocation_target.clone(),
            invoker_options.retry_policy.clone(),
        );
        let (tx, _rx) = mpsc::unbounded_channel();
        ism.start(tokio::spawn(async {}).abort_handle(), tx);

        // Add a notification proposal
        ism.notify_new_notification_proposal(NotificationId::CompletionId(1));

        // Put the state machine in the WaitingRetry state
        ism.handle_task_error(None, true);

        // Register the invocation state machine
        service_inner
            .invocation_state_machine_manager
            .register_invocation(MOCK_PARTITION, invocation_id, ism);

        // Fire the retry timer
        service_inner.handle_retry_timer_fired(&invoker_options, MOCK_PARTITION, invocation_id);

        // Create a notification
        let notification = RawNotification::new(
            NotificationType::Completion(CompletionType::Run),
            NotificationId::CompletionId(1),
            Bytes::default(),
        );

        // Send the notification
        service_inner.handle_notification(
            &invoker_options,
            MOCK_PARTITION,
            invocation_id,
            notification,
        );

        let (partition, id, target) =
            tokio::time::timeout(Duration::from_millis(100), task_started_rx.recv())
                .await
                .unwrap()
                .unwrap();
        assert_eq!(partition, MOCK_PARTITION);
        assert_eq!(id, invocation_id);
        assert_eq!(target, invocation_target);
    }

@slinkydeveloper slinkydeveloper merged commit 93509c5 into restatedev:main Apr 18, 2025
34 of 35 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Apr 18, 2025
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants