草庐IT

以flink实时流的方式实现OneId

sqhhhhAA111i 2023-03-28 原文

以flink实时流的方式实现OneId

前言

oneid相关概念,及其当前离线实现方式介绍请见以下链接及其系列其他文章:

用户标签(一):图计算实现ID_Mapping、Oneid打通数据孤岛

读完上述链接的相关文章,将理解oneid的需求及其实现方式.

背景

之前公司是做电商数据分析,可以接入多个数据源的数据(美团,饿了么,京东等),在我们系统中,我们将为每个用户统一打上在我们系统内部的user_id,即oneid.当时已经有了基于spark图计算实现的id-mapping来达成oneid.

到这里,我们已有的系统跟前言中链接文章提到的内容相似.

后来,我们有了新需求:实时化改造.实时为每个用户统计相关数据.

但是,实时计算的数据最后要归纳到用户上,那么我们的id-mapping也需要实时出现结果.

思考

本人不具备机器学习领域的知识,在看了几遍前言中的文章后,勉强搞懂了该图计算的原理,本质上是求最大连通图,研究后发现需要拿到全量数据进行迭代,不能改造为实时计算.

后来,基于"最大连通图"的算法,变种出了一种方法:

  1. 输入两个数据关联关系,例如 美团id1-饿了么id1,即输入两个点一个线.
  2. 拿到该关系,分别拿两个数据去与已有的id-mapping结果表对比.如,我们结果id-mapping表中,有关系
    美团id1 - oneid1
    饿了么id1 - oneid2
    
  3. 通过第一步中传入的关系,可以得出 美团id1 与 饿了么id1 在我们系统中应该识别为同一个人,对应同一个oneid,可以得到
    美团id1 - oneid1
    饿了么id1 - oneid1
    
    或者,都对应为另一个oneid
    美团id1 - oneid2
    饿了么id1 - oneid2
    
    即,我们通过传入关联关系,将 美团id1 与 饿了么id1 在我们系统中重新更新为关联到同一个oneid.
  4. 后续,某个用户id是 美团id1,那么它关联这个id-mapping结果表,可以得到它在我们系统中的id是oneid1(或者oneid2,此处根据第三步如何取值)
  5. 根据不同的对比结果,进行相应的替换或者新增,我们变相实现了"最大连通图"的算法,并且这个算法可以用flink实时计算实现

详细步骤

0. id-mapping结果表设计

结果表可以有多个描述字段,但是核心应该是以下两个字段:

原id , 计算出的oneid

1. 输入数据采集

我们在采集数据的时候,需要将数据解析成两两的关系对.如原始数据:

手机号1,美团id1,设备id1

需要将这条消息拆分为:

手机号1 - 美团id1
手机号1 - 设备id1
美团id1 - 设备id1

再将这三组关联关系传给后续对比计算.

2. 对比计算

假设我们得到关系对:

x - y

我们拿到此关系对到结果表中进行对比将有以下几种情况:

  1. x,y都没有对应oneid: 直接对结果表插入计算得出的新oneid(可以使用uuid)
    x - 新oneid
    y - 新oneid
    
  2. x已有对应oneid为 XXoneid,y没有:将y的oneid赋值为 XXoneid,并插入,得到
    x - XXoneid
    y - XXoneid
    
  3. x没有,y有oneid为 YYoneid :同第二种情况,得到
    x - YYoneid
    y - YYoneid
    
  4. x,y都有oneid,且一致,都为 ZZoneid: 不更新
  5. x,y都有oneid,且不一致,分别为 XXoneid,YYoneid :将 x,y更新为同一个oneid(XXoneid或者YYoneid),或者重新生成一个.此处看个人选择.
    并且!!!!!!
    将结果表中所有oneid为 XXoneid,YYoneid的相关数据,oneid都重设为新选择的oneid
    这是为了将相关联的其他数据一起指向新的oneid
    

至此,通过以上几种情况.我们复现了id-mapping中求最大连通图的算法.

实现程序设计

1. 数据源

kafka

2. 实时计算程序

flink

3. 对比中如何取数

redis:将结果表以k-v的形式放在内存中,这样flink可以快速取值并对比计算

4. 结果表存放

hbase:此处可以换为mysql,doris等支持更新的存储即可.并且还有以下原因:

对比计算中,第五种情况,需要从这里取所有oneid为 XXoneid,YYoneid的相关数据
而redis中没法根据value来取得key,所以第五种情况,需要查询此处存储得到相关数据

5. 结果更新

结果不但要更新hbase,还要更新redis中存放的k-v对!!!建议先更新redis,因为比较快.

有关以flink实时流的方式实现OneId的更多相关文章

  1. ruby - 如何以所有可能的方式将字符串拆分为长度最多为 3 的连续子字符串? - 2

    我试图获取一个长度在1到10之间的字符串,并输出将字符串分解为大小为1、2或3的连续子字符串的所有可能方式。例如:输入:123456将整数分割成单个字符,然后继续查找组合。该代码将返回以下所有数组。[1,2,3,4,5,6][12,3,4,5,6][1,23,4,5,6][1,2,34,5,6][1,2,3,45,6][1,2,3,4,56][12,34,5,6][12,3,45,6][12,3,4,56][1,23,45,6][1,2,34,56][1,23,4,56][12,34,56][123,4,5,6][1,234,5,6][1,2,345,6][1,2,3,456][123

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  4. ruby-on-rails - 正确的 Rails 2.1 做事方式 - 2

    question的一些答案关于redirect_to让我想到了其他一些问题。基本上,我正在使用Rails2.1编写博客应用程序。我一直在尝试自己完成大部分工作(因为我对Rails有所了解),但在需要时会引用Internet上的教程和引用资料。我设法让一个简单的博客正常运行,然后我尝试添加评论。靠我自己,我设法让它进入了可以从script/console添加评论的阶段,但我无法让表单正常工作。我遵循的其中一个教程建议在帖子Controller中创建一个“评论”操作,以添加评论。我的问题是:这是“标准”方式吗?我的另一个问题的答案之一似乎暗示应该有一个CommentsController参

  5. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  6. 【鸿蒙应用开发系列】- 获取系统设备信息以及版本API兼容调用方式 - 2

    在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList​()Obt

  7. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  8. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  9. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  10. ruby - Arrays Sets 和 SortedSets 在 Ruby 中是如何实现的 - 2

    通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复

随机推荐