Problem with AsyncTask Kafka (TaskRejectedException)

Giang Trung
2 min readAug 5, 2020

Error:

org.springframework.core.task.TaskRejectedException: Executor [io.github.jhipster.async.ExceptionHandlingAsyncTaskExecutor@77b7bc21] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$988/1717607117@26e60a84
at org.springframework.core.task.support.TaskExecutorAdapter.submit(TaskExecutorAdapter.java:131)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:284)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at com.vm.uua.service.KafkaProducerService$$EnhancerBySpringCGLIB$$32c4174d.asyncSend(<generated>)
at

Explain:

We config AsynTask with following parameters:

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer, SchedulingConfigurer {

private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);

private final JHipsterProperties jHipsterProperties;

public AsyncConfiguration(JHipsterProperties jHipsterProperties) {
this.jHipsterProperties = jHipsterProperties;
}

@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
executor.setThreadNamePrefix("uua-Executor-");
return new ExceptionHandlingAsyncTaskExecutor(executor);
}

jhipster:

— async:

— — core-pool-size: 2

— — max-pool-size: 50

— — queue-capacity: 10000

Here are Sun’s rules for thread creation in simple terms:

  1. If the number of threads is less than the corePoolSize, create a new Thread to run a new task.
  2. If the number of threads is equal (or greater than) the corePoolSize, put the task into the queue.
  3. If the queue is full, and the number of threads is less than the maxPoolSize, create a new thread to run tasks in.
  4. If the queue is full, and the number of threads is greater than or equal to maxPoolSize, reject the task.

→ When a AsynTask is delay ( connection refuse or something happends), the asynTask number increases dramatically → reject new asyncTask ( case 4 above). In this case is Kafka connection problem.

Solution :

  • Set timeout for Kafka connection :

Spring: — kafka: — properties:- request.timeout.ms: 100

  • Try catch in show error log
  • Increase corePoolSize, maxPoolSize

Additional:

  • Show status Kafka (how many items)

/data/kafka/kafka/bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand — group <group-id> — bootstrap-server xxx:port,xxx:port — describe

--

--