干+砖
了解如何创建在砖笔记本干工作。
我们感动!改善客户体验,Collibra数据质量用户指南已经搬到新万博移动客户端
新万博移动客户端Collibra文档中心
作为Collibra数据质量2新万博移动客户端022.11发行版的一部分。为了确保无缝过渡,
dq-docs.新万博移动客户端collibra.com
将继续访问,但DQ用户指南现在维护只在文档中心。
本文档提供了如何指导来帮助你上传和干jar添加到砖集群和运行干工作干api调用(又名活动)。
干工作运行Scala和Pyspark笔记本。
在本节中,我们解释所涉及的步骤设置环境干砖。这是第一步在砖干api调用。
第一步是让干jar文件。一旦你干的jar包文件,你可以把jar通过运行以下命令:
焦油-xvf package.tar.gz
' '例子
:焦油-xvf owl - 2022.04 - rc1 - -包base.tar.gz违约
运行这个命令指示焦油中提取文件的压缩文件。从文件列表中,您需要上传owl-core-xxxx-jar-with-dependancies。jar文件系统对我们的砖将在下一节中解释。
从owl中提取猫头鹰jar文件包压缩文件。
砖的罐子应该手动上传文件系统。下面是步骤的快速摘要。你可以找到更多的细节关于砖页面中上传文件:
https://docs.databricks.com/data/databricks-file-system.html
https://docs.databricks.com/data/databricks-file-system.html access-dbfs
-
1。登录到你的砖。
-
2。点击
-
3所示。单击DBFS按钮在页面的顶部。
-
4所示。上传owl-core-xxxx-jar-with-dependancies。jar到你想要的路径。
\
上传owl-core-xxxx-jar-with-dependancies。DBFS jar。
安装owl-core-xxxx-jar-with-dependancies。在集群jar。
一旦完成这一步,您可以创建一个工作区和开始使用干api。第四。(可选)更新数据源的池的大小* * * *这一步只如果你需要PoolExhaustedException当你叫干api。解决这个问题可以简单地更新连接池大小的火花的环境。
SPRING_DATASOURCE_POOL_MAX_WAIT = 500
SPRING_DATASOURCE_POOL_MAX_SIZE = 30
SPRING_DATASOURCE_POOL_INITIAL_SIZE = 5
\
这是砖的文档关于如何设置环境变量:
https://docs.databricks.com/clusters/configure.html环境变量
干环境变量添加到砖的集群
进口org。apache。火花。sql。SparkSession
进口org。apache。火花。sql。功能。_
进口org。apache。火花。sql。类型。_
进口scala。集合。JavaConverters。_
进口java。跑龙套。日期
进口java。时间。LocalDate
进口java。文本。SimpleDateFormat
进口火花。值得一提的。_
进口java。跑龙套。{ArrayList,列表,UUID}
/ /干进口
进口com。猫头鹰。核心。猫头鹰
进口com。猫头鹰。常见的。选项。_
进口com。猫头鹰。常见的。domain2。_
进口com。猫头鹰。核心。跑龙套。OwlUtils
火花。目录。clearCache
/ /选项1:将客户数据从一个文件中
瓦尔df=(火花。读
。格式(“csv”)。选项(“头”,真正的)。选项(“分隔符”,”、“)
。负载(“dbfs: / FileStore / nyse.csv”)
)
/ /选项2:将客户数据从一个数据库
瓦尔connProps=地图(
“司机”- >“org.postgresql.Driver”,
“用户”- >“? ? ? ?”,
“密码”- >“? ? ? ?”,
“url”- >" jdbc: postgresql: / / xxx: 0000 / postgres”,
“数据表”- >“public.example_data”)
/ / - - - - - -负载火花DataFrame——/ /
瓦尔df=火花。读。格式(“jdbc”)。选项(connProps)。加载显示(df)
显示(df)/ /查看数据
瓦尔pgHost=“xxxx.amazonaws.com”
瓦尔pgDatabase=“postgres”
瓦尔pgSchema=“公共”
瓦尔pgUser=“? ? ? ? ? ? ?”
瓦尔pgPass=“? ? ? ?”
瓦尔pgPort=“0000”
注意:如果规则已经从UI创建并分配给一个数据集,叫owlcheck()将自动执行所有与给定数据集相关的规则和不需要重新创建规则从笔记本。
瓦尔数据集=“cdq_notebook_db_rules”
var日期=“2018-01-11”
/ /选项
瓦尔选择=新OwlOptions()
选择。数据集=数据集
选择。runId=日期
选择。主机=pgHost
选择。港口=pgPort
选择。pgUser=pgUser
选择。pgPassword=pgPass
选择。setDatasetSafeOff(假)/ /启用历史数据集的覆盖
/ /创建一个简单的规则,并将其分配给数据集
瓦尔simpleRule=OwlUtils。createRule(选择。数据集)
simpleRule。setRuleNm(“nyse-stocks-symbol”)
simpleRule。setRuleValue(”符号= =博鸿泰”)
simpleRule。setRuleType(“SQLG”)
simpleRule。setPerc(1.0)
simpleRule。定位点(1)
simpleRule。setIsActive(1)
simpleRule。setUserNm(“admin”)
simpleRule。setPreviewLimit(8)
/ /创建一个规则从通用规则创建的界面:
瓦尔genericRule=OwlUtils。createRule(选择。数据集)
genericRule。setRuleNm(“exchangeRule”)/ /可以是任何名字
genericRule。setRuleType(“自定义”)
genericRule。定位点(1)
genericRule。setIsActive(1)
genericRule。setUserNm(“admin”)
genericRule。setRuleRepo(“exchangeCheckRule”);/ /验证通用规则名称
/ /从UI
genericRule。setRuleValue(“EXCH”)/ /列assosicate规则
/ /前常规
瓦尔干=com。猫头鹰。核心。跑龙套。OwlUtils。OwlContext(df,选择)
干。removeAllRules(选择。数据集)
。注册(选择)
。addRule(simpleRule)
/ /扫描
干。owlCheck()
瓦尔结果=干。呵斥()/ /返回对象呵斥,不是DataFrame
/ /看到Json结果(选择下游处理)
println(”- - - - - - - - - - - - - - -结果:- - - - - - - - - - - - - - - - - \ n”)
println(结果)/ /可选
/ /程序,看到DataFrame结果(选择下游处理)
瓦尔休息时间=干。getRuleBreakRows(“nyse-stocks-symbol”)
println(”- - - - - - - - - - - - - - -休息:- - - - - - - - - - - - - - - - - \ n”)
显示(休息时间)
/ /不同的选项来处理不良记录
瓦尔badRecords=休息时间。下降(“_dataset”,“_run_id”,“_rule_name”,“owl_id”)
显示(badRecords)
瓦尔goodRecords=df。除了(badRecords)
显示(goodRecords)
/ /删除文件是否存在
dbutils。fs。rm(“/ tmp / databricks-df-example.parquet”,真正的)
休息时间。写。拼花(“/ tmp / databricks-df-example.parquet”)
下面的图像显示的代码片段,结果在砖:
在砖创建干测试规则
优惠和规则可以在干web。
瓦尔数据集=“cdq_notebook_nyse_profile”
瓦尔runList=列表(“2018-01-01”,“2018-01-02”,“2018-01-03”,“2018-01-04”,“2018-01-05”,“2018-01-08”,“2018-01-09”,“2018-01-10”)
为(runId< -runList){
/ /选项
瓦尔选择=新OwlOptions()
选择。数据集=数据集
选择。主机=pgHost
选择。港口=pgPort
选择。pgUser=pgUser
选择。pgPassword=pgPass
瓦尔profileOpt=新ProfileOpt
profileOpt。在=真正的
profileOpt。setShape(真正的)
profileOpt。setShapeSensitivity(5.0)
profileOpt。setShapeMaxPerCol(10)
profileOpt。setShapeMaxColSize(10)
profileOpt。setShapeGranular(真正的)
profileOpt。behaviorEmptyCheck=真正的
profileOpt。behaviorMaxValueCheck=真正的
profileOpt。behaviorMinValueCheck=真正的
profileOpt。behaviorNullCheck=真正的
profileOpt。behaviorRowCheck=真正的
profileOpt。behaviorMeanValueCheck=真正的
profileOpt。behaviorUniqueCheck=真正的
profileOpt。behaviorMinSupport=5/ /默认是4
profileOpt。behaviorLookback=5
选项。配置文件=profileOpt
var日期=runId
vardf_1=df。在哪里(美元“TRADE_DATE”= = =年代”美元日期”)
/ /扫描
瓦尔干=OwlUtils。OwlContext(df_1,选项)
干。注册(选择)
干。owlCheck()
瓦尔配置文件=干。profileDF()
配置文件。显示()
}
运行在砖干概要文件
分析结果可以在干。
瓦尔数据集=“cdq_notebook_db_dupe”
var日期=“2018-01-11”
/ /选项
瓦尔选择=新OwlOptions()
选择。数据集=数据集
选择。runId=日期
选择。主机=pgHost
选择。港口=pgPort
选择。pgUser=pgUser
选择。pgPassword=pgPass
选择。易受骗的人。ignoreCase=真正的
选择。易受骗的人。在=真正的
选择。易受骗的人。下界=99年
选择。易受骗的人。包括=数组(“象征”,“TRADE_DATE”)
/ /扫描
瓦尔干=OwlUtils。OwlContext(df,选择)
干。注册(选择)
干。owlCheck()
瓦尔dupesDf=干。getDupeRecords
dupesDf。显示()
干欺骗在砖中运行
可以在干网络欺骗的结果。
进口scala。集合。JavaConverters。_
进口java。跑龙套
进口java。跑龙套。{ArrayList,列表,UUID}
瓦尔数据集=“cdq_notebook_db_outlier”
var日期=“2018-01-11”
/ /选项
瓦尔选择=新OwlOptions()
选择。数据集=数据集
选择。runId=日期
选择。主机=pgHost
选择。港口=pgPort
选择。pgUser=pgUser
选择。pgPassword=pgPass
选择。易受骗的人。在=假
瓦尔dlMulti:跑龙套。列表(OutlierOpt]=新跑龙套。ArrayList(OutlierOpt]
瓦尔outlierOpt=新OutlierOpt()
outlierOpt。结合=真正的
outlierOpt。dateColumn=“trade_date”
outlierOpt。lookback=4
outlierOpt。关键=数组(“象征”)
outlierOpt。包括=数组(“高”)
outlierOpt。historyLimit=10
dlMulti。添加(outlierOpt)
选择。setOutliers(dlMulti)
瓦尔干=OwlUtils。OwlContext(df,选择)
。注册(选择)
干。owlCheck
瓦尔离群值=干。getOutliers()
离群值。显示
离群值。选择(“价值”)
干活动不能独立的呼吁。owlCheck()函数调用的任何活动之前应该被称为。例如让概要DataFrame应该调用下面的代码片段:
cdq.owlCheck ()
cdq.getProfileDF ()
最后修改8月前