2012-02-13 7 views
7

Threading aşağıdaki gerçekleştirmek çalışıyorum: Perl Kuyruklar ve

  1. yaklaşık 10 GB demek ve kuyruğuna içine itmek çok büyük bir dosyadan veri okuyan bir iplik var. buildQueue iplik yaklaşık 5 işçi iş parçacığı de-kuyruğu ve süreç verileri aynı anda sıraya veri bastırıyor iken (I için kuyrukta istemiyorsanız ya çok büyük olsun)

  2. .

Ben girişimi yaptık ama benim diğer konu çünkü benim buildQueue iplik sürekli döngünün erişilemez.

Yaklaşımım tamamen yanlış olabilir. Herhangi bir yardım için teşekkürler, çok takdir ediyor.

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open DICT_FILE, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<DICT_FILE>) { 
      if ($queue->pending() < 100) { 
       my $query = <DICT_FILE>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

Ve beklediğimden olarak bu iplik bir şey işletilirse bu parçacığı bitirmek olmaz çünkü başka sonra infaz edilecek:

İşte buildQueue için kod.

Oluşturucu iş parçacığı uzun bir süredir çalışacağından, hiçbir zaman çalışan iş parçacığı oluşturmayacağım. Size ipler (ya joinor detach yoluyla) çıkmak istediğinizde işaretlemek gerekir

#!/usr/bin/perl -w 
use strict; 
use Thread; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
my @threads; 

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open dict_file, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<dict_file>) { 
      if ($queue->pending() < 100) { 
       my $query = <dict_file>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

sub processor { 
    my $query; 
    while (1) { 
     if ($query = $queue->dequeue) { 
      print "$query\n"; 
     } 
    } 
} 

my $builder = new Thread(&buildQueue); 
push @threads, new Thread(&processor) for 1..5; 
+0

soruların bir çift: Eğer kuyruk-oluşturucu iplik bitirmek olmaz söz, ama hiç bir şey yapar? Sıra boyutu 100'ün altına düşüyor mu veya 0'ın üstüne mi çıkıyor? Ayrıca, [Konularınızı doğru bir şekilde oluşturduğunuzdan emin değilim] (http://perldoc.perl.org/perlthrtut.html). Olmaması gereken $ builder = thread-> create (\ & buildQueue); '? –

+0

Kuyruk oluşturucu iyi oluşturulur, ancak çalışan iş parçacıklarının oluşturulmasına erişilemediğinden, sıradan hiçbir şey kaldırılamaz, böylece sıra sürekli bir döngüden dolayı sıraya girerken sıra 100'de kalır. – Sinista

+0

Hmmm, Özellikle konuları oluşturduğunuzda, içerik oluşturmak için daha fazla kod görmem gerek. İşçi iş parçacığı oluşturmadan önce, sıra oluşturucu veya kuyruk oluşturucuyu ayırmıyorsunuz değil mi? –

cevap

10

:

İşte tüm kod var. last no'lu sonsuz döngülere sahip olmanız, bunlardan kurtulmak için de bir sorundur.

Düzeltme: Ayrıca çok önemli bir parça unutmuşum! Each worker thread will block, waiting for another item to process off of the queue until they get an undef in the queue. Bu nedenle, kuyruk oluşturucu bittikten sonra neden her bir iş parçacığı için bir kez undef'u bir kez özel olarak eklemekteyiz.

Dene: Bu durumda Parallel::ForkManager modülü ile yapabileceği gibi

#!/usr/bin/perl -w 
use strict; 
use threads; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
our @threads; #Do you really need our instead of my? 

sub buildQueue 
{ 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 

    #Three-argument open, please! 
    open my $dict_file, "<",$dict_path or die("Sorry, could not open file!"); 
    while(my $query=<$dict_file>) 
    { 
     chomp($query); 
     while(1) 
     { #Wait to see if our queue has < 100 items... 
      if ($queue->pending() < 100) 
      { 
       $queue->enqueue($query); 
       print "Queue Size: " . $queue->pending . "\n"; 
       last; #This breaks out of the infinite loop 
      } 
     } 
    } 
    close($dict_file); 
    foreach(1..5) 
    { 
     $queue->enqueue(undef); 
    } 
} 

sub processor 
{ 
    my $query; 
    while ($query = $queue->dequeue) 
    { 
     print "Thread " . threads->tid . " got $query\n"; 
    } 
} 

my $builder=threads->create(\&buildQueue); 
push @threads,threads->create(\&process) for 1..5; 

#Waiting for our threads to finish. 
$builder->join; 
foreach(@threads) 
{ 
    $_->join; 
} 
+1

Sorun, iş parçacığı modülüne değiştirilen kullanım dışı iş parçacığı modülüydü ve kodum şimdi olması gerektiği gibi çalışıyor. Teşekkürler Jack Doğru yöne doğru beni işaret ettiğin için. – Sinista

1

geliyor.

+0

Mümkünse, ForkManager çözümünü görmek isterim. – Sinista

0

farklı bir yaklaşım: Ayrıca MCE 1.2+ içinde user_tasks kullanmak ve iki çoklu işçi oluşturabilirtasks, (o Büyük bir dosya, çünkü, dosyayı korurken paralel okuma yarar aramaya okuyabilir) okunması için bir görevdir ve işlemek için bir görev, vb.

Aşağıdaki kod, tampon sıranızı yönetmek için hala Thread::Queue kullanır.

buildQueue alt, sıra boyutu denetiminize sahiptir ve iş parçacığı kullandığımızdan bu verileri doğrudan $ R_QUEUE yönetici işlemine iter, böylece ebeveynin bellek alanına erişimi vardır. Çatallar yerine kullanmak isterseniz, bir geri arama işlevi aracılığıyla kuyruğa yine de erişebilirsiniz. Ama burada sadece kuyruğa basmayı seçtim.hiçbir şey daha fazla bekleyen kurulana kadar kuyrukta ne olursa olsun

processQueue alt basitçe olacak

de-kuyruk. Her görevde her görevde yalnızca bir kez yönetici işlemi tarafından yürütülür, bu yüzden onu çalışan işlemlerimize bir durma sinyali vermek için kullanırız.

Açıkçası, hatta nasıl verilerinizi slurp yığın boyutu veya üzerine karar verebilir böylece, işçilere yığın veri istiyorum nasıl bir özgürlük çok şey var.

#!/usr/bin/env perl 
use strict; 
use warnings; 
use threads; 
use threads::shared; 
use Thread::Queue; 
use MCE; 

my $R_QUEUE = Thread::Queue->new; 
my $queue_workers = 8; 
my $process_workers = 8; 
my $chunk_size = 1; 

print "Enter a file name: "; 
my $input_file = <STDIN>; 
chomp($input_file); 

sub buildQueue { 
    my ($self, $chunk_ref, $chunk_id) = @_; 
    if ($R_QUEUE->pending() < 100) { 
     $R_QUEUE->enqueue($chunk_ref); 
     $self->sendto('stdout', "Queue Size: " . $R_QUEUE->pending ."\n"); 
    } 
} 

sub processQueue { 
    my $self = shift; 
    my $wid = $self->wid; 
    while (my $buff = $R_QUEUE->dequeue) { 
     $self->sendto('stdout', "Thread " . $wid . " got $$buff"); 
    } 
} 

my $mce = MCE->new(
    input_data => $input_file, # this could be a filepath or a file handle or even a scalar to treat like a file, check the documentation for more details. 
    chunk_size => $chunk_size, 
    use_slurpio => 1, 

    user_tasks => [ 
     { # queueing task 
      max_workers => $queue_workers, 
      user_func => \&buildQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory. 
      task_end => sub { $R_QUEUE->enqueue((undef) x $process_workers) } # signal stop to our process workers when they hit the end of the queue. Thanks > Jack Maney! 
     }, 
     { # process task 
      max_workers => $process_workers, 
      user_func => \&processQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory 
      task_end => sub { print "Finished processing!\n"; } 
     } 
    ] 
); 

$mce->run(); 

exit; 
3

Perl için MCE modülü büyük dosyaları sever. MCE ile, bir kerede birçok satırı parçalayabilir, büyük bir parçayı skaler bir dize olarak dağıtabilir veya bir kerede 1 satır okuyabilirsiniz. Birden fazla satırı bir kerede ayırmak IPC için ek yükü azaltır.

MCE 1,504 anda dışarıda. Konu dahil çocuk süreçleri için destek ile MCE :: Queue sağlar. Buna ek olarak, 1.5 sürümünde otomatik yanı sıra MCE örneğini başlatmasını özen 5 model (MCE :: Akış, MCE :: Grep, MCE :: Döngü, MCE :: Harita ve MCE :: Akış) ile geliyor ayar max_workers ve chunk_size. Biri bu seçenekleri btw geçersiz kılabilir. Aşağıda

, MCE :: Döngü gösteri için kullanılır.
use MCE::Loop; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
    my ($mce, $chunk_ref, $chunk_id) = @_; 

    foreach my $line (@$chunk_ref) { 
     chomp $line; 
     ## add your code here to process $line 
    } 

} $dict_path; 

Eğer işçiler ve/veya parça_boyutu sayısını belirtmek istiyorum

, o zaman bunu yapmak için 2 yol vardır.

use MCE::Loop max_workers => 5, chunk_size => 300000; 

Ya da ...

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 300000 
}; 

chunking büyük dosyalar için tercih edilmesine rağmen, tek bir seferde bir satır chunking vakit karşılaştırabilirsiniz. Biri, bloğun içindeki ilk çizgiyi çıkarabilir (yorumda bulunur). Bir iç döngü için gerek yok dikkat edin. $ chunk_ref hala 1 satır içeren bir dizi ref'dir. Scalar $ _ girdisi chunk_size 1 olduğunda eşittir, aksi halde $ chunk_ref değerini gösterir.

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 1 
}; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
# my ($mce, $chunk_ref, $chunk_id) = @_; 

    my $line = $_; 
    ## add your code here to process $line or $_ 

} $dict_path; 

Bu sunumun, paralel olarak bir dosyayı işlemek isteyen kişiler için yararlı olduğunu umuyorum.

:) mario