草庐IT

Streaming

全部标签

python - 如何通过 flask 应用程序流式传输数据?

我正在研究使用Flask应用程序作为嵌入式系统接口(interface)的可能性。我以前使用过Flask(例如,我编写了一些非常基本的Flask站点来轮询外部系统以响应页面加载以填充图表)但我不确定如何将数据推送到Flask应用程序中以及用户的浏览器。我计划使用ZeroMQ将嵌入式设备上运行的C++应用程序中的数据推送到Flask应用程序(也在嵌入式设备上运行)中.根据我的阅读,类似于flask-socketIO将有可能将东西从Flask获取到用户的浏览器。我不清楚的一件事是是否可能/您将如何从ZeroMQ接收数据并将其推送到浏览器? 最佳答案

python - 使用 tweepy 访问 Twitter 的 Streaming API

我目前在获取使用tweepy访问Twitter的StreamingAPI以正确运行的示例代码时遇到问题(错误......或者至少我希望它如何运行)。我正在使用来自GitHub(标记为版本1.9)和Python2.7.1的tweepy的最新克隆。我尝试了来自三个来源的示例代码,在每种情况下都使用“twitter”作为跟踪的测试术语:O'Rilley答案代码:HowtoCaptureTweetsinReal-timewithTwitter'sStreamingAPIAndrewRobinson的博客:UsingTweepytoaccesstheTwitterStreamGitHub上的Tw

python - 如何使用 elasticsearch.helpers.streaming_bulk

有人可以建议如何使用函数elasticsearch.helpers.streaming_bulk而不是elasticsearch.helpers.bulk将数据索引到elasticsearch中。如果我简单地更改streaming_bulk而不是bulk,则不会索引任何内容,所以我想它需要以不同的形式使用。下面的代码以500个元素的block从CSV文件创建索引、类型和索引数据到elasticsearch。它工作正常,但我在徘徊是否有可能提高性能。这就是为什么我想尝试streaming_bulk函数。目前我需要10分钟为200MB的CSV文档索引100万行。我使用两台机器,Centos

python - Pandas df 的流数据

我正在尝试模拟使用pandas来访问不断变化的文件。我有一个文件读取一个csv文件,向其中添加一行,然后随机休眠一段时间以模拟批量输入。importpandasaspdfromtimeimportsleepimportrandomdf2=pd.DataFrame(data=[['test','trial']],index=None)whileTrue:df=pd.read_csv('data.csv',header=None)df.append(df2)df.to_csv('data.csv',index=False)sleep(random.uniform(0.025,0.3))第二

python - 如何将 Spark Streaming 数据转换为 Spark DataFrame

到目前为止,Spark还没有创建流式数据的DataFrame,但是我在做异常检测的时候,使用DataFrame进行数据分析更加方便快捷。我已经完成了这部分,但是当我尝试使用流数据进行实时异常检测时,问题出现了。试了好几种方法,仍然无法将DStream转为DataFrame,也无法将DStream内部的RDD转为DataFrame。这是我最新版本的代码的一部分:importsysimportrefrompysparkimportSparkContextfrompyspark.sql.contextimportSQLContextfrompyspark.sqlimportRowfrompy

python - 在 Hadoop Streaming 中生成单独的输出文件

仅使用映射器(Python脚本)而不使用缩减器,如何为每一行输出输出一个以键作为文件名的单独文件,而不是输出长文件? 最佳答案 可以使用-inputformat和-outputformat命令行参数替换输入和输出格式类。如何执行此操作的一个示例可以在dumboproject中找到,这是一个用于编写流式作业的python框架。它具有写入多个文件的功能,并且在内部用其姊妹项目feathers中的类替换输出格式。-fm.last.feathers.output.MultipleTextFiles。reducer然后需要发出一个元组作为键,

python - 我可以流式传输 Python pickle 列表、元组或其他可迭代数据类型吗?

我经常使用逗号/制表符分隔的数据文件,这些文件可能如下所示:key1,1,2.02,hello,4key2,3,4.01,goodbye,6...我可能会在Python中读取并将其预处理为列表列表,如下所示:[[key1,1,2.02,'hello',4],[key2,3,4.01,'goodbye',6]]有时,我喜欢将这个列表列表保存为pickle,因为它保留了我的条目的不同类型。但是,如果pickled文件很大,那么以流方式读取此列表列表会很棒。在Python中,为了将文本文件作为流加载,我使用以下方法打印出每一行:withopen('big_text_file.txt')asf

python - 如何使用 python 捕获 mp3 流

捕获来自http的mp3流并使用python将其保存到磁盘的最佳方法是什么?到目前为止我已经尝试过target=open(target_path,"w")conn=urllib.urlopen(stream_url)whileTrue:target.write(conn.read(buf_size))这给了我数据,但它在mp3播放器中出现乱码或无法播放。 最佳答案 如果您使用的是Windows,您可能会不小心进行CRLF转换,从而损坏二进制数据。尝试以二进制模式打开target:target=open(target_path,"wb

python - 通过 DLNA/UPnP 流媒体文件

我目前正在我的RaspberryPi上运行Raspbmc并激活了UPnP流式传输渲染。我的目标是编写一个简单的Python来流式传输视频或音乐播放列表。我试过了Coherence,但我抛出了一堆异常,我并没有真正理解documentationisprettychaotic的意义.所以我正在寻找一种更简单的方法/库。使用Python将媒体文件流式传输到电视的最简单方法是什么? 最佳答案 有一个轻量级的纯python库dlnap这允许在同一本地网络中的DLNA/UPnP设备上播放媒体 关于p

python - 数据源用完时如何停止 Spark 流

我有一个SparkStreaming作业,每5秒从Kafka读取一次,对传入数据进行一些转换,然后写入文件系统。这真的不需要成为流式传输作业,实际上,我只想每天运行一次以将消息排入文件系统。不过,我不确定如何停止这项工作。如果我将超时传递给streamingContext.awaitTermination,它不会停止进程,它所做的只是导致进程在需要迭代流时产生错误(参见下面的错误)完成我想做的事情的最好方法是什么这是针对Python上的Spark1.6编辑:感谢@marios,解决方案是这样的:ssc.start()ssc.awaitTermination(10)ssc.stop()在