benim çözüm:
Sen @Streaming ek açıklama kullanabilirsiniz: @Streaming
ile
public interface ITwitterAPI {
@GET("/2/rsvps")
@Streaming
Observable<ResponseBody> twitterStream();
}
ITwitterAPI api = new Retrofit.Builder()
.baseUrl("http://stream.meetup.com")
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build().create(ITwitterAPI.class);
biz ResponseBody
itibaren ham giriş alabilirsiniz.
İşte benim işlevi olaylarla çizgilerle bölünmüş bedeni sarmak için:
public static Observable<String> events(BufferedSource source) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
subscriber.onCompleted();
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
});
}
Ve kullanımını neden: Biz, güçlendirme kapanır aboneliği iptal zaman incelikle
durdurma konusunda
api.twitterStream()
.flatMap(responseBody -> events(responseBody.source()))
.subscribe(System.out::println);
UPD'yi inputStream. Ancak giriş akışını giriş akışından kendiliğinden değil de kapalı olarak algılamak imkansız, bu yüzden tek yol - akıştan okumayı deneyin - Socket closed
mesajıyla istisna kazanıyoruz.
@Override
public void call(Subscriber<? super String> subscriber) {
boolean isCompleted = false;
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
} catch (IOException e) {
if (e.getMessage().equals("Socket closed")) {
isCompleted = true;
subscriber.onCompleted();
} else {
throw new UncheckedIOException(e);
}
}
//if response end we get here
if (!isCompleted) {
subscriber.onCompleted();
}
}
Ve bağlantı nedeniyle tepki ucunu kapalı, biz herhangi bir istisna değildir vardır: Biz kapanış olarak bu istisnayı yorumlayabilir. Burada isCompleted
bunu kontrol edin. Yanılıyorsam bana bildirin :)
Teşekkürler! Akışı canlı bir şekilde dinlemeyi nasıl keseceğinize dair ipuçları var mı? – ticofab