SpringBatch 企业级批处理框架的使用

jopen 12年前

SpringBatch是Spring FrameWork的子项目.据说可以承受千万级的压力.

SpringBatch适合做什么?

1.大规模的数据集需要处理

2.自动化不需要人工干预的

3.可靠性要求较高的

4.在性能上要求较高的

SpringBatch工作时序图

SpringBatch企业级批处理框架的使用

我这里做一个简单SpringBatch的实战,案例是想要做点数据清洗

Spring 3.1,Springbatch 2.1.8,hsqldb 2.2.9

SQL:

CREATE TABLE SYS_APPSTORE (      APP_ID VARCHAR(20) NOT NULL,      PARENT_ID VARCHAR(20),      APP_DESC VARCHAR(100) NOT NULL,      APP_URL VARCHAR(200),      FOLDER BOOLEAN,      PRIMARY KEY(APP_ID)  );
java bean:
public class SysAppStore implements Serializable {          private final static long serialVersionUID = 19890414L;        private String appId = null;        private String parentId = null;        private String appDesc = null;        private String appURL = null;        private Boolean folder = null;        ...getter,setter...  }

Spring JDBC Mapper

public class SysAppStoreMapper implements RowMapper<SysAppStore> {        public SysAppStoreMapper() {          super();      }        @Override      public SysAppStore mapRow(ResultSet resultSet, int i) throws SQLException {          SysAppStore sysAppStore = new SysAppStore();          sysAppStore.setAppId(resultSet.getString("APP_ID"));          sysAppStore.setParentId(resultSet.getString("PARENT_ID"));          sysAppStore.setAppDesc(resultSet.getString("APP_DESC"));          sysAppStore.setAppURL(resultSet.getString("APP_URL"));          sysAppStore.setFolder(resultSet.getBoolean("FOLDER"));          return sysAppStore;      }  }
SpringBatch Processer:
public class SysAppStoreProcessor  implements ItemProcessor<SysAppStore, SysAppStore> {        public SysAppStoreProcessor() {        }        @Override      public SysAppStore process(SysAppStore item) throws Exception {          System.out.println(item.getAppDesc()); //这里什么都不做,输出一下          return item;      }  }

SpringBatch Writer:

public class SysAppStoreWriter implements ItemWriter<SysAppStore> {        @Override      public void write(List items) throws Exception {          for (Object item : items) {              System.out.println(item); //也不做任何事          }      }  }
Spring Schema
<?xml version="1.0" encoding="UTF-8"?>  <beans xmlns="http://www.springframework.org/schema/beans"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xmlns:batch="http://www.springframework.org/schema/batch"         xmlns:context="http://www.springframework.org/schema/context"           xsi:schemaLocation="http://www.springframework.org/schema/beans         http://www.springframework.org/schema/beans/spring-beans.xsd         http://www.springframework.org/schema/batch      http://www.springframework.org/schema/batch/spring-batch-2.1.xsd      http://www.springframework.org/schema/context         http://www.springframework.org/schema/context/spring-context-3.0.xsd">        <context:property-placeholder location="classpath:jdbc.properties" />        <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">          <property name="driverClassName" value="${jdbc.driverClass}" />          <property name="url" value="${jdbc.url}" />          <property name="username" value="${jdbc.username}" />          <property name="password" value="${jdbc.password}" />      </bean>        <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">          <property name="dataSource" ref="dataSource" />      </bean>        <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />        <bean id="sysAppStoreMapper" class="net.dbatch.mapper.SysAppStoreMapper" />        <bean id="dbReader"            class="org.springframework.batch.item.database.JdbcPagingItemReader">          <property name="dataSource" ref="dataSource"/>          <property name="rowMapper" ref="sysAppStoreMapper"/>          <property name="queryProvider" ref="appQueryProvider"/>      </bean>          <bean id="appQueryProvider"            class="org.springframework.batch.item.database.support.HsqlPagingQueryProvider">          <property name="selectClause" value="a.APP_ID, a.PARENT_ID, a.APP_DESC, a.APP_URL, a.FOLDER, a.SEQ"/>          <property name="fromClause" value="sys_appstore a"/>          <property name="sortKey" value="SEQ"/>      </bean>        <bean id="sysAppStoreProcessor" class="net.dbatch.process.SysAppStoreProcessor" />        <bean id="sysAppStoreWriter" class="net.dbatch.writer.SysAppStoreWriter" />          <bean id="itemSqlParameterSourceProvider"                  class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />        <batch:job id="testJdbcBatch">          <batch:step id="firstCleanStep">              <batch:tasklet>                  <batch:chunk reader="dbReader" processor="sysAppStoreProcessor" writer="sysAppStoreWriter"                               commit-interval="5" chunk-completion-policy=""/>              </batch:tasklet>          </batch:step>      </batch:job>  </beans>
测试类:
public class JdbcORMJobMain {        public static void main(String[] args) {          ApplicationContext context = new ClassPathXmlApplicationContext("jdbcorm_job.xml");          SimpleJobLauncher launcher = new SimpleJobLauncher();          launcher.setJobRepository((JobRepository) context.getBean("jobRepository"));          launcher.setTaskExecutor(new SyncTaskExecutor());          try {              JobExecution je = launcher.run(context.getBean("testJdbcBatch", Job.class),                      new JobParametersBuilder().toJobParameters());                System.out.println("======================================================================");              System.out.println(je);              System.out.println(je.getJobInstance());              System.out.println(je.getStepExecutions());          } catch (Exception e) {              e.printStackTrace();          }      }  }
输出:
10-20 09:20:35 INFO [config.PropertyPlaceholderConfigurer] - <Loading properties file from class path resource [jdbc.properties]>  10-20 09:20:35 INFO [support.DefaultListableBeanFactory] - <Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@2dea1ba6: defining beans [org.springframework.beans.factory.config.PropertyPlaceholderConfigurer#0,dataSource,transactionManager,jobRepository,sysAppStoreMapper,dbReader,appQueryProvider,sysAppStoreProcessor,sysAppStoreWriter,itemSqlParameterSourceProvider,org.springframework.batch.core.scope.internalStepScope,org.springframework.beans.factory.config.CustomEditorConfigurer,org.springframework.batch.core.configuration.xml.CoreNamespacePostProcessor,firstCleanStep,testJdbcBatch]; root of factory hierarchy>  10-20 09:20:35 INFO [datasource.DriverManagerDataSource] - <Loaded JDBC driver: org.hsqldb.jdbcDriver>  10-20 09:20:35 INFO [support.SimpleJobLauncher] - <Job: [FlowJob: [name=testJdbcBatch]] launched with the following parameters: [{}]>  10-20 09:20:35 INFO [job.SimpleStepHandler] - <Executing step: [firstCleanStep]>  SourceForge  树节点查看  网易163  WEBQQ  ITeye  net.dbatch.entity.SysAppStore@6944da12[appId=11102880045318725,parentId=11102880044233464,appDesc=SourceForge,appURL=http://sourceforge.net/,folder=false]  net.dbatch.entity.SysAppStore@2c1e29ca[appId=11102881323428897,parentId=11102881323057218,appDesc=树节点查看,appURL=index.jsp,folder=false]  net.dbatch.entity.SysAppStore@7049a366[appId=11102880050094388,parentId=11102880049448584,appDesc=网易163,appURL=http://mail.163.com/,folder=false]  net.dbatch.entity.SysAppStore@7286b721[appId=11102880048511704,parentId=11102880047038128,appDesc=WEBQQ,appURL=http://web.qq.com/,folder=false]  net.dbatch.entity.SysAppStore@6a611244[appId=11102880047497417,parentId=11102880047240743,appDesc=ITeye,appURL=http://www.iteye.com/,folder=false]  社区  Intel  IBM  微软  软件公司  net.dbatch.entity.SysAppStore@30f224d9[appId=11102880047038128,parentId=11102880016088125,appDesc=社区,appURL=,folder=true]  net.dbatch.entity.SysAppStore@69513ba9[appId=11102880041502775,parentId=11102880041300615,appDesc=Intel,appURL=http://www.intel.com/,folder=false]  net.dbatch.entity.SysAppStore@54240a43[appId=11102880041149608,parentId=11102880039316139,appDesc=IBM,appURL=http://www.ibm.com/,folder=false]  net.dbatch.entity.SysAppStore@a1ddfdd[appId=11102880040025640,parentId=11102880039316139,appDesc=微软,appURL=http://www.microsoft.com/,folder=false]  net.dbatch.entity.SysAppStore@2f542b5b[appId=11102880039316139,parentId=11102880038314190,appDesc=软件公司,appURL=,folder=true]  国内  分页显示程序  网易126  新浪微博  CSDN  net.dbatch.entity.SysAppStore@e316834[appId=11102880016088125,parentId=Root,appDesc=国内,appURL=,folder=true]  net.dbatch.entity.SysAppStore@4db03533[appId=11102881324298312,parentId=11102881323057218,appDesc=分页显示程序,appURL=powerasapp.jsp,folder=false]  net.dbatch.entity.SysAppStore@6b74cf1d[appId=11102880050404071,parentId=11102880049448584,appDesc=网易126,appURL=http://mail.126.com/,folder=false]  net.dbatch.entity.SysAppStore@41c9b008[appId=11102880049211044,parentId=11102880047038128,appDesc=新浪微博,appURL=http://weibo.com/,folder=false]  net.dbatch.entity.SysAppStore@2043fef6[appId=11102880048200884,parentId=11102880047240743,appDesc=CSDN,appURL=http://www.csdn.net/,folder=false]  开源社区  AMD  硬件公司  Apache  Google  net.dbatch.entity.SysAppStore@100917f0[appId=11102880044233464,parentId=11102880016418917,appDesc=开源社区,appURL=,folder=true]  net.dbatch.entity.SysAppStore@450295c9[appId=11102880042470026,parentId=11102880041300615,appDesc=AMD,appURL=http://www.amd.com/,folder=false]  net.dbatch.entity.SysAppStore@2cb7e284[appId=11102880041300615,parentId=11102880038314190,appDesc=硬件公司,appURL=,folder=true]  net.dbatch.entity.SysAppStore@5c785f0b[appId=11102880045542267,parentId=11102880044233464,appDesc=Apache,appURL=http://www.apache.org/,folder=false]  net.dbatch.entity.SysAppStore@62a7fa9a[appId=11102880040236939,parentId=11102880039316139,appDesc=Google,appURL=http://www.google.com/,folder=false]  腾讯  苹果  苹果  Eclipse  IT学习  net.dbatch.entity.SysAppStore@70630657[appId=11102880035183022,parentId=11102880031124887,appDesc=腾讯,appURL=http://www.qq.com/,folder=false]  net.dbatch.entity.SysAppStore@75357365[appId=11102880040488906,parentId=11102880039316139,appDesc=苹果,appURL=http://www.apple.com/,folder=false]  net.dbatch.entity.SysAppStore@82b2801[appId=11102880043182136,parentId=11102880041300615,appDesc=苹果,appURL=http://www.apple.com/,folder=false]  net.dbatch.entity.SysAppStore@494f5dd7[appId=11102880046118737,parentId=11102880044233464,appDesc=Eclipse,appURL=http://eclipse.org/,folder=false]  net.dbatch.entity.SysAppStore@7999f3da[appId=11102880047240743,parentId=11102880016088125,appDesc=IT学习,appURL=,folder=true]  新浪邮箱  测试连接  授权程序  搜狐  摩托罗拉  net.dbatch.entity.SysAppStore@1d984f10[appId=11102880051055401,parentId=11102880049448584,appDesc=新浪邮箱,appURL=http://mail.sina.com.cn/,folder=false]  net.dbatch.entity.SysAppStore@7a6eb29d[appId=11102881323057218,parentId=Root,appDesc=测试连接,appURL=,folder=true]  net.dbatch.entity.SysAppStore@7990a036[appId=11102881325080465,parentId=11102881323057218,appDesc=授权程序,appURL=powerasapptree.jsp,folder=false]  net.dbatch.entity.SysAppStore@6067794[appId=11102880035434221,parentId=11102880031124887,appDesc=搜狐,appURL=http://www.souhu.com/,folder=false]  net.dbatch.entity.SysAppStore@129498a3[appId=11102880044032342,parentId=11102880041300615,appDesc=摩托罗拉,appURL=http://www.motorala.com/,folder=false]  阿里巴巴  Oracle[甲骨文]  邮箱  挂接程序  我的博客  net.dbatch.entity.SysAppStore@6819f939[appId=11102880036079524,parentId=11102880031124887,appDesc=阿里巴巴,appURL=http://www.alibaba.com/,folder=false]  net.dbatch.entity.SysAppStore@1394294[appId=11102880044595761,parentId=11102880039316139,appDesc=Oracle[甲骨文],appURL=http://www.oracle.com/,folder=false]  net.dbatch.entity.SysAppStore@5642032c[appId=11102880049448584,parentId=11102880016088125,appDesc=邮箱,appURL=,folder=true]  net.dbatch.entity.SysAppStore@7de69f2[appId=11102881326070340,parentId=11102881323057218,appDesc=挂接程序,appURL=sysapptree.jsp,folder=false]  net.dbatch.entity.SysAppStore@1afd92e7[appId=11102880052400411,parentId=Root,appDesc=我的博客,appURL=http://zhzhenqin.iteye.com/,folder=false]  10-20 09:20:37 INFO [support.SimpleJobLauncher] - <Job: [FlowJob: [name=testJdbcBatch]] completed with the following parameters: [{}] and the following status: [COMPLETED]>  ======================================================================  JobExecution: id=0, version=2, startTime=Sat Oct 20 09:20:35 CST 2012, endTime=Sat Oct 20 09:20:37 CST 2012, lastUpdated=Sat Oct 20 09:20:37 CST 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{}], Job=[testJdbcBatch]]  JobInstance: id=0, version=0, JobParameters=[{}], Job=[testJdbcBatch]  [StepExecution: id=1, version=10, name=firstCleanStep, status=COMPLETED, exitStatus=COMPLETED, readCount=35, filterCount=0, writeCount=35 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=8, rollbackCount=0, exitDescription=]
可以看到,测试程序每次读取1条数据放入Processor中执行,然后组成5个bean的List一次性放入Writer中执行.然后有一次commit的过程
commit-interval="5"
当然,从这点上足看出SpringBatch架构是非常不错的