在 RxJava 中实现并行异步调用时,我需要一些帮助。我选择了一个简单的用例,其中 FIRST 调用获取(相当搜索)要显示的产品列表(平铺)。随后的调用出去并获取(A)评论和(B)产品图像
经过几次尝试,我到达了这个地方。
1 Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);
2 List<Tile> allTiles = new ArrayList<Tile>();
3 ClientResponse response = new ClientResponse();
4 searchTile.parallel(oTile -> {
5 return oTile.flatMap(t -> {
6 Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());
7 Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());
8 return Observable.zip(reviews, imageUrl, (r, u) -> {
9 t.setReviews(r);
10 t.setImageUrl(u);
11 return t;
12 });
13 });
14 }).subscribe(e -> {
15 allTiles.add((Tile) e);
16 });
第 1 行:取出要显示的产品(Tile)
第 4 行:我们获取 Observable 的列表并对其进行 SHARD 以获取评论和 imageUrls
谎言 6,7:获取 Observable 评论和 Observable url
第 8 行:最后 2 个 observable 被压缩以返回更新后的 Observable
第 15 行:最后第 15 行整理了所有要显示在集合中的单个产品,该集合可以返回给调用层
虽然 Observable 已被分片并且在我们的测试中运行了 4 个不同的线程;获取评论和图像似乎是一个接一个。我怀疑第 8 行的 zip 步骤基本上导致了 2 个 observables(reviews 和 url)的顺序调用。
这个小组对并行获取评论和图片网址有什么建议吗?本质上,上面附加的瀑布图应该看起来更垂直堆叠。对评论和图像的调用应该是并行的
谢谢 阿南德拉曼
最佳答案
并行运算符被证明是几乎所有用例的问题,并且没有达到大多数人的预期,因此在 1.0.0.rc.4 版本中将其删除:https://github.com/ReactiveX/RxJava/pull/1716
可以看到 here 的一个很好的例子来说明如何执行这种类型的行为并获得并行执行。 .
在您的示例代码中,不清楚 searchServiceClient 是同步的还是异步的。它会稍微影响如何解决问题,就好像它已经是异步的,不需要额外的调度。如果需要同步额外调度。
首先这里有一些显示同步和异步行为的简单示例:
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecution {
public static void main(String[] args) {
System.out.println("------------ mergingAsync");
mergingAsync();
System.out.println("------------ mergingSync");
mergingSync();
System.out.println("------------ mergingSyncMadeAsync");
mergingSyncMadeAsync();
System.out.println("------------ flatMapExampleSync");
flatMapExampleSync();
System.out.println("------------ flatMapExampleAsync");
flatMapExampleAsync();
System.out.println("------------");
}
private static void mergingAsync() {
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
}
private static void mergingSync() {
// here you'll see the delay as each is executed synchronously
Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
}
private static void mergingSyncMadeAsync() {
// if you have something synchronous and want to make it async, you can schedule it like this
// so here we see both executed concurrently
Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
}
private static void flatMapExampleAsync() {
Observable.range(0, 5).flatMap(i -> {
return getDataAsync(i);
}).toBlocking().forEach(System.out::println);
}
private static void flatMapExampleSync() {
Observable.range(0, 5).flatMap(i -> {
return getDataSync(i);
}).toBlocking().forEach(System.out::println);
}
// artificial representations of IO work
static Observable<Integer> getDataAsync(int i) {
return getDataSync(i).subscribeOn(Schedulers.io());
}
static Observable<Integer> getDataSync(int i) {
return Observable.create((Subscriber<? super Integer> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(i);
s.onCompleted();
});
}
}
以下是尝试提供与您的代码更匹配的示例:
import java.util.List;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecutionExample {
public static void main(String[] args) {
final long startTime = System.currentTimeMillis();
Observable<Tile> searchTile = getSearchResults("search term")
.doOnSubscribe(() -> logTime("Search started ", startTime))
.doOnCompleted(() -> logTime("Search completed ", startTime));
Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
.doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime));
Observable<String> imageUrl = getProductImage(t.getProductId())
.doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime));
return Observable.zip(reviews, imageUrl, (r, u) -> {
return new TileResponse(t, r, u);
}).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime));
});
List<TileResponse> allTiles = populatedTiles.toList()
.doOnCompleted(() -> logTime("All Tiles Completed ", startTime))
.toBlocking().single();
}
private static Observable<Tile> getSearchResults(String string) {
return mockClient(new Tile(1), new Tile(2), new Tile(3));
}
private static Observable<Reviews> getSellerReviews(int id) {
return mockClient(new Reviews());
}
private static Observable<String> getProductImage(int id) {
return mockClient("image_" + id);
}
private static void logTime(String message, long startTime) {
System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
}
private static <T> Observable<T> mockClient(T... ts) {
return Observable.create((Subscriber<? super T> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
}
for (T t : ts) {
s.onNext(t);
}
s.onCompleted();
}).subscribeOn(Schedulers.io());
// note the use of subscribeOn to make an otherwise synchronous Observable async
}
public static class TileResponse {
public TileResponse(Tile t, Reviews r, String u) {
// store the values
}
}
public static class Tile {
private final int id;
public Tile(int i) {
this.id = i;
}
public int getSellerId() {
return id;
}
public int getProductId() {
return id;
}
}
public static class Reviews {
}
}
这个输出:
Search started => 65ms
Search completed => 1094ms
getProductImage[1] completed => 2095ms
getSellerReviews[2] completed => 2095ms
getProductImage[3] completed => 2095ms
zip[1] completed => 2096ms
zip[2] completed => 2096ms
getProductImage[2] completed => 2096ms
getSellerReviews[1] completed => 2096ms
zip[3] completed => 2096ms
All Tiles Completed => 2097ms
getSellerReviews[3] completed => 2097ms
我已将每个 IO 调用模拟为花费 1000 毫秒,因此很明显延迟在哪里并且它是并行发生的。它以经过的毫秒数打印出进度。
这里的技巧是 flatMap 会合并异步调用,所以只要被合并的 Observable 是异步的,它们都会被并发执行。
如果像 getProductImage(t.getProductId()) 这样的调用是同步的,可以像这样异步调用:getProductImage(t.getProductId()).subscribeOn(Schedulers.io)。
这是上面示例的重要部分,没有所有的日志记录和样板类型:
Observable<Tile> searchTile = getSearchResults("search term");;
Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
Observable<Reviews> reviews = getSellerReviews(t.getSellerId());
Observable<String> imageUrl = getProductImage(t.getProductId());
return Observable.zip(reviews, imageUrl, (r, u) -> {
return new TileResponse(t, r, u);
});
});
List<TileResponse> allTiles = populatedTiles.toList()
.toBlocking().single();
我希望这会有所帮助。
关于java - RxJava 并行获取 Observables,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26249030/
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
有没有办法在这个简单的get方法中添加超时选项?我正在使用法拉第3.3。Faraday.get(url)四处寻找,我只能先发起连接后应用超时选项,然后应用超时选项。或者有什么简单的方法?这就是我现在正在做的:conn=Faraday.newresponse=conn.getdo|req|req.urlurlreq.options.timeout=2#2secondsend 最佳答案 试试这个:conn=Faraday.newdo|conn|conn.options.timeout=20endresponse=conn.get(url
我有一个存储主机名的Ruby数组server_names。如果我打印出来,它看起来像这样:["hostname.abc.com","hostname2.abc.com","hostname3.abc.com"]相当标准。我想要做的是获取这些服务器的IP(可能将它们存储在另一个变量中)。看起来IPSocket类可以做到这一点,但我不确定如何使用IPSocket类遍历它。如果它只是尝试像这样打印出IP:server_names.eachdo|name|IPSocket::getaddress(name)pnameend它提示我没有提供服务器名称。这是语法问题还是我没有正确使用类?输出:ge
我想获取模块中定义的所有常量的值:moduleLettersA='apple'.freezeB='boy'.freezeendconstants给了我常量的名字:Letters.constants(false)#=>[:A,:B]如何获取它们的值的数组,即["apple","boy"]? 最佳答案 为了做到这一点,请使用mapLetters.constants(false).map&Letters.method(:const_get)这将返回["a","b"]第二种方式:Letters.constants(false).map{|c
我安装了ruby版本管理器,并将RVM安装的ruby实现设置为默认值,这样'哪个ruby'显示'~/.rvm/ruby-1.8.6-p383/bin/ruby'但是当我在emacs中打开inf-ruby缓冲区时,它使用安装在/usr/bin中的ruby。有没有办法让emacs像shell一样尊重ruby的路径?谢谢! 最佳答案 我创建了一个emacs扩展来将rvm集成到emacs中。如果您有兴趣,可以在这里获取:http://github.com/senny/rvm.el
假设我有这个范围:("aaaaa".."zzzzz")如何在不事先/每次生成整个项目的情况下从范围中获取第N个项目? 最佳答案 一种快速简便的方法:("aaaaa".."zzzzz").first(42).last#==>"aaabp"如果出于某种原因你不得不一遍又一遍地这样做,或者如果你需要避免为前N个元素构建中间数组,你可以这样写:moduleEnumerabledefskip(n)returnto_enum:skip,nunlessblock_given?each_with_indexdo|item,index|yieldit
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
如何在Ruby中获取BasicObject实例的类名?例如,假设我有这个:classMyObjectSystem我怎样才能使这段代码成功?编辑:我发现Object的实例方法class被定义为returnrb_class_real(CLASS_OF(obj));。有什么方法可以从Ruby中使用它? 最佳答案 我花了一些时间研究irb并想出了这个:classBasicObjectdefclassklass=class这将为任何从BasicObject继承的对象提供一个#class您可以调用的方法。编辑评论中要求的进一步解释:假设你有对象