Every good software requires a retry mechanism for calls to async functions or to external services.
In this post, we will not focus on what a retry pattern is, nor on what a good retry system entails. Instead, we will explore how to implement a robust retry mechanism in async Rust, drawing on some well-written articles.
What is the main condition for a function to be able to be run within a retry
system? Of course, idempotency, which tells us that our function will not
introduce unintended side effects (like increasing a number using the INCR
command in a Redis database).
Here you can read more about retry patterns & mechanisms:
- AWS: Exponential Backoff And Jitter, by Marc Brooker - a solid article on adding jitter to backoff time — a concept we will implement in this post;
- Google SRE: Chapter 21 - Handling Overload, by Alejandro Forero Cuervo;
- Stripe: Designing robust and predictable APIs with idempotency, by Brandur Leach - describing how to implement idempotent APIs using idempotency keys;
- Microsoft: Retry pattern;
- Wikipedia: Exponential backoff.
Now let's get to work.
First, let's see what our main function looks like. We will call our
run_with_retry function (which we will define later), passing as argument
a function that prints the current time and then returns an error, which we
will retry for a total number of 5 times with an initial backoff of 100ms.
use std::time::{SystemTime, UNIX_EPOCH};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let result: Result<i32, Box<dyn std::error::Error>> = run_with_retry(
|| async {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
println!("Failed at {}", now);
Err::<i32, Box<dyn std::error::Error>>("Failed".into())
},
100,
5
)
.await;
match result {
Ok(value) => println!("Success {}", value),
Err(err) => println!("Failed after retries: {}", err),
}
Ok(())
}Don't forget to activate the feature flags
rt-multi-thread/rtandmacros, orfull, so the compiler does not throw an error becausemainis async.
Let's see how a simple retry mechanism that makes use of exponential backoff looks like:
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
async fn run_with_retry<F, Fut, T, E>(f: F, initial_backoff_ms: u64, max_tries: u32) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let mut current_try: u32 = 1;
let mut backoff_ms = initial_backoff_ms;
loop {
match f().await {
Ok(value) => return Ok(value),
Err(err) => {
if current_try == max_tries {
return Err(err);
}
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
backoff_ms *= 2;
current_try += 1;
}
}
}
}Our function run_with_retry takes the following arguments:
- any async function that returns a
Result(defined byF: Fn() -> Fut, Fut: std::future::Future<Output = Result<T, E>>); - the initial backoff that will be increased exponentially;
- the maximum number of attempts.
We run a loop. If the function fails and we have not reached the maximum number of attempts, we sleep for the backoff duration, increment the current try count, exponentially increase the backoff time, and then repeat the loop.
However, if you read the AWS blog post listed above, you know that we can improve our code by adding "full jitter" to our retry mechanism:
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
async fn run_with_retry<F, Fut, T, E>(f: F, initial_backoff_ms: u64, max_tries: u32) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let mut current_try: u32 = 1;
let mut backoff_ms = initial_backoff_ms;
loop {
match f().await {
Ok(value) => return Ok(value),
Err(err) => {
if current_try == max_tries {
return Err(err);
}
let actual_backoff_ms = rand::random_range(0..backoff_ms);
tokio::time::sleep(Duration::from_millis(actual_backoff_ms)).await;
backoff_ms *= 2;
current_try += 1;
}
}
}
}We added a single line to generate a random value in the range
[0...backoff_ms) for the sleep duration. This applies the "full jitter"
mechanism to our code.
There is one more improvement we can make to our run_with_retry
method: checking if the returned error is transient. This is an important check
because it is not always effective to retry.
For this, we will use a more realistic example where we implement a retry
function for generic Redis methods from the redis crate:
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
async fn run_with_retry<F, Fut, T, E>(f: F, initial_backoff_ms: u64, max_tries: u32, is_transient_error: fn(&E) -> bool) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let mut current_try: u32 = 1;
let mut backoff_ms = initial_backoff_ms;
loop {
match f().await {
Ok(value) => return Ok(value),
Err(err) => {
if current_try == max_tries || !is_transient_error(&err) {
return Err(err);
}
let actual_backoff_ms = rand::random_range(0..backoff_ms);
tokio::time::sleep(Duration::from_millis(actual_backoff_ms)).await;
backoff_ms *= 2;
current_try += 1;
}
}
}
}
fn is_redis_error_transient(err: &redis::RedisError) -> bool {
err.is_timeout()
|| err.is_connection_dropped()
|| err.is_connection_refusal()
|| err.is_io_error()
}
async fn run_redis_with_retry<F, Fut, T>(f: F, initial_backoff_ms: u64, max_tries: u32) -> Result<T, redis::RedisError>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T, redis::RedisError>>,
{
run_with_retry(
f,
initial_backoff_ms,
max_tries,
is_redis_error_transient
).await
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let result = run_redis_with_retry::<_, _, i32>(
|| async {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
println!("Failed at {}", now);
// Uncomment for a non-transient error
// Err(redis::RedisError::from((
// redis::ErrorKind::UnexpectedReturnType,
// "Non-transient error"
// )))
Err(redis::RedisError::from((
redis::ErrorKind::Io,
"Transient error"
)))
},
100,
5,
)
.await;
match result {
Ok(value) => println!("Success {}", value),
Err(err) => println!("Failed after retries: {}", err),
}
Ok(())
}We add the function that checks if an error is transient as an additional
argument to the run_with_retry function. This argument is called on each
failure to check if we should continue to retry.
Because the code for calling run_with_retry is becoming quite verbose, we can create
a specialized version for Redis called run_redis_with_retry.
We know that errors from the redis crate are of type redis::RedisError,
so we can replace the generic error type E. We can also define another
function, is_redis_error_transient, which checks if a redis::RedisError is
transient, and use it as a parameter for is_transient_error.
Now we have a more readable retry function for Redis.
You can see the complete code for each version:
