草庐IT

spark-streaming

全部标签

Java Stream GroupingBy 在自定义对象中收集

我有温度等级classTemperature{doubleminTemp;doublemaxTemp;Stringcity;Stringcountry;}我有另一个维护温度集合的类classTemperatures{Listtemperatures;}我想使用流按countryName对温度进行分组。我想要的是publicMaptemperaturesByCountry()但是我无法使用流获取温度作为map值,我得到的是温度列表。我的groupingBy实现如下Map>result=this.getTemperatures().stream().collect(Collectors.g

java - 合并多个相同的 Kafka Streams 主题

我有2个Kafka主题流式传输来自不同来源的完全相同的内容,因此我可以在其中一个来源出现故障时保持高可用性。我正在尝试使用KafkaStreams0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时不会出现重复。当使用KStream的leftJoin方法时,其中一个主题可以正常下降(次要主题),但是当主要主题下降时,不会向输出主题发送任何内容。这似乎是因为,根据KafkaStreamsdeveloperguide,KStream-KStreamleftJoinisalwaysdrivenbyrecordsarrivingfromthep

java - Java Stream API 是如何选择执行计划的?

我刚开始学习Java8中的StreamAPI和一般的函数式编程,但对Java并不陌生。我有兴趣了解和了解StreamAPI如何选择执行计划。它如何知道哪些部分需要并行化,哪些部分不需要?存在多少种执行计划?基本上,我想知道为什么Java8中的Streams有助于使事情变得更快,以及它如何发挥这种“魔力”。我找不到太多关于这一切如何运作的文献。 最佳答案 这个问题有点宽泛,不好详细解释,但我会尽力回答到满意的程度。我还使用了ArrayList的Stream示例。当我们创建流时,返回的对象称为ReferencePipeline.这个对象

java - Stream collect with Generic 类型

我尝试在将json反序列化为pojo的方法中使用泛型,以便它可以返回任何对象类型。这是我的代码:privateBla(Listas,Listbs){this.as=as;this.bs=bs;}publicstaticBlafrom(JsonObjectjson){returnnewBla(Bla.load(json,As),Bla.load(json,Bs));}privatestaticListload(JsonObjectjsonObject,Stringparam){returnjsonObject.getJsonArray(param).stream().map(Bla::g

java - 如何用 Streams 替换 Iterables.filter()?

我正在尝试从Guava迁移到Java8Streams,但不知道如何处理可迭代对象。这是我的代码,用于从可迭代对象中删除空字符串:Iterablelist=Iterables.filter(raw,//it'sIterablenewPredicate(){@Overridepublicbooleanapply(Stringtext){return!text.isEmpty();}});注意,这是一个Iterable,不是Collection.它可能包含无限数量的项目,我无法将它们全部加载到内存中。我的Java8替代品是什么?顺便说一句,有了Lamba,这段代码看起来会更短:Iterabl

java - 在 Java 中获取 Spark 中的当前任务 ID

我需要在Spark中获取当前任务的ID。我一直在谷歌和官方API中搜索,但我能找到的唯一ID是执行者ID和RDD的ID。有谁知道如何获得任务的唯一ID?我已经看到类TaskInfo正是我要找的东西,但我不知道如何获取此类的实例。 最佳答案 为了获得特定的任务ID,您可以使用TaskContext:importorg.apache.spark.TaskContext;textFile.map(x->{TaskContexttc=TaskContext.get();System.out.println(tc.taskAttemptId(

java - 如何使用java spark下载文件?

我想为文件下载编写简单的restapi。我找不到关于它的文档,因为我知道我需要为响应设置mimetype='application/zip',但不清楚如何返回流。http://sparkjava.com/更新:此处解决示例代码:publicstaticvoidmain(String[]args){//setPort(8080);get("/hello",(request,responce)->getFile(request,responce));}privatestaticObjectgetFile(Requestrequest,Responseresponce){Filefile=n

java - 为什么Stream <T> collect方法返回不同的键顺序?

我有以下代码:publicenumContinent{ASIA,EUROPE}publicclassCountry{privateStringname;privateContinentregion;publicCountry(Stringna,Continentreg){this.name=na;this.region=reg;}publicStringgetName(){returnname;}publicContinentgetRegion(){returnregion;}@OverridepublicStringtoString(){return"Country[name="+n

2024-02-26(Spark,kafka)

1.SparkSQL是Spark的一个模块,用于处理海量结构化数据限定:结构化数据处理RDD的数据开发中,结构化,非结构化,半结构化数据都能处理。2.为什么要学习SparkSQLSparkSQL是非常成熟的海量结构化数据处理框架。学习SparkSQL主要在2个点:a.SparkSQL本身十分优秀,支持SQL语言\性能强\可以自动优化\API兼容\兼容HIVE等b.企业大面积在使用SparkSQL处理业务数据:离线开发,数仓搭建,科学计算,数据分析3.SparkSQL的特点a.融合性:SQL可以无缝的集成在代码中,随时用SQL处理数据b.统一数据访问:一套标准的API可以读写不同的数据源c.Hi

java - 从 S3 并行读取多个文件(Spark、Java)

我看到了一些关于此的讨论,但不太理解正确的解决方案:我想将几百个文件从S3加载到RDD中。这是我现在的做法:ObjectListingobjectListing=s3.listObjects(newListObjectsRequest().withBucketName(...).withPrefix(...));Listkeys=newLinkedList();objectListing.getObjectSummaries().forEach(summery->keys.add(summery.getKey()));//repeatwhileobjectListing.isTrunc