草庐IT

Flink-出租车-车程事件流和付车费事件流connect

北在北方 2023-03-28 原文

案例来源: https://github.com/apache/flink-training/blob/release-1.14/README_zh.md

案例背景

出租车车程(taxi ride)事件结构
1.每次车程都由两个事件表示:行程开始(trip start)和行程结束(trip end)。
2.每个事件都由十一个字段组成:

rideId         : Long      // 每次车程的唯一id
taxiId         : Long      // 每一辆出租车的唯一id
driverId       : Long      // 每一位司机的唯一id
isStart        : Boolean   // 行程开始事件为 TRUE, 行程结束事件为 FALSE
eventTime      : Long      // 事件的时间戳
startLon       : Float     // 车程开始位置的经度
startLat       : Float     // 车程开始位置的维度
endLon         : Float     // 车程结束位置的经度
endLat         : Float     // 车程结束位置的维度
passengerCnt   : Short     // 乘车人数

出租车车费(taxi fare)事件结构
rideId         : Long      // 每次车程的唯一id
taxiId         : Long      // 每一辆出租车的唯一id
driverId       : Long      // 每一位司机的唯一id
startTime      : Long   // 车程开始时间
paymentType    : String    // 现金(CASH)或刷卡(CARD)
tip            : Float     // 小费
tolls          : Float     // 过路费
totalFare      : Float     // 总计车费

案例目标

1.将每次车程的 TaxiRide 和 TaxiFare 记录连接在一起

2.对于每个不同的 rideId,恰好有三个事件:

TaxiRide START 事件
TaxiRide END 事件
一个 TaxiFare 事件(其时间戳恰好与开始时间匹配)

最终的结果应该是 DataStream<RideAndFare>,每个不同的 rideId 都产生一个 RideAndFare 记录。 每个 RideAndFare 都应该将某个 rideId 的 TaxiRide START 事件与其匹配的 TaxiFare 配对。

案例流程

核心代码

  • connect 可以将两个流连接成一个ConnectedStreams, 而且不要求两个流的数据类型一致
       // 从车程事件中过滤中车程开始时间,并按车程标识 rideId 分组
        KeyedStream<TaxiRide, Long> rideStream = env.fromSource(rideSource, WatermarkStrategy.noWatermarks(), "ride source")
                .filter(ride -> ride.getStart()).keyBy(TaxiRide::getRideId);

        // 付车费事件按行程标识 rideId 分组
        KeyedStream<TaxiFare, Long> fareStream = env.fromSource(fareSource, WatermarkStrategy.noWatermarks(), "fare source")
                .keyBy(TaxiFare::getRideId);

        rideStream.connect(fareStream).flatMap(new EnrichmentFunction())
                .uid("enrichment") // uid for this operator's state
                .name("enrichment") // name for this operator in the web UI
                .addSink(new PrintSinkFunction<>());
  • 使用ValueState保存事件状态
public class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {

    private ValueState<TaxiRide> taxiRideValueState;
    private ValueState<TaxiFare> taxiFareValueState;


    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<TaxiRide> taxiRideDescriptor = new ValueStateDescriptor<TaxiRide>("save-ride", TaxiRide.class);
        ValueStateDescriptor<TaxiFare> taxiFareDescriptor = new ValueStateDescriptor<TaxiFare>("save-fare", TaxiFare.class);

        taxiRideValueState = getRuntimeContext().getState(taxiRideDescriptor);
        taxiFareValueState = getRuntimeContext().getState(taxiFareDescriptor);

    }


    /**
     * 当车程事件到来,检查车费的taxiFareValueState是否保存有对应行程付费记录
     * 如果有,则匹配输出,清空状态
     * 如果没有,则将车程事件保存起来
     */
    @Override
    public void flatMap1(TaxiRide taxiRide, Collector<RideAndFare> collector) throws Exception {
        TaxiFare taxiFare = taxiFareValueState.value();
        if (Objects.isNull(taxiFare)) {
            taxiRideValueState.update(taxiRide);
        } else {
            taxiFareValueState.clear();

            RideAndFare rideAndFare = new RideAndFare();
            rideAndFare.setRide(taxiRide);
            rideAndFare.setFare(taxiFare);

            collector.collect(rideAndFare);
        }
    }


    /**
     * 当付费事件到来,检查车程的taxiRideValueState是否保存有对应行程车程记录
     * 如果有,则匹配输出,清空状态
     * 如果没有,则将付费事件保存起来
     */
    @Override
    public void flatMap2(TaxiFare taxiFare, Collector<RideAndFare> collector) throws Exception {
        TaxiRide taxiRide = taxiRideValueState.value();
        if (Objects.isNull(taxiRide)) {
            taxiFareValueState.update(taxiFare);
        } else {
            taxiRideValueState.clear();

            RideAndFare rideAndFare = new RideAndFare();
            rideAndFare.setRide(taxiRide);
            rideAndFare.setFare(taxiFare);

            collector.collect(rideAndFare);

        }
    }
}

  • 车程事件流和付费事件流来自Kafka
       // 定义出租车-车程数据源
        KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
                .setBootstrapServers("192.168.0.192:9092")
                .setTopics("TOPIC_RIDE")
                .setGroupId("TEST_GROUP")
                .setClientIdPrefix("ride") // 避免kafka clientId重复
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new TaxiRideDeserialization())
                .build();

        // 定义出租车-车费数据源
        KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
                .setBootstrapServers("192.168.0.192:9092")
                .setTopics("TOPIC_FARE")
                .setGroupId("TEST_GROUP")
                .setClientIdPrefix("fare") // 避免kafka clientId重复
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new TaxiFareDeserialization())
                .build();

事件格式:

1.车程事件: {"rideId":10086, "taxiId":1, "driverId":2, "isStart":true, "eventTime":1656571391726, "startLon":113.273031, "startLat":23.147103, "endLon":113.268245, "endLat":23.14445, "passengerCnt":1}


2.付费事件: {"rideId":10086, "taxiId":1, "driverId":2, "startTime":1656571391726, "paymentType":"CASH", "tip":0.00, "tolls":10.00, "totalFare":110.00}

完整代码

https://github.com/Mr-LuXiaoHua/study-flink

程序入口: com.example.datastream.rideandfare.RideAndFareJob

有关Flink-出租车-车程事件流和付车费事件流connect的更多相关文章

  1. ruby-on-rails - 事件管理员日期过滤器日期格式自定义 - 2

    是否有简单的方法来更改默认ISO格式(yyyy-mm-dd)的ActiveAdmin日期过滤器显示格式? 最佳答案 您可以像这样为日期选择器提供额外的选项,而不是覆盖js:=f.input:my_date,as::datepicker,datepicker_options:{dateFormat:"mm/dd/yy"} 关于ruby-on-rails-事件管理员日期过滤器日期格式自定义,我们在StackOverflow上找到一个类似的问题: https://s

  2. ruby-on-rails - 事件记录 : Select max of limit - 2

    我正在尝试将以下SQL查询转换为ActiveRecord,它正在融化我的大脑。deletefromtablewhereid有什么想法吗?我想做的是限制表中的行数。所以,我想删除少于最近10个条目的所有内容。编辑:通过结合以下几个答案找到了解决方案。Temperature.where('id这给我留下了最新的10个条目。 最佳答案 从您的SQL来看,您似乎想要从表中删除前10条记录。我相信到目前为止的大多数答案都会如此。这里有两个额外的选择:基于MurifoX的版本:Table.where(:id=>Table.order(:id).

  3. ruby-on-rails - 事件管理员和自定义方法 - 2

    这是我在ActiveAdmin中的自定义页面ActiveAdmin.register_page"Settings"doaction_itemdolink_to('Importprojects','settings/importprojects')endcontentdopara"Text"endcontrollerdodefimportprojectssystem"rakedataspider:import_projects_ninja"para"OK"endendend我想做的是,当我单击“导入项目”按钮时,我想在Controller中执行rake任务。但是我无法访问该方法。可能是什

  4. ruby-on-rails - 在不重新查询数据库的情况下重新排序 Rails 中的事件记录? - 2

    例如,假设我有一个名为Products的模型,并且在ProductsController中,我有以下代码用于product_listView以显示已排序的产品。@products=Product.order(params[:order_by])让我们想象一下,在product_listView中,用户可以使用下拉菜单按价格、评级、重量等进行排序。数据库中的产品不会经常更改。我很难理解的是,每次用户选择新的order_by过滤器时,rails是否必须查询,或者rails是否能够以某种方式缓存事件记录以在服务器端重新排序?有没有一种方法可以编写它,以便在用户排序时rails不会重新查询结果

  5. ruby-on-rails - Ruby 长时间运行的进程对队列事件使用react - 2

    我有一个将某些事件写入队列的Rails3应用。现在我想在服务器上创建一个服务,每x秒轮询一次队列,并按计划执行其他任务。除了创建ruby​​脚本并通过cron作业运行它之外,还有其他稳定的替代方案吗? 最佳答案 尽管启动基于Rails的持久任务是一种选择,但您可能希望查看更有序的系统,例如delayed_job或Starling管理您的工作量。我建议不要在cron中运行某些东西,因为启动整个Rails堆栈的开销可能很大。每隔几秒运行一次它是不切实际的,因为Rails上的启动时间通常为5-15秒,具体取决于您的硬件。不过,每天这样做几

  6. ruby HTTPClient : How to use persistent connections? - 2

    如何通过HTTPClient使用持久HTTP连接?发送HTTP请求时是否只是设置KeepAlive的问题?文档指出支持持久连接,但没有告诉我们如何使用它们。 最佳答案 是availableinNet::HTTP如文档中所写,Net::HTTP.startimmediatelycreatesaconnectiontoanHTTPserverwhichiskeptopenforthedurationoftheblock.Theconnectionwillremainopenformultiplerequestsintheblockift

  7. ruby-on-rails - 使用 Rails 事件记录获取二级模型 - 2

    我有一个帖子属于城市的关系,城市又属于一个州,例如:classPost现在我想找到所有帖子及其所属的城市和州。我编写了以下查询来获取带有城市的帖子,但不知道如何在同一查找器中获取带有城市的相应州:@post=Post.find:all,:include=>[:city]感谢任何帮助。谢谢。 最佳答案 Post.all(:include=>{:city=>:state}) 关于ruby-on-rails-使用Rails事件记录获取二级模型,我们在StackOverflow上找到一个类似的问

  8. ruby - 在没有数据库的情况下伪造一个事件记录模型 - 2

    我觉得我错过了什么。我正在编写一个ruby​​gem,它允许与事件记录进行交互,作为其主要功能的附加功能。在为其编写测试用例时,我需要能够指定虚拟事件记录模型来测试此功能。如果我可以获得一个事件记录模型的实例,它不需要与数据库的任何连接,可以有关系,所有这些东西,但不需要我在数据库中设置表,那就太棒了。我对测试还很陌生,在Rails测试之外我也很陌生,但似乎我应该能够相当轻松地完成类似的事情,但我什么也没找到。谁能告诉我我错过了什么?我看过工厂、制造商、固定装置,所有这些似乎都想达到目标。人们如何在您只需要AR对象进行测试的地方测试gem? 最佳答案

  9. ruby-on-rails - 在事件记录库中添加某些方法的首选方法是什么? - 2

    我想创建一个模块,为从事件记录库继承的类提供一些通用方法。以下是我们可以实现的两种方式。1)moduleCommentabledefself.extended(base)base.class_evaldoincludeInstanceMethodsextendClassMethodsendendmoduleClassMethodsdeftest_commentable_classmethodputs'testclassmethod'endendmoduleInstanceMethodsdeftest_commentable_instance_methodputs'testinstanc

  10. ruby - 使用 watir-webdriver 打开多个线程导致 'Connection refused' 错误 - 2

    我有这个简单的例子:require'watir-webdriver'arr=[]sites=["www.google.com","www.bbc.com","www.cnn.com","www.gmail.com"]sites.eachdo|site|arr每次我运行这个脚本,我都会得到ruby/2.1.0/net/http.rb:879:in`initialize':Connectionrefused-connect(2)for"127.0.0.1"port9517(Errno::ECONNREFUSED)或者其中一个浏览器在至少一个线程上意外关闭。另一方面,如果我在每个循环周期结束

随机推荐