SpringBatch企业批处理框架Reader的使用

jopen 12年前

SpringBatch是解决企业数据逻辑较简单,重复性高,大数据量而设计的.从他提供的各种Reader就能看出来.起码我是这样理解的.最适合做的如:数据清洗,数据分析后转移,或者定时的和其他系统交互的地方等.

在上一篇文章中,我使用了 JdbcPagingItemReader读取HSQLDB数据库的数据.

 <bean id="sysAppStoreMapper" class="net.dbatch.mapper.SysAppStoreMapper" />    <bean id="dbReader"            class="org.springframework.batch.item.database.JdbcPagingItemReader">          <property name="dataSource" ref="dataSource"/>          <property name="rowMapper" ref="sysAppStoreMapper"/>          <property name="queryProvider" ref="appQueryProvider"/>      </bean>          <bean id="appQueryProvider"            class="org.springframework.batch.item.database.support.HsqlPagingQueryProvider">          <property name="selectClause" value="a.APP_ID, a.PARENT_ID, a.APP_DESC, a.APP_URL, a.FOLDER, a.SEQ"/>          <property name="fromClause" value="sys_appstore a"/>          <property name="sortKey" value="SEQ"/>      </bean>

事实上SpringBatch提供了很多的Reader,自定义的Reader只要是继承自org.springframework.batch.item.ItemReader接口的都可以.但是好多都不用你麻烦了,SpringBatch都替你做好了.2.1.8API中基本常用的和数据库[Hibernate/Ibatis/JDBC],文件系统,JMS消息等Reader现成的实现.如图:

SpringBatch企业批处理框架Reader的使用


对于喜欢SpringJDBC的用户[我就非常不喜欢Hibernate ]可以使用JdbcPagingItemReader

,然后指定一个queryProvider ,queryProvider 是针对各种数据库的一个分页的实现,常用的数据库 queryProvider也有现成的.如图:

SpringBatch企业批处理框架Reader的使用


好了.如果上面你实在找不到你可以使用的数据库对应的实现,而你又了解你的数据库SQL,你可以使用JdbcCursorItemReader.这个Reader允许你自己set SQL.

如我上面实现的例子,JdbcCursorItemReader改写也非常简单:

<bean id="dbReader"            class="org.springframework.batch.item.database.JdbcCursorItemReader">          <property name="dataSource" ref="dataSource" />          <property name="sql" value="select a.APP_ID, a.PARENT_ID, a.APP_DESC, a.APP_URL, a.FOLDER from sys_appstore a order by a.SEQ" />          <property name="rowMapper" ref="sysAppStoreMapper" />      </bean>

他仍然可以工作的很好,而且还简单了.

如果我的数据来源不是从数据库,从文件的怎么办?

看到刚才的Reader实现里有个FlatFileItemReader?他就是读取文件[文本文件].

假如我要分析这样结构的log日志信息

User1,20  User2,21  User3,22  User4,23  User5,24  User6,25  User7,26  User8,27  User9,28  User10,29

他都是一些结构化的文本文件,我可以很容易的实现.Spring代码:

<bean id="delimitedLineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" />        <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">          <property name="lineTokenizer" ref="delimitedLineTokenizer" />    <property name="fieldSetMapper">     <bean class="net.dbatch.sample.UserMapper" />    </property>   </bean>     <bean id="messageReader" class="org.springframework.batch.item.file.FlatFileItemReader">    <property name="lineMapper" ref="lineMapper" />    <property name="resource" value="classpath:/users.txt" />   </bean>

再写上一个对应的Bean

public class UserMapper implements FieldSetMapper<User> {         public User mapFieldSet(FieldSet fs) throws BindException {    User u = new User();    u.setName(fs.readString(0));    u.setAge(fs.readInt(1));    return u;   }  }

Processor:

public class MessagesItemProcessor implements ItemProcessor<User, Message> {     public Message process(User user) throws Exception {    if(!StringUtils.hasText(user.getName())){     throw new RuntimeException("The user name is required!");    }    Message m = new Message();//Message是user一个简单的包装    m.setUser(user);    m.setContent("Hello " + user.getName()      + ",please pay promptly at end of this month.");    return m;   }  }

Writer:

public class MessagesItemWriter implements ItemWriter<Message> {        public void write(List<? extends Message> messages) throws Exception {          System.out.println("write results");          for (Message m : messages) {              System.out.println(m.getContent());  //只做输出          }      }  }

测试代码:

public static void main(String[] args) {    ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext("localfile_job.xml");    SimpleJobLauncher launcher = new SimpleJobLauncher();    launcher.setJobRepository((JobRepository) c.getBean("jobRepository"));    launcher.setTaskExecutor(new SyncTaskExecutor());    try {     JobExecution je = launcher.run((Job) c.getBean("messageJob"),                      new JobParametersBuilder().toJobParameters());     System.out.println(je);     System.out.println(je.getJobInstance());     System.out.println(je.getStepExecutions());    } catch (Exception e) {     e.printStackTrace();    }   }

输出:

10-20 15:28:32 INFO [job.SimpleStepHandler] - <Executing step: [messageStep]>  write results  Hello User1,please pay promptly at end of this month.  Hello User2,please pay promptly at end of this month.  Hello User3,please pay promptly at end of this month.  Hello User4,please pay promptly at end of this month.  Hello User5,please pay promptly at end of this month.  write results  Hello User6,please pay promptly at end of this month.  Hello User7,please pay promptly at end of this month.  Hello User8,please pay promptly at end of this month.  Hello User9,please pay promptly at end of this month.  Hello User10,please pay promptly at end of this month.  10-20 15:28:32 INFO [support.SimpleJobLauncher] - <Job: [FlowJob: [name=messageJob]] completed with the following parameters: [{run.month=2011-10}] and the following status: [COMPLETED]>  JobExecution: id=0, version=2, startTime=Sat Oct 20 15:28:32 CST 2012, endTime=Sat Oct 20 15:28:32 CST 2012, lastUpdated=Sat Oct 20 15:28:32 CST 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]]  JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]  [StepExecution: id=1, version=5, name=messageStep, status=COMPLETED, exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=3, rollbackCount=0, exitDescription=]

从日志里,我们可以清楚的看到.他是每行的读取并送入Processor中处理,完成5次读取进行一次性的写入.tasklet的属性 commit-interval可以调节此值.

全部的Spring配置:

<batch:job id="messageJob" restartable="true">    <batch:step id="messageStep">     <batch:tasklet>      <batch:chunk reader="messageReader"       processor="messageProcessor"       writer="messageWriter"         commit-interval="5"          chunk-completion-policy=""          retry-limit="2">       <batch:retryable-exception-classes>        <batch:include class="java.lang.RuntimeException" />       </batch:retryable-exception-classes>      </batch:chunk>     </batch:tasklet>    </batch:step>   </batch:job>