2016-04-13 15 views
8

Sonsuz bir akışı dinlemek için Retrofit + RxJava kullanabilir miyim? Mesela Twitter akışı.Android Güçlendirme 2 + RxJava: sonsuz akışını dinle

public interface MeetupAPI { 
    @GET("http://stream.meetup.com/2/rsvps/") 
    Observable<RSVP> getRSVPs(); 
} 

MeetupAPI api = new Retrofit.Builder() 
      .baseUrl(MeetupAPI.RSVP_API) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .addConverterFactory(GsonConverterFactory.create()) 
      .build() 
      .create(MeetupAPI.class); 

api.getRSVPs() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(rsvp -> Log.d(TAG, "got rsvp"), 
       error -> Log.d(TAG, "error: " + error), 
       () -> Log.d(TAG, "onComplete")); 

ama ilk nesne ayrıştırıldı sonra "onComplete" çağrılır: Ne var budur. Retrofit'e bir sonraki duyuruya kadar açık kalmasını söylemenin bir yolu var mı? İşte

cevap

9

benim çözüm:

Sen @Streaming ek açıklama kullanabilirsiniz: @Streaming ile

public interface ITwitterAPI { 

    @GET("/2/rsvps") 
    @Streaming 
    Observable<ResponseBody> twitterStream(); 
} 

ITwitterAPI api = new Retrofit.Builder() 
      .baseUrl("http://stream.meetup.com") 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build().create(ITwitterAPI.class); 

biz ResponseBody itibaren ham giriş alabilirsiniz.

İşte benim işlevi olaylarla çizgilerle bölünmüş bedeni sarmak için:

public static Observable<String> events(BufferedSource source) { 
    return Observable.create(new Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subscriber) { 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
       subscriber.onCompleted(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
       subscriber.onError(e); 
      } 
     } 
    }); 
} 

Ve kullanımını neden: Biz, güçlendirme kapanır aboneliği iptal zaman incelikle

durdurma konusunda

api.twitterStream() 
    .flatMap(responseBody -> events(responseBody.source())) 
    .subscribe(System.out::println); 

UPD'yi inputStream. Ancak giriş akışını giriş akışından kendiliğinden değil de kapalı olarak algılamak imkansız, bu yüzden tek yol - akıştan okumayı deneyin - Socket closed mesajıyla istisna kazanıyoruz.

 @Override 
     public void call(Subscriber<? super String> subscriber) { 
      boolean isCompleted = false; 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
      } catch (IOException e) { 
       if (e.getMessage().equals("Socket closed")) { 
        isCompleted = true; 
        subscriber.onCompleted(); 
       } else { 
        throw new UncheckedIOException(e); 
       } 
      } 
      //if response end we get here 
      if (!isCompleted) { 
       subscriber.onCompleted(); 
      } 
     } 

Ve bağlantı nedeniyle tepki ucunu kapalı, biz herhangi bir istisna değildir vardır: Biz kapanış olarak bu istisnayı yorumlayabilir. Burada isCompleted bunu kontrol edin. Yanılıyorsam bana bildirin :)

+0

Teşekkürler! Akışı canlı bir şekilde dinlemeyi nasıl keseceğinize dair ipuçları var mı? – ticofab