Problem with AsyncTask Kafka (TaskRejectedException)
Error:
org.springframework.core.task.TaskRejectedException: Executor [io.github.jhipster.async.ExceptionHandlingAsyncTaskExecutor@77b7bc21] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$988/1717607117@26e60a84
at org.springframework.core.task.support.TaskExecutorAdapter.submit(TaskExecutorAdapter.java:131)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:284)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at com.vm.uua.service.KafkaProducerService$$EnhancerBySpringCGLIB$$32c4174d.asyncSend(<generated>)
at
Explain:
We config AsynTask with following parameters:
@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer, SchedulingConfigurer {
private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);
private final JHipsterProperties jHipsterProperties;
public AsyncConfiguration(JHipsterProperties jHipsterProperties) {
this.jHipsterProperties = jHipsterProperties;
}
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
executor.setThreadNamePrefix("uua-Executor-");
return new ExceptionHandlingAsyncTaskExecutor(executor);
}
jhipster:
— async:
— — core-pool-size: 2
— — max-pool-size: 50
— — queue-capacity: 10000
Here are Sun’s rules for thread creation in simple terms:
- If the number of threads is less than the
corePoolSize
, create a new Thread to run a new task. - If the number of threads is equal (or greater than) the
corePoolSize
, put the task into the queue. - If the queue is full, and the number of threads is less than the
maxPoolSize
, create a new thread to run tasks in. - If the queue is full, and the number of threads is greater than or equal to
maxPoolSize
, reject the task.
→ When a AsynTask is delay ( connection refuse or something happends), the asynTask number increases dramatically → reject new asyncTask ( case 4 above). In this case is Kafka connection problem.
Solution :
- Set timeout for Kafka connection :
Spring: — kafka: — properties:- request.timeout.ms: 100
- Try catch in show error log
- Increase corePoolSize, maxPoolSize
Additional:
- Show status Kafka (how many items)
/data/kafka/kafka/bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand — group <group-id> — bootstrap-server xxx:port,xxx:port — describe