Guest
Der Spring Batch-Job läuft nur beim ersten Mal einwandfrei. Funktioniert danach nicht mehr
Post
by Guest » 03 Jan 2025, 06:34
Ich löse einen Batch-Job mithilfe einer API aus. Das erste Mal, dass ich die API nach jedem Serverneustart auslöse, funktioniert einwandfrei. Durchläuft alle Haltepunkte und speichert die Daten wie vorgesehen in der Datenbank. Danach heißt es jedoch im Protokoll, dass der Job gestartet wurde und der Schritt ausgeführt wird. Es werden jedoch keine Haltepunkte erreicht und die Daten werden auch nicht wie erwartet gespeichert. Mein Code und meine Konfiguration lauten wie folgt -
Code: Select all
// Controller
@RestController
@Component
public class MyController {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job insertJob;
@PostMapping("/start")
public String startBatchJob(@RequestParam String validation) {
try {
String jobId = String.valueOf(System.currentTimeMillis());
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobId", jobId)
.addString("validation", validation)
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(insertJob, jobParameters);
if (jobExecution.getStatus().isUnsuccessful() || jobExecution.getStatus() == BatchStatus.FAILED)
return "Job FAILED with Job ID: " + jobId + ". Status: " + jobExecution.getStatus();
// Get job status or other details
return "Job finished successfully with Job ID: " + jobId + ". Status: " + jobExecution.getStatus();
} catch (JobExecutionException e) {
e.printStackTrace();
return "Error starting job: " + e.getMessage();
}
}
}
// Batch Config
@Configuration
public class SpringBatchConfig {
@Autowired
ScheduledTasks scheduledTasks;
@Autowired
ValidationRepository validationRepository;
@Autowired
@Lazy
PlatformTransactionManager transactionManager;
@Autowired
@Lazy
JobRepository jobRepository;
public List
getData() {
return scheduledTasks.callRestApiForData();
}
@Bean(name = "insertJob")
public Job insertJob(BaseWriter writer,
BaseProcessor processor,
BaseReader reader) {
return new JobBuilder("insertJob", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(insertJobListener()).start(step_1(writer, processor, reader)).build();
}
@Bean
public Step step_1(BaseWriter writer,
BaseProcessor processor,
BaseReader itemReader) {
return new StepBuilder("step_1", jobRepository)
. chunk(200, transactionManager)
.reader(itemReader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public JobExecutionListener insertJobListener() {
return new InsertJobCompletionListener();
}
}
// listener
@Slf4j
public class InsertJobCompletionListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("UPDATE BATCH COMPLETED");
}
else if(jobExecution.getStatus() == BatchStatus.FAILED){
log.info("UPDATE BATCH JOB FAILED TO COMPLETE");
}
}
}
// reader
public interface BaseReader extends ItemReader {
}
@Configuration
public class ExchangeRateBaseReader implements BaseReader {
@Autowired
ScheduledTasks scheduledTasks;
private boolean read = false;
private List data;
private int index = 0;
@Override
public Personality read() throws Exception {
if (data == null) {
data = getData();
}
Personality item = null;
if (index < data.size()) {
item = data.get(index);
index++;
}
return item;
}
public List getData() {
return scheduledTasks.callRestApiForData();
}
}
// processor
public interface BaseProcessor extends ItemProcessor {
}
@Configuration
@Slf4j
public class CurrencyExchangeProcessor implements BaseProcessor {
@Override
public Personality process(Personality Personality) throws Exception {
return Personality;
}
}
// writer
public interface BaseWriter extends ItemWriter {
}
@Configuration
@Slf4j
public class CurrencyExchangeWriter implements BaseWriter {
@Autowired
private ValidationRepository validationRepository;
@Override
public void write(Chunk
1735882475
Guest
Ich löse einen Batch-Job mithilfe einer API aus. Das erste Mal, dass ich die API nach jedem Serverneustart auslöse, funktioniert einwandfrei. Durchläuft alle Haltepunkte und speichert die Daten wie vorgesehen in der Datenbank. Danach heißt es jedoch im Protokoll, dass der Job gestartet wurde und der Schritt ausgeführt wird. Es werden jedoch keine Haltepunkte erreicht und die Daten werden auch nicht wie erwartet gespeichert. Mein Code und meine Konfiguration lauten wie folgt - [code]// Controller @RestController @Component public class MyController { @Autowired JobLauncher jobLauncher; @Autowired Job insertJob; @PostMapping("/start") public String startBatchJob(@RequestParam String validation) { try { String jobId = String.valueOf(System.currentTimeMillis()); JobParameters jobParameters = new JobParametersBuilder() .addString("jobId", jobId) .addString("validation", validation) .toJobParameters(); JobExecution jobExecution = jobLauncher.run(insertJob, jobParameters); if (jobExecution.getStatus().isUnsuccessful() || jobExecution.getStatus() == BatchStatus.FAILED) return "Job FAILED with Job ID: " + jobId + ". Status: " + jobExecution.getStatus(); // Get job status or other details return "Job finished successfully with Job ID: " + jobId + ". Status: " + jobExecution.getStatus(); } catch (JobExecutionException e) { e.printStackTrace(); return "Error starting job: " + e.getMessage(); } } } // Batch Config @Configuration public class SpringBatchConfig { @Autowired ScheduledTasks scheduledTasks; @Autowired ValidationRepository validationRepository; @Autowired @Lazy PlatformTransactionManager transactionManager; @Autowired @Lazy JobRepository jobRepository; public List getData() { return scheduledTasks.callRestApiForData(); } @Bean(name = "insertJob") public Job insertJob(BaseWriter writer, BaseProcessor processor, BaseReader reader) { return new JobBuilder("insertJob", jobRepository) .incrementer(new RunIdIncrementer()) .listener(insertJobListener()).start(step_1(writer, processor, reader)).build(); } @Bean public Step step_1(BaseWriter writer, BaseProcessor processor, BaseReader itemReader) { return new StepBuilder("step_1", jobRepository) . chunk(200, transactionManager) .reader(itemReader) .processor(processor) .writer(writer) .build(); } @Bean public JobExecutionListener insertJobListener() { return new InsertJobCompletionListener(); } } // listener @Slf4j public class InsertJobCompletionListener extends JobExecutionListenerSupport { @Override public void afterJob(JobExecution jobExecution) { if (jobExecution.getStatus() == BatchStatus.COMPLETED) { log.info("UPDATE BATCH COMPLETED"); } else if(jobExecution.getStatus() == BatchStatus.FAILED){ log.info("UPDATE BATCH JOB FAILED TO COMPLETE"); } } } // reader public interface BaseReader extends ItemReader { } @Configuration public class ExchangeRateBaseReader implements BaseReader { @Autowired ScheduledTasks scheduledTasks; private boolean read = false; private List data; private int index = 0; @Override public Personality read() throws Exception { if (data == null) { data = getData(); } Personality item = null; if (index < data.size()) { item = data.get(index); index++; } return item; } public List getData() { return scheduledTasks.callRestApiForData(); } } // processor public interface BaseProcessor extends ItemProcessor { } @Configuration @Slf4j public class CurrencyExchangeProcessor implements BaseProcessor { @Override public Personality process(Personality Personality) throws Exception { return Personality; } } // writer public interface BaseWriter extends ItemWriter { } @Configuration @Slf4j public class CurrencyExchangeWriter implements BaseWriter { @Autowired private ValidationRepository validationRepository; @Override public void write(Chunk