Elastic-Job-Lite 是一个轻量级无中心化的分布式调度解决方案,基于成熟的开源产品Quartz和Zookeeper,支持作业的定时触发,支持任务分片并发执行,失效转移,可以和Spring整合,并且提供基于web的控制台工具,可以轻松管理运行中的作业,但不能通过控制台添加作业。Elastic-Job-Lite 是 Elastic-Job 的这一个子项目,另外一个子项目是Elastic-Job-Cloud 是一套分布式调度的云解决方案。Elastic-Job-lite 架构图如下:
Elastic-Job-Lite 依赖 zookeeper 作为作业的注册中心,用户记录任务的配置,服务器信息和运行状态。
在某些场景下,需要将一个任务分解成多个子任务在分布式环境中由多台服务器并发执行,所以就如要将任务分片,通常是按照数据集来分片。一个例子:
有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。
为了保证作业分片的一致性,Elastic-Job-Lite 确保同一个分片在分布式环境中同时只有一个执行实例,另外,Elastic-Job-Lite 不提供按照数据的自动分片功能,目的是将分片与业务逻辑解耦,所以需要开发者自行配置作业分片,根据不同的分片处理不同的数据。
使用 Elastic-Job-Lite 和 spring 集成需要引入如下依赖:
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-core</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
EJL 提供了三种任务类型,Simple 类型,Dataflow 类型 和 脚本类型。Simple 是最常用的类型,其代码例如:
/**
* Simple类型 Job 需要实现 SimpleJob 接口 的 execute 方法。
*
* 本例为对128个分表数据进行扫描,批量修改某自动。 比如,讲分为16个片,没片处理8张表,比如分片id为 2,则处理,table_017 ~
* table_023
*/
public class MyFirstEJLJob implements SimpleJob {
/**
* shardingContext 包含作业的分片信息。
*/
public void execute(ShardingContext shardingContext) {
// 当前触发的分片号
int shardingId = shardingContext.getShardingItem();
// 全局分片数
int shardingTotalCount = shardingContext.getShardingTotalCount();
// 数据库表的分表书
int tableShardNum = 128;
int step = shardingTotalCount > tableShardNum ? 1 : tableShardNum / shardingTotalCount;
System.out.println("step:" + step);
int startTableNum = shardingId * step;
int endTableNum = startTableNum + step;
for (int i = startTableNum; i < endTableNum; i++) {
processTable(i);
}
}
public void processTable(int tableNum) {
System.out.println("Table:" + tableNum);
}
}
EJL 可以通过java 代码配置,也可以通过xml配置,本文以xml配置为例,在Srping EJL 的基本配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<reg:zookeeper id="regCenter" server-lists="192.168.56.101:2181"
namespace="dd-job" base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="3000" max-retries="3" />
<!-- 每一个 job 有一个唯一id -->
<job:simple id="item_myFirstEJLJob"
class="com.github.coderxing.book.code.chapter8.MyFirstEJLJob"
registry-center-ref="regCenter" cron="0/5 * * * * ?"
sharding-total-count="3" overwrite="true" />
</beans>
简单任务配置参数:
- id:任务名称,不能喝已经存在的id冲突,实际使用时最好按照业务添加前缀,避免和其他业务冲突。
- class:任务的对应实现类,需实现ElasticJob接口。
- registry-center-ref,注册中心对应的bean引用。
- cron: cron表达式,用于控制作业触发时间,也可以通过 EJL web 控制台进行配置。
- sharding-total-count:任务分片总数,可以通过 EJL web 控制台进行动态调整,默认情况下会按照机器数平均分配,比如如果有2台机器,任务分数为 6,那么每台机器分配3个分片,单台机器上启动3个线程来执行,如果是1,则表示该任务全局只在一台机器上执行。
- overwrite: 表示是否已本地配置为准,通过web 控制台也可以动态修改配置。 更多参数参考官方文档(http://dangdangdotcom.github.io/elastic-job/elastic-job-lite/02-guide/config-manual/)
Dataflow类型任务用于处理流式数据,其将数据的获取和处理分开,数据获取用fetchData 方法,处理数据用 processData 方法,spring 对应的配置例如:
<job:dataflow id="item_myFirstElasticDataflowJob"
class="com.github.coderxing.book.code.chapter8.MyFirstElasticDataflowJob"
registry-center-ref="regCenter" cron="0/5 * * * * ?"
sharding-total-count="3" streaming-process="false" overwrite="true" />
和 simple 类型的任务配置相似,Simeple 作业支持的配置 Dataflow 也支持,仅仅多出一个 streaming-process 参数,用于控制是否按照流式处理,默认为false。如果为 true,每次触发作业时,如果 fetchData 不返回空数据,则继续执行作业,会多次调用 fetchData 和 processData。如果为 false,则按照非流式方式执行,如果返回数据不为空,则一次 fetchData ,一次 processData 调用,如果返回数据为空,则只调用一次 fetchData。
实例代码为:
package com.github.coderxing.book.code.chapter8;
import java.util.ArrayList;
import java.util.List;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.github.coderxing.book.code.chapter8.pojo.Item;
/**
* Dataflow 类型用于处理数据流,需实现DataflowJob接口
*
*/
public class MyFirstElasticDataflowJob implements DataflowJob<Item> {
public List<Item> fetchData(ShardingContext context) {
List<Item> data = null;
switch (context.getShardingItem()) {
case 0:
return getItemFromDB(0);
case 1:
return getItemFromDB(1);
case 2:
return getItemFromDB(2);
}
return data;
}
public void processData(ShardingContext shardingContext, List<Item> data) {
// do sth...........
}
/**
* 模拟数据的获取
*
* @param shardingNum
* @return
*/
private List<Item> getItemFromDB(int shardingNum) {
List<Item> list = new ArrayList<Item>();
list.add(new Item());
return list;
}
}
另外 EJL支持脚本类型作业,配置如下:
<job:script id="scriptElasticJob" registry-center-ref="regCenter"
cron="0/10 * * * * ?" sharding-total-count="3" script-command-line="/tmp/test.sh" />
相比于前两种类型的任务,只多一个 script-command-line 配置,用户指定脚文件地址,例如文件内容为:
#!/bin/bash
echo sharding execution context is $*
sharding execution context is {"jobName":"scriptElasticJob","taskId":"scriptElasticJob@-@0,1,2@-@READY@[email protected]@-@16877","shardingTotalCount":3,"jobParameter":"","shardingItem":0}
任务控制台:
Elastic-Job-Lite 提供了基于 web 的控制台 elastic-job-lite-console,可以非常方便地操控任务的执行,可以动态调整EJL的任务执行时间,任务分片数,任务的启动、关闭等功能。
elastic-job-lite-console 下载安装并启动:
wget https://dangdangdotcom.github.io/elastic-job/elastic-job-lite/dist/elastic-job-lite-console-2.1.4.tar.gz
tar -zxvf ./elastic-job-lite-console-2.1.4.tar.gz
cd elastic-job-lite-console-2.1.4
./bin/start.sh
访问地址 http://hostname:8899/,首次访问需要配置zookeeper。
动态修改作业面板:
TODO 作业失效转移