Problem with AsyncTask Kafka (TaskRejectedException)

Giang Trung
2 min readAug 5, 2020

--

Error:

org.springframework.core.task.TaskRejectedException: Executor [io.github.jhipster.async.ExceptionHandlingAsyncTaskExecutor@77b7bc21] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$988/1717607117@26e60a84
at org.springframework.core.task.support.TaskExecutorAdapter.submit(TaskExecutorAdapter.java:131)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:284)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at com.vm.uua.service.KafkaProducerService$$EnhancerBySpringCGLIB$$32c4174d.asyncSend(<generated>)
at

Explain:

We config AsynTask with following parameters:

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer, SchedulingConfigurer {

private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);

private final JHipsterProperties jHipsterProperties;

public AsyncConfiguration(JHipsterProperties jHipsterProperties) {
this.jHipsterProperties = jHipsterProperties;
}

@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
executor.setThreadNamePrefix("uua-Executor-");
return new ExceptionHandlingAsyncTaskExecutor(executor);
}

jhipster:

— async:

— — core-pool-size: 2

— — max-pool-size: 50

— — queue-capacity: 10000

Here are Sun’s rules for thread creation in simple terms:

  1. If the number of threads is less than the corePoolSize, create a new Thread to run a new task.
  2. If the number of threads is equal (or greater than) the corePoolSize, put the task into the queue.
  3. If the queue is full, and the number of threads is less than the maxPoolSize, create a new thread to run tasks in.
  4. If the queue is full, and the number of threads is greater than or equal to maxPoolSize, reject the task.

→ When a AsynTask is delay ( connection refuse or something happends), the asynTask number increases dramatically → reject new asyncTask ( case 4 above). In this case is Kafka connection problem.

Solution :

  • Set timeout for Kafka connection :

Spring: — kafka: — properties:- request.timeout.ms: 100

  • Try catch in show error log
  • Increase corePoolSize, maxPoolSize

Additional:

  • Show status Kafka (how many items)

/data/kafka/kafka/bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand — group <group-id> — bootstrap-server xxx:port,xxx:port — describe

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Giang Trung
Giang Trung

No responses yet

Write a response