久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

結(jié)構(gòu)化處理之Spark Session的示例分析

166次閱讀
沒有評論

共計 3460 個字符,預(yù)計需要花費(fèi) 9 分鐘才能閱讀完成。

丸趣 TV 小編給大家分享一下結(jié)構(gòu)化處理之 Spark Session 的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

創(chuàng)建 DataFrame,有三種模式,一種是 sql() 主要是訪問 Hive 表;一種是從 RDD 生成 DataFrame,主要從 ExistingRDD 開始創(chuàng)建;還有一種是 read/format 格式,從 json/txt/csv 等數(shù)據(jù)源格式創(chuàng)建。

先看看第三種方式的創(chuàng)建流程。

1、read/format

def read: DataFrameReader = new DataFrameReader(self)

SparkSession.read() 方法直接創(chuàng)建 DataFrameReader,然后再 DataFrameReader 的 load() 方法來導(dǎo)入外部數(shù)據(jù)源。load() 方法主要邏輯如下:
 

def load(paths: String*): DataFrame = {
 sparkSession.baseRelationToDataFrame(
 DataSource.apply(
 sparkSession,
 paths = paths,
 userSpecifiedSchema = userSpecifiedSchema,
 className = source,
 options = extraOptions.toMap).resolveRelation())
 }

創(chuàng)建對應(yīng)數(shù)據(jù)源類型的 DataSource,DataSource 解析成 BaseRelation, 然后通過 SparkSession 的 baseRelationToDataFrame 方法從 BaseRelation 映射生成 DataFrame。從 BaseRelation 創(chuàng)建 LogicalRelation,然后調(diào)用 Dataset.ofRows 方法從 LogicalRelation 創(chuàng)建 DataFrame。DataFrame 實際就是 Dataset。

type DataFrame = Dataset[Row]

baseRelationToDataFrame 的定義:

def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { Dataset.ofRows(self, LogicalRelation(baseRelation))
 }

Dataset.ofRows 方法主要是將邏輯計劃轉(zhuǎn)換成物理計劃,然后生成新的 Dataset。

2、執(zhí)行

SparkSession 的執(zhí)行關(guān)鍵是如何從 LogicalPlan 生成物理計劃。我們試試跟蹤這部分邏輯。

def count(): Long = withAction( count , groupBy().count().queryExecution) {plan =

  plan.executeCollect().head.getLong(0)

  }

Dataset 的 count() 動作觸發(fā)物理計劃的執(zhí)行,調(diào)用物理計劃 plan 的 executeCollect 方法,該方法實際上會調(diào)用 doExecute() 方法生成 Array[InternalRow] 格式。executeCollect 方法在 SparkPlan 中定義。

3、HadoopFsRelation

需要跟蹤下如何從 HadoopFsRelation 生成物理計劃(也就是 SparkPlan)

通過 FileSourceStrategy 來解析。它在 FileSourceScanExec 上疊加 Filter 和 Projection 等操作,看看 FileSourceScanExec 的定義:

case class FileSourceScanExec(
 @transient relation: HadoopFsRelation,
 output: Seq[Attribute],
 requiredSchema: StructType,
 partitionFilters: Seq[Expression],
 dataFilters: Seq[Expression],
 override val metastoreTableIdentifier: Option[TableIdentifier])
 extends DataSourceScanExec with ColumnarBatchScan {}

它的主要執(zhí)行代碼 doExecute() 的功能邏輯如下:

protected override def doExecute(): RDD[InternalRow] = { if (supportsBatch) {
 // in the case of fallback, this batched scan should never fail because of:
 // 1) only primitive types are supported
 // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
 WholeStageCodegenExec(this).execute()
 } else {
 val unsafeRows = {
 val scan = inputRDD
 if (needsUnsafeRowConversion) { scan.mapPartitionsWithIndexInternal { (index, iter) = 
 val proj = UnsafeProjection.create(schema)
 proj.initialize(index)
 iter.map(proj)
 }
 } else {
 scan
 }
 }
 val numOutputRows = longMetric(numOutputRows)
 unsafeRows.map { r = 
 numOutputRows += 1
 r
 }
 }
 }

inputRDD 有兩種方式創(chuàng)建,一是 createBucketedReadRDD,二是 createNonBucketedReadRDD。兩者沒有本質(zhì)的區(qū)別,僅僅是文件分區(qū)規(guī)則的不同。

private lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) =  Iterator[InternalRow] =
 relation.fileFormat.buildReaderWithPartitionValues(
 sparkSession = relation.sparkSession,
 dataSchema = relation.dataSchema,
 partitionSchema = relation.partitionSchema,
 requiredSchema = requiredSchema,
 filters = pushedDownFilters,
 options = relation.options,
 hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

 case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =  createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)  case _ =  createNonBucketedReadRDD(readFile, selectedPartitions, relation)  }  } createNonBucketedReadRDD 調(diào)用 FileScanRDD :new FileScanRDD(fsRelation.sparkSession, readFile, partitions)

以上是“結(jié)構(gòu)化處理之 Spark Session 的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注丸趣 TV 行業(yè)資訊頻道!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-16發(fā)表,共計3460字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 全南县| 修文县| 红桥区| 保德县| 屏南县| 彭泽县| 禹州市| 章丘市| 南汇区| 通渭县| 合江县| 宿迁市| 赤壁市| 云林县| 丰城市| 和静县| 长泰县| 西畴县| 额尔古纳市| 札达县| 沽源县| 苍山县| 邻水| 香河县| 莱芜市| 石柱| 交口县| 建平县| 尉氏县| 郸城县| 临朐县| 林州市| 馆陶县| 曲周县| 通城县| 涞水县| 新干县| 榆中县| 佛冈县| 通榆县| 饶平县|