共計 4600 個字符,預計需要花費 12 分鐘才能閱讀完成。
這篇文章主要講解了“Spark ALS 實現的步驟是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“Spark ALS 實現的步驟是什么”吧!
spark ALS 算法是做個性推薦用的,它所需要的數據集是類似用戶對商品的打分表之類的數據集。實現步驟主要以下幾步:
1、定義輸入數據
2、輸入數據轉換成評分數據格式,如 case class Rating(user: Int, movie: Int, rating: Float)
3、設計 ALS 模型訓練數據
4、計算推薦數據,存儲起來供業務系統直接使用。
下面看看具體的代碼:
package recommend
import org.apache.spark.sql.SparkSession
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.IndexToString
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.TaskContext
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.SaveMode
* 個性化推薦 ALS 算法
* 用戶對資源的點擊率作為評分
*
*/
object Recommend { case class Rating(user: Int, movie: Int, rating: Float)
def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName(Java Spark MYSQL Recommend)
.master(local)
.config(es.nodes , 127.0.0.1)
.config(es.port , 9200)
.config(es.mapping.date.rich , false) // 不解析日期類型
.getOrCreate()
trainModel(spark)
spark.close()
}
def trainModel(spark: SparkSession): Unit = {
import spark.implicits._
val MAX = 3 // 最大推薦數目
val rank = 10 // 向量大小,默認 10
val iterations = 10 // 迭代次數,默認 10
val url = jdbc:mysql://127.0.0.1:3306/test?useUnicode=true characterEncoding=utf8
val table = clicks
val user = root
val pass = 123456
val props = new Properties()
props.setProperty(user , user) // 設置用戶名
props.setProperty(password , pass) // 設置密碼
val clicks = spark.read.jdbc(url, table, props).repartition(4)
clicks.createOrReplaceGlobalTempView(clicks)
val agg = spark.sql(SELECT userId ,resId ,COUNT(id) AS clicks FROM global_temp.clicks GROUP BY userId,resId )
val userIndexer = new StringIndexer()
.setInputCol(userId)
.setOutputCol(userIndex)
val resIndexer = new StringIndexer()
.setInputCol(resId)
.setOutputCol(resIndex)
val indexed1 = userIndexer.fit(agg).transform(agg)
val indexed2 = resIndexer.fit(indexed1).transform(indexed1)
indexed2.show()
val ratings = indexed2.map(x = Rating(x.getDouble(3).toInt, x.getDouble(4).toInt, x.getLong(2).toFloat))
ratings.show()
val Array(training, test) = ratings.randomSplit(Array(0.9, 0.1))
println(training:)
training.show()
println(test:)
test.show()
// 隱性反饋和顯示反饋
val als = new ALS()
.setMaxIter(iterations)
.setRegParam(0.01)
.setImplicitPrefs(false)
.setUserCol(user)
.setItemCol(movie)
.setRatingCol(rating)
val model = als.fit(ratings)
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to drop to ensure we don t get NaN evaluation metrics
model.setColdStartStrategy(drop)
val predictions = model.transform(test)
val r2 = model.recommendForAllUsers(MAX)
println(r2.schema)
val result = r2.rdd.flatMap(row = { val userId = row.getInt(0)
val arrayPredict: Seq[Row] = row.getSeq(1)
var result = ArrayBuffer[Rating]()
arrayPredict.foreach(rowPredict = { val p = rowPredict(0).asInstanceOf[Int]
val score = rowPredict(1).asInstanceOf[Float]
val sql = insert into recommends(userId,resId,score) values ( +
userId + , +
rowPredict(0) + , +
rowPredict(1) +
)
println(sql: + sql)
result.append(Rating(userId, p, score))
})
for (i - result) yield {
i
}
})
println(推薦結果 RDD 已展開)
result.toDF().show()
// 資源 id 隱射
val resInt2Index = new IndexToString()
.setInputCol(movie)
.setOutputCol(resId)
.setLabels(resIndexer.fit(indexed1).labels)
//userId 映射
val userInt2Index = new IndexToString()
.setInputCol(user)
.setOutputCol(userId)
.setLabels(userIndexer.fit(agg).labels)
val rc = userInt2Index.transform(resInt2Index.transform(result.toDF()))
rc.show()
rc.withColumnRenamed(rating , score).select(userId , resId , score).write.mode(SaveMode.Overwrite)
.format(jdbc)
.option(url , url)
.option(dbtable , recommends)
.option(user , user)
.option(password , pass)
.option(batchsize , 5000)
.option(truncate , true)
.save
println(finished!!!)
}
}
DataFrame 寫入 mysql 還有另一種寫法,就是原生寫入:
// 分區寫推薦結果到 mysql
r2.foreachPartition(p = {
@transient val conn = ConnectionPool.getConnection
p.foreach(row = { val userId = row.getInt(0)
val arrayPredict: Seq[Row] = row.getSeq(1)
arrayPredict.foreach(rowPredict = { println(rowPredict(0) + @ + rowPredict(1))
val sql = insert into recommends(userId,resId,score) values ( +
userId+ , +
rowPredict(0)+ , +
rowPredict(1) +
)
println(sql: +sql)
val stmt = conn.createStatement
stmt.executeUpdate(sql)
})
})
ConnectionPool.returnConnection(conn)
})
感謝各位的閱讀,以上就是“Spark ALS 實現的步驟是什么”的內容了,經過本文的學習后,相信大家對 Spark ALS 實現的步驟是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!
正文完