我在 HDFS 上有一个文件夹,其中包含 10 个 CSV 文件。每个 CSV 文件包含 10000 行和 17 列。
目标
响应式读取 HDFS 上的文件夹。
如果文件夹中包含文件,则从文件夹中一次读取一个文件(从旧到新)。
在 Shiny 中绘制一些参数。
当新文件添加到文件夹或从文件夹中读取时更新绘图。
状态 目前,借助 SparklyR,我能够一次响应式(Reactive)读取所有文件并生成包含 100000 个点的绘图 (ggplot)。如果我在启动应用程序后添加第 11 个文件(包含 10000 行),绘图将更新为 110000 个点。
library(sparklyr)
conf = spark_config()
conf$spark.driver.memory="50g"
sc <- spark_connect(master = "local[*]", config = conf)
read_folder <- stream_read_csv(sc, "hdfs://localhost:9000/nik_ml/")
ui <- function(){
plotOutput("plot")
}
server <- function(input, output, session){
ps <- reactiveSpark(read_folder, intervalMillis = 10)
output$plot <- renderPlot({
df2 = ps()
# str(df2)
ggplot(data = df2, aes(x=Time, y=outletN2)) + geom_point() + ggtitle(nrow(df2)) + theme_bw()
})
}
shinyApp(ui, server)
SessionInfo()
# R version 3.5.1 (2018-07-02)
# Platform: x86_64-w64-mingw32/x64 (64-bit)
# Running under: Windows Server >= 2012 x64 (build 9200)
#
# Matrix products: default
#
# locale:
# [1] LC_COLLATE=English_United States.1252 LC_CTYPE=English_United States.1252
# [3] LC_MONETARY=English_United States.1252 LC_NUMERIC=C
# [5] LC_TIME=English_United States.1252
#
# attached base packages:
# [1] stats graphics grDevices utils datasets methods base
#
# other attached packages:
# [1] shinyFiles_0.7.2 bindrcpp_0.2.2 dplyr_0.7.8 shiny_1.2.0 ggplot2_3.1.0
# [6] future_1.10.0 sparklyr_0.9.3.9000
#
# loaded via a namespace (and not attached):
# [1] tidyselect_0.2.5 forge_0.1.9002 purrr_0.2.5 listenv_0.7.0 lattice_0.20-38 colorspace_1.3-2
# [7] generics_0.0.2 htmltools_0.3.6 yaml_2.2.0 base64enc_0.1-3 rlang_0.3.0.1 later_0.7.5
# [13] pillar_1.3.0 glue_1.3.0 withr_2.1.2 DBI_1.0.0 dbplyr_1.2.2 bindr_0.1.1
# [19] plyr_1.8.4 munsell_0.5.0 gtable_0.2.0 htmlwidgets_1.3 codetools_0.2-15 labeling_0.3
# [25] httpuv_1.4.5 parallel_3.5.1 broom_0.5.1 r2d3_0.2.2 Rcpp_1.0.0 xtable_1.8-3
# [31] openssl_1.1 promises_1.0.1 backports_1.1.2 scales_1.0.0 jsonlite_1.6 config_0.3
# [37] fs_1.2.6 mime_0.6 digest_0.6.18 grid_3.5.1 rprojroot_1.3-2 tools_3.5.1
# [43] magrittr_1.5 lazyeval_0.2.1 tibble_1.4.2 crayon_1.3.4 tidyr_0.8.2 pkgconfig_2.0.2
# [49] rsconnect_0.8.12 assertthat_0.2.0 httr_1.4.0 rstudioapi_0.8 R6_2.3.0 globals_0.12.4
# [55] nlme_3.1-137 compiler_3.5.1
但我真正想要的是响应式地一次读取一个文件并制作一个 ggplot。这类似于 Spark Streaming,但 Spark Streaming(据我了解)将所有文本文件读入单个 RDD。从 Spark 的文档中,Python 中存在一个名为 SparkContext.wholeTextFiles 的函数,它可以让您读取包含多个小文本文件的目录,并以(文件名,内容)对的形式返回每个文件(link) .我还没有测试它,因为我现在想将所有内容都保留在 R 中。我查看了 shinyFiles 但找不到执行此操作的任何函数 ( https://github.com/thomasp85/shinyFiles )。
R/Sparklyr 中有类似的东西吗?我想做的事听起来很傻吗?如果您认为在 R 中有更有效的方法来实现它,我洗耳恭听!
谢谢。
最佳答案
我在我的一个项目中遇到了你的问题。我最终使用的是 reactivePoll 函数来更新我的情节。
所以你有两个选择,要么每 x 秒更新一次绘图,而不知道是否有新文件。在这个例子中 120 秒所以两分钟: 您在应用程序代码的开头初始化累加器 b。
b <- 0
IsThereNew = function(){
b <<- b+1
b
}
ReadHdfsData=function(){ # A function that calculates the underlying value
path <- paste0("/your/path/to/data.json")
df <- sc %>%
spark_read_json("name", path) %>%
collect()
return(df)
}
df <- reactivePoll(120 * 1000, session, IsThereNew, ReadHdfsData)
所以在这种情况下,即使没有新数据,您也会以一种愚蠢的方式每 2 分钟更新一次绘图。
您可以做的另一种方法是列出 hdfs 目录中的文件数,每 x 秒一次,如果修改了列表计数,则绘图将更新 因此,您必须定义一个返回文件数量的函数 listNumberOfFiles 并替换 isThereNew 函数。
关于R Spark 一次从文件夹中读取一个文件,与 Shiny 集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53866264/
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
我的目标是转换表单输入,例如“100兆字节”或“1GB”,并将其转换为我可以存储在数据库中的文件大小(以千字节为单位)。目前,我有这个:defquota_convert@regex=/([0-9]+)(.*)s/@sizes=%w{kilobytemegabytegigabyte}m=self.quota.match(@regex)if@sizes.include?m[2]eval("self.quota=#{m[1]}.#{m[2]}")endend这有效,但前提是输入是倍数(“gigabytes”,而不是“gigabyte”)并且由于使用了eval看起来疯狂不安全。所以,功能正常,
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何
我想要做的是有2个不同的Controller,client和test_client。客户端Controller已经构建,我想创建一个test_clientController,我可以使用它来玩弄客户端的UI并根据需要进行调整。我主要是想绕过我在客户端中内置的验证及其对加载数据的管理Controller的依赖。所以我希望test_clientController加载示例数据集,然后呈现客户端Controller的索引View,以便我可以调整客户端UI。就是这样。我在test_clients索引方法中试过这个:classTestClientdefindexrender:template=>
在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/