草庐IT

Flink总结之一文彻底搞懂处理函数

搞数据的小伙伴 2023-04-22 原文

​处理函数是Flink底层的函数,工作中通常用来做一些更复杂的业务处理,这次把Flink的处理函数做一次总结,处理函数分好几种,主要包括基本处理函数,keyed处理函数,window处理函数,通过源码说明和案例代码进行测试。

处理函数就是位于底层API里,熟悉处理函数能够更好的处理Flink流处理。

Flink官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/process_function/

一、基本处理函数(ProcessFunction)

首先我们看ProcessFunction的源码,ProcessFunction是一个抽象类,继承了AbstractRichFunction类,那么处理函数就拥有了富函数的所有特性。

1. 拥有的方法如下

processElement:编写我们的处理逻辑,每个数据到来都会走这个函数,有三个参数,第一个参数是输入值类型,第二个参数是上下文Context,第三个参数是收集器(输出)。

onTimer:定时器,通过TimerService 进行注册,当定时时间到达的时候就会执行onTimer函数。只有在KeyedStream中才可以使用。

2. 拥有的抽象类

Context:上下文抽象类,在这个类中可以获取到当前时间戳,以及时间服务timerService,可用来注册定时器和查询时间。

3. 源码

//I:输入类型
//O:收集输出类型
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public ProcessFunction() {
}
public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;
// 定时器触发时执行的逻辑 当前不支持
public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
public abstract class OnTimerContext extends ProcessFunction<I, O>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}

4. 测试代码

使用Linux的nc服务进行端口监听,并向9999端口发送数据,然后我们通过Flink监听9999端口,并获取数据进行数据处理。

安装nc组件:

sudo yum install nc -y

开启9999端口:

nc -lk 9999

代码如下:

/**
* 处理函数测试
*/
public class ProcessFunctionTest {
public static void main(String[] args) throws Exception {
// todo 构建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 监听hadoop110服务器9999端口,获取输入流
DataStreamSource<String> streamSource = env.socketTextStream("hadoop110", 9999);
// todo 封装输入流,将数据处理成{"userName":"aa","time",xxxxx}这个结构
SingleOutputStreamOperator<JSONObject> mapStream = streamSource.map(t -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName",t);
jsonObject.put("time",System.currentTimeMillis());
return jsonObject;
});
// TODO 调用处理函数
mapStream.process(new MyProcessFunction()).print("调用处理函数接收到的数据:");
env.execute();
}
}
// 自定义处理函数
class MyProcessFunction extends ProcessFunction<JSONObject,String>{
@Override
public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {
System.out.println("processElement方法接收到的用户数据:"+ jsonObject.getString("userName"));
collector.collect(jsonObject.getString("userName")+"-----");
}
}

二、按键分区处理函数(KeyedProcessFunction)

按键分区处理函数是重点,用在keyby后面,对keyedStream进行处理,keyby将会按照Key进行分区,然后将不同key的数据分配到不同并行子任务上进行执行。

KeyedProcessFunction可以使用定时器和定时服务,代码中使用定时器和定时服务查看数据和完成定时任务。

KeyedProcessFunction:处理分区数据,每个元素执行一次processElement方法

1. KeyedProcessFunction源码

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public KeyedProcessFunction() {
}
// 处理方法,每个数据都会走这个方法
public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;
// 定时器逻辑,定时器触发时会走这个方法的逻辑
public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
public abstract class OnTimerContext extends KeyedProcessFunction<K, I, O>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
public abstract K getCurrentKey();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
public abstract K getCurrentKey();
}
}

2. TimerService源码

public interface TimerService {
// 不支持注册定时器的提示语
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
// 不支持删除定时器的提示语
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
// 当前处理时间
long currentProcessingTime();

// 当前水位线
long currentWatermark();
// 注册基于处理时间的定时器
void registerProcessingTimeTimer(long var1);

// 注册基于事件时间的定时器
void registerEventTimeTimer(long var1);

// 删除基于处理时间的定时器
void deleteProcessingTimeTimer(long var1);
// 删除基于事件时间的定时器
void deleteEventTimeTimer(long var1);
}

3. 测试代码:

/**
* @title: KeyedProcessFunctionTest
* @Author Tian
* @Date: 2023/3/21 22:59
* @Version 1.0
*/
public class KeyedProcessFunctionTest {
public static void main(String[] args) throws Exception {
// todo 获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 处理原始数据并增加时间戳
SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName", t);
jsonObject.put("time", System.currentTimeMillis()-5000);
return jsonObject;
});
// todo sourceStream设置水位线,指定事件时间字段
sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("time");
}
}));
// todo 按照用户名分组
// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);
sourceStream.keyBy(data -> true).process(new MyKeyedProcessFunction());
// todo 调用窗口处理函数并输出
// keyedStream.process(new MyKeyedProcessFunction()).print("处理结果:");
env.execute();
}
}
/**
* 自定义窗口处理函数
*/
class MyKeyedProcessFunction extends KeyedProcessFunction<Boolean,JSONObject,String>{
@Override
public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {
System.out.println("当前处理Key:"+ context.getCurrentKey());
System.out.println("数据到达时间:"+ context.timestamp());
System.out.println("当前处理时间:"+ context.timerService().currentProcessingTime());
System.out.println("当前水位线:"+ context.timerService().currentWatermark());
// todo 注册定时器,处理时间+1
context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()+1000);
// todo 返回当前key
collector.collect(jsonObject.toJSONString());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// todo 触发器触发的时候执行的逻辑 ;
super.onTimer(timestamp, ctx, out);
System.out.println("触发器触发了:"+ ctx.timestamp());
}
}

三、窗口处理函数(ProcessWindowsFunction)

除了上面的按键分区处理函数之外,对于窗口也有函数,分两种,一种是窗口处理函数(ProcessWindowsFunction),另一种是全窗口处理函数(ProcessAllWindowsFunction),ProcessWindowFunction获得一个包含窗口所有元素的可迭代器以及一个具有时间和状态信息访问权的上下文对象,使得它比其他窗口函数提供更大的灵活性。是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止。

ProcessWindowsFunction:处理分区数据,每个窗口执行一次process方法

1. ProcessWindowsFunction源码

// IN: input,数据流中窗口任务的输入数据类型
// OUT: output,窗口任务进行计算之后的输出数据类型。
// KEY:数据中键 key 的类型。
// W:窗口的类型,是 Window 的子类型。一般情况下我们定义时间窗口, W就是 TimeWindow。
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public ProcessWindowFunction() {
}
// Process处理数据方法
public abstract void process(KEY var1, ProcessWindowFunction<IN, OUT, KEY, W>.Context var2, Iterable<IN> var3, Collector<OUT> var4) throws Exception;
// 清除窗口数据方法
public void clear(ProcessWindowFunction<IN, OUT, KEY, W>.Context context) throws Exception {
}
// context上下文包含的内容
public abstract class Context implements Serializable {
public Context() {
}
public abstract W window();
public abstract long currentProcessingTime();
public abstract long currentWatermark();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}

2. 测试代码

public class ProcessWindowsFunctionTest {
public static void main(String[] args) throws Exception {
// todo 获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 处理原始数据并增加时间戳
SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {
Thread.sleep(5000);
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName", t);
jsonObject.put("time", System.currentTimeMillis()-10000);
return jsonObject;
});
// todo sourceStream设置水位线,指定事件时间字段
sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("time");
}
}));
// todo 按照用户名分组
// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);
sourceStream.keyBy(data -> data.getString("userName"))
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new MyProcessWindowsFunction()).print();
env.execute();
}
}
class MyProcessWindowsFunction extends ProcessWindowFunction<JSONObject, HashMap<String,Long>, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<JSONObject> iterable, Collector<HashMap<String,Long>> collector) throws Exception {
// 创建map对象
HashMap<String, Long> result = new HashMap<>();
// 遍历窗口中的数据
for (JSONObject jsonObject : iterable) {
String userName = jsonObject.getString("userName");
if(result.containsKey(userName)){
Long aLong = result.get(userName);
result.put(userName,aLong+1);
}else {
result.put(userName,1l);
}
}
collector.collect(result);
}
}

四、全窗口处理函数(ProcessAllWindowFunction)

ProcessAllWindowFunction和ProcessFunction类相似,都是用来对上游过来的元素做处理,不过ProcessFunction是每个元素执行一次processElement方法,ProcessAllWindowFunction是每个窗口执行一次process方法(方法内可以遍历该窗口内的所有元素);

1. ProcessAllWindowFunction源码

//IN: input,数据流中窗口任务的输入数据类型。
//OUT: output,窗口任务进行计算之后的输出数据类型。
//W:窗口的类型,是 Window 的子类型。一般情况下我们定义时间窗口, W就是 TimeWindow。
public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public ProcessAllWindowFunction() {
}
// 处理方法逻辑
public abstract void process(ProcessAllWindowFunction<IN, OUT, W>.Context var1, Iterable<IN> var2, Collector<OUT> var3) throws Exception;
// 清除窗口内数据
public void clear(ProcessAllWindowFunction<IN, OUT, W>.Context context) throws Exception {
}
// 窗口上下文信息
public abstract class Context {
public Context() {
}
public abstract W window();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}

2. 测试代码

public class ProcessAllWindowFunctionTest {
public static void main(String[] args) throws Exception {
// todo 获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 处理原始数据并增加时间戳
SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName", t);
jsonObject.put("time", System.currentTimeMillis()-10000);
return jsonObject;
});
// todo sourceStream设置水位线,指定事件时间字段
sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("time");
}
}));
// todo 按照用户名分组
// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);
sourceStream.keyBy(data -> data.getString("userName"))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new MyProcessAllWindowFunction()).print();
env.execute();
}
}
class MyProcessAllWindowFunction extends ProcessAllWindowFunction<JSONObject, HashMap<String,Long>, TimeWindow>{
@Override
public void process(Context context, Iterable<JSONObject> iterable, Collector<HashMap<String, Long>> collector) throws Exception {
// 创建map对象
HashMap<String, Long> result = new HashMap<>();
// 遍历窗口中的数据
for (JSONObject jsonObject : iterable) {
String userName = jsonObject.getString("userName");
if(result.containsKey(userName)){
Long aLong = result.get(userName);
result.put(userName,aLong+1);
}else {
result.put(userName,1l);
}
}
collector.collect(result);
}
}

五、合并流处理函数(CoProcessFunction)

对于连接流ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function)。与 CoMapFunction 类似,如果是调用.flatMap()就需要传入一个 CoFlatMapFunction,需要实现 flatMap1()、flatMap2()两个方法;而调用.process()时,传入的则是一个 CoProcessFunction。

1. 源码

public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public CoProcessFunction() {
}
// 第一条流处理方法
public abstract void processElement1(IN1 var1, CoProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
// 第二条流处理方法
public abstract void processElement2(IN2 var1, CoProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
public void onTimer(long timestamp, CoProcessFunction<IN1, IN2, OUT>.OnTimerContext ctx, Collector<OUT> out) throws Exception {
}
public abstract class OnTimerContext extends CoProcessFunction<IN1, IN2, OUT>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}

六、连接流处理函数(ProcessJoinFunction)

ProcessJoinFunction和CoProcessFunction类似,但是有区别。

ProcessJoinFunction 用于join流操作,可以拿到两个流数据处理

CoProcessFunction 用于连接流处理,两个流数据分别处理

1. 源码

//IN1:第一条流输入类型
//IN2:第二条流处理类型
//OUT:输出类型
public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
private static final long serialVersionUID = -2444626938039012398L;
public ProcessJoinFunction() {
}
// 流join后处理逻辑,可以获取到两个流的数据
public abstract void processElement(IN1 var1, IN2 var2, ProcessJoinFunction<IN1, IN2, OUT>.Context var3, Collector<OUT> var4) throws Exception;
public abstract class Context {
public Context() {
}
public abstract long getLeftTimestamp();
public abstract long getRightTimestamp();
public abstract long getTimestamp();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}

七、广播流处理函数(BroadcastProcessFunction)

广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流” BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。

1. 源码

// IN1:输入的第一条流
// IN2:输入的第二条流
// OUT:输出类型
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
private static final long serialVersionUID = 8352559162119034453L;
public BroadcastProcessFunction() {
}
// 处理流逻辑
// IN1:输入流数据
// ReadOnlyContext:只读上下文
// Collector<OUT>:输出
public abstract void processElement(IN1 var1, BroadcastProcessFunction<IN1, IN2, OUT>.ReadOnlyContext var2, Collector<OUT> var3) throws Exception;
// 处理广播流逻辑
public abstract void processBroadcastElement(IN2 var1, BroadcastProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
// 只读的上下文
public abstract class ReadOnlyContext extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.ReadOnlyContext {
public ReadOnlyContext() {
super(BroadcastProcessFunction.this);
}
}
public abstract class Context extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.Context {
public Context() {
super(BroadcastProcessFunction.this);
}
}
}

八、 按键分区的广播连接流处理函数(KeyedBroadcastProcessFunction)

按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流, 是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物。

1.源码

// KS:当调用keyBy时所依赖的Key 的类型
// IN1:第一条流类型
// IN2:广播流类型
// OUT:输出类型
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
private static final long serialVersionUID = -2584726797564976453L;
public KeyedBroadcastProcessFunction() {
}
// 第一条流处理逻辑
public abstract void processElement(IN1 var1, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.ReadOnlyContext var2, Collector<OUT> var3) throws Exception;
// 广播流处理逻辑
public abstract void processBroadcastElement(IN2 var1, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
// 定时器出发后执行的逻辑
public void onTimer(long timestamp, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext ctx, Collector<OUT> out) throws Exception {
}
public abstract class OnTimerContext extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.ReadOnlyContext {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
public abstract KS getCurrentKey();
}
public abstract class ReadOnlyContext extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.ReadOnlyContext {
public ReadOnlyContext() {
super(KeyedBroadcastProcessFunction.this);
}
public abstract TimerService timerService();
public abstract KS getCurrentKey();
}
public abstract class Context extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.Context {
public Context() {
super(KeyedBroadcastProcessFunction.this);
}
public abstract <VS, S extends State> void applyToKeyedState(StateDescriptor<S, VS> var1, KeyedStateFunction<KS, S> var2) throws Exception;
}
}

有关Flink总结之一文彻底搞懂处理函数的更多相关文章

  1. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  2. ruby - 在没有 sass 引擎的情况下使用 sass 颜色函数 - 2

    我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re

  3. ruby-on-rails - 在 ruby​​ 中使用 gsub 函数替换单词 - 2

    我正在尝试用ruby​​中的gsub函数替换字符串中的某些单词,但有时效果很好,在某些情况下会出现此错误?这种格式有什么问题吗NoMethodError(undefinedmethod`gsub!'fornil:NilClass):模型.rbclassTest"replacethisID1",WAY=>"replacethisID2andID3",DELTA=>"replacethisID4"}end另一个模型.rbclassCheck 最佳答案 啊,我找到了!gsub!是一个非常奇怪的方法。首先,它替换了字符串,所以它实际上修改了

  4. ruby - 在 Ruby 中有条件地定义函数 - 2

    我有一些代码在几个不同的位置之一运行:作为具有调试输出的命令行工具,作为不接受任何输出的更大程序的一部分,以及在Rails环境中。有时我需要根据代码的位置对代码进行细微的更改,我意识到以下样式似乎可行:print"Testingnestedfunctionsdefined\n"CLI=trueifCLIdeftest_printprint"CommandLineVersion\n"endelsedeftest_printprint"ReleaseVersion\n"endendtest_print()这导致:TestingnestedfunctionsdefinedCommandLin

  5. ruby - 在 Ruby 中按名称传递函数 - 2

    如何在Ruby中按名称传递函数?(我使用Ruby才几个小时,所以我还在想办法。)nums=[1,2,3,4]#Thisworks,butismoreverbosethanI'dlikenums.eachdo|i|putsiend#InJS,Icouldjustdosomethinglike:#nums.forEach(console.log)#InF#,itwouldbesomethinglike:#List.iternums(printf"%A")#InRuby,IwishIcoulddosomethinglike:nums.eachputs在Ruby中能不能做到类似的简洁?我可以只

  6. C51单片机——实现用独立按键控制LED亮灭(调用函数篇) - 2

    说在前面这部分我本来是合为一篇来写的,因为目的是一样的,都是通过独立按键来控制LED闪灭本质上是起到开关的作用,即调用函数和中断函数。但是写一篇太累了,我还是决定分为两篇写,这篇是调用函数篇。在本篇中你主要看到这些东西!!!1.调用函数的方法(主要讲语法和格式)2.独立按键如何控制LED亮灭3.程序中的一些细节(软件消抖等)1.调用函数的方法思路还是比较清晰地,就是通过按下按键来控制LED闪灭,即每按下一次,LED取反一次。重要的是,把按键与LED联系在一起。我打算用K1来作为开关,看了一下开发板原理图,K1连接的是单片机的P31口,当按下K1时,P31是与GND相连的,也就是说,当我按下去时

  7. SPI接收数据异常问题总结 - 2

    SPI接收数据左移一位问题目录SPI接收数据左移一位问题一、问题描述二、问题分析三、探究原理四、经验总结最近在工作在学习调试SPI的过程中遇到一个问题——接收数据整体向左移了一位(1bit)。SPI数据收发是数据交换,因此接收数据时从第二个字节开始才是有效数据,也就是数据整体向右移一个字节(1byte)。请教前辈之后也没有得到解决,通过在网上查阅前人经验终于解决问题,所以写一个避坑经验总结。实际背景:MCU与一款芯片使用spi通信,MCU作为主机,芯片作为从机。这款芯片采用的是它规定的六线SPI,多了两根线:RDY和INT,这样从机就可以主动请求主机给主机发送数据了。一、问题描述根据从机芯片手

  8. ruby-on-rails - 将字符串转换为 ruby​​-on-rails 中的函数 - 2

    我需要一个通过输入字符串进行计算的方法,像这样function="(a/b)*100"a=25b=50function.something>>50有什么方法吗? 最佳答案 您可以使用instance_eval:function="(a/b)*100"a=25.0b=50instance_evalfunction#=>50.0请注意,使用eval本质上是不安全的,尤其是当您使用外部输入时,因为它可能包含注入(inject)的恶意代码。另请注意,a设置为25.0而不是25,因为如果它是整数a/b将导致0(整数)。

  9. Ruby-vips 图像处理库。有什么好的使用示例吗? - 2

    我对图像处理完全陌生。我对JPEG内部是什么以及它是如何工作一无所知。我想知道,是否可以在某处找到执行以下简单操作的ruby​​代码:打开jpeg文件。遍历每个像素并将其颜色设置为fx绿色。将结果写入另一个文件。我对如何使用ruby​​-vips库实现这一点特别感兴趣https://github.com/ender672/ruby-vips我的目标-学习如何使用ruby​​-vips执行基本的图像处理操作(Gamma校正、亮度、色调……)任何指向比“helloworld”更复杂的工作示例的链接——比如ruby​​-vips的github页面上的链接,我们将不胜感激!如果有ruby​​-

  10. ruby - Faye WebSocket,关闭处理程序被触发后重新连接到套接字 - 2

    我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d

随机推荐