readTextFile(path)
readFile(fileInputFormat, path)
可以直接在 ExecutionEnvironment 和 StreamExecutionEnvironment 类中找到 Flink 支持的读取本地文件的方法,如下图所示:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// read text file from an HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.types(Integer.class, String.class, Double.class);
// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.includeFields("10010") // take the first and the fourth field
.types(String.class, Double.class);
// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.pojoType(Person.class, "name", "age", "zipcode");
fromCollection(Collection)
fromElements(T ...)
我们也可以在源码中看到 Flink 支持的方法,如下图所示:
DataSet<String> text = env.fromElements(
"Flink Spark Storm",
"Flink Flink Flink",
"Spark Spark Spark",
"Storm Storm Storm"
);
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
DataStream<String> text = env.socketTextStream("127.0.0.1", 9000, "\n");
public class MyStreamingSource implements SourceFunction<Item> {
private boolean isRunning = true;
/**
* 重写run方法产生一个源源不断的数据发送源
* @param ctx
* @throws Exception
*/
public void run(SourceContext<Item> ctx) throws Exception {
while(isRunning){
Item item = generateItem();
ctx.collect(item);
//每秒产生一条数据
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
//随机产生一条商品数据
private Item generateItem(){
int i = new Random().nextInt(100);
ArrayList<String> list = new ArrayList();
list.add("HAT");
list.add("TIE");
list.add("SHOE");
Item item = new Item();
item.setName(list.get(new Random().nextInt(3)));
item.setId(i);
return item;
}
}
public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
}
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
当然我们也可以使用在集群或者哨兵模式下使用 Redis 连接器。
集群模式:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
哨兵模式:
FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
.setMasterName("master").setSentinels(...).build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
/**
* 实现 'AsyncFunction' 用于发送请求和设置回调
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** 能够利用回调函数并发发送请求的数据库客户端 */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// 发送异步请求,接收 future 结果
final Future<String> result = client.query(key);
// 设置客户端完成请求后要执行的回调函数
// 回调函数只是简单地把结果发给 future
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// 显示地处理异常
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// 创建初始 DataStream
DataStream<String> stream = ...;
// 应用异步 I/O 转换操作
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
其中,ResultFuture 的 complete 方法是异步的,不需要等待返回。
我们在之前讲解 Flink State 时,提到过 Flink 提供了 StateDesciptor 方法专门用来访问不同的 state,StateDesciptor 同时还可以通过 setQueryable 使状态变成可以查询状态。可查询状态目前是一个 Beta 功能,暂时不推荐使用。
关注公众号:大数据技术派,回复资料,领取1024G资料。
大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje
我需要在rail3中使用标准注册/登录/忘记密码功能进行身份验证。是否有大多数人为此使用的插件或其他东西? 最佳答案 我不确定最常用的方法是什么-但可以肯定的是,Plataformatec的“Devise”是一个非常流行的方法:http://github.com/plataformatec/devise我已经尝试了一些authgem,对我来说,它是最简单的设置和修改以满足我的需要。它内置了密码恢复、帐户确认(如果需要)和其他一些非常方便的功能。 关于ruby-on-rails-在Rail
我在ruby表单中有一个提交按钮f.submitbtn_text,class:"btnbtn-onemgt12mgb12",id:"btn_id"我想在不使用任何javascript的情况下通过ruby禁用此按钮 最佳答案 添加disabled:true选项。f.submitbtn_text,class:"btnbtn-onemgt12mgb12",id:"btn_id",disabled:true 关于ruby-on-rails-如何在Rails中添加禁用的提交按钮,我们在St
关闭。这个问题是off-topic.它目前不接受答案。想改进这个问题吗?Updatethequestion所以它是on-topic用于堆栈溢出。关闭11年前。Improvethisquestion我不经常使用ruby-通常它加起来相当于每两个月或更长时间编写一次脚本。我的大部分编程都是使用C++进行的,这与ruby有很大不同。由于我与ruby之间的差距如此之大,我总是忘记语言的基本方面(比如解析文本文件和其他简单的东西)。我想每天练习一些基本的东西,我想知道是否有一些我可以订阅的网站,并且会向我发送当天的Ruby问题或类似的东西。有人知道这样的站点/Internet服务吗?
您好,我正在做Rails应用程序,当我捆绑安装时它返回一个错误:无法在任何来源中找到coffee-script-source-1.1.3我知道coffee-script-source-1.1.3gem已被弃用/取消,但我的其他gem与此有依赖关系。但是这个项目正在与其他机器和heroku产品一起工作。如何在不更改我的gemfile的情况下成功捆绑安装?谢谢 最佳答案 运行这个:bundleupdate--sourcecoffee-script-source如果出现错误,如下所示:Anerroroccurredwhileinstall
深度学习12.CNN经典网络VGG16一、简介1.VGG来源2.VGG分类3.不同模型的参数数量4.3x3卷积核的好处5.关于学习率调度6.批归一化二、VGG16层分析1.层划分2.参数展开过程图解3.参数传递示例4.VGG16各层参数数量三、代码分析1.VGG16模型定义2.训练3.测试一、简介1.VGG来源VGG(VisualGeometryGroup)是一个视觉几何组在2014年提出的深度卷积神经网络架构。VGG在2014年ImageNet图像分类竞赛亚军,定位竞赛冠军;VGG网络采用连续的小卷积核(3x3)和池化层构建深度神经网络,网络深度可以达到16层或19层,其中VGG16和VGG
电脑上可以截取图片吗?如果可以,该如何操作呢?相信很多小伙伴都只知道一两种截图的方式,知道的并不全面。其实,电脑上有多种方式截图的,而且非常方便。电脑怎么截图?今天我们就来教大家如何使用电脑截取图片的8种常用方式!操作环境:演示机型:Delloptiplex7050系统版本:Windows10方法一:系统自带截图具体操作:同时按下电脑的自带截图键【Windows+shift+S】,可以选择其中一种方式来截取图片:截屏有矩形截屏、任意形状截屏、窗口截屏和全屏截图。 方法二:QQ截图具体操作:在电脑登录QQ,然后同时按下【Ctrl+Alt+A】,可以任意截图你需要的界面,可以把截图的页面直接下载,
这个问题在这里已经有了答案:Unabletoinstallgem-Failedtobuildgemnativeextension-cannotloadsuchfile--mkmf(LoadError)(17个答案)关闭9年前。嘿,我正在尝试在一台新的ubuntu机器上安装rails。我安装了ruby和rvm,但出现“无法构建gemnative扩展”错误。这是什么意思?$sudogeminstallrails-v3.2.9(没有sudo表示我没有权限)然后它会输出很多“获取”命令,最终会出现这个错误:Buildingnativeextensions.Thiscouldtakeawhi
我想知道如何从Apple.p12文件中提取key。根据我有限的理解,.p12文件是X504证书和私钥的组合。我看到我遇到的每个.p12文件都有一个X504证书和至少一个key,在某些情况下有两个key。这是因为每个.p12都有一个Apple开发人员key,有些还有一个额外的key(可能是Appleroot授权key)。我只考虑那些具有两个key的.p12文件是有效的。我的目标是区分具有一个key的.p12文件和具有两个key的.p12文件。到目前为止,我已经使用OpenSSL来检查X504文件和任何.p12的key。例如,我有这段代码可以检查目录中的所有.p12文件:Dir.glob(
require'openssl'ifARGV.length==2pkcs12=OpenSSL::PKCS12.new(File.read(ARGV[0]),ARGV[1])ppkcs12.certificateelseputs"Usage:load_cert.rb"end运行它会在Windows上产生错误,但在Linux上不会。错误:OpenSSL::PKCS12::PKCS12Error:PKCS12_parse:macverifyfailurefrom(irb):21:ininitializefrom(irb):21:innewfrom(irb):21fromC:/Ruby192/