1.背景

许多批处理问题都可以通过单线程、单进程作业来解决,因此在考虑更复杂的实现之前,最好先检查这些作业是否满足您的需要。衡量一份现实工作的表现,首先看看最简单的实现是否满足您的需求。即使使用标准硬件,您也可以在一分钟内读写数百MB的文件。

2.并行处理

Spring Batch提供了一系列选项,本章对此进行了描述,尽管其他地方介绍了一些功能。在较高级别上,有两种并行处理模式:

  • 单进程、多线程

  • 多进程

这些也可分为以下几类:

  • 多线程步骤(单进程)

  • 并行步骤(单进程)

  • 基于step的远程分块(多进程)

  • 基于step进行分区(单个或多个进程)

2.1 多线程step

启动并行处理的最简单方法是将TaskExecutor添加到步骤配置中。

使用java配置时,可以将TaskExecutor添加到步骤中,如下例所示:

@Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("spring_batch");
    }

    @Bean
    public Step sampleMutliStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleMutliStep")
                .<DemoUser, DemoUser>chunk(2)
                .reader(new DemoReader(10))
                .writer(new DemoWriter<>())
                .taskExecutor(taskExecutor)
                .throttleLimit(5)
                .build();
    }

2.2 并行step

只要需要并行化的应用程序逻辑可以划分为不同的职责并分配给各个步骤,那么它就可以在单个进程中并行化。并行步骤执行易于配置和使用。

使用java配置时,与step3并行执行步骤(step1、step2)非常简单,如下例所示:

@Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .start(splitFlow())
                .next(step4())
                .build()        //builds FlowJobBuilder instance
                .build();       //builds Job instance
    }

    @Bean
    public Flow splitFlow() {
        return new FlowBuilder<SimpleFlow>("splitFlow")
                .split(paralTaskExecutor())
                .add(flow1(), flow2())
                .build();
    }

    @Bean
    public Flow flow1() {
        return new FlowBuilder<SimpleFlow>("flow1")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Flow flow2() {
        return new FlowBuilder<SimpleFlow>("flow2")
                .start(step3())
                .build();
    }

2.3 远程分块

在远程分块中,步骤处理被分割到多个进程中,通过一些中间件相互通信。下图显示了该模式:

Remote Chunking

manager组件是单个进程,Worker是多个远程进程。如果管理器不是瓶颈,则此模式效果最好,因此处理的成本必须高于读取项目的成本(在实践中经常如此)。

​ manager是Spring批处理步骤的一个实现,ItemWriter被一个通用版本所取代,该版本知道如何将项目块作为消息发送到中间件。工作人员是使用任何中间件的标准侦听器(例如,对于JMS,他们将是MessageListener实现),他们的角色是通过ChunkProcessor接口使用标准ItemWriter或ItemProcessor plus ItemWriter处理项目块。使用此模式的优点之一是读卡器、处理器和写入器组件是现成的(与本地执行步骤时使用的组件相同)。项目被动态划分,工作通过中间件共享,因此,如果监听器都是热心的消费者,那么负载平衡是自动的。中间件必须是持久的,有保证的交付,每个消息都有一个消费者。JMS是显而易见的候选者,但网格计算和共享内存产品空间中存在其他选项(如JavaSpace)。

​ 更进一步可以通过使用ChunkMessageChannelItemWriter(由Spring Batch Integration提供)将区块处理外部化,它将项目发送出去并收集结果。发送后,Spring Batch将继续读取和分组项目的过程,而无需等待结果。相反,ChunkMessageChannelItemWriter负责收集结果并将其集成回Spring批处理过程。

通过Spring集成,您可以完全控制进程的并发性(例如,通过使用QueueChannel而不是DirectChannel)。此外,通过依赖Spring Integration丰富的通道适配器集合(如JMS和AMQP),您可以将批处理作业的块分发给外部系统进行处理。

具有要远程分块的步骤的简单作业可能具有类似以下配置:

Remote Chunking

从版本4.1开始,Spring Batch Integration引入了@EnableBatchIntegration注释,可用于简化远程分块设置。此注释提供了两个可以在应用程序上下文中自动连接的bean:

RemoteChunkingManagerStepBuilderFactory:用于配置管理器步骤

RemoteChunkingWorkerBuilder:用于配置远程工作者集成流

这些API负责配置许多组件,如下图所述:

Remote Chunking Configuration

  • 代码如下:看下生产者remoteChunkManager
	@Autowired
    private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
    @Bean
    public Job remoteChunkingJob() {
        return this.jobBuilderFactory.get("remoteChunkingJob")
                .start(remoteChunkManagerStep()).build();
    }

    @Bean
    public DirectChannel mangerRequests() {
        return new DirectChannel();
    }
    
    @Bean
    public IntegrationFlow managerOutboundFlow() {
        return IntegrationFlows.from(mangerRequests())
                .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("requests"))
                .get();
    }

    @Bean
    public QueueChannel managerReplies() {
        return new QueueChannel();
    }
    @Bean
    public IntegrationFlow managerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) {
        return IntegrationFlows
                .from(Amqp.inboundAdapter(rabbitmqConnectionFactory,"replies")
                )
                .channel(managerReplies()).get();
    }

    @Bean
    public TaskletStep remoteChunkManagerStep() {
        return this.managerStepBuilderFactory.get("remoteChunkManagerStep")
                .<Integer, Integer>chunk(3).reader(itemReader())
                .outputChannel(mangerRequests())
                .inputChannel(managerReplies()).build();
    }

    @Bean
    public ListItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6));
    }

可以看到。Manager需要定义Reader。定义发送的消息通道,定义接收的消息通道。

这里面只定义了reader。好奇发出去的是什么?接收到的又是什么?

img

在RemoteChunkingManagerStepBuilder#build()中又添加了ChunkMessageChannelItemWriter相当于内部拼接了个完成的step。

那发出去的是啥?

img

发出去的ChunkRequest

img

  • 看下消费者remoteChunkWorker

    @Autowired
        private RemoteChunkingWorkerBuilder<Integer, Integer> remoteChunkingWorkerBuilder;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Bean
        public DirectChannel workerRequests() {
            return new DirectChannel();
        }
        @Bean
        public IntegrationFlow workerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) {
            return IntegrationFlows
                    .from(Amqp.inboundAdapter(rabbitmqConnectionFactory,"requests"))
                    .channel(workerRequests()).get();
        }
        @Bean
        public DirectChannel workerReplies() {
            return new DirectChannel();
        }
    
    
    
        @Bean
        public IntegrationFlow workerOutboundFlow() {
            return IntegrationFlows.from(workerReplies())
                    .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("replies"))
                    .get();
        }
    
        @Bean
        public ItemProcessor<Integer, Integer> itemProcessor() {
            return item -> {
                System.out.println("processing item " + item);
                return item;
            };
        }
    
        @Bean
        public ItemWriter<Integer> itemWriter() {
            return items -> {
                for (Integer item : items) {
                    System.out.println("writing item " + item);
                }
            };
        }
    
        @Bean
        public IntegrationFlow workerIntegrationFlow() {
            return this.remoteChunkingWorkerBuilder
                    .itemProcessor(itemProcessor())
                    .itemWriter(itemWriter())
                    .inputChannel(workerRequests())
                    .outputChannel(workerReplies()).build();
        }
    

    img

    接收到的消息是如何处理的?

    img

    执行process后续过程。执行完成返回ChunResponse

    img

    并将response发送到队列。发回到生产者。生产者是如何处理的呢

    img

    生产者从replychannel接收消息。然后将消息更新到内存。进行运算。内存运算实体为LocalState

    img

  • 解读一下哈:

    • 在Manager方面,RemoteChunkingManagerStepBuilderFactory允许您通过声明以下内容来配置管理器步骤:

      reader读取项目并将其发送给Worker

      向Worker发送请求的输出通道(“requests”)

      接收Worker回复的输入通道(“replies”)

      无需显式配置ChunkMessageChannelItemWriter和MessagingTemplate(如果需要,仍然可以显式配置)

    • 在Worker方面,RemoteChunkingWorkerBuilder允许您将工作者配置为:

      侦听管理器在输入通道上发送的请求(“requests”)

      使用配置的ItemProcessor和ItemWriter为每个请求调用ChunkProcessorChunkHandler的handleChunk方法

      将输出通道上的回复(“replies”)发送给Manager

      无需显式配置SimpleChunkProcessor和ChunkProcessorChunkHandler(如果需要,可以显式配置)。

整体流程可以概括如下:

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
    @Configuration
    public static class ManagerConfiguration {
        @Autowired
        private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
        @Bean
        public TaskletStep managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // requests sent to workers
                       .inputChannel(replies())   // replies received from workers
                       .build();
        }
        // Middleware beans setup omitted
    }
    @Configuration
    public static class WorkerConfiguration {
        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;
        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // requests received from the manager
                       .outputChannel(replies()) // replies sent to the manager
                       .build();
        }
        // Middleware beans setup omitted
    }
}

2.4远程分区

Remote Partitioning

远程分区是通过分区器partitioner来控制整体流程。真正的执行(包含reader这里是一个完整的step功能)不同于远程分块模块。

另一方面,当导致瓶颈的不是项目的处理而是相关的I/O时,远程分区很有用。使用远程分区,可以将工作分配给执行完整的Spring批处理步骤的工作人员。因此,每个worker都有自己的ItemReader、ItemProcessor和ItemWriter。为此,Spring Batch Integration提供了MessageChannelPartitionHandler。

PartitionHandler接口的这个实现使用MessageChannel实例向remote Worker发送指令并接收他们的响应。这为用于与remote Worker通信的传输(如JMS和AMQP例子用的是rabbitmq)提供了一个很好的抽象。

“可伸缩性”一章中涉及远程分区的部分概述了配置远程分区所需的概念和组件,并显示了使用默认TaskExecutionPartitionHandler在单独的本地执行线程中进行分区的示例。对于到多个JVM的远程分区,需要另外两个组件:

  • 远程结构或网格环境

  • 支持所需远程处理结构或网格环境的PartitionHandler实现

与远程分块类似,JMS/AMQP可以用作“远程处理结构”。在这种情况下,使用MessageChannelPartitionHandler实例作为PartitionHandler实现,如上所述。以下示例假定存在一个分区作业,并重点介绍MessageChannelPartitionHandler和JMS/AMQP配置:

image-20220530224040746

  • 代码如下:

    • 看下manager代码
    	@Bean
        public DirectChannel managerRequests() {
            return new DirectChannel();
        }
        @Bean
        public IntegrationFlow managerOutboundFlow() {
            return IntegrationFlows.from(managerRequests())
                    .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("requests"))
                    .get();
        }
        @Bean
        public DirectChannel managerReplies() {
            return new DirectChannel();
        }
        @Bean
        public IntegrationFlow managerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) {
            return IntegrationFlows
                    .from(Amqp.inboundAdapter(
                            rabbitmqConnectionFactory,"replies"))
                    .channel(managerReplies()).get();
        }
        @Bean
        public Step managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                    .partitioner("workerStep", new SimplePartitioner())
                    .gridSize(GRID_SIZE)
                    .outputChannel(managerRequests())
                    .inputChannel(managerReplies()).build();
        }
        @Bean
        public Job remotePartitioningJob() {
            return this.jobBuilderFactory
                    .get("remotePartitioningJob")
                    .start(managerStep())
                    .build();
        }
    
    • 看worker代码
    	@Bean
        public DirectChannel workerRequests() {
            return new DirectChannel();
        }
        @Bean
        public IntegrationFlow workerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) {
            return IntegrationFlows
                    .from(Amqp.inboundAdapter(
                            rabbitmqConnectionFactory,"requests"))
                    .channel(workerRequests()).get();
        }
        @Bean
        public DirectChannel workerReplies() {
            return new DirectChannel();
        }
        @Bean
        public IntegrationFlow workerOutboundFlow() {
            return IntegrationFlows.from(workerReplies())
                    .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("replies"))
                    .get();
        }
        @Bean
        public Step workerStep() {
            return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(workerRequests())
                    .outputChannel(workerReplies())
                    .tasklet(tasklet(null)).build();
        }
        @Bean
        @StepScope
        public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) {
            return (contribution, chunkContext) -> {
                System.out.println("processing " + partition);
                return RepeatStatus.FINISHED;
            };
        }
    

    可用于简化远程分区设置的@EnableBatchIntegration注释。此注释提供了两个对远程分区有用的bean:

    RemotePartitioningManagerStepBuilderFactory:用于配置Manager步骤

    RemotePartitioningWorkerStepBuilderFactory:用于配置Worker步骤

    这些API负责配置许多组件,如下图所述:(使用db轮询模式)

    Remote Partitioning Configuration (with job repository polling)Remote Partitioning Configuration (with job repository polling)

Remote Partitioning Configuration (with replies aggregation)

Remote Partitioning Configuration (with replies aggregation)

在Manager方面,RemotePartitioningManagerStepBuilderFactory允许您通过声明以下内容来配Manager步骤:

  • 用于对数据进行分区的Partitioner

  • 向Worker发送请求的输出通道(“传出请求”)

  • 接收Worker回复的输入通道(“传入回复”)(配置回复聚合时)

  • 轮询间隔和超时参数(配置作业存储库轮询时)

无需显式配置MessageChannelPartitionHandler和MessagingTemplate(如果需要,仍然可以显式配置)。

在工作者方面,RemotePartitioningWorkersStepBuilderFactory允许您将工作者配置为:

  • 侦听管理器在输入通道上发送的请求(“传入请求”)

  • 为每个请求调用StepExecutionRequestHandler的句柄方法

  • =将输出通道上的回复(“传出回复”)发送给经理

无需显式配置StepExecutionRequestHandler(如果需要,可以显式配置)。

以下示例显示了如何使用这些API:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
    @Configuration
    public static class ManagerConfiguration {
        @Autowired
        private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
        @Bean
        public Step managerStep() {
                 return this.managerStepBuilderFactory
                    .get("managerStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }
        // Middleware beans setup omitted
    }
    @Configuration
    public static class WorkerConfiguration {
        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromManager())
                    .outputChannel(outgoingRepliesToManager())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }
        // Middleware beans setup omitted
    }
}

转自:
https://www.cnblogs.com/jackssybin/articles/16328760.html