共計 3460 個字符,預(yù)計需要花費(fèi) 9 分鐘才能閱讀完成。
丸趣 TV 小編給大家分享一下結(jié)構(gòu)化處理之 Spark Session 的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
創(chuàng)建 DataFrame,有三種模式,一種是 sql() 主要是訪問 Hive 表;一種是從 RDD 生成 DataFrame,主要從 ExistingRDD 開始創(chuàng)建;還有一種是 read/format 格式,從 json/txt/csv 等數(shù)據(jù)源格式創(chuàng)建。
先看看第三種方式的創(chuàng)建流程。
1、read/format
def read: DataFrameReader = new DataFrameReader(self)
SparkSession.read() 方法直接創(chuàng)建 DataFrameReader,然后再 DataFrameReader 的 load() 方法來導(dǎo)入外部數(shù)據(jù)源。load() 方法主要邏輯如下:
def load(paths: String*): DataFrame = {
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
創(chuàng)建對應(yīng)數(shù)據(jù)源類型的 DataSource,DataSource 解析成 BaseRelation, 然后通過 SparkSession 的 baseRelationToDataFrame 方法從 BaseRelation 映射生成 DataFrame。從 BaseRelation 創(chuàng)建 LogicalRelation,然后調(diào)用 Dataset.ofRows 方法從 LogicalRelation 創(chuàng)建 DataFrame。DataFrame 實際就是 Dataset。
type DataFrame = Dataset[Row]
baseRelationToDataFrame 的定義:
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { Dataset.ofRows(self, LogicalRelation(baseRelation))
}
Dataset.ofRows 方法主要是將邏輯計劃轉(zhuǎn)換成物理計劃,然后生成新的 Dataset。
2、執(zhí)行
SparkSession 的執(zhí)行關(guān)鍵是如何從 LogicalPlan 生成物理計劃。我們試試跟蹤這部分邏輯。
def count(): Long = withAction( count , groupBy().count().queryExecution) {plan =
plan.executeCollect().head.getLong(0)
}
Dataset 的 count() 動作觸發(fā)物理計劃的執(zhí)行,調(diào)用物理計劃 plan 的 executeCollect 方法,該方法實際上會調(diào)用 doExecute() 方法生成 Array[InternalRow] 格式。executeCollect 方法在 SparkPlan 中定義。
3、HadoopFsRelation
需要跟蹤下如何從 HadoopFsRelation 生成物理計劃(也就是 SparkPlan)
通過 FileSourceStrategy 來解析。它在 FileSourceScanExec 上疊加 Filter 和 Projection 等操作,看看 FileSourceScanExec 的定義:
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
output: Seq[Attribute],
requiredSchema: StructType,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with ColumnarBatchScan {}
它的主要執(zhí)行代碼 doExecute() 的功能邏輯如下:
protected override def doExecute(): RDD[InternalRow] = { if (supportsBatch) {
// in the case of fallback, this batched scan should never fail because of:
// 1) only primitive types are supported
// 2) the number of columns should be smaller than spark.sql.codegen.maxFields
WholeStageCodegenExec(this).execute()
} else {
val unsafeRows = {
val scan = inputRDD
if (needsUnsafeRowConversion) { scan.mapPartitionsWithIndexInternal { (index, iter) =
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map(proj)
}
} else {
scan
}
}
val numOutputRows = longMetric(numOutputRows)
unsafeRows.map { r =
numOutputRows += 1
r
}
}
}
inputRDD 有兩種方式創(chuàng)建,一是 createBucketedReadRDD,二是 createNonBucketedReadRDD。兩者沒有本質(zhì)的區(qū)別,僅僅是文件分區(qū)規(guī)則的不同。
private lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) = Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
case _ =
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
}
}
createNonBucketedReadRDD 調(diào)用 FileScanRDD :new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
以上是“結(jié)構(gòu)化處理之 Spark Session 的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注丸趣 TV 行業(yè)資訊頻道!