作者:vivo互联网服务器团队-Hao Guangshi
在以上六种可以用户自定义的地方,我们选择了【Check Analysis Rules】。因为该检查规则在方法调用的时候是不需要有返回值的,也就意味着不需要对当前遍历的逻辑计划树进行修改,这正是我们需要的。而【Analyzer Rules】、【Optimizer Rules】则需要对当前的逻辑计划进行修改,使得我们难以迭代整个树,难以得到我们想要的结果。
- 【Analyzer Rules】逻辑计划分析规则
- 【Check Analysis Rules】逻辑计划检查规则
- 【Optimizer Rules.】 逻辑计划优化规则
- 【Planning Strategies】形成物理计划的策略
- 【Customized Parser】自定义的sql解析器
- 【(External) Catalog listeners catalog】监听器
class ExtralSparkExtension extends (SparkSessionExtensions => Unit) {
override def apply(spark: SparkSessionExtensions): Unit = {
//字段血缘
spark.injectCheckRule(FieldLineageCheckRuleV3)
//sql解析器
spark.injectParser { case (_, parser) => new ExtraSparkParser(parser) }
}
}class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging{
override def parsePlan(sqlText: String): LogicalPlan = {
val lineAgeEnabled = SparkSession.getActiveSession
.get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
logDebug(s"SqlText: $sqlText")
if(sqlText.toLowerCase().contains("insert")){
if(lineAgeEnabled){
if(FIELD_LINE_AGE_SQL_COULD_SET.get()){
//线程本地变量在这里
FIELD_LINE_AGE_SQL.set(sqlText)
}
FIELD_LINE_AGE_SQL_COULD_SET.remove()
}
}
delegate.parsePlan(sqlText)
}
//调用原始的sqlparser
override def parseExpression(sqlText: String): Expression = {
delegate.parseExpression(sqlText)
}
//调用原始的sqlparser
override def parseTableIdentifier(sqlText: String): TableIdentifier = {
delegate.parseTableIdentifier(sqlText)
}
//调用原始的sqlparser
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
delegate.parseFunctionIdentifier(sqlText)
}
//调用原始的sqlparser
override def parseTableSchema(sqlText: String): StructType = {
delegate.parseTableSchema(sqlText)
}
//调用原始的sqlparser
override def parseDataType(sqlText: String): DataType = {
delegate.parseDataType(sqlText)
}
}case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {
val executor: ThreadPoolExecutor =
ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)
override def apply(plan: LogicalPlan): Unit = {
val sql = FIELD_LINE_AGE_SQL.get
FIELD_LINE_AGE_SQL.remove()
if(sql != null){
//这里我们拿到sql然后启动一个线程做剩余的解析任务
val task = new FieldLineageRunnableV3(sparkSession,sql)
executor.execute(task)
}
}
}override def run(): Unit = {
val parser = sparkSession.sessionState.sqlParser
val analyzer = sparkSession.sessionState.analyzer
val optimizer = sparkSession.sessionState.optimizer
val planner = sparkSession.sessionState.planner
............
val newPlan = parser.parsePlan(sql)
PASS_TABLE_AUTH.set(true)
val analyzedPlan = analyzer.executeAndCheck(newPlan)
val optimizerPlan = optimizer.execute(analyzedPlan)
//得到sparkPlan
val sparkPlan = planner.plan(optimizerPlan).next()
...............
if(targetTable != null){
val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
//projection
projectionLineAge(levelProject, sparkPlan.child)
//predication
predicationLineAge(predicates, sparkPlan.child)
...............
经过SqlParser后会得到逻辑计划,此时表名、函数等都没有解析,还不能执行;经过Analyzer会分析一些绑定信息,例如表验证、字段信息、函数信息;经过Optimizer 后逻辑计划会根据既定规则被优化,这里的规则是RBO,当然 Spark 还支持CBO的优化;经过SparkPlanner后就成了可执行的物理计划。我们看一个逻辑计划与物理计划对比的例子:一个 SQL 语句:select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3
物理计划是这样的:
显然简化了很多。得到 SparkPlan 后,我们就可以根据不同的SparkPlan节点做迭代处理。我们将字段血缘分为两种类型:projection(select查询字段)、predication(wehre查询条件)。这两种是一种点对点的关系,即从原始表的字段生成目标表的字段的对应关系。想象一个查询是一棵树,那么迭代关系会如下从树的顶端开始迭代,直到树的叶子节点,叶子节点即为原始表:
那么我们迭代查询的结果应该为id ->tab1.id , name->tab1.name,tabb2.name, age→tabb2.age。注意到有该变量 val levelProject = new ArrayBuffer ArrayBuffer[NameExpressionHolder],通过projecti-onLineAge 迭代后 levelProject 存储了顶层id,name,age对应的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。当然也不是简单的递归迭代,还需要考虑特殊情况例如:Join、ExplandExec、Aggregate、Explode、GenerateExec等都需要特殊考虑。例子及效果:SQL:
with A as (select id,name,age from tab1 where id > 100 ) ,
C as (select id,name,max(age) from A group by A.id,A.name) ,
B as (select id,name,age from tabb2 where age > 28)
insert into tab3
select C.id,concat(C.name,B.name) as name, B.age from
B,C where C.id = B.id{
"edges": [
{
"sources": [
3
],
"targets": [
0
],
"expression": "id",
"edgeType": "PROJECTION"
},
{
"sources": [
4,
7
],
"targets": [
1
],
"expression": "name",
"edgeType": "PROJECTION"
},
{
"sources": [
5
],
"targets": [
2
],
"expression": "age",
"edgeType": "PROJECTION"
},
{
"sources": [
6,
3
],
"targets": [
0,
1,
2
],
"expression": "INNER",
"edgeType": "PREDICATE"
},
{
"sources": [
6,
5
],
"targets": [
0,
1,
2
],
"expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",
"edgeType": "PREDICATE"
},
{
"sources": [
3
],
"targets": [
0,
1,
2
],
"expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",
"edgeType": "PREDICATE"
}
],
"vertices": [
{
"id": 0,
"vertexType": "COLUMN",
"vertexId": "default.tab3.id"
},
{
"id": 1,
"vertexType": "COLUMN",
"vertexId": "default.tab3.name"
},
{
"id": 2,
"vertexType": "COLUMN",
"vertexId": "default.tab3.age"
},
{
"id": 3,
"vertexType": "COLUMN",
"vertexId": "default.tab1.id"
},
{
"id": 4,
"vertexType": "COLUMN",
"vertexId": "default.tab1.name"
},
{
"id": 5,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.age"
},
{
"id": 6,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.id"
},
{
"id": 7,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.name"
}
]
}很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
我有一个表单,其中有很多字段取自数组(而不是模型或对象)。我如何验证这些字段的存在?solve_problem_pathdo|f|%>... 最佳答案 创建一个简单的类来包装请求参数并使用ActiveModel::Validations。#definedsomewhere,atthesimplest:require'ostruct'classSolvetrue#youcouldevencheckthesolutionwithavalidatorvalidatedoerrors.add(:base,"WRONG!!!")unlesss
我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢
我知道我可以指定某些字段来使用pluck查询数据库。ids=Item.where('due_at但是我想知道,是否有一种方法可以指定我想避免从数据库查询的某些字段。某种反拔?posts=Post.where(published:true).do_not_lookup(:enormous_field) 最佳答案 Model#attribute_names应该返回列/属性数组。您可以排除其中一些并传递给pluck或select方法。像这样:posts=Post.where(published:true).select(Post.attr
导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵
我认为我的问题最好用一个例子来描述。假设我有一个名为“Thing”的简单模型,它有一些简单数据类型的属性。像...Thing-foo:string-goo:string-bar:int这并不难。数据库表将包含具有这三个属性的三列,我可以使用@thing.foo或@thing.bar之类的东西访问它们。但我要解决的问题是当“foo”或“goo”不再包含在简单数据类型中时会发生什么?假设foo和goo代表相同类型的对象。也就是说,它们都是“Whazit”的实例,只是数据不同。所以现在事情可能看起来像这样......Thing-bar:int但是现在有一个新的模型叫做“Whazit”,看起来
我几天前在我的rubyonrails2.3.2上安装了Sphinx和Thinking-Sphinx,基本搜索效果很好。这意味着,没有任何条件。现在,我想用一些条件过滤搜索。我有公告模型,索引如下所示:define_indexdoindexestitle,:as=>:title,:sortable=>trueindexesdescription,:as=>:description,:sortable=>trueend也许我错了,但我注意到只有当我将:sortable=>true语法添加到这些属性时,我才能将它们用作搜索条件。否则它找不到任何东西。现在,我还在使用acts_as_tag
我有一个要在我的Rails3项目中使用的数组扩展方法。它应该住在哪里?我有一个应用程序/类,我最初把它放在(array_extensions.rb)中,在我的config/application.rb中我加载路径:config.autoload_paths+=%W(#{Rails.root}/应用程序/类)。但是,当我转到railsconsole时,未加载扩展。是否有一个预定义的位置可以放置我的Rails3扩展方法?或者,一种预先定义的方式来添加它们?我知道Rails有自己的数组扩展方法。我应该将我的添加到active_support/core_ext/array/conversion
假设您编写了一个类Sup,我决定将其扩展为SubSup。我不仅需要了解你发布的接口(interface),还需要了解你的私有(private)字段。见证这次失败:classSupdefinitialize@privateField="fromsup"enddefgetXreturn@privateFieldendendclassSub问题是,解决这个问题的正确方法是什么?看起来子类应该能够使用它想要的任何字段而不会弄乱父类(superclass)。编辑:equivalentexampleinJava返回"fromSup",这也是它应该产生的答案。 最佳答案
我使用rails3.1+rspec和factorygirl。我对必填字段(validates_presence_of)的验证工作正常。我如何让测试将该事实用作“成功”而不是“失败”规范是:describe"Addanindustrywithnoname"docontext"Unabletocreatearecordwhenthenameisblank"dosubjectdoind=Factory.create(:industry_name_blank)endit{shouldbe_invalid}endend但是我失败了:Failures:1)Addanindustrywithnona