2016-04-05 39 views
1

split() durdurmak için:Apache Camel EIP rota - Bir aşağıdaki rota ile bazı sorunlar var

// from("cxf:....")... 
from("direct:start").process(startRequestProcessor) // STEP 1 
      .choice() 
       .when(body().isNull()) 
         .to("direct:finish") 
       .otherwise() 
        .split(body()) // STEP 2 
        .bean(TypeMapper.class) // STEP 3 
        .log("Goes to DynamicRouter:: routeByTypeHeader with header: ${headers.type}") 
        .recipientList().method(Endpoint1DynamicRouter.class, "routeByTypeHeader") // STEP 4 
        .ignoreInvalidEndpoints(); 

    from("direct:endpoint2") // STEP 6 
      .log("Goes to DynamicRouter::routeByCollectionHeader with header: ${headers.collection}") 
      .recipientList().method(Endpoint2DynamicRouter.class, "routeByCollectionHeader") 
      .ignoreInvalidEndpoints(); 

    from("direct:endpoint1.1") // STEP 5 
      .process(new DateRangeProcessor()) 
      .to("direct:collections"); 

    from("direct:endpoint1.2") // STEP 5 
      .process(new SingleProcessor()) 
      .to("direct:collections"); 


    from("direct:endpoint2.2") // STEP 7 
      .aggregate(header("collection" /** endpoint2.2 */), CollectionAggregationStrategy) 
      .completionSize(exchangeProperty("endpoint22")) 

      .process(new QueryBuilderProcessor()) 
      .bean(MyService, "getDbCriteria") 

      .setHeader("collection", constant("endpoint2.1")) 
      .to("direct:endpoint2.1").end(); 


    from("direct:endpoint2.1") // STEP 8 
      .aggregate(header("collection" /** endpoint2.1 */), CollectionAggregationStrategy) 
      .completionSize(exchangeProperty("CamelSplitSize")) 
      .to("direct:finish").end(); 

    from("direct:finish") 
      .process(new QueryBuilderProcessor()) 
      .bean(MyRepository, "findAll") 
      .log("ResponseData: ${body}"). 
      marshal().json(JsonLibrary.Gson).end(); 

rota

  1. json dize bir ait (HashSet) listelemek için dönüştürür Aldı JSONObjects.
  2. Alınan listeyi json nesnelerine ayırın.
  3. Kriterleri mongodb mesaj dönüştürme için başka bir başlığa göre
  4. Endpoint2 rotalar mesajları endpoint2 gönder
  5. bağlantıların kullanılmasını uygun endpoint1.1 rota veya endpoint1.2 nesne içeriğine göre karşılık gelen başlıklarıyla aynı
  6. Seti son nokta2.1 veya son nokta2.2.
  7. Bitiş noktası2.2 alınan tüm iletileri toplar, mongodb Ölçütünü alır ve son noktaya21 gönderir (completeionSize 2. adımda hesaplanır ve özellik "endpoint22" ye kaydedilir).
  8. Enpoint2.1 kümeleri TÜM iletiler (CamelSplitSize), toplu iletileri Sorgu nesnesine dönüştürür ve verileri almak için Depo'ya gönderir.

ben ayıklayıcısında geçerli bir yanıt nesnesi görebilirsiniz ama yine bir hata alıyorum: diğer yolları ile çalışır ve onu HashSets içermiyor olarak

No message body writer has been found for class java.util.HashSet, ContentType: application/json

sorun yanıt nesnesinde değil.

  • rota çıktısında sorunun ne:

    Benim tahminim

    Benim sorulara

    vardır ... o rota HashSet tat 1. ADIM oluşturulan çıkışına gönderir mi?
  • hem recipientList() geçersiz son nokta (I istisna önlemek .ignoreInvalidEndpoints() kullanmak zorunda) için iletileri yönlendirmek için denemek:

    org.apache.camel.NoSuchEndpointException: No endpoint could be found for: [email protected], please check your classpath contains the needed Camel component jar.

Herhangi yardım çok takdir! Teşekkürler.

cevap

2

Çok garip buluyorum, ancak .aggregate() işlevi exchange'i yanıtlamıyor. Bu, toplama stratejisini kullanır ancak her zaman gelen değişimi yanıtlar. Bu, belgeleri okurken açık değildir, ancak değişimi geri alabilmek için split() ile birlikte toplama stratejisi kullanmanız gerekir.