Perl 多线程理解

jopen 10年前

Thread:在使用多线程处理比较大的数据量的扫描,遇到读写文件可能死锁的问题。

Perl 线程的生命周期

1.使用 threads 包的 create() 方法:

use threads;     sub say_hello {       printf("Hello thread! @_.\n");       return( rand(10) );   }     my $t1 = threads->create( \&say_hello, "param1", "param2" );   my $t2 = threads->create( "say_hello", "param3", "param4" );   my $t3 = threads->create(   sub {       printf("Hello thread! @_\n");       return( rand(10) );   }, "param5", "param6" );



2.join 和 detach 方法

线程一旦被成功创建,它就立刻开始运行了,这个时候你面临两种选择,分别是 join 或者 detach 这个新建线程。当然你也可以什么都不做,不过这可不是一个好习惯、

从字面上来理解,join 就是把新创建的线程结合到当前的主线程中来,把它当成是主线程的一部分,使他们合二为一。join 会触发两个动作,首先,主线程会索取新建线程执行结束以后的返回值;其次,新建线程在执行完毕并返回结果以后会自动释放它自己所占用的系统资源。例如

join收割新建线程

#!/usr/bin/perl   #   use threads;     sub func {    sleep(1);    return(rand(10));   }     my $t1 = threads->create( \&func );   my $t2 = threads->create( \&func );     printf("do something in the main thread\n");     my $t1_res = $t1->join();   my $t2_res = $t2->join();     printf("t1_res = $t1_res\nt2_res = $t2_res\n");

由此我们不难发现,调用 join 的时机是一个十分有趣的问题。如果调用 join 方法太早,新建线程尚未执行完毕,自然就无法返回任何结果,那么这个时候,主线程就不得不被阻塞,直到新建线程执行完毕之后,才能获得返回值,然后资源会被释放,join 才能结束,这在很大程度上破话了线程之间的并行性。相反,如果调用 join 方法太晚,新建线程早已执行完毕,由于一直没有机会返回结果,它所占用的资源就一直无法得到释放,直到被 join 为止,这在很大程度上浪费了宝贵的系统资源。因此,join 新建线程的最好时机应该是在它刚刚执行完毕的时候,这样既不会阻塞当前线程的执行,又可以及时释放新建线程所占用的系统资源。 

foreach ( threads->list(threads::joinable) ){          $_->join( );  }

我们再来看看 detach 方法,这也许是最省心省力的处理方法了。从字面上来理解,detach 就是把新创建的线程与当前的主线程剥离开来,让它从此和主线程无关。当你使用 detach 方法的时候,表明主线程并不关心新建线程执行以后返回的结果,新建线程执行完毕后 Perl 会自动释放它所占用的资源。

detach剥离线程

#!/usr/bin/perl   #   use threads;   use Config;     sub say_hello {    my ( $name ) = @_;    printf("Hello World! I am $name.\n");   }   my $t1 = threads->create( \&say_hello, "Alex" );   $t1->detach();   printf("doing something in main thread\n");   sleep(1);



一个新建线程一旦被 detach 以后,就无法再 join 了。当你使用 detach 方法剥离线程的时候,有一点需要特别注意,那就是你需要保证被创建的线程先于主线程结束,否则你创建的线程会被迫结束,除非这种结果正是你想要的,否则这也许会造成异常情况的出现,并增加程序调试的难度。

3.线程的消亡

大多数情况下,你希望你创建的线程正常退出,这就意味着线程所对应的函数体在执行完毕后返回并释放资源。例如在清单 5 的示例中,新建线程被 join 以后的退出过程。可是,如果由于 detach 不当或者由于主线因某些意外的异常提前结束了,尽管它所创建的线程可能尚未执行完毕,但是他们还是会被强制中止,正所谓皮之不存,毛将焉附。这时你也许会得到一个类似于“Perl exited with active threads”的警告。

当然,你也可以显示地调用 exit() 方法来结束一个线程,不过值得注意的是,默认情况下,如果你在一个线程中调用了 exit() 方法, 其他线程都会随之一起结束,在很多情况下,这也许不是你想要的,如果你希望 exit() 方法只在调用它的线程内生效,那么你在创建该线程的时候就需要设置’ exit ’ => ’ thread_only ’。例如

为某个线程设置exit属性

#!/usr/bin/perl   #   use threads;     sub say_hello {    printf("Hello thread! @_.\n");    sleep(10);    printf("Bye\n");   }     sub quick_exit {    printf("I will be exit in no time\n");    exit(1);   }     my $t1 = threads->create( \&say_hello, "param1", "param2" );   my $t2 = threads->create( {'exit'=>'thread_only'}, \&quick_exit );     $t1->join();   $t2->join();



如果你希望每个线程的 exit 方法都只对自己有效,那么在每次创建一个新线程的时候都去要显式设置’ exit ’ => ’ thread_only ’属性显然有些麻烦,你也可以在引入 threads 包的时候设置这个属性在全局范围内有效,例如

use threads ('exit' => 'threads_only');     sub func {    ...    if( $condition ) {    exit(1);    }   }     my $t1 = threads->create( \&func );   my $t2 = threads->create( \&func );     $t1->join();   $t2->join();




共享与同步

threads::shared

#!/usr/bin/perl    #      use threads;    use threads::shared;    use strict;      my $var   :shared  = 0;       # use :share tag to define    my @array :shared = (); # use :share tag to define    my %hash = ();    share(%hash);                  # use share() funtion to define        sub start {    $var = 100;      @array[0] = 200;    @array[1] = 201;      $hash{'1'} = 301;    $hash{'2'} = 302;    }      sub verify {       sleep(1);                      # make sure thread t1 execute firstly       printf("var = $var\n");     # var=100      for(my $i = 0; $i < scalar(@array); $i++) {           printf("array[$i] = $array[$i]\n");    # array[0]=200; array[1]=201    }      foreach my $key ( sort( keys(%hash) ) ) {    printf("hash{$key} = $hash{$key}\n"); # hash{1}=301; hash{2}=302    }    }      my $t1 = threads->create( \&start );    my $t2 = threads->create( \&verify );      $t1->join();    $t2->join();

死锁常是多线程程序中最隐蔽的问题,往往难以发现与调试,也增加了排查问题的难度。为了避免在程序中死锁的问题,在程序中我们应该尽量避免同时获取多个共享变量的锁,如果无法避免,那么一是要尽量使用相同的顺序来获取多个共享变量的锁,另外也要尽可能地细化上锁的粒度,减少上锁的时间。

use threads::shared;      # in thread 1    {       lock( $share );        # lock for 3 seconds       sleep(3);               # other threads can not lock again    }    # unlock implicitly now after the block      # in thread 2    {       lock($share);          # will be blocked, as already locked by thread 1       $share++;               # after thread 1 quit from the block    }    # unlock implicitly now after the block



use threads;    use threads::shared;      {       lock(@share);          # the array has been locked       lock(%hash);           # the hash has been locked       sleep(3);               # other threads can not lock again    }      {       lock($share[1]);     # error will occur       lock($hash{key});    # error will occur    }



信号量:Thread::Semaphore

my $semaphore = Thread::Semaphore->new( $max_threads ); #信号量
my $mutex = Thread::Semaphore->new( 1 );   #互斥量

区别:

1. 互斥量用于线程的互斥,信号量用于线程的同步。

这是互斥量和信号量的根本区别,也就是互斥和同步之间的区别。

互斥:是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。

同步:是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源

以上区别是主要想记住的。

note:信号量可以用来实现互斥量的功能

2. 互斥量值只能为0/1,信号量值可以为非负整数。

也就是说,一个互斥量只能用于一个资源的互斥访问,它不能实现多个资源的多线程互斥问题。信号量可以实现多个同类资源的多线程互斥和同步。当信号量为单值信号量是,也可以完成一个资源的互斥访问。

3. 互斥量的加锁和解锁必须由同一线程分别对应使用,信号量可以由一个线程释放,另一个线程得到。

use threads;   use threads::shared;   use Thread::Semaphore;     my $s = Thread::Semaphore->new();   $s->down();                # P operation   ...   $s->up();                  # V operation



Thread::Queue:生产者-消费者模型对多线程队列的使用。

生产者可以不断地在线程队列上做 enqueue 操作,而消费者只需要不断地在线程队列上做 dequeue 操作,这就很简单地实现了生产者和消费者之间同步的问题。

#!/usr/bin/perl    #      use threads;    use Thread::Queue;      my $q = Thread::Queue->new();      sub produce {       my $name = shift;       while(1) {           my $r = int(rand(100));           $q->enqueue($r);           printf("$name produce $r\n");           sleep(int(rand(3)));       }    }      sub consume {       my $name = shift;       while(my $r = $q->dequeue()) {           printf("consume $r\n");       }    }      my $producer1 = threads->create(\&produce, "producer1");    my $producer2 = threads->create(\&produce, "producer2");    my $consumer1 = threads->create(\&consume, "consumer2");      $producer1->join();    $producer2->join();    $consumer1->join();