Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。基于流执行引擎,Flink提供了跟多高抽象层的API便于用户编写分布式任务,下面稍微介绍一下Flink的几种API:
Flink官网:https://flink.apache.org/
官方文档(1.14.2版本):https://nightlies.apache.org/flink/flink-docs-release-1.14/
官方中文文档(1.14.2版本):https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/
不同版本的文档:https://nightlies.apache.org/flink/


Flink的基础架构图:

JobClient:负责接收程序,解析和优化程序的执行计划,然后提交执行计划到JobManager。这里执行的程序优化是将相邻的Operator融合,形成Operator Chain,Operator的融合可以减少task的数量,提高TaskManager的资源利用率。为了了解Flink的解析过程,需要简单介绍一下Flink的Operator,在Flink主要有三类Operator:
JobManagers:负责申请资源,协调以及控制整个job的执行过程,具体包括,调度任务、处理checkpoint、容错等等。
TaskManager:TaskManager运行在不同节点上的JVM进程,负责接收并执行JobManager发送的task,并且与JobManager通信,反馈任务状态信息,如果说JobManager是master的话,那么TaskManager就是worker用于执行任务。每个TaskManager像是一个容器,包含一个或者多个Slot。
Slot:Slot是TaskManager资源粒度的划分,每个Slot都有自己独立的内存。所有Slot平均分配TaskManager的内存,值得注意的是,Slot仅划分内存,不涉及CPU的划分,即CPU是共享使用。每个Slot可以运行多个task。Slot的个数就代表了一个程序的最高并行度。
Task:Task是在operators的subtask进行链化之后形成的,具体Flink job中有多少task和operator的并行度和链化的策略有关。
SubTask:因为Flink是分布式部署的,程序中的每个算子,在实际执行中被分隔为一个或者多个subtask,运算符子任务(subtask)的数量是该特定运算符的并行度。数据流在算子之间流动,就对应到SubTask之间的数据传输。Flink允许同一个job中来自不同task的subtask可以共享同一个slot。每个slot可以执行一个并行的pipeline。可以将pipeline看作是多个subtask的组成的。
Flink 中的 Time 分为三种:事件时间、达到时间与处理时间。
事件时间:是事件真实发生的时间。
达到时间:是系统接收到事件的时间,即服务端接收到事件的时间。
处理时间:是系统开始处理到达事件的时间。
【温馨提示】在某些场景下,处理时间等于达到时间。因为处理时间没有乱序的问题,所以服务端做基于处理时间的计算是比较简单的,无迟到与乱序数据。
Flink 中只需要通过 env 环境变量即可设置Time:
//创建环境上下文
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置在当前程序中使用 ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
窗口本质就是将无限数据集沿着时间(或者数量)的边界切分成有限数据集。
Time Window:基于时间的,分为Tumbling Window(无数据重叠)和Sliding Window(有数据重叠) 。
Count Window:基于数量的,分为Tumbling Window(无数据重叠)和Sliding Window(有数据重叠)。
Session Window:基于会话的,一个session window关闭通常是由于一段时间没有收到元素。
Global Window:全局窗口。
【温馨提示】在实际操作中,window又分为两大类型的窗口:Keyed Window 和 Non-keyed Window,两种类型的窗口操作API有细微的差别。
触发器决定了窗口何时会被触发计算,Flink 中开发人员需要在 window 类型的操作之后才能调用 trigger 方法传入触发器定义。Flink 中的触发器定义需要继承并实现 Trigger 接口,该接口有以下方法:
以上方法会返回决定如何触发执行的 TriggerResult:
如果开发人员未指定触发器,则 Flink 会自动根据场景使用默认的预定义好的触发器。在基于事件时间的窗口中使用 EventTimeTrigger,该触发器会在watermark通过窗口边界后立即触发(即watermark出现关闭改窗口时)。在全局窗口(GlobalWindow)中使用 NeverTrigger,该触发器永远不会触发,所以在使用全局窗口时用户需要自定义触发器。
Flink中状态的实现有三种:MemoryState、FsState、RocksDBState。三种状态存储方式与使用场景各不相同,详细介绍如下:
构造函数:MemoryStateBackend(int maxStateSize, boolean asyncSnapshot)
存储方式:State存储于各个 TaskManager内存中,Checkpoint存储于 JobManager内存
容量限制:单个State最大5M、maxStateSize<=akka.framesize(10M)、总大小不超过JobManager内存
使用场景:无状态或者JobManager挂掉不影响的测试环境等,不建议在生产环境使用
构造函数:FsStateBackend(URI checkpointUri, boolean asyncSnapshot)
存储方式:State存储于 TaskManager内存,Checkpoint存储于 外部文件系统(本次磁盘 or HDFS)
容量限制:State总量不超过TaskManager内存、Checkpoint总大小不超过外部存储空间
使用场景:常规使用状态的作业,分钟级的窗口聚合等,可在生产环境使用
构造函数:RocksDBStateBackend(URI checkpointUri, boolean enableincrementCheckpoint)
存储方式:State存储于 TaskManager上的kv数据库(内存+磁盘),Checkpoint存储于 外部文件系统(本次磁盘 or HDFS)
容量限制:State总量不超过TaskManager内存+磁盘、单key最大2g、Checkpoint总大小不超过外部存储空间
使用场景:超大状态的作业,天级的窗口聚合等,对读写性能要求不高的场景,可在生产环境使用
根据业务场景需要用户选择最合适的 StateBackend ,代码中只需在相应的 env 环境中设置即可:
// flink 上下文环境变量
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置状态后端为 FsStateBackend,数据存储到 hdfs /tmp/flink/checkpoint/test 中
env.setStateBackend(new FsStateBackend("hdfs://ns1/tmp/flink/checkpoint/test", false))
Checkpoint 是分布式全域一致的,数据会被写入hdfs等共享存储中。且其产生是异步的,在不中断、不影响运算的前提下产生。
用户只需在相应的 env 环境中设置即可:
// 1000毫秒进行一次 Checkpoint 操作
env.enableCheckpointing(1000)
// 模式为准确一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 两次 Checkpoint 之间最少间隔 500毫秒
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 过程超时时间为 60000毫秒,即1分钟视为超时失败
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 同一时间只允许1个Checkpoint的操作在执行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
异步屏障快照算法,这个算法基本上是Chandy-Lamport算法的变体,针对DAG(有向无环图)的ABS算法执行流程如下所示:
下面这个图展示了一个ABS算法的执行过程:

Flink 程序并 不能自动提取数据源中哪个字段/标识为数据的事件时间,从而也就无法自己定义 Watermark 。开发人员需要通过 Flink 提供的 API 来 提取和定义 Timestamp/Watermark,可以在 数据源或者数据流中 定义。
自定义的数据源类需要继承并实现 SourceFunction[T] 接口,其中 run 方法是定义数据生产的地方:
//自定义的数据源为自定义类型MyType
class MySource extends SourceFunction[MyType]{
//重写run方法,定义数据生产的逻辑
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
//设置timestamp从MyType的哪个字段获取(eventTimestamp)
ctx.collectWithTimestamp(next, next.eventTimestamp)
if (next.hasWatermarkTime) {
//设置watermark从MyType的那个方法获取(getWatermarkTime)
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}
}
在这里插入代码片在数据流中,可以设置 stream 的 Timestamp Assigner ,该 Assigner 将会接收一个 stream,并生产一个带 Timestamp和Watermark 的新 stream。
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
和 Spark 中的广播变量一样,Flink 也支持在各个节点中各存一份小数据集,所在的计算节点实例可在本地内存中直接读取被广播的数据,可以避免Shuffle提高并行效率。
广播状态(Broadcast State)的引入是为了支持一些来自一个流的数据需要广播到所有下游任务的情况,它存储在本地,用于处理其他流上的所有传入元素。
// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream.keyBy(new KeySelector<Shape, Color>(){...});
// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Rule>() {}));
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
DataStream<Match> output = colorPartitionedStream.connect(ruleBroadcastStream).process(new KeyedBroadcastProcessFunction<Color, Item, Rule, String>(){...});
Flink作业中,可以指定相关的chain将相关性非常强的转换操作(operator)绑定在一起,使得上下游的Task在同一个Pipeline中执行,避免因为数据在网络或者线程之间传输导致的开销。
一般情况下Flink在Map类型的操作中默认开启 Operator Chain 以提高整体性能,开发人员也可以根据需要创建或者禁止 Operator Chain 对任务进行细粒度的链条控制。
//创建 chain
dataStream.filter(...).map(...).startNewChain().map(...)
//禁止 chain
dataStream.map(...).disableChaining()
创建的链条只对当前的操作符和之后的操作符有效,不不影响其他操作,如上代码只针对两个map操作进行链条绑定,对前面的filter操作无效,如果需要可以在filter和map之间使用 startNewChain方法即可。
除了从DataStream操作的结果中获取主数据流之外,Flink还可以产生任意数量额外的侧输出(Side Output)结果流。侧输出结果流的数据类型不需要与主数据流的类型一致,不同侧输出流的类型也可以不同。当要拆分数据流时(通常必须复制流),从每个流过滤出不想拥有的数据时Side Output将非常有用。
DataStream<Integer> input = ...;
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// 将数据发送到常规输出中
out.collect(value);
// 将数据发送到侧输出中
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);

下载地址:http://flink.apache.org/downloads.html
对于 Local 模式来说,JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload。如果要验证一个简单的应用,Local 模式是最方便的。实际企业中大多使用Flink On Yarn模式,而local模式只是将安装包解压启动(./bin/start-cluster.sh)即可。其实local模式就是单节点,master和woker节点都是同一台机器。

Local Cluster模式是开箱即用的,直接解压安装包,然后启动即可。
$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz
# 解压
$ tar -zxvf flink-1.14.2-bin-scala_2.12.tgz -C /opt/bigdata/hadoop/server/
# 进入bin目录运行启动脚本
$ cd /opt/bigdata/hadoop/server/flink-1.14.2
$ ./bin/start-cluster.sh
打开浏览器输入http://IP:8081,查看WEBUI监控界面
我这里地址:http://hadoop-node1:8081

Stanalone CLuster是一种独立的集群模式,集群运行不需要依赖外部系统,完全自己独立进行管理。
| 机器IP | 机器名 | 节点类型 |
|---|---|---|
| 192.168.0.113 | hadoop-node1 | Master |
| 192.168.0.114 | hadoop-node2 | Worker |
| 192.168.0.115 | hadoop-node3 | Worker |

$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz
# 解压
$ tar -zxvf flink-1.14.2-bin-scala_2.12.tgz -C /opt/bigdata/hadoop/server/
$ cd /opt/bigdata/hadoop/server/flink-1.14.2
$ cd /opt/bigdata/hadoop/server/flink-1.14.2/conf
$ vi flink-conf.yaml
## jobmanager节点地址,也是master节点地址
jobmanager.rpc.address: hadoop-node1
其它使用默认配置,其中有一些HA高可用、容错、安全、HistoryServer相关配置,按需进行配置即可,HistoryServer需单独运行启动脚本来启动服务。
hadoop-node1:8081
hadoop-node2
hadoop-node3
$ cd /opt/bigdata/hadoop/server
$ scp -r flink-1.14.2 hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r flink-1.14.2 hadoop-node3:/opt/bigdata/hadoop/server/
在/etc/profile文件中添加如下内容(所有节点):
export FLINK_HOME=/opt/bigdata/hadoop/server/flink-1.14.2
export PATH=$PATH:$FLINK_HOME/bin
使配置文件生效
$ source /etc/profile
$ start-cluster.sh


On Yarn官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/yarn/

YARN模式是使用YARN做为Flink运行平台,JobManager、TaskManager、用户提交的应用程序都运行在YARN上。
FLink on yarn 有三种运行模式:
官方介绍:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/deployment/overview/

下载
$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz
# 解压
$ tar -zxvf flink-1.14.2-bin-scala_2.12.tgz -C /opt/bigdata/hadoop/server/
$ cd /opt/bigdata/hadoop/server/flink-1.14.2
配置
在/etc/profile文件中追加如下内容:

export FLINK_HOME=/opt/bigdata/hadoop/server/flink-1.14.2
export PATH=$PATH:$FLINK_HOME/bin
# 上面两句如果加过,可以忽略
export HADOOP_CLASSPATH=`hadoop classpath`
加载配置
$ source /etc/profile
Yarn-session模式下,首先向Yarn提交一个长时运行的空应用,运行起来之后,任务跑完集群也不释放,可以重复使用在Yarn上开启的Flink集群,也称为共享型集群,适合小任务。

实验
第一种模式分为两步:yarn-session.sh(启动,开辟/申请资源)+flink run(提交任务)
$ yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
### 参数解释:
#-n 2 表示指定两个容器
# -jm 1024 表示jobmanager 1024M内存
# -tm 1024表示taskmanager 1024M内存
#-d 任务后台运行
### 如果你不希望flink yarn client一直运行,也可以启动一个后台运行的yarn session。使用这个参数:-d 或者 --detached。在这种情况下,flink yarn client将会只提交任务到集群然后关闭自己。注意:在这种情况下,无法使用flink停止yarn session,必须使用yarn工具来停止yarn session。
# yarn application -kill $applicationId
#-nm,--name YARN上为一个自定义的应用设置一个名字
#-q,--query 显示yarn中可用的资源 (内存, cpu核数)
#-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
#-id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中

JobManager Web Interface: http://hadoop-node2:41787,端口是随机的。


通过yarn入口访问flink


$ cd $FLINK_HOME
$ hadoop fs -put LICENSE /
$ hadoop fs -ls /LICENSE
# 提交任务
$ flink run ./examples/batch/WordCount.jar -input hdfs://hadoop-node1:8082/LICENSE -output hdfs://hadoop-node1:8082/wordcount-result.txt




再提交一次任务
【注意】-output一定是不存在的文件,有flink自动创建写入
$ flink run ./examples/batch/WordCount.jar -input hdfs://hadoop-node1:8082/LICENSE -output hdfs://hadoop-node1:8082/wordcount-result2.txt
发现现在已经有两个跑完的任务了,但是只有一个flink集群,从而验证了yarn-session模式

Yarn-cluster模式下,每个任务单独在Yarn上启动一套Flink集群,适合大任务,运行完后结束,集群释放,资源释放,再有任务,会再起新的Flink集群,需要频繁的在Yanr上开启Flink集群,集群相互独立,适合大任务。

当然除了on yarn模式,还有on k8s,有兴趣的小伙伴,可以试试,当时目前企业里用的最多的还是on yarn模式,但是现在不是流行容器化嘛,以后很大可能会慢慢转到 on k8s模式。
实验
第二种模式其实也分为两个部分,依然是开辟资源和提交任务,但是在Job模式下,这两步都合成一个命令了。
$ cd $FLINK_HOME
$ flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
# 查看帮助
$ flink --help
### 参数详解,这里只列出了部分参数
Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Set to yarn-cluster to use YARN
execution mode.
-yat,--yarnapplicationType <arg> Set a custom application type for the
application on YARN
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
application
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
【温馨提示】上面命令中没有指定-input 和 -output,这是由于有默认的数据集和输出方式,看看效果。




发现查看不了History,是因为没起History服务,下面启动这个服务
History Server允许查询由JobManager归档的已完成作业的状态和统计信息。已完成作业的归档在JobManager上进行,JobManager会将归档的作业信息upload到文件系统目录,这个文件系统可以是本地文件系统、HDFS、H3等,这个目录是可以在配置文件中指定的。然后还需要配置History Server去扫描这个目录,并且可以配置扫描的间隔时间。
配置historyserver
$ cd $FLINK_HOME/bin
# 选创建目录
$ hdfs://hadoop-node1:8082/flink/completed-jobs/
# conf/flink-conf.yaml
# 指定由JobManager归档的作业信息所存放的目录,这里使用的是HDFS
jobmanager.archive.fs.dir: hdfs://hadoop-node1:8082/flink/completed-jobs/
# 指定History Server扫描哪些归档目录,多个目录使用逗号分隔
historyserver.archive.fs.dir: hdfs://hadoop-node1:8082/flink/completed-jobs/
# 指定History Server间隔多少毫秒扫描一次归档目录
historyserver.archive.fs.refresh-interval: 10000
# History Server所绑定的ip,hadoop-node1代表允许所有ip访问
historyserver.web.address: hadoop-node1
# 指定History Server所监听的端口号,默认端口是8082
historyserver.web.port: 9082
启动historyserver
$ ./historyserver.sh start
$ jps

重新跑一次任务
$ flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar





查看正在运行任务的日志,yarn容器退出之后,就下面入口就访问不了了。

Application模式的由来
其实前面两种模式client端还是需要干三件事情的:
只有在这些都完成之后,才会通过env.execute()方法触发Flink运行时真正地开始执行作业。试想,如果所有用户都在Deployer上提交作业,较大的依赖会消耗更多的带宽,而较复杂的作业逻辑翻译成JobGraph也需要吃掉更多的CPU和内存,客户端的资源反而会成为瓶颈——不管Session还是Per-Job模式都存在此问题。为了解决它,社区在传统部署模式的基础上实现了Application模式。
Application模式概述
Application模式原本需要客户端做的三件事被转移到了JobManager里,也就是说main()方法在集群中执行(入口点位于ApplicationClusterEntryPoint),Deployer只需要负责发起部署请求了。另外,如果一个main()方法中有多个env.execute()/executeAsync()调用,在Application模式下,这些作业会被视为属于同一个应用,在同一个集群中执行(如果在Per-Job模式下,就会启动多个集群)。可见,Application模式本质上是Session和Per-Job模式的折衷。
实验
$ cd $FLINK_HOME
$ ./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=10 \
-Dyarn.application.name="MyFlinkApp" \
./examples/batch/WordCount.jar
【温馨提示】
-t参数用来指定部署目标,目前支持YARN(yarn-application)和K8S(kubernetes-application)。-D参数则用来指定与作业相关的各项参数,具体可参见官方文档。
可以先看我之前写的Spark文章:https://blog.csdn.net/qq_35745940/article/details/122011664
Flink执行流程图如下:

| 对比维度 | Spark | Flink |
|---|---|---|
| 设计理念 | Spark的技术理念是使用微批来模拟流的计算,基于Micro-batch,数据流以时间为单位被切分为一个个批次,通过分布式数据集RDD进行批量处理,是一种伪实时。 | Flink是基于事件驱动的,是面向流的处理框架, Flink基于每个事件一行一行地流式处理,是真正的实时流式计算, 另外他也可以基于流来模拟批进行计算实现批处理。 |
| 架构方面 | Spark在运行时的主要角色包括:Master、Worker、Driver、Executor。 | Flink 在运行时主要包含:Jobmanager、Taskmanager和Slot。 |
| 任务调度 | Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图DAG,根据DAG中的action操作形成job,每个job有根据窄宽依赖生成多个stage。 | 使用DataStream API开发的应用程序,首先被转换为Transformation,再被映射为StreamGraph,在客户端进行StreamGraph、JobGraph的转换,提交JobGraph到Flink集群后,Flink集群负责将JobGraph转换为ExecutionGraph,之后进入调度执行阶段。 |
| 时间机制 | Spark Streaming 支持的时间机制有限,只支持处理时间。使用processing time模拟event time必然会有误差, 如果产生数据堆积的话,误差则更明显。 | flink支持三种时间机制:事件时间,注入时间,处理时间,同时支持 watermark 机制处理迟到的数据,说明Flink在处理乱序大实时数据的时候,更有优势。 |
| 容错机制 | SparkStreaming的容错机制是基于RDD的容错机制,会将经常用的RDD或者对宽依赖加Checkpoint。利用SparkStreaming的direct方式与Kafka可以保证数据输入源的,处理过程,输出过程符合exactly once。 | Flink 则使用两阶段提交协议来保证exactly once。 |
| 吞吐量与延迟 | spark是基于微批的,而且流水线优化做的很好,所以说他的吞入量是最大的,但是付出了延迟的代价,它的延迟是秒级。 | 而Flink是基于事件的,消息逐条处理,而且他的容错机制很轻量级,所以他能在兼顾高吞吐量的同时又有很低的延迟,它的延迟能够达到毫秒级。 |
Flink原理介绍先写到这里了,更多关于Flink的知识点,请您耐心等待,当然也可以先自行去看官方文档:https://nightlies.apache.org/flink/。
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
这里是Ruby新手。完成一些练习后碰壁了。练习:计算一系列成绩的字母等级创建一个方法get_grade来接受测试分数数组。数组中的每个分数应介于0和100之间,其中100是最大分数。计算平均分并将字母等级作为字符串返回,即“A”、“B”、“C”、“D”、“E”或“F”。我一直返回错误:avg.rb:1:syntaxerror,unexpectedtLBRACK,expecting')'defget_grade([100,90,80])^avg.rb:1:syntaxerror,unexpected')',expecting$end这是我目前所拥有的。我想坚持使用下面的方法或.join,
我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
我想为我的Rails网络应用程序提供推荐功能。特别是,我想向新注册的用户推荐他可能想要关注的其他用户。Rails中是否有用于此目的的引擎/gem?如果没有,我应该从哪里开始构建它?谢谢。 最佳答案 有Coletivogemhttps://github.com/diogenes/coletivo我试了一下。在MySQL上运行。Neo4jhttp://neo4j.org真的很容易实现一个“跟随谁”。事实上,大多数展示其能力的样本都涉及“跟随谁”。快速提示-只有在JRuby上运行时,Neo4j.rb才会很酷。如果不是-使用Neograph
我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD
导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵
一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame
本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01 客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02 数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit