ScopeNotActiveException für Reader, Prozessor und Writer beim Implementieren eines Spring Batch-partitionierten JobsJava

 ScopeNotActiveException für Reader, Prozessor und Writer beim Implementieren eines Spring Batch-partitionierten Jobs

Post by Guest »

Ich muss einen Spring-Batch-Job erstellen, der Daten aus Datenbank A liest, die Daten transformiert und in Datenbank B speichert. Ich habe es schon früher geschafft, einen nicht partitionierten Batch-Job zu erstellen, aber wenn ich versuche, den zu implementieren Bei einem partitionierten Job wird eine Ausnahme angezeigt.
Hier ist mein Code, und als Randnotiz habe ich einen Kommentar eingefügt, nur um zu verdeutlichen, dass ich vorher etwas ausprobiert habe. Vielleicht kann mir jemand etwas geben Einsicht damit.
Also beim .itemReader ist die Ausnahme:

Method throw
' ScopeNotActiveException'

bei .itemProcessor kann nicht ausgewertet werden Ausnahme ist:

Methode hat
Ausnahme. jdk.proxy2.$Proxy108.toString()

bei .itemWriter kann nicht ausgewertet werden, die Ausnahme ist:

Methode hat
Ausnahme ausgelöst. Kann jdk.proxy2.$Proxy109.toString() nicht auswerten

Irgendwie werden der Leseprozessor und der Schreiber einfach vom Code weggelassen, und wenn ich den verwende Debugger bekomme ich diese Ausnahmen.

Code: Select all

public class NewPartitionedBatchConfig {

private final CustomerRepository customerRepository;
private final SourceMigrationDataChanger sourceMigrationDataChanger;

public NewPartitionedBatchConfig(
CustomerRepository customerRepository,
SourceMigrationDataChanger sourceMigrationDataChanger
) {
this.customerRepository = customerRepository;
this.sourceMigrationDataChanger = sourceMigrationDataChanger;

public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);

public Job sampleJob(
JobRepository jobRepository,
Step partitionedStep
) {
return new JobBuilder("sampleJob", jobRepository)

// i've tried to declare it this way
//    @Bean
//    public Partitioner partitioner() {
//        CustomPartitioner partitioner = new CustomPartitioner();
//        partitioner.partition(GRID_SIZE);
//        return partitioner;
//    }

public PartitionHandler partitionHandler(
JobExecutorConfig jobExecutorConfig,
Step workerStep
) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
//        handler.setTaskExecutor(jobExecutorConfig.threadPoolTaskExecutor()); //i've made a ThreadPoolTaskExecutor too
return handler;

public Step partitionedStep(
JobRepository jobRepository,
CustomPartitioner customPartitioner,
//            BasicPartitioner basicPartitioner, //i've tried Basic Partitioner too
PartitionHandler partitionHandler
) {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", customPartitioner)
//                .step(workerStep()) //i've tried to put a step directly without partitionedHandler already

public Step workerStep(
PlatformTransactionManager transactionManager,
JobRepository jobRepository,
RepositoryItemReader itemReader,
ItemProcessor itemProcessor,
ItemWriter itemWriter
) {
return new StepBuilder("workerStep", jobRepository)
.chunk(BATCH_SIZE, transactionManager)

public RepositoryItemReader itemReader(
@Value("#{stepExecutionContext['partitionIndex'] ?: 0}") Integer partitionIndex,
@Value("#{stepExecutionContext['gridSize']}") Integer gridSize
) {
if (partitionIndex == null) {
throw new IllegalStateException("partitionIndex is null.  Ensure that the step execution context is properly set.");

RepositoryItemReader reader = new RepositoryItemReader();
//        reader.setMethodName("findCustomersByPage"); using JPA Pageable
reader.setArguments(Arrays.asList(partitionIndex, gridSize));
reader.setSort(Collections.singletonMap("id", Sort.Direction.ASC));
return reader;

public ItemProcessor itemProcessor() {
return customer -> {
sourceMigrationDataChanger.updateCustomerMigrated((Customer) customer);

Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, customer);
return defaultKafkaHash;

public ItemWriter itemWriter() {
return items -> {
Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, items);
//            it used to send some data through kafka, but omit this for now
Jetzt mein benutzerdefinierter Partitionierer

Code: Select all

public class CustomPartitioner implements Partitioner {

//    public Map partitions = new HashMap();

public Map partition(int gridSize) {
System.out.println("Partitioning with gridSize: " + gridSize);


Map partitions = new HashMap();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("partitionIndex", i);
context.putInt("gridSize", gridSize);
context.putInt("startIndex", i * BATCH_SIZE);
context.putInt("pageSize", BATCH_SIZE);

partitions.put(PARTITION_KEY + i, context);
return partitions;

Quick Reply

Change Text Case: 
  • Similar Topics
    Last post