Distributed Rate Limiter with Auto-Retries using Bucket4j, Redis, SQS & Reflection
Rate limiting is a strategy to limit access to APIs. It restricts the number of API calls that a client can make within a certain time frame. This helps defend the API against overuse, both unintentional and malicious.
Objective
Build a robust & fault tolerant rate limiting system for downstream rate limited APIs with the following properties -
• Configurable rate limit
• Shared limit among all pods in a cluster
• Auto-retry in case of internal/external limit overshoot
• Plug n’ Play nature
Components
Bucket4j
Bucket4j is a Java rate-limiting library based on the token-bucket algorithm. Bucket4j is a thread-safe library that can be used in either a standalone JVM application, or a clustered environment. It also supports in-memory or distributed caching via the JCache (JSR107) specification.
Redis
Redis is driven by a keystore-based data structure to persist data and can be used as a database, cache, message broker, etc.
Redis has a dual purpose here -
1. Host the buckets & their configuration.
2. Cache the data needed to invoke rate limited method at a later time.
SQS
Amazon Simple Queue Service (SQS) is a managed messaging service that allows users to send, store, and receive messages between software components. SQS is specifically used because it provides delayed message delivery & high throughput. We’re using regular queue which does not guarantee order, but a FIFO queue is available if ordering data is crucial.
Flow

To make Rate Limit Configurable on the fly, configuration is stored in DB. Ideally this should be stored in Redis DB for quicker access but for simplicity we’ll store it in MySQL DB.
Configuration
Redis
Custom cache called “rate-limit-cache” is created to host the buckets. Bucket4j supports any JCache implementation. The redis configuration is same for both JCache & Redission client.
package com.rppol.ratelimit.config;
import com.giffing.bucket4j.spring.boot.starter.config.cache.SyncCacheResolver;
import com.giffing.bucket4j.spring.boot.starter.config.cache.jcache.JCacheCacheResolver;
import io.github.bucket4j.distributed.proxy.ProxyManager;
import io.github.bucket4j.grid.jcache.JCacheProxyManager;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.jcache.configuration.RedissonConfiguration;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.cache.JCacheManagerCustomizer;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.RedisTemplate;
import javax.cache.CacheManager;
import javax.cache.Caching;
import java.util.Arrays;
@Slf4j
@Configuration
@EnableCaching
public class RedisConfig {
@Value("${spring.redis.cluster.node}")
public String redisClusterNode;
@Value("${spring.redis.mode}")
public String redisMode;
@Autowired
private Environment environment;
private static RedissonClient redissonClient;
@Bean
public Config redissonConfig() {
Config config = new Config();
log.info("===RedissonClient: [getRedissonClientCluster]===");
String redisHost = "redis://" + redisClusterNode;
if ("SINGLE_NODE".equalsIgnoreCase(redisMode)) {
log.info("=== RedisConfig: [getRedisConfigSingle] ===");
config.useSingleServer().setAddress(redisHost);
} else {
log.info("=== RedisConfig: [getRedisConfigCluster] ===");
config.useClusterServers().addNodeAddress(redisHost);
}
return config;
}
@Bean
public RedisTemplate<String,Object> getRedisTemplate(){
RedisTemplate<String, Object> redisTemplate = new RedisTemplate();public RedissonClient getRedissonClient() {
if (redissonClient != null) {
return redissonClient;
}
if (Arrays.stream(environment.getActiveProfiles()).anyMatch(
env -> (env.equalsIgnoreCase("production")
|| env.equalsIgnoreCase("testing")))) {
return getRedissonClientCluster();
} else {
return getRedissonClientSingle();
}
} redisTemplate.setConnectionFactory(new RedissonConnectionFactory(getRedissonClient()));
return redisTemplate;
}
private static final String CACHE_NAME = "rate-limit-cache";
@Bean
@Primary
@Profile({"production", "testing"})
public RedissonClient getRedissonClientCluster() {
log.info("===RedissonClient: [getRedissonClientCluster]===");
String redisHost = "redis://" + redisClusterNode;
Config config = redissonConfig();
config.useClusterServers()
.addNodeAddress(redisHost);
redissonClient = Redisson.create(config);
return redissonClient;
}
@Bean
@Primary
@Profile({"local"})
public RedissonClient getRedissonClientSingle() {
log.info("===RedissonClient: [getRedissonClientSingle]===");
String redisHostName = "redis://" + redisClusterNode;
Config config = redissonConfig();
config.useSingleServer()
.setAddress(redisHostName);
redissonClient = Redisson.create(config);
return redissonClient;
}
public RedissonClient getRedissonClient() {
if (redissonClient != null) {
return redissonClient;
}
if (Arrays.stream(environment.getActiveProfiles()).anyMatch(
env -> (env.equalsIgnoreCase("production")
|| env.equalsIgnoreCase("testing")))) {
return getRedissonClientCluster();
} else {
return getRedissonClientSingle();
}
}
@Bean
public CacheManager cacheManager(Config config) {
return Caching.getCachingProvider().getCacheManager();
}
@Bean
public ProxyManager<String> proxyManager(CacheManager cache) {
return new JCacheProxyManager<>(cache.getCache(CACHE_NAME));
}
// this rises an exception on start up since there is no resolver when there is a reactive stack to handle the app (netty, tomcat3 etc) when there are several cache names.
@Bean
@Primary
public SyncCacheResolver bucket4jCacheResolver(CacheManager cacheManager) {
return new JCacheCacheResolver(cacheManager);
}
/** this bean creates the cache where the keys are being associated to your bucket. im using here the customizer
* so if there is a spring bean which uses some redis connection or caching, can share the same config without rising up
* the exception
*/
@Bean
@Primary
public JCacheManagerCustomizer jCacheManagerCustomizer(Config config) {
return cacheManager -> cacheManager.createCache(CACHE_NAME, RedissonConfiguration.fromConfig(config));
}
}
A redis template is created in config to manually get, store & delete request data via Redis Service.
package com.rppol.ratelimit.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class RedisService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ObjectMapper objectMapper;
public void storeMethodDetails(String redisIdentifier, String methodName, Object[] params) throws JsonProcessingException {
Map<String, Object> methodDetails = new HashMap<>();
methodDetails.put("methodName", methodName);
methodDetails.put("params", params);
String serializedDetails = objectMapper.writeValueAsString(methodDetails);
storeRequest(redisIdentifier, serializedDetails);
}
public void storeRequest(String redisIdentifier, String params) {
log.info("Storing request in Redis: {} Params: {}", redisIdentifier, params);
redisTemplate.opsForValue().set(redisIdentifier, params, 4, TimeUnit.DAYS);
}
public Object get(String redisIdentifier) {
return redisTemplate.opsForValue().get(redisIdentifier);
}
public void delete(String redisIdentifier) {
log.info("Deleting request from Redis: {}", redisIdentifier);
redisTemplate.delete(redisIdentifier);
}
}
Bucket4j
Buckets are created based on rate limit and refill interval in entity.
package com.rppol.ratelimit.config;
import com.rppol.ratelimit.entity.RateLimitEntity;
import com.rppol.ratelimit.repository.RateLimitRepository;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.BucketConfiguration;
import io.github.bucket4j.Refill;
import io.github.bucket4j.distributed.proxy.ProxyManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import java.time.Duration;
import java.util.function.Supplier;
@Configuration
public class RateLimitBucketConfig {
@Autowired
public ProxyManager buckets;
@Autowired
RateLimitRepository rateLimitRepository;
public Bucket resolveBucket(RateLimitEntity rateLimit) {
Supplier<BucketConfiguration> configSupplier = getConfigSupplier(rateLimit);
// Does not always create a new bucket, but instead returns the existing one if it exists.
return buckets.builder().build(rateLimit, configSupplier);
}
private Supplier<BucketConfiguration> getConfigSupplier(RateLimitEntity rateLimit) {
Refill refill;
switch (rateLimit.getRefillInterval()){
case 's' -> refill = Refill.intervally(rateLimit.getLimit(), Duration.ofSeconds(1));
case 'h' -> refill = Refill.intervally(rateLimit.getLimit(), Duration.ofHours(1));
default -> refill = Refill.intervally(rateLimit.getLimit(), Duration.ofMinutes(1));
}
Bandwidth limit = Bandwidth.classic(rateLimit.getLimit(), refill);
return () -> (BucketConfiguration.builder()
.addLimit(limit)
.build());
}
}
AWS SQS
SqsClient is needed to interact with AWS SQS
package com.rppol.ratelimit.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.services.sqs.SqsClient;
@Configuration
public class AwsConfig {
@Bean
public SqsClient sqsClient() {
return SqsClient.builder().build();
}
}
Gradle
This example works with Spring 3.1.6 & Java 21
Apart from general spring dependencies, we’ll need the following for components listed above
implementation 'org.springframework:spring-messaging:5.3.25'
implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs:3.2.0'
implementation 'software.amazon.awssdk:sqs:2.27.14'
implementation 'org.redisson:redisson-spring-boot-starter:3.20.1'
implementation 'com.giffing.bucket4j.spring.boot.starter:bucket4j-spring-boot-starter:0.9.0'
Rate Limit Entity
bucketName: Unique identifier for
limit: Max requests allowed
refillInterval: Unit after which tokens are refilled (second/minute/hour)
package com.rppol.ratelimit.entity;
import jakarta.persistence.*;
import lombok.Data;
import org.hibernate.annotations.CreationTimestamp;
import org.hibernate.annotations.UpdateTimestamp;
import java.sql.Timestamp;
@Entity
@Data
@Table(name = "rate_limit")
public class RateLimitEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id")
private Long id;
@Column(name = "bucket_name")
private String bucketName;
@Column(name = "rate_limit")
private Long limit;
@Column(name = "refill_interval")
private Character refillInterval;
@CreationTimestamp
@Column(name = "created_at", nullable = false)
private Timestamp createdAt;
@UpdateTimestamp
@Column(name = "updated_at", nullable = false)
private Timestamp updatedAt;
}
Implementation
To make the implementation plug n’ play, we build a custom annotation.
The name parameter in the annotation must be the same as the name in entity. Buckets are fetched using solely the name.
package com.rppol.ratelimit.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
String bucketName();
}
We using Spring AOP to trigger rate limiting logic. The aspect is triggered every time a method annotated with @RateLimit is called. To avoid concurrent threads from accessing buckets lock is acquired. We try to consume a token from bucket, if successful, the annotated method executes.
package com.rppol.ratelimit.aspect;
import com.rppol.ratelimit.annotation.RateLimit;
import com.rppol.ratelimit.config.RateLimitBucketConfig;
import com.rppol.ratelimit.entity.RateLimitEntity;
import com.rppol.ratelimit.exception.RateLimitedException;
import com.rppol.ratelimit.repository.RateLimitRepository;
import com.rppol.ratelimit.service.RedisService;
import com.rppol.ratelimit.service.SqsService;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.ConsumptionProbe;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
@Aspect
@Component
@Slf4j
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class RateLimitAspect {
@Autowired
private RateLimitBucketConfig rateLimitBucketConfig;
@Autowired
RateLimitRepository rateLimitRepository;
@Autowired
RedisService redisService;
@Autowired
SqsService sqsService;
private final ReentrantLock lock = new ReentrantLock();
@Around("@annotation(rateLimitAnnotation)")
public Object rateLimit(ProceedingJoinPoint joinPoint, RateLimit rateLimitAnnotation) throws Throwable {
RateLimitEntity rateLimitEntity = rateLimitRepository.findByBucketName(rateLimitAnnotation.bucketName())
.orElseThrow(() -> new RuntimeException("Rate limit not found in DB"));
Bucket bucket = rateLimitBucketConfig.resolveBucket(rateLimitEntity);
String beanName = convertToBeanName(joinPoint.getTarget().getClass().getSimpleName());
String bucketName = rateLimitAnnotation.bucketName();
int delaySeconds;
boolean acquired = lock.tryLock(30, TimeUnit.SECONDS); // Try to acquire the lock with a timeout
if (!acquired) {
throw new RuntimeException("Could not acquire lock within the specified timeout");
}
try {
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (!probe.isConsumed()) {
delaySeconds = Math.toIntExact(probe.getNanosToWaitForRefill() / 1_000_000_000);
String logMessage = this.processRateLimitException(joinPoint, beanName, bucketName, delaySeconds);
log.info(logMessage);
throw new RateLimitedException(logMessage);
} else {
log.info("Name : {} Remaining tokens: {}", rateLimitAnnotation.bucketName(), probe.getRemainingTokens());
}
} finally {
lock.unlock(); // Release the lock
}
try{
// Proceed with the method execution
return joinPoint.proceed();
}
catch (HttpClientErrorException.TooManyRequests e){
// If external rate limit is exceeded
log.info("External Rate Limit exceeded: {} for bucket: {}", e.getMessage(), bucketName);
switch (rateLimitEntity.getRefillInterval()){
case 's' -> delaySeconds = 1;
case 'h' -> delaySeconds = 60 * 60;
default -> delaySeconds = 60;
}
String logMessage = this.processRateLimitException(joinPoint, beanName, bucketName, delaySeconds);
log.info(logMessage);
throw new RateLimitedException(logMessage);
}
catch (Exception e) {
log.error("Exception occurred in Rate Limited Method: {}", e.getMessage(), e);
throw e;
}
}
private String processRateLimitException(ProceedingJoinPoint joinPoint, String beanName, String bucketName, int delaySeconds) throws JsonProcessingException {
String redisIdentifier = saveMethodMetadataToRedis(joinPoint);
String sqsMessage = buildAndSendSqsMessage(redisIdentifier, delaySeconds, beanName);
String logMessage = getLogMessage(sqsMessage, delaySeconds, bucketName);
log.info(logMessage);
return logMessage;
}
private String convertToBeanName(String className) {
if (className == null || className.isEmpty()) {
return className;
}
return Character.toLowerCase(className.charAt(0)) + className.substring(1);
}
String saveMethodMetadataToRedis(ProceedingJoinPoint joinPoint) throws JsonProcessingException{
String redisIdentifier = UUID.randomUUID().toString();
redisService.storeMethodDetails(redisIdentifier, joinPoint.getSignature().getName(), joinPoint.getArgs());
log.info("Method details stored in Redis: {}", redisIdentifier);
return redisIdentifier;
}
String buildAndSendSqsMessage(String redisIdentifier, int delay, String beanName){
String sqsMessage = new StringBuilder(redisIdentifier).append("|").append(beanName).toString();
log.info("Sending message to SQS: {}", sqsMessage);
sqsService.sendMessage(sqsMessage, delay);
return sqsMessage;
}
String getLogMessage(String sqsMessage, int delaySeconds, String bucketName){
return new StringBuilder("Rate limit exceeded for ")
.append(bucketName)
.append(" retry after ")
.append(delaySeconds)
.append("s.\n")
.append("SQS Message: ")
.append(sqsMessage).toString();
}
}
In case no tokens are available, a message made up of pipe separated random UUID, name from annotation & bean name of annotated method are sent to SQS with a delay equivalent to time until next bucket refill. The method name & it’s parameters are stored in Redis against the same UUID with TTL of 1 Day.
SQS
SqsListener helps receive messages from SQS. Messages aren’t deleted from SQS when a client has received them, they’re simply hidden until visibility timeout expires. If messages aren’t deleted manually, they’ll available for receiving after visibility timeout.
package com.rppol.ratelimit.service;
import com.rppol.ratelimit.entity.RateLimitEntity;
import com.rppol.ratelimit.repository.RateLimitRepository;
import io.github.bucket4j.Refill;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.web.client.HttpClientErrorException;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import io.awspring.cloud.sqs.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.Headers;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Map;
@Slf4j
@Service
public class SqsService {
@Autowired
SqsClient sqsClient;
@Value("${cloud.aws.sqs.queue.url}")
private String queueUrl;
@Autowired
RedisService redisService;
@Autowired
RateLimitRepository rateLimitRepository;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private MethodInvokerService methodInvokerService;
public void sendMessage(String message, Integer delaySeconds) {
SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(message)
.delaySeconds(++delaySeconds)
.build();
try {
sqsClient.sendMessage(sendMsgRequest);
log.info("Successfully sent message to SQS: {}", message);
}
catch (Exception e){
log.error("Error sending message to SQS: {}", e.getMessage(), e);
}
}
public void deleteMessageFromQueue(String message, String receiptHandle) {
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(receiptHandle)
.build();
try {
sqsClient.deleteMessage(deleteMessageRequest);
log.info("Deleted message - {} from SQS with Sqs_ReceiptHandle: {}", message, receiptHandle);
}
catch (Exception e){
log.error("Error deleting message to SQS: {}", e.getMessage(), e);
}
}
@SqsListener("${cloud.aws.sqs.queue.url}")
public void receiveMessage(String message, @Headers Map<String, Object> headers){
log.info("Received message - {} from SQS", message);
this.deleteMessageFromQueue(message, headers.get("Sqs_ReceiptHandle").toString());
String[] parts = message.split("\\|");
String redisIdentifier = parts[0];
String beanName = parts[1];
Object methodMetadata = redisService.get(redisIdentifier);
if(null != methodMetadata){
redisService.delete(redisIdentifier);
try{
Object returnValue = methodInvokerService.invokeMethod((String) methodMetadata, applicationContext.getBean(beanName));
log.info("Method invoked successfully for redisIdentifier: {} Returned: {}", redisIdentifier, returnValue);
}
catch (Exception e){
if (e instanceof InvocationTargetException) {
e = (Exception) ((InvocationTargetException) e).getTargetException();
}
log.error("Error invoking method using reflection: {}", e.getMessage(), e);
}
}
else{
log.error("No method details found for redisIdentifier: {}", redisIdentifier);
}
}
}
Message is deserialised to obtain bean Name & redis Identifier i.e. UUID. Method details are fetched from Redis and method is invoked using Spring Reflection. The params must’ve a constructor as Spring builds them on the fly. Ideally the method should be a fire and forget style call.
package com.rppol.ratelimit.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.client.HttpClientErrorException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@Slf4j
@Service
public class MethodInvokerService {
@Autowired
private ObjectMapper objectMapper;
@Autowired
RedisService redisService;
public Object invokeMethod(String methodMetadata, Object bean) throws Exception {
Map<String, Object> methodDetails = objectMapper.readValue(methodMetadata, Map.class);
String methodName = (String) methodDetails.get("methodName");
Object[] params = objectMapper.convertValue(methodDetails.get("params"), Object[].class);
Method method = findMethod(bean.getClass(), methodName, params);
if (method == null) {
throw new RuntimeException("Method not found: " + methodName);
}
Class<?>[] paramTypes = method.getParameterTypes();
Object[] deserializedParams = new Object[params.length];
for (int i = 0; i < params.length; i++) {
deserializedParams[i] = objectMapper.convertValue(params[i], paramTypes[i]);
}
log.info("Invoking method: {} with params: {}", methodName, deserializedParams);
return method.invoke(bean, deserializedParams);
}
private Method findMethod(Class<?> clazz, String methodName, Object[] params) {
for (Method method : clazz.getDeclaredMethods()) {
if (method.getName().equals(methodName) && method.getParameterCount() == params.length) {
return method;
}
}
return null;
}
}
Token Bucket is the most comprehensible rate limiting algorithm but it has it’s limitations. Let’s understand with an example -
Say you’ve setup a rate limit of 10 req/min & a downstream API called by the annotated method also had a limit of 10 req/min limited by Sliding Window algorithm. Our Internal bucket was built initially at 00:00 mm:ss and holds 10 tokens. We receive 20 requests at 00:40, our implementation would allow first 10 requests & rest will be sent to SQS with a delay of 20s. At 01:00 we receive messages from SQS and the method is reinvoked. Since the bucket is also refilled at 01:00, all are passed to the downstream API. Since the window from downstream API spans from 00:00 to 01:00, these requests are rejected with 429 as we already used up the limit at 00:40.
To mitigate this, when the downstream API call throws HttpClientErrorException.TooManyRequests exception, we resend the message to SQS with delay of bucket refill time.
Feel free to reach-out on LinkedIn — https://www.linkedin.com/in/rutikpol