您现在的位置是:亿华云 > 系统运维

配置 Spring Batch 批处理失败重试

亿华云2025-10-03 06:17:37【系统运维】1人已围观

简介1. 引言默认情况下,Spring批处理作业在执行过程中出现任何错误都会失败。然而有些时候,为了提高应用程序的弹性,我们就需要处理这类间歇性的故障。在这篇短文中,我们就来一起探讨 如何在Spring批

 1. 引言

默认情况下,配置h批Spring批处理作业在执行过程中出现任何错误都会失败。处理然而有些时候,失败为了提高应用程序的重试弹性,我们就需要处理这类间歇性的配置h批故障。在这篇短文中,处理我们就来一起探讨 如何在Spring批处理框架中配置重试逻辑。失败

如果对spring batch不了解,重试可以参考以前的配置h批一篇文章:

开车!Spring Batch 入门级示例教程!

2. 简单举例

假设有一个批处理作业,它读取一个CSV文件作为输入:

username,处理 userid, transaction_date, transaction_amount sammy, 1234, 31/10/2015, 10000 john, 9999, 3/12/2015, 12321 

然后,它通过访问REST端点来处理每条记录,失败获取用户的重试 age 和 postCode 属性:

public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {      @Override     public Transaction process(Transaction transaction) throws IOException {          log.info("RetryItemProcessor, attempting to process: { }", transaction);         HttpResponse response = fetchMoreUserDetails(transaction.getUserId());         //parse users age and postCode from response and update transaction         ...         return transaction;     }     ... } 

最后,它生成并输出一个合并的配置h批XML:

<transactionRecord>     <transactionRecord>         <amount>10000.0</amount>         <transactionDate>2015-10-31 00:00:00</transactionDate>         <userId>1234</userId>         <username>sammy</username>         <age>10</age>         <postCode>430222</postCode>     </transactionRecord>     ... </transactionRecord> 

3. ItemProcessor 中添加重试

现在假设,如果到REST端点的处理连接由于某些网络速度慢而超时,该怎么办?失败如果发生这种情况,亿华云则我们的批处理工作将失败。

在这种情况下,我们希望失败的 item 处理重试几次。因此,接下来我将批处理作业配置为:在出现故障时执行最多三次重试:

@Bean public Step retryStep(   ItemProcessor<Transaction, Transaction> processor,   ItemWriter<Transaction> writer) throws ParseException {      return stepBuilderFactory       .get("retryStep")       .<Transaction, Transaction>chunk(10)       .reader(itemReader(inputCsv))       .processor(processor)       .writer(writer)       .faultTolerant()       .retryLimit(3)       .retry(ConnectTimeoutException.class)       .retry(DeadlockLoserDataAccessException.class)       .build(); } 

这里调用了 faultTolerant() 来启用重试功能。另外,我们使用 retry 和 retryLimit 分别定义符合重试条件的异常和 item 的最大重试次数。

4. 测试重试次数

假设我们有一个测试场景,其中返回 age 和 postCode 的REST端点关闭了一段时间。在这个测试场景中,我们只对前两个 API 调用获取一个 ConnectTimeoutException ,而第三个调用将成功:

@Test public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception {      FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT);     FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT);     when(httpResponse.getEntity())       .thenReturn(new StringEntity("{  \"age\":10, \"postCode\":\"430222\" }"));     //fails for first two calls and passes third time onwards     when(httpClient.execute(any()))       .thenThrow(new ConnectTimeoutException("Timeout count 1"))       .thenThrow(new ConnectTimeoutException("Timeout count 2"))       .thenReturn(httpResponse);     JobExecution jobExecution = jobLauncherTestUtils       .launchJob(defaultJobParameters());     JobInstance actualJobInstance = jobExecution.getJobInstance();     ExitStatus actualJobExitStatus = jobExecution.getExitStatus();     assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));     assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED"));     AssertFile.assertFileEquals(expectedResult, actualResult); } 

在这里,我们的工作成功地完成了。另外,香港云服务器从日志中可以明显看出 第一条记录 id=1234 失败了两次,最后在第三次重试时成功了:

19:06:57.742 [main] INFO  o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep] 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999 19:06:57.773 [main] INFO  o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms 

同样,看下另一个测试用例,当所有重试次数都用完时会发生什么:

@Test public void whenEndpointAlwaysFail_thenJobFails() throws Exception {      when(httpClient.execute(any()))       .thenThrow(new ConnectTimeoutException("Endpoint is down"));     JobExecution jobExecution = jobLauncherTestUtils       .launchJob(defaultJobParameters());     JobInstance actualJobInstance = jobExecution.getJobInstance();     ExitStatus actualJobExitStatus = jobExecution.getExitStatus();     assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));     assertThat(actualJobExitStatus.getExitCode(), is("FAILED"));     assertThat(actualJobExitStatus.getExitDescription(),       containsString("org.apache.http.conn.ConnectTimeoutException")); } 

在这个测试用例中,在作业因 ConnectTimeoutException 而失败之前,会尝试对第一条记录重试三次。

5. 使用XML配置重试

最后,让我们看一下与上述配置等价的XML:

<batch:job id="retryBatchJob">     <batch:step id="retryStep">         <batch:tasklet>             <batch:chunk reader="itemReader" writer="itemWriter"               processor="retryItemProcessor" commit-interval="10"               retry-limit="3">                 <batch:retryable-exception-classes>                     <batch:include class="org.apache.http.conn.ConnectTimeoutException"/>                     <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>                 </batch:retryable-exception-classes>             </batch:chunk>         </batch:tasklet>     </batch:step> </batch:job> 

6. 简单总结

在本文中,我们学习了如何在Spring批处理中配置重试逻辑,其中包括使用Java和XML配置。以及使用单元测试来观察重试在实践中是如何工作的。

本文转载自微信公众号「锅外的大佬」,可以通过以下二维码关注。转载本文请联系锅外的大佬公众号。

服务器租用

很赞哦!(361)