久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Spark ALS實現的步驟是什么

146次閱讀
沒有評論

共計 4600 個字符,預計需要花費 12 分鐘才能閱讀完成。

這篇文章主要講解了“Spark ALS 實現的步驟是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“Spark ALS 實現的步驟是什么”吧!

spark ALS 算法是做個性推薦用的,它所需要的數據集是類似用戶對商品的打分表之類的數據集。實現步驟主要以下幾步:

1、定義輸入數據

2、輸入數據轉換成評分數據格式,如 case class Rating(user: Int, movie: Int, rating: Float)

3、設計 ALS 模型訓練數據

4、計算推薦數據,存儲起來供業務系統直接使用。

下面看看具體的代碼:

package recommend
import org.apache.spark.sql.SparkSession
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.IndexToString
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.TaskContext
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.SaveMode
 *  個性化推薦 ALS 算法
 *  用戶對資源的點擊率作為評分
 *
 */
object Recommend { case class Rating(user: Int, movie: Int, rating: Float)
 
 
 def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName(Java Spark MYSQL Recommend)
 .master(local)
 .config(es.nodes ,  127.0.0.1)
 .config(es.port ,  9200)
 .config(es.mapping.date.rich ,  false) // 不解析日期類型
 .getOrCreate()
 trainModel(spark)
 spark.close()
 }
 def trainModel(spark: SparkSession): Unit = {
 import spark.implicits._
 val MAX = 3 //  最大推薦數目
 val rank = 10 //  向量大小,默認 10
 val iterations = 10 //  迭代次數,默認 10

 val url =  jdbc:mysql://127.0.0.1:3306/test?useUnicode=true characterEncoding=utf8  val table =  clicks  val user =  root  val pass =  123456  val props = new Properties()  props.setProperty(user , user) //  設置用戶名  props.setProperty(password , pass) //  設置密碼  val clicks = spark.read.jdbc(url, table, props).repartition(4)  clicks.createOrReplaceGlobalTempView(clicks)  val agg = spark.sql(SELECT userId ,resId ,COUNT(id) AS clicks FROM global_temp.clicks GROUP BY userId,resId )    val userIndexer = new StringIndexer()  .setInputCol(userId)  .setOutputCol(userIndex)  val resIndexer = new StringIndexer()  .setInputCol(resId)  .setOutputCol(resIndex)  val indexed1 = userIndexer.fit(agg).transform(agg)  val indexed2 = resIndexer.fit(indexed1).transform(indexed1)  indexed2.show()  val ratings = indexed2.map(x =  Rating(x.getDouble(3).toInt, x.getDouble(4).toInt, x.getLong(2).toFloat))  ratings.show()  val Array(training, test) = ratings.randomSplit(Array(0.9, 0.1))  println(training:)  training.show()  println(test:)  test.show()  // 隱性反饋和顯示反饋  val als = new ALS()  .setMaxIter(iterations)  .setRegParam(0.01)  .setImplicitPrefs(false)  .setUserCol(user)  .setItemCol(movie)  .setRatingCol(rating)  val model = als.fit(ratings)  // Evaluate the model by computing the RMSE on the test data  // Note we set cold start strategy to  drop  to ensure we don t get NaN evaluation metrics  model.setColdStartStrategy(drop)  val predictions = model.transform(test)  val r2 = model.recommendForAllUsers(MAX)  println(r2.schema)  val result = r2.rdd.flatMap(row =  { val userId = row.getInt(0)  val arrayPredict: Seq[Row] = row.getSeq(1)  var result = ArrayBuffer[Rating]()  arrayPredict.foreach(rowPredict =  { val p = rowPredict(0).asInstanceOf[Int]  val score = rowPredict(1).asInstanceOf[Float]  val sql =  insert into recommends(userId,resId,score) values (  +  userId +  ,  +  rowPredict(0) +  ,  +  rowPredict(1) +   )  println(sql:  + sql)  result.append(Rating(userId, p, score))  })  for (i  - result) yield {  i  }  })  println(推薦結果 RDD 已展開)  result.toDF().show()  // 資源 id 隱射  val resInt2Index = new IndexToString()  .setInputCol(movie)  .setOutputCol(resId)  .setLabels(resIndexer.fit(indexed1).labels)  //userId 映射  val userInt2Index = new IndexToString()  .setInputCol(user)  .setOutputCol(userId)  .setLabels(userIndexer.fit(agg).labels)  val rc = userInt2Index.transform(resInt2Index.transform(result.toDF()))  rc.show()  rc.withColumnRenamed(rating , score).select(userId ,  resId , score).write.mode(SaveMode.Overwrite)  .format(jdbc)  .option(url , url)  .option(dbtable ,  recommends)  .option(user , user)  .option(password , pass)  .option(batchsize ,  5000)  .option(truncate ,  true)  .save  println(finished!!!)  } }

DataFrame 寫入 mysql 還有另一種寫法,就是原生寫入:

 // 分區寫推薦結果到 mysql
 r2.foreachPartition(p =  {
 @transient val conn = ConnectionPool.getConnection
 p.foreach(row =  { val userId = row.getInt(0)
 val arrayPredict: Seq[Row] = row.getSeq(1)
 arrayPredict.foreach(rowPredict =  { println(rowPredict(0) +  @  + rowPredict(1))
 val sql =  insert into recommends(userId,resId,score) values (  +
 userId+ ,  +
 rowPredict(0)+ , +
 rowPredict(1) +
  ) 
 println(sql: +sql)
 val stmt = conn.createStatement
 stmt.executeUpdate(sql)
 })
 })
 ConnectionPool.returnConnection(conn)
 })

感謝各位的閱讀,以上就是“Spark ALS 實現的步驟是什么”的內容了,經過本文的學習后,相信大家對 Spark ALS 實現的步驟是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-25發表,共計4600字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 德清县| 牟定县| 寿阳县| 扶沟县| 嵊州市| 古田县| 车险| 凭祥市| 海口市| 邹城市| 浦城县| 基隆市| 双桥区| 观塘区| 介休市| 和硕县| 双桥区| 西藏| 芦山县| 马鞍山市| 通渭县| 泽州县| 武威市| 牡丹江市| 乌恰县| 内丘县| 保康县| 会理县| 孟村| 班玛县| 镇原县| 杂多县| 卫辉市| 都兰县| 恩施市| 华阴市| 东平县| 山阳县| 东丰县| 夏邑县| 武清区|