草庐IT

给技术新人的ODPS优化建议

轻风博客 2023-04-27 原文

数据开发基本都是从陌生到熟悉,但是写多了就会发现各种好用的工具/函数,也会发现各种坑,本文分享了作者从拿到数据到数据开发到数据监控的一些实操经验。

写在前面

本文档是组内的一份算法ODPS离线开发分享,仅列出了这些年积累下来的一些重要经验和结论,特别是在算法日常数据处理和调度中的技巧和配置方法,至于具体为什么,建议大家去阿里云官网查看底层map reduce以及data flow的工作原理,会有更加深刻的体会:)
参考文档:https://help.aliyun.com/document_detail/89993.html

各种join的用法篇

输入数据:zhule_a, zhule_b两张测试表,具体schema和数据如下(后续为了说明重复数据带来的问题,会在a和b表中各重复插入一条key=2,ds='20220930'的数据):
read zhule_a;
key
ds
1
20220930
2
20220930
1
20221001
2
20221001
3
20221001

read zhule_b;

key
ds
2
20220930
3
20220930
1
20221001
2
20221001
3
20221001
4
20221001
5
20221001

 

Join/Inner Join
用法:Returns the rows that have matching column values in both the left table and the right table based on the join condition。一句话:找出两个表中共同的部分,注意笛卡尔积下面的性能优化
1、每张表先选出来subset,然后再join。

-- better way to perform join, select small range of data first.
SELECT A.*, B.*
FROM
(SELECT * FROM A WHERE ds='20180101') A
JOIN
(SELECT * FROM B WHERE ds='20180101') B
ON a.key = b.key;
注意:在进行各种jion操作前,一定要自查左右表是否有重复数据,否则最终重复的结果会以笛卡尔积的数量增长,比如左右表各有两条重复数据,那么join后重复数据会多达4条!
2、最好的情况下是大表join小表,然后利用mapjoin来实现。
官方解释:In the map stage, MAPJOIN loads all data in the specified tables into the memory of the program that performs the JOIN operation. The tables specified for MAPJOIN must be small tables, and the total memory occupied by the table data cannot exceed 512 MB.
Limits on JOIN operations in MAPJOIN:
  • The left table in a LEFT OUTER JOIN operation must be a large table.

  • The right table in a RIGHT OUTER JOIN operation must be a large table.

  • MAPJOIN cannot be used in a FULL OUTER JOIN operation.

  • The left or right table in an INNER JOIN operation can be a large table.


SELECT  /*+ MAPJOIN(b) */
        a.*
FROM    test_a a
JOIN test_b b
ON      a.user_key = b.user_key
;
//就是在sql语句前加一个标记说这是mapjoin,把小表别名写在括号里
一个有趣的点:
当我们用mapjoin时,除了正常的等式,mapjoin还支持不等式,如下面的例子:

 

Left Join

用法:A LEFT JOIN operation first takes the Cartesian product of the rows in Table A and Table B and returns all the rows of Table A and rows in Table B that meet the join condition. If the join condition finds no matching rows in Table B for a row in Table A, the row in Table A is returned in the result set with NULL values in each column from Table B.一句话:输出左表的所有记录,以及右表中符合关联条件的数据。右表中不符合关联条件的行,输出NULL。
  1. 一定要保留左表的内容是,可以选择用left join,例如加入key_attrs

  2. Right Join和Left Join没有本质区别,建议定义好左表后都利用Left Join来执行

  3. 如果右表有重复数据的情况,那么最终left join的结果会有重复

 

Left Semi Join

用法:A LEFT SEMI JOIN operation returns only the rows in Table A that have a matching row in Table B. 对于左表中的一条数据,如果右表存在符合关联条件的行,则输出左表,否则不输出
  1. 当右表没有重复数据时,和Join是一致的,只会保留相同的列下来

  2. left semi join并不会返回右表B中的任何数据,所以你没法在where条件中指定关于右表B的任何筛选条件,下面得例子能够有更加清晰的对比(例子引用于开源论坛):

employee (2 columns - e_id and e_name)
10, Tom
20, Jerry 
30, Emily

employee_department_mapping (2 columns - e_id and d_name)
10, IT
10, Sales
20, HR

-- inner join results
SELECT e.e_id, e.e_name, d.d_name FROM 
employee e INNER JOIN employee_department_mapping d
on e.e_id = d.e_id
-- results
10, Tom, IT
10, Tom, Sales
20, Jerry, HR

-- left semi join results
SELECT e.e_id, e.e_name, d.d_name FROM 
employee e LEFT SEMI JOIN employee_department_mapping d
on e.e_id = d.e_id
-- results
10, Tom, IT
20, Jerry, HR

 

Left Anti Join
用法:A LEFT ANTI JOIN operation returns only the rows in Table A that have no matching rows in Table B.一句话:对于左表中的一条数据,如果右表中不存在符合关联条件的数据,则输出左表。
  1. 最好用的场景就是找出两表的差异部分;

  2. 算法日常调度时可以用于每日新增修改商品的提取,将关键字段放到ON条件中就行

 

Full Join

用法:A FULL JOIN operation takes the Cartesian product of the rows in Table A and Table B and returns all the rows in Table A and Table B, whether the join condition is met or not. In the result set, NULL values are returned in the columns from the table that lacks a matching row in the other table.一句话:输出左表和右表的所有记录,对于不符合关联条件的数据,未关联的另一侧输出NULL
  1. 在有增删改情况下更新下游最新数据时,非常好用,但是知道的人比较少
举个栗子,其中today_feat是今天新计算的feature table,yest_feat是上一个分区的feature:

SELECT  COALESCE(a.main_image_url,b.main_image_url) AS main_image_url
        ,COALESCE(a.embedding,b.embedding) AS embedding
FROM    today_feat a
FULL JOIN yest_feat b
ON      a.main_image_url = b.main_image_url

其中full jion的效果如下,正好满足new,old,updated feature的更新,配合COALESCE很丝滑:

合理设置Mapper和Reducepriority

 

set odps.instance.priority

目前ODPS更新后只能在开发dev空间生效,通过设置优先级能够一定程度提升排队任务的执行优先级,但是目前线上正式环境不会生效了,建议大家优化好自己健康分,同时对于重要的线上调度任务设置好基线,保证产出的时效。

 

set odps.sql.mapper.split.size

官方指导:Changes the input data amount of each Map worker, which is the split size of the input file. You can use this property to indirectly control the number of workers at each Map stage (default value: 256, unit: MB)。一句话:如果小文件很多,可以调大split.size的数值,这样可以保证在有限资源下更容易申请到Mapper,提升执行的效率。如果资源丰富,想要更多Mapper资源,那就调小split.size的数值,可以申请到更多的Mapper,提升执行效率。酌情处理哟~
举个栗子: 

-- original sql
CREATE TABLE if not EXISTS tmp_zhl_test LIFECYCLE 1 AS 
SELECT sig, venture, seller_id, COUNT(product_id) as cnt 
FROM sku_main_image_sig
WHERE LENGTH(sig) > 10 --some bad cases may have weird sigs like '#NEXT#'
GROUP BY sig, venture, seller_id
HAVING cnt>2
;
如果是默认设置,553 mappers 和 184 reducers 被分配,大约耗时 3m18s:
在资源充沛的情况下,我们设置odps.sql.mapper.split.size=64, 可以申请到更多的Mapper去处理文件的分片,同时更多的reducer也可以被分配到,同样的SQL代码执行时间降为: 2m34s. 
同样的,如果你的数据量超大,但是每条数据本身很小,同时空间资源也有限(毕竟现在资源管控比较严格),与其等待9999个Mapper被分配,你可以尝试设置odps.sql.mapper.split.size=2048(甚至更大)去减少需要分配的Mapper数量,让任务能够快速执行:)

 

set odps.sql.reducer.instances

显示设置reducer的数量(默认值从0到4000),不设置的话会根据任务动态分配,设置后就会分配设置数量的reducer。同样是上面的例子,当我们设置odps.sql.reducer.instances=1000, 耗时变为2m

 

set odps.sql.mapper(reducer).memory

设置每个Map/Reducer worker的内存(默认值是1024,可以设置为256到12288之间的值)一般我们不需要特别设置这个值,但是当任务报错并说「data exceeds the memory」时,可以根据个人情况来设置这个选项。

在Python UDF中使用第三方库

在这部分主要和大家分享下如何在ODPS的python udf安装需要的第三方库(如numpy,opencv等),以及如果有不同依赖库之间的版本不兼容问题时的有效解决方法。

 

Upload&Call Package

  • 需要下载第三方库的安装包xxx.whl,可以直接下载到自己的电脑上面,这样可以在离线环境验证多个版本的一致性(下面介绍)。一般来说我们需要去看安装包需要的python版本号以及兼容机器环境,一般来说都是cp37-cp37m or py2.py3-none-any在中间,然后末尾是x86_64的安装包;

  • 本地直接将xxx.whl转换为xxx.zip,利用命令「mv xxx.whl xxx.zip」就行

  • 将zip资源文件上传到ODPS对应的环境

  • 在你的UDF中,利用下面的代码指定资源包的路径和引用(直接copy就行)

def include_package_path(res_name, lib_name):    
    archive_files = get_cache_archive(res_name)
    dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
                        if '.dist_info' not in f.name], key=lambda v: len(v))
    for dir_name in dir_names:
        if dir_name.endswith(lib_name):
            sys.path.insert(0, os.path.dirname(dir_name))
            break
        elif os.path.exists(os.path.join(dir_name, lib_name + '.py')):
            sys.path.insert(0, os.path.abspath(dir_name))
            break

class PostProcess(BaseUDTF):
    def __init__(self):
        include_package_path('opencv_python-3.4.0.zip','cv2')
        include_package_path('numpy-1.16.6.zip','numpy')
  • python UDF写完后,就可以在创建函数里面的Resources里直接将你的资源名写进去,这样在流程启动后,你的资源才会被有效调用起来。

  • python UDF默认的版本是2.x的,如果你的python版本是3.x,那么需要在ODPS运行前加入下面的指令;同时,部分功能是需要打开沙箱的,所以如果报错的话,可以加入第二行的沙箱命令。
set odps.sql.python.version = cp37; --use python 3.7, default is 2.x version
set odps.isolation.session.enable = true; 

 

Solve Compatibility Issue

有时候在使用多个库时,可能会出现不同版本之间的冲突问题(比如在使用opencv库的时候,如果对应的numpy版本不兼容的话,就会报错)。所以在上传多个库的资源包前,需要先确认版本间的兼容性。一般非常不建议大家用不同版本去试,而应该先在本地确定版本后再上传。可行的步骤如下:
  • 在本地可以用类似conda的工具搭建一个虚拟环境

  • 在本地用pip或者conda install来安装你需要的三方库

  • 查询你下载的三方库以及依赖库的版本,比如python-opencv的话可以打印cv2.__version__

  • 把对应版本的xxx.whl包按照上面的方法现在下来并且上传到ODPS资源中进行依赖

发布任务时的一些额外建议

  • 发布任务配置时,可以灵活使用exclude和extra来去掉或添加你想要的依赖。其中exclude可以去掉你中间产出的临时表,而extra可以帮你增加虽然不在代码里但是希望依赖的上游表(这在汇总不同上游表数据写入下游对应分区并且希望同时产出下游数据时很有用)。
--exclude input or output tables (especially those tmp tables)--@exclude_input=lsiqg_iqc_sku_product_detection_result--@exclude_output=lsmp_sku_image_url_bizdate
-- include input or output tables (especially those separate venture tables)--@extra_input=lsiqg_iqc_sku_product_detection_result--@extra_output=lsmp_sku_image_url_bizdate
  • 如果在SQL代码过程中你需要使用临时表来过渡中间产出的数据(避免SQL嵌套过于严重,影响运行效率),建议一定在临时表中加入一个时间戳,ex. lsiqg_iqc_input_tmp_${bizdate}不然在补数据或者遇到任务堵塞两个任务同时在调度时,或产生overwrite的一系列问题。
  • 如果存在上游表存在多个分区,但是每个分区处理逻辑一样的话(比如不同国家的分区表处理逻辑其实一样),非常建议在第一步里就把不同分区表的数据汇总起来,可以重新增加一个分区(如venture)来存放融合后的数据。如下示例:

INSERT OVERWRITE TABLE sku_main_image_sig PARTITION (ds = '${bizdate}',venture)
SELECT  id
        ,image_url
        ,venture
FROM    (
            SELECT  id
                    ,image_url
                    ,'ID' AS venture
            FROM    auction_image_id
            WHERE   ds = MAX_PT('auction_image_id')
            UNION
            SELECT  id
                    ,image_url
                    ,'PH' AS venture
            FROM    auction_image_ph
            WHERE   ds = MAX_PT('auction_image_ph')
            UNION
            SELECT  id
                    ,image_url
                    ,'VN' AS venture
            FROM    auction_image_vn
            WHERE   ds = MAX_PT('auction_image_vn')
            UNION
            SELECT  id
                    ,image_url
                    ,'SG' AS venture
            FROM    auction_image_sg
            WHERE   ds = MAX_PT('auction_image_sg')
            UNION
            SELECT  id
                    ,image_url
                    ,'MY' AS venture
            FROM    auction_image_my
            WHERE   ds = MAX_PT('auction_image_my')
            UNION
            SELECT  id
                    ,image_url
                    ,'TH' AS venture
            FROM    auction_image_th
            WHERE   ds = MAX_PT('auction_image_th')
        ) union_table
;
  • 对于重要的数据表,一定要设置监控,防止数据丢失、不正常产出等问题,具体的方法又可以分两类:

    • 设置任务基线(baseline)来保证任务优先级,这样调度的时间更有保障

    • 设置warning的短信/电话或者DQC的监控规则来具体监控数据

简单的任务可以直接在任务中心查看详情中设置:

对于更加细致的数据层面监控可以通过DQC平台进行配置,比如无数据产出,数据波动,数据最大/最小值监控等。

写在最后

数据开发基本都是从陌生到熟悉,但是写多了就会发现各种好用的工具/函数,也会发现各种坑,个人拿到数据到数据开发到数据监控的一些经验是:
  1. 拿到数据第一时间验证数据的重复性,有效性;如果是组内问题就反馈,上游链路问题就自己过滤;

  2. 写完数据的每一部分都先验证合理性,这样会提高最终数据的成功率;

  3. 一般节点上线后,会主动去观察3-4天,确保输出是符合预期的(如果会发现应该稳定的数据反而猛然增加or减少,那很可能是数据逻辑有问题);

  4. 定义合理的数据监控,可以避免数据为空,数据波动过大,数据字段不合理等问题;

Enjoy Data Engineering!!

 

作者|周慧玲(逐乐)

有关给技术新人的ODPS优化建议的更多相关文章

  1. Unity 热更新技术 | (三) Lua语言基本介绍及下载安装 - 2

    ?博客主页:https://xiaoy.blog.csdn.net?本文由呆呆敲代码的小Y原创,首发于CSDN??学习专栏推荐:Unity系统学习专栏?游戏制作专栏推荐:游戏制作?Unity实战100例专栏推荐:Unity实战100例教程?欢迎点赞?收藏⭐留言?如有错误敬请指正!?未来很长,值得我们全力奔赴更美好的生活✨------------------❤️分割线❤️-------------------------

  2. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  3. HBase Region 简介和建议数量&大小 - 2

    Region是HBase数据管理的基本单位,region有一点像关系型数据的分区。region中存储这用户的真实数据,而为了管理这些数据,HBase使用了RegionSever来管理region。Region的结构hbaseregion的大小设置默认情况下,每个Table起初只有一个Region,随着数据的不断写入,Region会自动进行拆分。刚拆分时,两个子Region都位于当前的RegionServer,但处于负载均衡的考虑,HMaster有可能会将某个Region转移给其他的RegionServer。RegionSplit时机:当1个region中的某个Store下所有StoreFile

  4. ruby-on-rails - 我需要从 HTML 转到 markdown,有什么建议吗? - 2

    我正在使用Maruku,将Markdown(超集)转换为HTML,你知道我该怎么做才能从HTML转换为Markdown吗? 最佳答案 Google发现了一个名为reverse_markdown的ruby​​脚本.它似乎可以满足您的需求。 关于ruby-on-rails-我需要从HTML转到markdown,有什么建议吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/175162

  5. ruby-on-rails - 用于门户的 Ruby 技术 - 2

    我刚刚看到whitehouse.gov正在使用drupal作为CMS和门户技术。drupal的优点之一似乎是很容易添加插件,而且编程最少,即重新发明轮子最少。这实际上正是Ruby-on-Rails的DRY理念。所以:drupal的缺点是什么?Rails或其他基于Ruby的技术有哪些不符合whitehouse.org(或其他CMS门户)门户技术的资格? 最佳答案 Whatarethedrawbacksofdrupal?对于Ruby和Rails,这确实是一个相当主观的问题。Drupal是一个可靠的内容管理选项,非常适合面向社区的站点。它

  6. iNFTnews | 周杰伦18年前未发布的作品Demo,藏在了区块链技术里 - 2

    当音乐碰上区块链技术,会擦出怎样的火花?或许周杰伦已经给了我们答案。8月29日下午,B站独家首发周杰伦限定珍藏Demo独家访谈VCR,周杰伦在VCR里分享了《晴天》《青花瓷》《搁浅》《爱在西元前》四首经典歌曲Demo背后的创作故事,并首次公布18年前未发布的神秘作品《纽约地铁》的Demo。在VCR中,方文山和杰威尔音乐提及到“多亏了区块链技术,现在我们可以将这些Demos,变成独一无二具有收藏价值的艺术品,这些Demos可以在薄盒(国内数藏平台)上听到。”如何将音乐与区块链技术相结合,薄盒方面称:“薄盒作为区块链技术服务方,打破传统对于区块链技术只能作为数字收藏的理解。聚焦于区块链技术赋能,在

  7. Ruby 缺少常量表达式优化? - 2

    我希望Ruby的解析器会进行这种微不足道的优化,但似乎并没有(谈到YARV实现,Ruby1.9.x、2.0.0):require'benchmark'deffib1a,b=0,1whileb由于这两种方法除了在第二种方法中使用预定义常量而不是常量表达式外是相同的,因此Ruby解释器似乎在每个循环中一次又一次地计算幂常数。是否有一些Material说明为什么Ruby根本不进行这种基本优化或只在某些特定情况下进行? 最佳答案 很抱歉给出了另一个答案,但我不想删除或编辑我之前的答案,因为它下面有有趣的讨论。正如JörgWMittag所说,

  8. ruby-on-rails - 优化读取数据库和写入csv文件 - 2

    我正在尝试从数据库中读取大量单元格(超过100.000个)并将它们写入VPSUbuntu服务器上的csv文件。碰巧服务器没有足够的内存。我正在考虑一次读取5000行并将它们写入文件,然后再读取5000行,等等。我应该如何重构我当前的代码以使内存不会被完全消耗?这是我的代码:defwrite_rows(emails)File.open(file_path,"w+")do|f|f该函数由sidekiqworker调用:write_rows(user.emails)感谢您的帮助! 最佳答案 这里的问题是,当您调用emails.each时,

  9. ruby-on-rails - 仍然建议使用 Minitest 在 Rails 4 中测试路由吗? - 2

    在Rails3中,当在MiniTest中编写功能测试时,我养成了将路由测试与Controller操作分开测试的习惯。我从RailsGuideonTesting-Section9:TestingRoutes得到了这个想法.然而,在将我的应用程序升级到Rails4之后,我注意到如果我不为get|patch|post|delete方法提供一组适当的参数。例如,给定路线:#config/routes.rbnamespace"api"donamespace"v2",defaults:{format::json}doresources:usersdoresources:postsdoresourc

  10. ruby - 使用哪种群发消息技术? - 2

    我感到有点困惑——大约24小时以来,我一直在考虑在我的项目中使用哪种组播技术。基本上,我需要的是:创建组(通过一些后端进程)任意客户端广播消息(1:N,N:N)(可能)直接消息(1:1)(重要)使用我自己的后端(例如,通过某种HTTPAPI)对客户端进行身份验证/授权能够通过后端进程(或服务器插件)踢出特定的客户端这是我要的:Ruby或Haxe中的后端相关流程JS+Haxe(Flash9)中的前端—在浏览器中,因此理想情况下通过80/443进行通信,但不一定。因此,这项技术必须能够在HaxeforFlash中轻松访问,最好是Ruby。我一直在考虑:RabbitMQ(或OpenAMQ)、

随机推荐