草庐IT

spark-streaming

全部标签

java - Spark (JAVA) - 具有多个聚合的数据框 groupBy?

我正在尝试使用JAVA在Spark上编写一个groupBy。在SQL中,这看起来像SELECTid,count(id)ascount,max(date)maxdateFROMtableGROUPBYid;但是这个查询的Spark/JAVA风格等价物是什么?假设变量table是一个数据框,以查看与SQL查询的关系。我在想类似的东西:table=table.select(table.col("id"),(table.col("id").count()).as("count"),(table.col("date").max()).as("maxdate")).groupby("id")这显然

Java 8 流 IllegalStateException : Stream has already been operated on or closed

我正在尝试使用StreamAPI生成Order实例。我有一个创建订单的工厂函数,一个DoubleStream用于初始化订单金额。privateDoubleStreamdoubleStream=newRandom().doubles(50.0,200.0);privateOrdercreateOrder(){returnnewOrder(doubleStream.findFirst().getAsDouble());}@Testpublicvoidtest(){StreamorderStream=Stream.generate(()->{returncreateOrder();});or

java - 使用 Collection.stream 按特定属性动态分组

我正在尝试使用Java8Collection-Stream按多个属性对对象列表进行分组。这很好用:publicclassMyClass{publicStringtitle;publicStringtype;publicStringmodule;publicMyClass(Stringtitle,Stringtype,Stringmodule){this.type=type;this.title=title;this.module=module;}}Listdata=newArrayList();data.add(newMyClass("1","A","B"));data.add(new

java - Spark : How to save a dataframe with headers?

dataframe.saveasTextFile,仅以分隔格式保存数据。如何在JAVA中保存带有标题的数据框。sourceRufFrame.toJavaRDD().map(newTildaDelimiter()).coalesce(1,true).saveAsTextFile(targetSrcFilePath); 最佳答案 如果你想保存为csv文件,我建议使用spark-csv包。您可以简单地使用spark-csv保存您的数据框,如下所示。dataFrame.write.format("com.databricks.spark.c

java - 如何在 HTTP 响应主体(使用 Spark)中发送 QR 码的 PNG?

我想生成一个QR码图像,将其转换为PNG并将其作为HTTP响应返回给我的客户端。为了生成二维码,我使用了ZXing。我已经通过使用带有MatrixToImageWriter.writeToStream(...)的FileOutputStream来测试转换部分。这就像一个魅力。我目前使用的网络框架是Spark(版本1.1.1)。handle(...)方法的返回被设置为响应主体。我在这里做错了什么?使用当前的解决方案,我在使用Firefox执行GET请求时得到Theimage"http://localhost:4567/qrcode"cannotbedisplayedbecauseitco

java - 如何使用 Java 在 Spark SQL 中加入多列以在 DataFrame 中进行过滤

DataFramea=包含列x,y,z,kDataFrameb=包含列x,y,aa.join(b,)???我试过用a.join(b,a.col("x").equalTo(b.col("x"))&&a.col("y").equalTo(b.col("y"),"inner")但是Java抛出错误提示&&isnotallowed. 最佳答案 SparkSQL在标记为java_expr_ops的Column上提供了一组方法,专为Java互操作而设计。它包括and(另请参阅or)可以在此处使用的方法:a.col("x").equalTo(b.

java - 使用转换器时,如何将 header 添加到 spark 的响应中

我有这个:get("/test",(req,resp)->{returnrepository.getAll();},newJsonTransformer());我的变压器看起来像:publicclassJsonTransformerimplementsResponseTransformer{ObjectMapperom=newObjectMapper();publicJsonTransformer(){}@OverridepublicStringrender(Objecto)throwsException{returnom.writeValueAsString(o);}}我试过在响应中

java - Arrays.stream(array) 与 Arrays.asList(array).stream()

在this问题已经回答了两个表达式是相等的,但在这种情况下它们会产生不同的结果。对于给定的int[]分数,为什么会这样:Arrays.stream(scores).forEach(System.out::println);...但这不是:Arrays.asList(scores).stream().forEach(System.out::println);据我所知,.stream()可以在任何集合上调用,列表肯定是。第二个代码片段只返回一个包含整个数组而不是元素的流。 最佳答案 您看到的行为并非特定于Stream秒。Arrays.a

Elasticsearch与Hadoop和Spark的整合与大数据处理

1.背景介绍1.背景介绍Elasticsearch是一个开源的搜索和分析引擎,基于Lucene库构建,具有实时搜索、文本分析、数据聚合等功能。Hadoop是一个开源的分布式存储和分析平台,由Google的MapReduce算法启发,具有高可扩展性和高容错性。Spark是一个快速、高效的大数据处理引擎,基于内存计算,具有高吞吐量和低延迟。随着大数据时代的到来,这三种技术在大数据处理领域中得到了广泛应用。Elasticsearch可以提供实时搜索和分析功能,Hadoop可以提供大规模数据存储和分析功能,Spark可以提供高效的数据处理功能。因此,将这三种技术整合在一起,可以实现更高效、更智能的大数

java - 卡夫卡 : Cant Create Multiple Stream Consumers

我刚刚启动并运行了Kafka0.8beta1。我有一个非常简单的示例启动并运行,问题是,我只能让一个消息消费者工作,而不是几个。也就是说,runSingleWorker()方法有效。run()方法不起作用:importkafka.consumer.ConsumerIterator;importkafka.consumer.KafkaStream;importkafka.consumer.ConsumerConfig;importkafka.javaapi.consumer.ConsumerConnector;importjava.util.Map;importjava.util.Lis