2014-09-03 30 views
5

Hatlarını gruplayarak işlemek istediğim bazı büyük-ish metin dosyalarım var. Java 8'de tembel bir grup akışı yapmak, bir akış döndürmek mümkün mü?

Ben

return FileUtils.readLines(...) 
      .parallelStream() 
      .map(...) 
      .collect(groupingBy(pair -> pair[0])); 

sorun AFAIK, bu bir harita oluşturur, yani gibi yeni akış özelliklerini kullanmaya çalıştı.

o yukarıdaki gibi üst düzey koduna sahip bir yolu var mı üretir, örneğin, Girişleri bir Akış?

UPDATE: Ne arıyorum python'un itertools.groupby gibi bir şeydir. Dosyalarım zaten sıralanmış (çift [0] ile), sadece grupları tek tek yüklemek istiyorum.

Zaten iteratif çözümü var. Sadece bunu yapmak için daha açıklayıcı bir yol olup olmadığını merak ediyorum. Btw, guava veya başka bir üçüncü taraf kitaplığı kullanmak büyük bir sorun olmaz.

+2

Nasıl tembel olursun gruplama ölçütü? Akışta yer alan nesnenin bazı özelliklerini gruplamak için, Akıştaki tüm öğeler üzerinde yineleme yapmanız gerekir. – Eran

+0

"Hatlarını gruplamak" ile ne demek istiyorsun? Stream 'groupBy' yöntemi gibi binning mi demek istiyorsunuz yoksa toplu olarak bir seferde birden çok satır okumayı mı kastediyorsunuz? – dkatzel

+0

Yorumlarınız için teşekkürler, soruya bir UPDATE ekledi. –

cevap

3

ulaşmak istediğiniz görev gruplama ne oldukça farklıdır. groupingByStream mertebesinde sitesindeki elemanları ancak Map ilgili ‘sınıflandırıcı Function‘in sonuç için uygulanan algoritması dayanmaz. Ne istiyorsun

biri List öğesine ortak özellik değeri olan komşu öğeleri kat etmektir. Aynı özellik değerine sahip tüm öğelerin kümelenmesini garanti edebildiğiniz sürece, bu özelliğe göre Stream numaralı ürüne sahip olmanız bile gerekli değildir.

Belki de bir azalma olarak bu görevi formüle etmek mümkündür ama bana çıkan yapı çok karmaşık görünüyor.

Bu özellik için doğrudan destek Stream s ekleniyor sürece, bir yineleyici dayalı yaklaşım benim için en pragmatik görünüyor

:

class Folding<T,G> implements Spliterator<Map.Entry<G,List<T>>> { 
    static <T,G> Stream<Map.Entry<G,List<T>>> foldBy(
      Stream<? extends T> s, Function<? super T, ? extends G> f) { 
     return StreamSupport.stream(new Folding<>(s.spliterator(), f), false); 
    } 
    private final Spliterator<? extends T> source; 
    private final Function<? super T, ? extends G> pf; 
    private final Consumer<T> c=this::addItem; 
    private List<T> pending, result; 
    private G pendingGroup, resultGroup; 

    Folding(Spliterator<? extends T> s, Function<? super T, ? extends G> f) { 
     source=s; 
     pf=f; 
    } 
    private void addItem(T item) { 
     G group=pf.apply(item); 
     if(pending==null) pending=new ArrayList<>(); 
     else if(!pending.isEmpty()) { 
      if(!Objects.equals(group, pendingGroup)) { 
       if(pending.size()==1) 
        result=Collections.singletonList(pending.remove(0)); 
       else { 
        result=pending; 
        pending=new ArrayList<>(); 
       } 
       resultGroup=pendingGroup; 
      } 
     } 
     pendingGroup=group; 
     pending.add(item); 
    } 
    public boolean tryAdvance(Consumer<? super Map.Entry<G, List<T>>> action) { 
     while(source.tryAdvance(c)) { 
      if(result!=null) { 
       action.accept(entry(resultGroup, result)); 
       result=null; 
       return true; 
      } 
     } 
     if(pending!=null) { 
      action.accept(entry(pendingGroup, pending)); 
      pending=null; 
      return true; 
     } 
     return false; 
    } 
    private Map.Entry<G,List<T>> entry(G g, List<T> l) { 
     return new AbstractMap.SimpleImmutableEntry<>(g, l); 
    } 
    public int characteristics() { return 0; } 
    public long estimateSize() { return Long.MAX_VALUE; } 
    public Spliterator<Map.Entry<G, List<T>>> trySplit() { return null; } 
} 

Stream katlanmış iyi uygulayarak ortaya konabilir çıkan tembel doğa o sonsuz akışa a katkıda

Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4) 
     .filter(e -> e.getKey()>5) 
     .findFirst().ifPresent(e -> System.out.println(e.getValue())); 
+1

hafif düzeltme: 'groupingBy 'aslında alt akış toplayıcısı işbirliği yaparsa (çoğu, UNORDERED karakteristiği olanlar hariç) orijinal akışın düzenini korur; Verilen herhangi bir kovadaki elemanların alt kümesi, girişte mevcut oldukları sırayla aşağı akış toplayıcıya sunulur. –

+1

@Brian Goetz: evet, * siparişi koruyor, ama cevabımda söylediğim her şey, grupların oluşturulması sırasına * * dayanmadığıdır. Btw. Çözümüm için yaptığım test örneklerinden biriydi: Çözümümün bir “Harita” ya getirdiği bir akışı toplamak, aynı sınıflandırıcıyı kullanarak “MapingBy” ile aynı “Harita” yı üretmelidir. – Holger

1

cyclops-react, ben kütüphane, istediğini yapabilecek hem sharding ve gruplama funcitonality sunmaktadır.

ReactiveSeq<ListX<TYPE>> grouped = ReactiveSeq.fromCollection(FileUtils.readLines(...)) 
      .groupedStatefullyWhile((batch,next) -> batch.size()==0 ? true : next.equals(batch.get(0))); 

groupedStatefullyWhile operatör elemanları toplu mevcut durumuna göre gruplandırılır sağlar. ReactiveSeq, tek bir dişli sıralı Akıştır. senkronize olmayan ve buna paralel olarak dosya verileri işler

Map<Key, Stream<Value> sharded = 
        new LazyReact() 
       .fromCollection(FileUtils.readLines(...)) 
       .map(..) 
       .shard(shards, pair -> pair[0]); 

Bu (yani java.util.stream.Stream uygulayan) bir LazyFutureStream yaratacaktır. Tembel ve veri çekilinceye kadar işlemeye başlamayacak.

bilmeniz gereken tek

önceden kırıkları tanımlamak gerekir olmasıdır. Yani Yukarıdaki 'kırlangıçlar' parametresi, async.Queue 'nin bir haritasıdır ve anahtarın anahtarı ile anahtarlanır (muhtemelen her hangi bir çift [0] nedir?).

örn.

Map<Integer,Queue<String>> shards; 

There is a sharding example with video here ve test code here

0
O StreamEx

final int[][] aa = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 4, 4 } }; 

StreamEx.of(aa) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .forEach(System.out::println); 

ile collapse yapılabilir

Biz peek ekleyebilir ve tembel hesaplama eğer limit doğrulamak için:

StreamEx.of(aa) 
     .peek(System.out::println) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .limit(1) 
     .forEach(System.out::println);