ScopeNotActiveException für Reader, Prozessor und Writer beim Implementieren eines Spring Batch-partitionierten JobsJava

Java-Forum
Guest
 ScopeNotActiveException für Reader, Prozessor und Writer beim Implementieren eines Spring Batch-partitionierten Jobs

Post by Guest »

Ich muss einen Spring-Batch-Job erstellen, der Daten aus Datenbank A liest, die Daten transformiert und in Datenbank B speichert. Ich habe es schon früher geschafft, einen nicht partitionierten Batch-Job zu erstellen, aber wenn ich versuche, den zu implementieren Bei einem partitionierten Job wird eine Ausnahme angezeigt.
Hier ist mein Code, und als Randnotiz habe ich einen Kommentar eingefügt, nur um zu verdeutlichen, dass ich vorher etwas ausprobiert habe. Vielleicht kann mir jemand etwas geben Einsicht damit.
Also beim .itemReader ist die Ausnahme:

Method throw
'org.springframework.beans.factory.support. ScopeNotActiveException'
Ausnahme.
org.springframework.batch.item.data.RepositoryItemReader$$SpringCGLIB$$0.toString()

bei .itemProcessor kann nicht ausgewertet werden Ausnahme ist:

Methode hat
'org.springframework.beans.factory.support.ScopeNotActiveException'
Ausnahme. jdk.proxy2.$Proxy108.toString()

bei .itemWriter kann nicht ausgewertet werden, die Ausnahme ist:

Methode hat
'org.springframework.beans.factory.support.ScopeNotActiveException'
Ausnahme ausgelöst. Kann jdk.proxy2.$Proxy109.toString() nicht auswerten

Irgendwie werden der Leseprozessor und der Schreiber einfach vom Code weggelassen, und wenn ich den verwende Debugger bekomme ich diese Ausnahmen.

Code: Select all

@Configuration
@EnableBatchProcessing
public class NewPartitionedBatchConfig {

private final CustomerRepository customerRepository;
private final SourceMigrationDataChanger sourceMigrationDataChanger;

public NewPartitionedBatchConfig(
CustomerRepository customerRepository,
SourceMigrationDataChanger sourceMigrationDataChanger
) {
this.customerRepository = customerRepository;
this.sourceMigrationDataChanger = sourceMigrationDataChanger;
}

@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}

@Bean
public Job sampleJob(
JobRepository jobRepository,
Step partitionedStep
) {
return new JobBuilder("sampleJob", jobRepository)
.start(partitionedStep)
.build();
}

// i've tried to declare it this way
//    @Bean
//    public Partitioner partitioner() {
//        CustomPartitioner partitioner = new CustomPartitioner();
//        partitioner.partition(GRID_SIZE);
//        return partitioner;
//    }

@Bean
public PartitionHandler partitionHandler(
JobExecutorConfig jobExecutorConfig,
Step workerStep
) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
//        handler.setTaskExecutor(jobExecutorConfig.threadPoolTaskExecutor()); //i've made a ThreadPoolTaskExecutor too
handler.setTaskExecutor(jobExecutorConfig.simpleAsyncTaskExecutor());
handler.setStep(workerStep);
handler.setGridSize(GRID_SIZE);
return handler;
}

@Bean
public Step partitionedStep(
JobRepository jobRepository,
CustomPartitioner customPartitioner,
//            BasicPartitioner basicPartitioner, //i've tried Basic Partitioner too
PartitionHandler partitionHandler
) {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", customPartitioner)
//                .step(workerStep()) //i've tried to put a step directly without partitionedHandler already
.partitionHandler(partitionHandler)
.build();
}

@Bean
public Step workerStep(
PlatformTransactionManager transactionManager,
JobRepository jobRepository,
RepositoryItemReader itemReader,
ItemProcessor itemProcessor,
ItemWriter itemWriter
) {
return new StepBuilder("workerStep", jobRepository)
.chunk(BATCH_SIZE, transactionManager)
.reader(itemReader)
.processor(itemProcessor)
.writer(itemWriter)
.build();
}

@Bean
@StepScope
public RepositoryItemReader itemReader(
@Value("#{stepExecutionContext['partitionIndex'] ?: 0}") Integer partitionIndex,
@Value("#{stepExecutionContext['gridSize']}") Integer gridSize
) {
if (partitionIndex == null) {
throw new IllegalStateException("partitionIndex is null.  Ensure that the step execution context is properly set.");
}

RepositoryItemReader reader = new RepositoryItemReader();
reader.setRepository(customerRepository);
//        reader.setMethodName("findCustomersByPage"); using JPA Pageable
reader.setMethodName("findCustomersByPartition");
reader.setArguments(Arrays.asList(partitionIndex, gridSize));
reader.setPageSize(BATCH_SIZE);
reader.setSort(Collections.singletonMap("id", Sort.Direction.ASC));
return reader;
}

@Bean
@StepScope
public ItemProcessor itemProcessor() {
return customer -> {
sourceMigrationDataChanger.updateCustomerMigrated((Customer) customer);

Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, customer);
return defaultKafkaHash;
};
}

@Bean
@StepScope
public ItemWriter itemWriter() {
return items -> {
Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, items);
//            it used to send some data through kafka, but omit this for now
};
}
}
Jetzt mein benutzerdefinierter Partitionierer

Code: Select all

@Getter
@Component
public class CustomPartitioner implements Partitioner {

//    public Map partitions = new HashMap();

@Override
public Map partition(int gridSize) {
System.out.println("Partitioning with gridSize: " + gridSize);

gridSize=2;

Map partitions = new HashMap();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("partitionIndex", i);
context.putInt("gridSize", gridSize);
context.putInt("startIndex", i * BATCH_SIZE);
context.putInt("pageSize", BATCH_SIZE);

partitions.put(PARTITION_KEY + i, context);
}
return partitions;
}
}

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post