草庐IT

【云原生系列】第五讲:Knative Eventing 下篇

颜淡慕潇 2023-06-07 原文

目录

序言

1.Parallel介绍

1.1 Parallel Spec 

​编辑 

2.Sequence

2.1.Sequence Spec

2.2适用场景

2.3 Broker/Trigger

2.4 代码示例

3.投票


序言

三言两语,不如细心探索

今天整理了一下Eventing 相关知识点

  • Parallel
  • Sequence

希望此文,能帮助读者对Knative Eventing 的这两个知识点有一个初步的了解

文章标记颜色说明:

  • 黄色:重要标题
  • 红色:用来标记结论
  • 绿色:用来标记一级论点
  • 蓝色:用来标记二级论点

1.Parallel介绍

学习Parallel 的时候,看了很多资料,其中觉得比较全面的是:

Flows - Parallel - 《Knative v1.8 Documentation》

Parallel CRD provides a way to easily define a list of branches,

each receiving the same CloudEvent sent to the Parallel ingress channel.

Typically, each branch consists of a filter function guarding the execution of the branch.

Parallel creates Channels and Subscriptions under the hood.

总结下来,就是支持根据不同的过滤条件对事件进行选择处理

1.1 Parallel Spec 

 Parallel 资源定义,典型的 Parallel Spec描述如下:

apiVersion: messaging.knative.dev/v1alpha1
kind: Parallel
metadata:
  name: me-odd-even-parallel
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1alpha1
    kind: InMemoryChannel
  cases:
    - filter:
        uri: "http://me-even-odd-switcher.default.svc.cluster.local/0"
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1alpha1
          kind: Service
          name: me-even-transformer
    - filter:
        uri: "http://me-even-odd-switcher.default.svc.cluster.local/1"
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1alpha1
          kind: Service
          name: me-odd-transformer
  reply:
    apiVersion: serving.knative.dev/v1alpha1
    kind: Service
    name: me-event-display

主要包括3个部分:

  • channelTemplate定义了当前 Parallel 中使用的Channel类型
  • cases :定义了一系列 filter 和 subscriber。对于每个条件分支:

    • filter首先判断 filter, 当返回事件时,调用 subscriber。filter和subscriber要求都是可访问的。
    • subscriber: 执行返回的事件会发生到 reply。如果 reply 为空,则发送到 spec.reply
  • reply :定义了全局响应的目标函数。为可选配置

其架构流程如下图

2.Sequence

主要学习了该文章,感谢作者大大:

Knative Eventing 之 Sequence 介绍

Sequence 是Knative Eventing中提供了一个资源模型,主要是为了支持对一个事件进行分步骤多次处理的场景。简单说,用于事件Pipeline处理。概括下来,有以下2点:

  • 一种资源模型
  • 事件Pipeline处理

2.1.Sequence Spec

Sequence Spec  的demo示例

apiVersion: messaging.knative.dev/v1alpha1
kind: Sequence
metadata:
  name: sequencetest
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1alpha1
    kind: InMemoryChannel
  steps:
    - ref:
        apiVersion: serving.knative.dev/v1alpha1
        kind: Service
        name: sequencetest1
    - ref:
        apiVersion: serving.knative.dev/v1alpha1
        kind: Service
        name: sequencetest2
    - ref:
        apiVersion: serving.knative.dev/v1alpha1
        kind: Service
        name: sequencetest3
  reply:
    kind: Broker
    apiVersion: eventing.knative.dev/v1alpha1
    name: sequencetest4

从定义中可以看出:

Sequence Spec包括3个部分:

  1. channelTemplate:指定了具体使用的Channel
  2. steps: 定义了按照顺序执行的服务,而且每个服务会对应创建Subscription。这里定义了三个步骤。每个步骤执行不同的服务
  3. reply:定义了最后一个step返回结果的响应目标,这一步骤是可选

2.2适用场景

Sequence 在 Knative Eventing 中提供的如下 4 种使用场景:

  1.  Broker/Trigger

  2. 级联 Sequence

  3. 面向事件处理

  4. 直接访问 Service

2.3 Broker/Trigger

该类型是最复杂的一个场景

事件源 cronjobsource 向 Broker 发送事件,通过 Trigger 将这些事件发送到由 多个 Service 调用的 Sequence 中。

Sequence 处理完之后将结果事件发送给 Broker,并最终由另一个 Trigger 发送给 event-display Service 显示事件结果。

如下图所示:

2.4 代码示例

主要分为以下几步骤,分别创建: 

  1. knative Service
  2. Sequence
  3. Broker
  4. 创建事件源
  5. Trigger  指向 Sequence
  6. 结果订阅 Trigger

1.创建knative Service

创建 3 个 Knative Service,用于 Sequence 中服务处理。

--- 第一个service -one
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: one
spec:
  template:
    spec:
      containers:
      - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/probable-summer:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
        env:
        - name: STEP
          value: "1"

--- 第二个service - two
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: two
spec:
  template:
    spec:
      containers:
      - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/probable-summer:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
        env:
        - name: STEP
          value: "2"
--- 第三个
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: three
spec:
  template:
    spec:
      containers:
      - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/probable-summer:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
        env:
        - name: STEP
          value: "3"

2.创建 Sequence

创建 Sequence,依次顺序执行 one->two->three 这 3 个服务。

将最终处理的结果发送到 broker-test 中。

apiVersion: messaging.knative.dev/v1alpha1
kind: Sequence
metadata:
  name: sequence
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1alpha1
    kind: InMemoryChannel
  steps:
    - ref:
        apiVersion: serving.knative.dev/v1alpha1
        kind: Service
        name: one
    - ref:
        apiVersion: serving.knative.dev/v1alpha1
        kind: Service
        name: two
    - ref:
        apiVersion: serving.knative.dev/v1alpha1
        kind: Service
        name: three
  reply:
    kind: Broker
    apiVersion: eventing.knative.dev/v1alpha1
    name: broker-test

3.创建broker

创建默认的broker

kubectl label namespace default knative-eventing-injection=enabled

4.创建事件源

创建事件源指向 Broker

创建 CronjobSource,它将每隔 1 分钟发送一条 {"message": "Hello Knative Sequence!"} 消息到 broker-test 中。

apiVersion: sources.eventing.knative.dev/v1alpha1
kind: CronJobSource
metadata:
  name: cronjob-source
spec:
  schedule: "*/1 * * * *"
  data: '{"message": "Hello Knative Sequence!"}'
  sink:
    apiVersion: eventing.knative.dev/v1alpha1
    kind: Broker
    name: broker-test

5.Trigger  指向 Sequence 

创建订阅事件类型为 dev.knative.cronjob.event 的 Trigger, 用于 Sequence 进行消费处理。

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: sequence-trigger
spec:
  filter:
    sourceAndType:
      type: dev.knative.cronjob.event
  subscriber:
    ref:
      apiVersion: messaging.knative.dev/v1alpha1
      kind: Sequence
      name: sequence

6.结果订阅 Trigger

创建订阅 samples.http.mod3 的事件类型 Trigger,将 Sequence 执行的结果发送给 event-display Service 进行显示。

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: display-trigger
spec:
  filter:
    sourceAndType:
      type: samples.http.mod3
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1alpha1
      kind: Service
      name: event-display
---
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: event-display
spec:
  template:
    spec:
      containers:
        - image: registry.cn-hangzhou.aliyuncs.com/knative-release/event_display:bf45b3eb1e7fc4cb63d6a5a6416cf696295484a7662e0cf9ccdf5c080542c21d
---

参考:

Flows - Parallel - 《Knative v1.8 Documentation》

Knative Sequence 介绍

3.投票

有关【云原生系列】第五讲:Knative Eventing 下篇的更多相关文章

  1. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  2. ruby-on-rails - 使用一系列等级计算字母等级 - 2

    这里是Ruby新手。完成一些练习后碰壁了。练习:计算一系列成绩的字母等级创建一个方法get_grade来接受测试分数数组。数组中的每个分数应介于0和100之间,其中100是最大分数。计算平均分并将字母等级作为字符串返回,即“A”、“B”、“C”、“D”、“E”或“F”。我一直返回错误:avg.rb:1:syntaxerror,unexpectedtLBRACK,expecting')'defget_grade([100,90,80])^avg.rb:1:syntaxerror,unexpected')',expecting$end这是我目前所拥有的。我想坚持使用下面的方法或.join,

  3. 【鸿蒙应用开发系列】- 获取系统设备信息以及版本API兼容调用方式 - 2

    在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList​()Obt

  4. 阿里云RDS——产品系列概述 - 2

    基础版云数据库RDS的产品系列包括基础版、高可用版、集群版、三节点企业版,本文介绍基础版实例的相关信息。RDS基础版实例也称为单机版实例,只有单个数据库节点,计算与存储分离,性价比超高。说明RDS基础版实例只有一个数据库节点,没有备节点作为热备份,因此当该节点意外宕机或者执行重启实例、变更配置、版本升级等任务时,会出现较长时间的不可用。如果业务对数据库的可用性要求较高,不建议使用基础版实例,可选择其他系列(如高可用版),部分基础版实例也支持升级为高可用版。基础版与高可用版的对比拓扑图如下所示。优势 性能由于不提供备节点,主节点不会因为实时的数据库复制而产生额外的性能开销,因此基础版的性能相对于

  5. ruby - 从结束值创建一系列字符串 - 2

    我使用irb。下面是我写的代码。“斧头”..“bc”我期待"ax""ay""az""ba"bb""bc"但结果只是“斧头”..“bc”我该如何纠正?谢谢。 最佳答案 >puts("ax".."bc").to_aaxayazbabbbc 关于ruby-从结束值创建一系列字符串,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/7617092/

  6. 【云原生】SpringCloud-Spring Boot Starter使用测试 - 2

    目录SpringBootStarter是什么?以前传统的做法使用SpringBootStarter之后starter的理念:starter的实现: 创建SpringBootStarter步骤在idea新建一个starter项目、直接执行下一步即可生成项目。 在xml中加入如下配置文件:创建proterties类来保存配置信息创建业务类:创建AutoConfiguration测试如下:SpringBootStarter是什么? SpringBootStarter是在SpringBoot组件中被提出来的一种概念、简化了很多烦琐的配置、通过引入各种SpringBootStarter包可以快速搭建出一

  7. ruby-on-rails - 用一系列时间增量填充选择,加上其他选项 - 2

    使用RubyonRails,我使用给定的增量(例如每30分钟)用时间填充“选择”。目前我正在YAML文件中写出所有的可能性,但我觉得有一种更巧妙的方法。我想我想提供一个开始时间、一个结束时间、一个增量,并且目前只提供一个名为“关闭”的选项(想想“business_hours”)。所以,我的选择可能会显示:'Closed'5:00am5:30am6:00am...[allthewayto]...11:30pm谁能想出更好的方法,或者只是将它们全部“拼写”出来的最佳方法? 最佳答案 此答案基于@emh的答案。defcreate_hour

  8. Spring Security 6.0系列【32】授权服务器篇之默认过滤器 - 2

    有道无术,术尚可求,有术无道,止于术。本系列SpringBoot版本3.0.4本系列SpringSecurity版本6.0.2本系列SpringAuthorizationServer版本1.0.2源码地址:https://gitee.com/pearl-organization/study-spring-security-demo文章目录前言1.OAuth2AuthorizationServerMetadataEndpointFilter2.OAuth2AuthorizationEndpointFilter3.OidcProviderConfigurationEndpointFilter4.N

  9. ruby - 如何让 Ruby 找到原生库? - 2

    我在/usr/local/lib中安装了一些本地库。我现在正在尝试安装一个需要这些的gem,以便正确构建,但是gem构建失败,因为它找不到图书馆。gem的extconf.rb文件试图确认它可以找到库have_library()但由于某种原因失败了。我尝试设置一堆环境变量,但似乎没有任何效果:irb(main):003:0>require'mkmf'=>trueirb(main):004:0>have_library('gecodesearch')checkingformain()in-lgecodesearch...no=>falseirb(main):005:0>ENV['LD_LI

  10. ruby-on-rails - 使用 geokit 或其他 ruby​​ gem 计算一系列地理坐标的中心 - 2

    我使用geokit和geokit-railsgemforrails有一段时间了,但我还没有找到答案的一个问题是如何找到一组点的计算聚合中心。我知道如何计算两点之间的距离,但不会超过2。我的理由是,我在同一个城市中有一系列的点……一切都完美的城市会有一个我可以使用的中心,但有些城市,比如柏林没有一个完美的中心。他们有多个中心,我只想使用我数据库中的所有地点列表来计算特定分布的中心。还有其他人遇到过这个问题吗?有什么建议吗?谢谢 最佳答案 之前从未使用过Geokit,这个操作背后的数学原理相对容易自己实现。假设这些点由纬度和经度组成,您

随机推荐