Hier ist mein Code, und als Randnotiz habe ich einen Kommentar eingefügt, nur um zu verdeutlichen, dass ich vorher etwas ausprobiert habe. Vielleicht kann mir jemand etwas geben Einsicht damit.
Also beim .itemReader ist die Ausnahme:
Method throw
'org.springframework.beans.factory.support. ScopeNotActiveException'
Ausnahme.
org.springframework.batch.item.data.RepositoryItemReader$$SpringCGLIB$$0.toString()
bei .itemProcessor kann nicht ausgewertet werden Ausnahme ist:
Methode hat
'org.springframework.beans.factory.support.ScopeNotActiveException'
Ausnahme. jdk.proxy2.$Proxy108.toString()
bei .itemWriter kann nicht ausgewertet werden, die Ausnahme ist:
Methode hat
'org.springframework.beans.factory.support.ScopeNotActiveException'
Ausnahme ausgelöst. Kann jdk.proxy2.$Proxy109.toString() nicht auswerten
Irgendwie werden der Leseprozessor und der Schreiber einfach vom Code weggelassen, und wenn ich den verwende Debugger bekomme ich diese Ausnahmen.
Code: Select all
@Configuration
@EnableBatchProcessing
public class NewPartitionedBatchConfig {
private final CustomerRepository customerRepository;
private final SourceMigrationDataChanger sourceMigrationDataChanger;
public NewPartitionedBatchConfig(
CustomerRepository customerRepository,
SourceMigrationDataChanger sourceMigrationDataChanger
) {
this.customerRepository = customerRepository;
this.sourceMigrationDataChanger = sourceMigrationDataChanger;
}
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean
public Job sampleJob(
JobRepository jobRepository,
Step partitionedStep
) {
return new JobBuilder("sampleJob", jobRepository)
.start(partitionedStep)
.build();
}
// i've tried to declare it this way
// @Bean
// public Partitioner partitioner() {
// CustomPartitioner partitioner = new CustomPartitioner();
// partitioner.partition(GRID_SIZE);
// return partitioner;
// }
@Bean
public PartitionHandler partitionHandler(
JobExecutorConfig jobExecutorConfig,
Step workerStep
) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
// handler.setTaskExecutor(jobExecutorConfig.threadPoolTaskExecutor()); //i've made a ThreadPoolTaskExecutor too
handler.setTaskExecutor(jobExecutorConfig.simpleAsyncTaskExecutor());
handler.setStep(workerStep);
handler.setGridSize(GRID_SIZE);
return handler;
}
@Bean
public Step partitionedStep(
JobRepository jobRepository,
CustomPartitioner customPartitioner,
// BasicPartitioner basicPartitioner, //i've tried Basic Partitioner too
PartitionHandler partitionHandler
) {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", customPartitioner)
// .step(workerStep()) //i've tried to put a step directly without partitionedHandler already
.partitionHandler(partitionHandler)
.build();
}
@Bean
public Step workerStep(
PlatformTransactionManager transactionManager,
JobRepository jobRepository,
RepositoryItemReader itemReader,
ItemProcessor itemProcessor,
ItemWriter itemWriter
) {
return new StepBuilder("workerStep", jobRepository)
.chunk(BATCH_SIZE, transactionManager)
.reader(itemReader)
.processor(itemProcessor)
.writer(itemWriter)
.build();
}
@Bean
@StepScope
public RepositoryItemReader itemReader(
@Value("#{stepExecutionContext['partitionIndex'] ?: 0}") Integer partitionIndex,
@Value("#{stepExecutionContext['gridSize']}") Integer gridSize
) {
if (partitionIndex == null) {
throw new IllegalStateException("partitionIndex is null. Ensure that the step execution context is properly set.");
}
RepositoryItemReader reader = new RepositoryItemReader();
reader.setRepository(customerRepository);
// reader.setMethodName("findCustomersByPage"); using JPA Pageable
reader.setMethodName("findCustomersByPartition");
reader.setArguments(Arrays.asList(partitionIndex, gridSize));
reader.setPageSize(BATCH_SIZE);
reader.setSort(Collections.singletonMap("id", Sort.Direction.ASC));
return reader;
}
@Bean
@StepScope
public ItemProcessor itemProcessor() {
return customer -> {
sourceMigrationDataChanger.updateCustomerMigrated((Customer) customer);
Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, customer);
return defaultKafkaHash;
};
}
@Bean
@StepScope
public ItemWriter itemWriter() {
return items -> {
Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, items);
// it used to send some data through kafka, but omit this for now
};
}
}
Code: Select all
@Getter
@Component
public class CustomPartitioner implements Partitioner {
// public Map partitions = new HashMap();
@Override
public Map partition(int gridSize) {
System.out.println("Partitioning with gridSize: " + gridSize);
gridSize=2;
Map partitions = new HashMap();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("partitionIndex", i);
context.putInt("gridSize", gridSize);
context.putInt("startIndex", i * BATCH_SIZE);
context.putInt("pageSize", BATCH_SIZE);
partitions.put(PARTITION_KEY + i, context);
}
return partitions;
}
}