如何使用Spring开发和监控线程池服务

jopen 10年前

线程池对执行同步或异步的任务很重要。本文展示如何利用Spring开发并监控线程池服务。创建线程池的其他两种方法已讲解过。

使用技术

  • JDK 1.6.0_21
  • Spring 3.0.5
  • Maven 3.0.2

第1步:创建Maven工程

下面是一个maven工程。(可以使用Maven或IDE的插件创建)。

10e8a636f08788da7994c053f1f32d40.png

第2步:添加依赖库

将Spring的依赖添加到Maven的pom.xml文件中。

<!-- Spring 3 dependencies -->  <dependency>      <groupId>org.springframework</groupId>      <artifactId>spring-core</artifactId>      <version>${spring.version}</version>  </dependency>  <dependency>      <groupId>org.springframework</groupId>      <artifactId>spring-context</artifactId>      <version>${spring.version}</version>  </dependency>

使用下面的插件创建可执行jar包。

<plugin>      <groupId>org.apache.maven.plugins</groupId>      <artifactId>maven-shade-plugin</artifactId>      <version>1.3.1</version>      <executions>          <execution>              <phase>package</phase>              <goals>                  <goal>shade</goal>              </goals>              <configuration>                  <transformers>                      <transformer                          implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">                          <mainClass>com.otv.exe.Application</mainClass>                      </transformer>                      <transformer                          implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">                          <resource>META-INF/spring.handlers</resource>                      </transformer>                      <transformer                          implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">                          <resource>META-INF/spring.schemas</resource>                      </transformer>                  </transformers>              </configuration>          </execution>      </executions>  </plugin>

第3步:创建任务类

创建一个实现Runnable接口的新TestTask类。这个类表示要执行的任务。

package com.otv.task;    import org.apache.log4j.Logger;    /**   * @author onlinetechvision.com   * @since 17 Oct 2011   * @version 1.0.0   *   */  public class TestTask implements Runnable {        private static Logger log = Logger.getLogger(TestTask.class);      String taskName;        public TestTask() {      }        public TestTask(String taskName) {          this.taskName = taskName;      }        public void run() {          try {              log.debug(this.taskName + " : is started.");              Thread.sleep(10000);              log.debug(this.taskName + " : is completed.");          } catch (InterruptedException e) {              log.error(this.taskName + " : is not completed!");              e.printStackTrace();          }             }        @Override      public String toString() {          return (getTaskName());      }        public String getTaskName() {          return taskName;      }        public void setTaskName(String taskName) {          this.taskName = taskName;      }  }

第4步:创建TestRejectedExecutionHandler类

TestRejectedExecutionHandler类实现了RejectedExecutionHandler接口。如果没有空闲线程并且队列超出限制,任务会被拒绝。这个类处理被拒绝的任务。

package com.otv.handler;    import java.util.concurrent.RejectedExecutionHandler;  import java.util.concurrent.ThreadPoolExecutor;    import org.apache.log4j.Logger;    /**   * @author onlinetechvision.com   * @since 17 Oct 2011   * @version 1.0.0   *   */  public class TestRejectedExecutionHandler implements RejectedExecutionHandler {        private static Logger log = Logger.getLogger(TestRejectedExecutionHandler.class);        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {          log.debug(runnable.toString() + " : has been rejected");      }  }

第5步:创建ITestThreadPoolExecutorService接口

创建ITestThreadPoolExecutorService接口。(译者注:这个接口的主要功能是通过设置的参数创建一个线程池)

package com.otv.srv;    import java.util.concurrent.ThreadPoolExecutor;    import com.otv.handler.TestRejectedExecutionHandler;    /**   * @author onlinetechvision.com   * @since 17 Oct 2011   * @version 1.0.0   *    */  public interface ITestThreadPoolExecutorService {        public ThreadPoolExecutor createNewThreadPool();        public int getCorePoolSize();        public void setCorePoolSize(int corePoolSize);        public int getMaxPoolSize();        public void setMaxPoolSize(int maximumPoolSize);        public long getKeepAliveTime();        public void setKeepAliveTime(long keepAliveTime);        public int getQueueCapacity();        public void setQueueCapacity(int queueCapacity);        public TestRejectedExecutionHandler getTestRejectedExecutionHandler();        public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler);    }

第6步:创建TestThreadPoolExecutorService类

TestThreadPoolExecutorService类实现了ITestThreadPoolExecutorService接口(上一步创建的接口)。这个类可以创建一个新的线程池。

package com.otv.srv;    import java.util.concurrent.ArrayBlockingQueue;  import java.util.concurrent.ThreadPoolExecutor;  import java.util.concurrent.TimeUnit;    import com.otv.handler.TestRejectedExecutionHandler;    /**   * @author onlinetechvision.com   * @since 17 Oct 2011   * @version 1.0.0   *   */  public class TestThreadPoolExecutorService implements ITestThreadPoolExecutorService {        private int  corePoolSize;      private int  maxPoolSize;       private long keepAliveTime;      private int  queueCapacity;      TestRejectedExecutionHandler testRejectedExecutionHandler;        public ThreadPoolExecutor createNewThreadPool() {          ThreadPoolExecutor executor = new ThreadPoolExecutor(getCorePoolSize(),                                                                   getMaxPoolSize(),                                                                   getKeepAliveTime(),                                                                   TimeUnit.SECONDS,                                                                   new ArrayBlockingQueue(getQueueCapacity()),                                                                   getTestRejectedExecutionHandler());          return executor;      }        public int getCorePoolSize() {          return corePoolSize;      }        public void setCorePoolSize(int corePoolSize) {          this.corePoolSize = corePoolSize;      }        public int getMaxPoolSize() {          return maxPoolSize;      }        public void setMaxPoolSize(int maxPoolSize) {          this.maxPoolSize = maxPoolSize;      }        public long getKeepAliveTime() {          return keepAliveTime;      }        public void setKeepAliveTime(long keepAliveTime) {          this.keepAliveTime = keepAliveTime;      }        public int getQueueCapacity() {          return queueCapacity;      }        public void setQueueCapacity(int queueCapacity) {          this.queueCapacity = queueCapacity;      }        public TestRejectedExecutionHandler getTestRejectedExecutionHandler() {          return testRejectedExecutionHandler;      }        public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler) {          this.testRejectedExecutionHandler = testRejectedExecutionHandler;      }  }

第7步: 创建IThreadPoolMonitorService接口

创建IThreadPoolMonitorService接口

package com.otv.monitor.srv;    import java.util.concurrent.ThreadPoolExecutor;    public interface IThreadPoolMonitorService extends Runnable {        public void monitorThreadPool();        public ThreadPoolExecutor getExecutor();        public void setExecutor(ThreadPoolExecutor executor);  }

第8步:创建ThreadPoolMonitorService类

ThreadPoolMonitorService类实现了IThreadPoolMonitorService接口。这个类用来监控已创建的线程池。

package com.otv.monitor.srv;    import java.util.concurrent.ThreadPoolExecutor;  import org.apache.log4j.Logger;    /**   * @author onlinetechvision.com   * @since 17 Oct 2011   * @version 1.0.0   *   */  public class ThreadPoolMonitorService implements IThreadPoolMonitorService {        private static Logger log = Logger.getLogger(ThreadPoolMonitorService.class);      ThreadPoolExecutor executor;      private long monitoringPeriod;         public void run() {          try {              while (true){                  monitorThreadPool();                  Thread.sleep(monitoringPeriod*1000);              }           } catch (Exception e) {              log.error(e.getMessage());          }      }        public void monitorThreadPool() {          StringBuffer strBuff = new StringBuffer();          strBuff.append("CurrentPoolSize : ").append(executor.getPoolSize());          strBuff.append(" - CorePoolSize : ").append(executor.getCorePoolSize());          strBuff.append(" - MaximumPoolSize : ").append(executor.getMaximumPoolSize());          strBuff.append(" - ActiveTaskCount : ").append(executor.getActiveCount());          strBuff.append(" - CompletedTaskCount : ").append(executor.getCompletedTaskCount());          strBuff.append(" - TotalTaskCount : ").append(executor.getTaskCount());          strBuff.append(" - isTerminated : ").append(executor.isTerminated());            log.debug(strBuff.toString());      }        public ThreadPoolExecutor getExecutor() {          return executor;      }        public void setExecutor(ThreadPoolExecutor executor) {          this.executor = executor;      }           public long getMonitoringPeriod() {          return monitoringPeriod;      }        public void setMonitoringPeriod(long monitoringPeriod) {          this.monitoringPeriod = monitoringPeriod;      }  }

第9步:创建Starter类

(译者注:这个类内部维护了一个线程池服务(testThreadPoolExecutorService)和一个监控服务(threadPoolMonitorService),然后创建线程池、启动一个单独的线程执行监控服务、通过线程池执行任务)

package com.otv.start;    import java.util.concurrent.ThreadPoolExecutor;    import org.apache.log4j.Logger;    import com.otv.handler.TestRejectedExecutionHandler;  import com.otv.monitor.srv.IThreadPoolMonitorService;  import com.otv.monitor.srv.ThreadPoolMonitorService;  import com.otv.srv.ITestThreadPoolExecutorService;  import com.otv.srv.TestThreadPoolExecutorService;  import com.otv.task.TestTask;    /**   * @author onlinetechvision.com   * @since 17 Oct 2011   * @version 1.0.0   *   */  public class Starter {        private static Logger log = Logger.getLogger(TestRejectedExecutionHandler.class);        IThreadPoolMonitorService threadPoolMonitorService;      ITestThreadPoolExecutorService testThreadPoolExecutorService;        public void start() {            // A new thread pool is created...          ThreadPoolExecutor executor = testThreadPoolExecutorService.createNewThreadPool();          executor.allowCoreThreadTimeOut(true);            // Created executor is set to ThreadPoolMonitorService...          threadPoolMonitorService.setExecutor(executor);            // ThreadPoolMonitorService is started...          Thread monitor = new Thread(threadPoolMonitorService);          monitor.start();            // New tasks are executed...          for(int i=1;i&lt;10;i++) {              executor.execute(new TestTask(&quot;Task&quot;+i));          }            try {              Thread.sleep(40000);          } catch (Exception e)   {              log.error(e.getMessage());          }            for(int i=10;i&lt;19;i++) {              executor.execute(new TestTask(&quot;Task&quot;+i));          }            // executor is shutdown...          executor.shutdown();      }           public IThreadPoolMonitorService getThreadPoolMonitorService() {          return threadPoolMonitorService;      }        public void setThreadPoolMonitorService(IThreadPoolMonitorService threadPoolMonitorService) {          this.threadPoolMonitorService = threadPoolMonitorService;      }        public ITestThreadPoolExecutorService getTestThreadPoolExecutorService() {          return testThreadPoolExecutorService;      }        public void setTestThreadPoolExecutorService(ITestThreadPoolExecutorService testThreadPoolExecutorService) {          this.testThreadPoolExecutorService = testThreadPoolExecutorService;      }  }

第10步:创建Application类

创建Application类。这个类运行应用程序。

package com.otv.start;    import org.springframework.context.ApplicationContext;  import org.springframework.context.support.ClassPathXmlApplicationContext;    /**   * @author onlinetechvision.com   * @since 17 Oct 2011   * @version 1.0.0   *   */  public class Application {        public static void main(String[] args) {         ApplicationContext context = new ClassPathXmlApplicationContext(&quot;applicationContext.xml&quot;);         Starter starter = (Starter) context.getBean(&quot;Starter&quot;);         starter.start();      }  }

第11步:创建applicationContext.xml文件

(译者注:在Spring中注册了上面所创建的类,并提前设置了部分相应的参数,比如将监控服务的监控周期设为5)

<beans xmlns="http://www.springframework.org/schema/beans"      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"      xsi:schemaLocation="http://www.springframework.org/schema/beans    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">        <!-- Beans Declaration -->      <bean id="TestTask" class="com.otv.task.TestTask"></bean>      <bean id="ThreadPoolMonitorService" class="com.otv.monitor.srv.ThreadPoolMonitorService">          <property name="monitoringPeriod"  value="5" />      </bean>      <bean id="TestRejectedExecutionHandler" class="com.otv.handler.TestRejectedExecutionHandler"></bean>      <bean id="TestThreadPoolExecutorService" class="com.otv.srv.TestThreadPoolExecutorService">          <property name="corePoolSize"  value="1" />          <property name="maxPoolSize"   value="3" />          <property name="keepAliveTime" value="10" />          <property name="queueCapacity" value="3" />          <property name="testRejectedExecutionHandler" ref="TestRejectedExecutionHandler" />      </bean>      <bean id="Starter" class="com.otv.start.Starter">          <property name="threadPoolMonitorService" ref="ThreadPoolMonitorService" />          <property name="testThreadPoolExecutorService" ref="TestThreadPoolExecutorService" />      </bean>  </beans>

第12步:创建线程池的另一方法

Spring提供的ThreadPoolTaskExecutor类也可以创建线程池。

(译者注:上面通过我们自己创建的TestThreadPoolExecutorService类来设置线程池的各项参数并创建线程池,但实际上Spring也提供了功能类似的类,就是ThreadPoolTaskExecutor。所以也可以使用这种方式创建线程池)

<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">    <property name="corePoolSize"  value="1" />    <property name="maxPoolSize"   value="3" />    <property name="queueCapacity" value="3" />  </bean>    <bean id="testTaskExecutor" class="TestTaskExecutor">    <constructor-arg ref="threadPoolTaskExecutor" />  </bean>

第13步:构建项目

OTV_Spring_ThreadPool工程被build后,就会产生一个OTV_Spring_ThreadPool-0.0.1-SNAPSHOT.jar包。

第14步:运行工程

OTV_Spring_ThreadPool-0.0.1-SNAPSHOT.jar运行后,输出日志如下:

18.10.2011 20:08:48 DEBUG (TestRejectedExecutionHandler.java:19) - Task7 : has been rejected  18.10.2011 20:08:48 DEBUG (TestRejectedExecutionHandler.java:19) - Task8 : has been rejected  18.10.2011 20:08:48 DEBUG (TestRejectedExecutionHandler.java:19) - Task9 : has been rejected  18.10.2011 20:08:48 DEBUG (TestTask.java:25) - Task1 : is started.  18.10.2011 20:08:48 DEBUG (TestTask.java:25) - Task6 : is started.  18.10.2011 20:08:48 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 2 - CompletedTaskCount : 0 - TotalTaskCount : 5 - isTerminated : false  18.10.2011 20:08:48 DEBUG (TestTask.java:25) - Task5 : is started.  18.10.2011 20:08:53 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 0 - TotalTaskCount : 6 - isTerminated : false  18.10.2011 20:08:58 DEBUG (TestTask.java:27) - Task6 : is completed.  18.10.2011 20:08:58 DEBUG (TestTask.java:27) - Task1 : is completed.  18.10.2011 20:08:58 DEBUG (TestTask.java:25) - Task3 : is started.  18.10.2011 20:08:58 DEBUG (TestTask.java:25) - Task2 : is started.  18.10.2011 20:08:58 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 2 - TotalTaskCount : 6 - isTerminated : false  18.10.2011 20:08:58 DEBUG (TestTask.java:27) - Task5 : is completed.  18.10.2011 20:08:58 DEBUG (TestTask.java:25) - Task4 : is started.  18.10.2011 20:09:03 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 3 - TotalTaskCount : 6 - isTerminated : false  18.10.2011 20:09:08 DEBUG (TestTask.java:27) - Task2 : is completed.  18.10.2011 20:09:08 DEBUG (TestTask.java:27) - Task3 : is completed.  18.10.2011 20:09:08 DEBUG (TestTask.java:27) - Task4 : is completed.  18.10.2011 20:09:08 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false  18.10.2011 20:09:13 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false  18.10.2011 20:09:18 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false  18.10.2011 20:09:23 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false  18.10.2011 20:09:28 DEBUG (TestTask.java:25) - Task10 : is started.  18.10.2011 20:09:28 DEBUG (TestRejectedExecutionHandler.java:19) - Task16 : has been rejected  18.10.2011 20:09:28 DEBUG (TestRejectedExecutionHandler.java:19) - Task17 : has been rejected  18.10.2011 20:09:28 DEBUG (TestRejectedExecutionHandler.java:19) - Task18 : has been rejected  18.10.2011 20:09:28 DEBUG (TestTask.java:25) - Task14 : is started.  18.10.2011 20:09:28 DEBUG (TestTask.java:25) - Task15 : is started.  18.10.2011 20:09:28 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 6 - TotalTaskCount : 12 - isTerminated : false  18.10.2011 20:09:33 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 6 - TotalTaskCount : 12 - isTerminated : false  18.10.2011 20:09:38 DEBUG (TestTask.java:27) - Task10 : is completed.  18.10.2011 20:09:38 DEBUG (TestTask.java:25) - Task11 : is started.  18.10.2011 20:09:38 DEBUG (TestTask.java:27) - Task14 : is completed.  18.10.2011 20:09:38 DEBUG (TestTask.java:27) - Task15 : is completed.  18.10.2011 20:09:38 DEBUG (TestTask.java:25) - Task12 : is started.  18.10.2011 20:09:38 DEBUG (TestTask.java:25) - Task13 : is started.  18.10.2011 20:09:38 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 9 - TotalTaskCount : 12 - isTerminated : false  18.10.2011 20:09:43 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 9 - TotalTaskCount : 12 - isTerminated : false  18.10.2011 20:09:48 DEBUG (TestTask.java:27) - Task11 : is completed.  18.10.2011 20:09:48 DEBUG (TestTask.java:27) - Task13 : is completed.  18.10.2011 20:09:48 DEBUG (TestTask.java:27) - Task12 : is completed.  18.10.2011 20:09:48 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 12 - TotalTaskCount : 12 - isTerminated : true  18.10.2011 20:09:53 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 12 - TotalTaskCount : 12 - isTerminated : true

第15步:下载

来自:http://www.importnew.com/14163.html