响应式执行(异步回调)通过使用observe() 执行
Observable<String> fs = new CommandHelloWorld("World").observe();
返回值可以通过订阅Observable获得
fs.subscribe(new Action1<String>() {
@Override
public void call(String s) {
// value emitted here
}
});
下面的单元测试显示了这一行为
@Test
public void testObservable() throws Exception {
Observable<String> fWorld = new CommandHelloWorld("World").observe();
Observable<String> fBob = new CommandHelloWorld("Bob").observe();
// blocking
assertEquals("Hello World!", fWorld.toBlockingObservable().single());
assertEquals("Hello Bob!", fBob.toBlockingObservable().single());
// non-blocking
// - this is a verbose anonymous inner-class approach and doesn't do assertions
fWorld.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
// nothing needed here
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String v) {
System.out.println("onNext: " + v);
}
});
// non-blocking
// - also verbose anonymous inner-class
// - ignore errors and onCompleted signal
fBob.subscribe(new Action1<String>() {
@Override
public void call(String v) {
System.out.println("onNext: " + v);
}
});
}
使用Java 8 lambdas/closures代码如下:
fWorld.subscribe((v) -> {
System.out.println("onNext: " + v);
})
// - or while also including error handling
fWorld.subscribe((v) -> {
System.out.println("onNext: " + v);
}, (exception) -> {
exception.printStackTrace();
})