2016-03-23 17 views
1

Veri akışını kullanarak GoGroupByKey yapmaya çalışırken bu hatayı çalıştıran. Yüksek düzeyde, iki PCollection türünü KV<String, self-defined-class> türüne ve KV<String, TableRow> türünden birine katılmak istiyorum. Az önce en İçin official documentGroupByKey.GroupByKeyOnly için kayıtlı bir çevirmen yok

PCollection<KV<String, TableRow>> pt1 = ...; 
    PCollection<KV<String, MyClass>> pt2 = ...; 
    final TupleTag<TableRow> t1 = new TupleTag<>(); 
    final TupleTag<MyClass> t2 = new TupleTag<>(); 
    PCollection<KV<String, CoGbkResult>> coGbkResultCollection = 
    KeyedPCollectionTuple.of(t1, pt1) 
        .and(t2, pt2) 
        .apply(CoGroupByKey.<String>create()); 

listelenen örneğine benzer TupleTags, KeyedPCollection ve CoGroupByKey katılabilir standardını yapıyorum, bir nevi karıştı o öğrendim için etrafta biraz arandı (ortalama neyim o veri akışı "hizmet" sorgusunu işe "çevirmek için bir işe sahip değil, ancak teknik olarak ne anlama geldiğini hala bilmiyorum) ve potansiyel olarak belirttiği (özellikle de GroupByKeyOnly olduğunda) Kod parçamı ayıklamak için ipucu al.

Exception in thread "main" java.lang.IllegalStateException: no translator registered for GroupByKey.GroupByKeyOnly 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator$Translator.visitTransform(DataflowPipelineTranslator.java:500) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) 
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:455) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:146) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.run(DataflowPipelineRunner.java:325) 
at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.run(BlockingDataflowPipelineRunner.java:95) 

Bilginize ben BlockingDataflowPipelineRunner

DÜZENLEME ile java kitaplığı kullanıyorum ben DataflowPipelineTranslator.java Trafo kayıtlı değil çünkü olduğunu öğrendim kaynak kodu kurcaladım olan, şu şekildedir:

tüm StackTrace olduğunu GroupByKeyOnly DataflowPipelineRunner, DataflowPipelineOptions (ve herhangi bir uzantı) üzerinde çalışan herhangi bir boru hattı GroupByKeyOnly kayıtlı olacak ...?

+0

Dataflow SDK'nin hangi sürümünü kullanıyorsunuz? –

+0

'com.google.cloud.dataflow: google-cloud-dataflow-java-sdk-all: 1.4.0' –

+0

Yeni 1.5.0 SDK'yi kullanmayı deneyebilir misiniz? –

cevap

1

GroupByKeyOnun, DataflowPipelineRunner için grafiğe uygulanan dönüştürmeler kümesinde hiçbir zaman görünmemelidir ve bu durum, PipelineOptions'da runner olmaksızın boru hattı inşa edilmiş olabileceği ve bunu [Blocking] DataflowPipelineRunner.run çağrısı yapılabileceği için gerçekleşebilir. (Boru hattı). beklenen desen örneğin, doğrudan DataflowPipeline/DataflowPipelineRunner yöntemleri kullanmak getirmemektir:

Yukarıdaki örnek ile
PipelineOptions options = PipelineOptionsFactory.fromArgs(args); 

// Make sure that runner is set before calling Pipeline.create(options) 
Pipeline p = Pipeline.create(options); 

// Apply all your transforms 
p.apply(... transforms ...); 

PipelineResult result = p.run(); 

, uygulamanıza için komut satırı parametreleri ayarlayarak rayları takas mümkün olacak. Örneğin, BlockingDataflowPipelineRunner kullanılarak, p.run() işlevinden dönmeden önce iş sonucunun bir terminal durumuna ulaşmış olduğundan emin olun.