Java中怎么控制线程访问资源的数量

jopen 10年前

在API中是这样来描述Semaphore 的

Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

 

例如,下面的类使用信号量控制线程并发的数量
    import java.util.concurrent.ExecutorService;        import java.util.concurrent.Executors;        import java.util.concurrent.Semaphore;                        public class TestSemaphore {                    /**            * @param args            */            public static void main(String[] args) {                 ExecutorService pool =  Executors.newCachedThreadPool();                 final Semaphore sp = new Semaphore(3,true);                 for(int i=0;i<10;i++){                     Runnable runnable = new Runnable() {                                                 @Override                        public void run() {                                                         try {                                sp.acquire();                            } catch (InterruptedException e) {                                e.printStackTrace();                           }                               System.out.println(sp.availablePermits());                           System.out.println("线程  "+ Thread.currentThread().getName() +"进入,已有"+ (3-sp.availablePermits())+ "并发") ;                           try {                            Thread.sleep((long) (Math.random()*3000));                        } catch (InterruptedException e) {                            e.printStackTrace();                        }                           System.out.println("线程  "+Thread.currentThread().getName() +"即将离开 " );                           sp.release();                           System.out.println("线程  "+Thread.currentThread().getName() +"离开 ,已有"+ (3-sp.availablePermits()) + "并发");                        }                    };                    pool.execute(runnable);                 }            }                }  
再例如可以通过信号量来控制线程访问资源:
    import java.util.concurrent.Semaphore;                        public class DownloadThread {            private static int in_index = 0;            private static int out_index = 0;            private static int buffer_count = 100;            public static boolean g_downloadComplete;            private static Semaphore g_seFull = new Semaphore(0);            private static Semaphore g_seEmpty = new Semaphore(buffer_count);             public static boolean getBlockFromNet(int in_index) {                  int i = 0;                  while (i < 10000)                   i++;                  if (in_index < buffer_count - 1)                   return false;                  else                   return true;                 }                 public static void writeBlockToDisk(int out_index) {                  int i = 0;                  while (i < 100000)                   i++;                 }                    /**            * @param args            */            public static void main(String[] args) {                 g_downloadComplete = false;                  Thread threadA = new Thread() {                   public void run() {                    proA();                   }                  };                  Thread threadB = new Thread() {                   public void run() {                    proB();                   }                  };                  threadB.start();                  threadA.start();                 }                                                    public static void proA(){                 while (g_seFull.availablePermits() < buffer_count) {                       try {                        g_seEmpty.acquire();                       } catch (InterruptedException e1) {                        // TODO Auto-generated catch block                        e1.printStackTrace();                       }                       g_downloadComplete = getBlockFromNet(in_index);                       in_index = (in_index + 1) % buffer_count;                       g_seFull.release();                       System.out.println("download a block " + in_index);                       if (g_downloadComplete)                        break;                      }            }                        public static void proB(){                 while (g_seEmpty.availablePermits() > 0) {                       try {                        g_seFull.acquire();                       } catch (InterruptedException e1) {                        // TODO Auto-generated catch block                        e1.printStackTrace();                       }                       writeBlockToDisk(out_index);                       out_index = (out_index + 1) % buffer_count;                       g_seEmpty.release();                       System.out.println("write a block " + out_index);                       if (g_downloadComplete && out_index == in_index)                        break;                      }            }                }