keyby是flink中非常常见的操作。其作用为在逻辑上将流划分为不相交的分区,而具有相同key的数据都分配到同一个分区。这种操作在各种大数据计算引擎中都非常常见,比如最早的mapreduce,从map阶段到reduce阶段,就是通过shuffle操作将具有相同key的数据分配到同一个reduce端进行处理。在flink内部,keyby是通过哈希分区来实现的,并且自带有多种指定key的方式。
我们先通过源码,来看看keyby指定key的几种不同方式,flink版本1.7.2
/**
* Partitions the operator state of a {@link DataStream} by the given key positions.
*
* @param fields
* The position of the fields on which the {@link DataStream}
* will be grouped.
* @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
*/
public KeyedStream<T, Tuple> keyBy(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
} else {
return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
}
}
第一种方式,通过指定字段的位置来进行分组,输入参数为一个或多个整数,整数即代表字段对应位置。
/**
* Partitions the operator state of a {@link DataStream} using field expressions.
* A field expression is either the name of a public field or a getter method with parentheses
* of the {@link DataStream}'s underlying type. A dot can be used to drill
* down into objects, as in {@code "field1.getInnerField2()" }.
*
* @param fields
* One or more field expressions on which the state of the {@link DataStream} operators will be
* partitioned.
* @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
**/
public KeyedStream<T, Tuple> keyBy(String... fields) {
return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
}
第二种方式,通过指定字段名来指定key。这个字段名是有一定要求的,后面我们再详细解释。
/**
* It creates a new {@link KeyedStream} that uses the provided key for partitioning
* its operator states.
*
* @param key
* The KeySelector to be used for extracting the key for partitioning
* @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
*/
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
Preconditions.checkNotNull(key);
return new KeyedStream<>(this, clean(key));
}
第三种方式,通过KeySelector的方式指定。
而KeySelector是一个接口,里面只有一个方法getKey,我们使用的时候实现getKey方法即可。
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN value) throws Exception;
}
通过字段号指定key相对比较简单,直接看一个wordcount例子即可。
public static void baseVersion() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> stream = env.fromElements("java python c python python c");
DataStream<Tuple2<String, Integer>> flatstream = stream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
for(String word: value.split("\\W+")) {
out.collect(new Tuple2<>(word, 1));
}
}
})
.keyBy(0)
.sum(1);
flatstream.print();
env.execute("keyby base version");
}
keyBy(0)表示对第一个字段,即word进行分区,而sum(1)则表示对第二个字段即count进行求和。
通过字段号指定key使用比较简单方便,但是如果是比较复杂的场景,就不好搞定了。比如如果数据是个比较复杂的嵌套结构Tuple2<Tuple2<String, Integer>, Integer>,如果我们想对内部嵌套的Tuple2的第一个字段进行keyby操作,就无法通过字段号来操作,这个时候我们可以通过字段名的方式来进行代替。
字段名的方式相对来说复杂一些,下面我们来进行示范。
还是先以简单的wordcount为例。
先定义个内部静态类,静态类包含有两个字段,分别为word与count。
public static final class WC {
public String word;
public int count;
public WC() {}
public WC(String word, int count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public int getCount() {
return count;
}
public void setWord(String word) {
this.word = word;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return this.word + ": " + this.count;
}
}
该POLO类中的两个字段word与count,可以传到keyby算子中。
然后再进行flink相关代码的编写。
public static void nameVersion() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);`在这里插入代码片`
WC wc1 = new WC("java", 1);
WC wc2 = new WC("python", 2);
WC wc3 = new WC("c", 3);
WC wc4 = new WC("c", 4);
WC wc5 = new WC("java", 5);
DataStream<WC> stream = env.fromElements(wc1, wc2, wc3, wc4, wc5);
stream = stream.keyBy("word").sum("count");
stream.print();
env.execute("keyed base");
}
上面代码输出为
java: 1
python: 2
c: 3
c: 7
java: 6
注意上面的WC pojo类是有要求的
1.keyby中的字段名必须与pojo类的字段名一致。
2.pojo类一定要提供默认的构造函数,否则代码会报如下错误。
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<xxx.xxx.xxx.WC>) cannot be used as key.
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:337)
...
3.字段需要提供get/set方法。(但是在1.7.2版本测试,如果对字段不提供get/set方法,wordcount代码也可以正常运行)
接下来我们看嵌套的字段名如何在keyby中被指定。
public static final class WC {
public int count;
public InnerClass inner;
public WC() {}
public WC(InnerClass inner, int count) {
this.inner = inner;
this.count = count;
}
public int getCount() {
return count;
}
public WC setCount(int count) {
this.count = count;
return this;
}
@Override
public String toString() {
return this.inner.name + ": " + this.count;
}
}
public static final class InnerClass {
public String name;
public String department;
public InnerClass() {}
public InnerClass(String name) {
this.name = name;
}
public String getName() {
return name;
}
public InnerClass setName(String name) {
this.name = name;
return this;
}
public String getDepartment() {
return department;
}
public InnerClass setDepartment(String department) {
this.department = department;
return this;
}
}
首先我们定义了两个pojo类,一个是WC类,包含有count字段以及InnerClass对象。而InnerClass有name与department两个字段。
有同学可能会问,搞这么复杂干嘛,直接将所有字段定义到WC类中不就好了。同学们,我们这里是演示嵌套字段的用法…
接下来,我们想将name字段指定为keyby中的key
public static void run() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
InnerClass inn1 = new InnerClass("jojo"); InnerClass inn2 = new InnerClass("jojo");
InnerClass inn3 = new InnerClass("lili"); InnerClass inn4 = new InnerClass("lili");
WC wc1 = new WC(inn1, 1);
WC wc2 = new WC(inn2, 2);
WC wc3 = new WC(inn3, 3);
WC wc4 = new WC(inn4, 4);
DataStream<WC> stream = env.fromElements(wc1, wc2, wc3, wc4)
.keyBy("inner.name")
.sum("count");
stream.print();
env.execute("keyby complex version");
}
上面的例子中
count指的是WC中的count字段
inner.word指的是InnerClass中的word字段,inner则表示WC类中的inner属性。
这样就达到了指定复杂嵌套结构中key的目的。
看一个例子,就能明白上述方式的用法。
public static void keyselect() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> stream = env.fromElements("java python c python python c");
DataStream<Tuple2<String, Integer>> flatstream = stream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
for(String word: value.split("\\W+")) {
out.collect(new Tuple2<>(word, 1));
}
}
})
.keyBy((KeySelector<Tuple2<String, Integer>, Object>) value -> value.f0)
.sum(1);
flatstream.print();
env.execute("key select version");
}
我试图获取一个长度在1到10之间的字符串,并输出将字符串分解为大小为1、2或3的连续子字符串的所有可能方式。例如:输入:123456将整数分割成单个字符,然后继续查找组合。该代码将返回以下所有数组。[1,2,3,4,5,6][12,3,4,5,6][1,23,4,5,6][1,2,34,5,6][1,2,3,45,6][1,2,3,4,56][12,34,5,6][12,3,45,6][12,3,4,56][1,23,45,6][1,2,34,56][1,23,4,56][12,34,56][123,4,5,6][1,234,5,6][1,2,345,6][1,2,3,456][123
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
我正在尝试修改当前依赖于定义为activeresource的gem:s.add_dependency"activeresource","~>3.0"为了让gem与Rails4一起工作,我需要扩展依赖关系以与activeresource的版本3或4一起工作。我不想简单地添加以下内容,因为它可能会在以后引起问题:s.add_dependency"activeresource",">=3.0"有没有办法指定可接受版本的列表?~>3.0还是~>4.0? 最佳答案 根据thedocumentation,如果你想要3到4之间的所有版本,你可以这
我有一个这样的哈希数组:[{:foo=>2,:date=>Sat,01Sep2014},{:foo2=>2,:date=>Sat,02Sep2014},{:foo3=>3,:date=>Sat,01Sep2014},{:foo4=>4,:date=>Sat,03Sep2014},{:foo5=>5,:date=>Sat,02Sep2014}]如果:date相同,我想合并哈希值。我对上面数组的期望是:[{:foo=>2,:foo3=>3,:date=>Sat,01Sep2014},{:foo2=>2,:foo5=>5:date=>Sat,02Sep2014},{:foo4=>4,:dat
我刚刚被困在这个问题上一段时间了。以这个基地为例:moduleTopclassTestendmoduleFooendend稍后,我可以通过这样做在Foo中定义扩展Test的类:moduleTopmoduleFooclassSomeTest但是,如果我尝试通过使用::指定模块来最小化缩进:moduleTop::FooclassFailure这失败了:NameError:uninitializedconstantTop::Foo::Test这是一个错误,还是仅仅是Ruby解析变量名的方式的逻辑结果? 最佳答案 Isthisabug,or
question的一些答案关于redirect_to让我想到了其他一些问题。基本上,我正在使用Rails2.1编写博客应用程序。我一直在尝试自己完成大部分工作(因为我对Rails有所了解),但在需要时会引用Internet上的教程和引用资料。我设法让一个简单的博客正常运行,然后我尝试添加评论。靠我自己,我设法让它进入了可以从script/console添加评论的阶段,但我无法让表单正常工作。我遵循的其中一个教程建议在帖子Controller中创建一个“评论”操作,以添加评论。我的问题是:这是“标准”方式吗?我的另一个问题的答案之一似乎暗示应该有一个CommentsController参
在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList()Obt
如何使此根路径转到:“/dashboard”而不仅仅是http://example.com?root:to=>'dashboard#index',:constraints=>lambda{|req|!req.session[:user_id].blank?} 最佳答案 您可以通过以下方式实现:root:to=>redirect('/dashboard')match'/dashboard',:to=>"dashboard#index",:constraints=>lambda{|req|!req.session[:user_id].b
我有一个.pfx格式的证书,我需要使用ruby提取公共(public)、私有(private)和CA证书。使用shell我可以这样做:#ExtractPublicKey(askforpassword)opensslpkcs12-infile.pfx-outfile_public.pem-clcerts-nokeys#ExtractCertificateAuthorityKey(askforpassword)opensslpkcs12-infile.pfx-outfile_ca.pem-cacerts-nokeys#ExtractPrivateKey(askforpassword)o