使用基于Snowflake的Snowpark DataFrames進行數(shù)據(jù)處理
譯文簡介
Snowpark是Snowflake一個新的開發(fā)庫,它提供了一個API讓用戶可以使用編程語言像Scala(后續(xù)也會有Java和Python)來代替SQL進行數(shù)據(jù)處理。
Snowpark的核心概念是DataFrame(數(shù)據(jù)框),它表示一組數(shù)據(jù),就比如說一些數(shù)據(jù)庫表的行,我們可以用最喜歡的工具通過面向對象或者函數(shù)式編程的方式處理。Snowpark DataFrames的概念類似于Apache Spark或者Python中Pandas包的DataFrames的含義,是一種表格型的數(shù)據(jù)結構。
開發(fā)者也可以創(chuàng)建自定義函數(shù)推送到Snowflake服務器,來更方便地處理數(shù)據(jù)。Snowpark的代碼執(zhí)行采用了惰性計算的方式,這減少了從Snowpark倉庫到客戶端之間的數(shù)據(jù)流轉。
當前版本的Snowpark可以運行在Scala 2.12和JDK 8、9、10或11上。它現(xiàn)在處于公開預覽階段,可用于所有賬戶。
架構特點
從架構的角度來看,Snowpark客戶端類似于Apache Spark Driver程序。它執(zhí)行用戶在客戶端編寫的代碼并轉為SQL語句推送給Snowpark數(shù)據(jù)倉庫,等Snowpark計算服務端處理完數(shù)據(jù)后,接收以DataFrame格式組成的返回結果。
廣義的說,Snowpark數(shù)據(jù)倉庫的操作可以分為兩類:轉換和執(zhí)行。由于轉換是延遲執(zhí)行的,因此它們不會觸發(fā)DataFrames數(shù)據(jù)的計算處理過程。像select(查詢),filter(過濾),sort(排序),groupBy(分組)等等都屬于轉換范疇的操作。而執(zhí)行是正好相反的,它們會觸發(fā)對DataFrames數(shù)據(jù)的計算。Snowpark將針對DataFrame數(shù)據(jù)的SQL語句發(fā)送到服務端進行計算,然后將結果返回給客戶端內存。show,collect,take等都屬于執(zhí)行操作。
Snowpark執(zhí)行
在我們可以執(zhí)行任何Snowpark轉換和執(zhí)行之前,我們需要先連接到Snowpark數(shù)據(jù)倉庫并建立會話。
Scala
object Main {
def main(args: Array[String]): Unit = {
// Replace the <placeholders> below.
val configs = Map (
"URL" -> "https://<SNOWFLAKE-INSTANCE>.snowflakecomputing.com:443",
"USER" -> "<USERNAME>",
"PASSWORD" -> "<PASSWORD>",
"ROLE" -> "SYSADMIN",
"WAREHOUSE" -> "SALESFORCE_ACCOUNT",
"DB" -> "SALESFORCE_DB",
"SCHEMA" -> "SALESFORCE"
)
val session = Session.builder.configs(configs).create
session.sql("show tables").show()
}
}
從Snowpark管理頁面上看,我們有一個SALESFORCE_DB數(shù)據(jù)庫和一個有3個表的SALESFORCE:SALESFORCE_ACCOUNT表表示來自Salesforce實例的賬戶,SALESFORCE_ORDER表存儲由這些賬戶發(fā)起的訂單,SALESFORCE_ACCOUNT_ORDER是一個關聯(lián)表,存儲關聯(lián)的查詢結果(我們在這篇文章的后面會再論述這點)。
要檢索Salesforce_Account表的前10行,我們可以簡單地執(zhí)行以下DataFrame方法:
Scala
// Create a DataFrame from the data in the "salesforce_account" table.
val dfAccount = session.table("salesforce_account")
// To print out the first 10 rows, call:
dfAccount.show()
Snowpark會把代碼轉換成SQL語句并交給Snowflake執(zhí)行:
Scala
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX] SELECT * FROM ( SELECT * FROM (salesforce_account)) LIMIT 10
在我們的VSCode IDE中的輸出看起來像這樣:
我們也可以過濾某些行并執(zhí)行DataFrame的轉換(例如,選擇指定的列):
Scala
val dfFilter = session.table("salesforce_account").filter(col("type") === "Customer - Direct")
dfFilter.show()
val dfSelect = session.table("salesforce_account").select(col("accountname"), col("phone"))
dfSelect.show()
Snowpark將生成相應的SQL查詢,并將它們交給Snowflake計算服務器執(zhí)行:
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX] SELECT * FROM ( SELECT * FROM ( SELECT * FROM (salesforce_account)) WHERE ("TYPE" = 'Customer - Direct')) LIMIT 10
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX] SELECT * FROM ( SELECT "ACCOUNTNAME", "PHONE" FROM ( SELECT * FROM (salesforce_account))) LIMIT 10
下面是在VSCode中的輸出:
Snowpark DataFrame API也允許DataFrames數(shù)據(jù)間的拼接關聯(lián)。在這個例子中,我們有SALESFORCE_ORDER表,記錄了由Salesforce賬戶產生的賬單數(shù)據(jù),我們可以將這些數(shù)據(jù)拉到DataFrame中,并將它們與賬戶記錄連接起來:
Scala
val dfOrder = session.table("salesforce_order")
dfOrder.show()
val dfJoin = dfAccount.join(dfOrder, col("sfdcid") === col("accountid")).select(col("accountname"), col("phone"),col("productname"), col("amount"))
dfJoin.show()
Snowflake把DataFrame方法轉換為SQL語句,然后推送給Snowflake數(shù)據(jù)倉庫進行計算。在VSCode中輸出如下:
如果我們想持久化保存計算結果,可以使用saveAsTable這個方法:
Scala
dfJoin.write.mode(SaveMode.Overwrite).saveAsTable("salesforce_account_order")
生成的SQL語句看起來就像這樣:
Scala
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX] CREATE OR REPLACE TABLE salesforce_account_order AS SELECT * FROM ( SELECT "ACCOUNTNAME", "PHONE", "PRODUCTNAME", "AMOUNT" FROM ( SELECT * FROM (( SELECT "ACCOUNTNAME" AS "ACCOUNTNAME", "PHONE" AS "PHONE", "TYPE" AS "TYPE", "SFDCID" AS "SFDCID" FROM ( SELECT * FROM (salesforce_account))) AS SNOWPARK_TEMP_TABLE_UKKLR6UCHN6POXL INNER JOIN ( SELECT "ACCOUNTID" AS "ACCOUNTID", "PRODUCTNAME" AS "PRODUCTNAME", "AMOUNT" AS "AMOUNT" FROM ( SELECT * FROM (salesforce_order))) AS SNOWPARK_TEMP_TABLE_36DEOZXTQJUYKLD ON ("SFDCID" = "ACCOUNTID"))))
隨后,Snowpark會創(chuàng)建一個新表或者替換掉已存在的舊表,來存儲生成的數(shù)據(jù):
結語
Snowpark為數(shù)據(jù)處理提供了豐富的操作和工具。它允許用戶創(chuàng)建非常復雜的高級數(shù)據(jù)處理管道操作。將用戶自定義的代碼推到Snowflake數(shù)據(jù)倉庫服務端,并通過減少不必要的數(shù)據(jù)傳輸,在數(shù)據(jù)端執(zhí)行,這是Snowpark的一個非常強大的特性。
譯者介紹
盧鑫旺,51CTO社區(qū)編輯,半路出家的九零后程序員。做過前端頁面,寫過業(yè)務接口,搞過爬蟲,研究過JS,有幸接觸Golang,參與微服務架構轉型。目前主寫Java,負責公司可定制化低代碼平臺的數(shù)據(jù)引擎層設計開發(fā)工作。
原文標題:Snowflake Data Processing With Snowpark DataFrames,作者:Istvan Szegedi