草庐IT

rxjs_observable

全部标签

java - 如何等待异步 Observable 完成

我正在尝试使用rxjava构建示例。该示例应编排ReactiveWareService和ReactiveReviewService以重新运行WareAndReview组合。ReactiveWareServicepublicObservablefindWares(){returnObservable.from(wareService.findWares());}ReactiveReviewService:reviewService.findReviewsByItemdoesaThreadSleeptosimulatealatency!publicObservablefindReviews

java - 获取 Observable 的最新值并立即发出

我正在尝试获取给定Observable的最新值并让它发出一旦它被调用就立即。以下面的代码为例:returnObservable.just(myObservable.last()).flatMap(myObservable1->{returnmyObservable1;}).map(o->o.x)//HereIwanttoendupwithaTobjectinsteadofObservableobject这是行不通的,因为通过这样做,flatMap将发出myObservable1,而这又将具有发射到达map。我不知道是否有可能做这样的事情。有没有人知道如何实现这个目标?谢谢

java - RxAndroid 和 Retrofit : Unable to create call adapter for io. reactivex.Observable<retrofit2.Response<okhttp3.ResponseBody>>

我正在尝试使用rxJava、rxAndroid、Retrofit2和OkHTTP3从URL端点下载文件。我的代码无法为“Observable>”创建调用适配器。这些方法对我来说是新的,所以我相信我在这里遗漏了一个重要的概念。非常感谢任何方向或观点。FATALEXCEPTION:mainProcess:com.example.khe11e.rxdownloadfile,PID:14130java.lang.IllegalArgumentException:Unabletocreatecalladapterforio.reactivex.Observable>formethodRetrof

java - 在 Web 应用程序中使用 RxJava Observables 无法解释的缺乏性能改进

我正在执行一些测试,以评估使用基于Observables的响应式(Reactive)API而非传统的阻塞式API是否具有真正的优势。整个例子是availableonGithug令人惊讶的结果表明thoughput结果是:最好的:返回包装阻塞操作的Callable/DeferredResult的REST服务。还不错:阻止REST服务。最差:返回DeferredResult的REST服务,其结果由RxJavaObservable设置。这是我的SpringWebApp:应用:@SpringBootApplicationpublicclassSpringNioRestApplication{@

java - RxJava 2.0 - 如何将 Observable 转换为 Publisher

如何在RxJava版本2中将Observable转换为Publisher?在第一个版本中我们有https://github.com/ReactiveX/RxJavaReactiveStreams完全满足我需要的项目。但是我如何在RxJava2中做到这一点呢? 最佳答案 使用以下代码:Publisherpublisher=observable.toFlowable(BackpressureStrategy.XXX);由于Observable没有实现背压,转换时需要选择背压策略。查看可用选项here.或者首先使用Flowable而不是O

java - RX : Run Zipped Observables in parallel?

所以我在玩RX(真的很酷),我一直在转换我的api,它访问Android中的sqlite数据库以返回observables。所以自然而然地,我开始尝试解决的问题之一是,“如果我想进行3次API调用,获取结果,然后在它们全部完成后进行一些处理怎么办?”我花了一两个小时,但我最终找到了ZipFunctionality它可以帮助我轻松解决问题:Observableone=getNumberedObservable(1);Observabletwo=getNumberedObservable(2);Observablethree=getNumberedObservable(3);Observa

java - 如何将两个已排序的 Observable 合并为一个已排序的 Observable?

给定:Integer[]arr1={1,5,9,17};Integer[]arr2={1,2,3,6,7,12,15};Observableo1=Observable.from(arr1);Observableo2=Observable.from(arr2);如何获取包含1,1,2,3,5,6,7,9,12,15,17的Observable? 最佳答案 您可以对序列进行合并、排序和展平,但这会产生很大的开销:o1.mergeWith(o2).toSortedList().flatMapIterable(v->v).subscribe

java - 我应该使用 Listener 还是 Observer?

我的GUI中有一个下拉框,它显示另一个类中ArrayList的内容。可以在GUI的其他地方将新对象添加到ArrayList,因此我需要知道它何时更新,以便刷新下拉菜单。据我所知,我的两个选择是扩展ArrayList类以允许我向其添加自己的changeListener,或者使包含有问题的ArrayList的类扩展为可观察的。哪个是更合适的解决方案? 最佳答案 这两个解决方案本质上是相同根设计模式(四人组定义的“观察者”模式)的实现。在前一种情况下,您使ArrayList本身“可观察”,在后一种情况下,您是使使用数组列表的域对象“可观察

java - 如何过滤 RXJava 中 observable 发出的重复值?

我有一个对象集合,我想在其中抑制重复项。我知道Distinct运算符,但如果我没记错的话,它会通过正确覆盖的哈希码方法来比较项目。但是,如果我的哈希码为相同的对象返回不同的值,并且我想自己设置相等性怎么办。distinct有2个重载方法-一个没有参数,一个有Func1参数,我想我应该使用第二种方法,但有多精确?.distinct(newFunc1(){@OverridepublicObjectcall(ActivityManager.RunningServiceInforunningServiceInfo){returnnull;}}) 最佳答案

python - RxPy : Sort hot observable between (slow) scan executions

TL;DR我正在寻求帮助来实现下面的弹珠图。目的是尽可能对未排序的值进行排序,而无需在扫描执行之间等待时间。我不是要求完整的实现。欢迎任何指导。我有一个无限热可观察对象的异步慢速(出于测试目的而强制)扫描。这是相关代码:thread_1_scheduler=ThreadPoolScheduler(1)thread=ExternalDummyService()external_obs=thread.subject.publish()external_obs\.flat_map(lambdamsg:Observable.just(msg).subscribe_on(thread_1_sch