本文共 7712 字,大约阅读时间需要 25 分钟。
wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gzcd /Users/erichan/garden/spark-1.6.0-bin-hadoop2.6/bin./spark-shell --master local --driver-memory 6g
val data ="/Users/erichan/AliDrive/ml_spark/data/profiledata_06-May-2005"val rawUserArtistData = sc.textFile(data+"/user_artist_data.txt",10)// ALS 需要ID必须为数值型rawUserArtistData.first//res3: String = 1092764 1000311//rawUserArtistData.map(_.split(' ')(0).toDouble).stats()//res10: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000)//rawUserArtistData.map(_.split(' ')(1).toDouble).stats()//res11: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000)val rawArtistData = sc.textFile(data+"/artist_data.txt")//rawArtistData.first//res12: String = 1134999 06Crazy Lifeval artistByID = rawArtistData.flatMap { line => val (id, name) = line.span(_ != '\t') if (name.isEmpty) { None }else{ try { Some((id.toInt, name.trim)) } catch { case e: NumberFormatException => None } }}val rawArtistAlias = sc.textFile(data+"/artist_alias.txt")val artistAlias = rawArtistAlias.flatMap { line => val tokens = line.split('\t') if (tokens(0).isEmpty) { None }else{ Some((tokens(0).toInt, tokens(1).toInt)) }}.collectAsMap()//artistByID.lookup(1000010).head//res14: String = Aerosmith
import org.apache.spark.mllib.recommendation._val bArtistAlias = sc.broadcast(artistAlias)val trainData = rawUserArtistData.map { line => val Array(userID, artistID, count) = line.split(' ').map(_.toInt) val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID) Rating(userID, finalArtistID, count)}.cache()val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).filter { case Array(user,_,_) => user.toInt == 2093760}val existingProducts = rawArtistsForUser.map { case Array(_,artist,_) => artist.toInt}.collect().toSetartistByID.filter { case (id, name) => existingProducts.contains(id)}.values.collect().foreach(println)val recommendations = model.recommendProducts(2093760, 5)recommendations.foreach(println)val recommendedProductIDs = recommendations.map(_.product).toSetartistByID.filter { case (id, name) => recommendedProductIDs.contains(id)}.values.collect().foreach(println)
:load /Users/erichan/sourcecode/book/aas/ch03-recommender/src/main/scala/RunAUC.scalaval bArtistAlias = sc.broadcast(RunAUC.buildArtistAlias(rawArtistAlias))val allData = RunAUC.buildRatings(rawUserArtistData, bArtistAlias)val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1))trainData.cache()cvData.cache()val allItemIDs = allData.map(_.product).distinct().collect()val bAllItemIDs = sc.broadcast(allItemIDs)val mostListenedAUC = RunAUC.areaUnderCurve(cvData, bAllItemIDs, RunAUC.predictMostListened(sc, trainData))println(mostListenedAUC)//0.9395286660878177trainData.unpersist()cvData.unpersist()
val someUsers = allData.map(_.user).distinct().take(100)val someRecommendations = someUsers.map(userID => model.recommendProducts(userID, 5))someRecommendations.map( recs => recs.head.user + " -> " + recs.map(_.product).mkString(", ")).foreach(println)
RunAUC.scala
import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.broadcast.Broadcastimport org.apache.spark.mllib.recommendation._import org.apache.spark.rdd.RDDimport scala.collection.Mapimport scala.collection.mutable.ArrayBufferimport scala.util.Random/** * Created by erichan * on 16/1/26. */object RunAUC { def areaUnderCurve( positiveData: RDD[Rating], bAllItemIDs: Broadcast[Array[Int]], predictFunction: (RDD[(Int,Int)] => RDD[Rating])) = { // What this actually computes is AUC, per user. The result is actually something // that might be called "mean AUC". // Take held-out data as the "positive", and map to tuples val positiveUserProducts = positiveData.map(r => (r.user, r.product)) // Make predictions for each of them, including a numeric score, and gather by user val positivePredictions = predictFunction(positiveUserProducts).groupBy(_.user) // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of // small AUC problems, and it would be inefficient, when a direct computation is available. // Create a set of "negative" products for each user. These are randomly chosen // from among all of the other items, excluding those that are "positive" for the user. val negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions { // mapPartitions operates on many (user,positive-items) pairs at once userIDAndPosItemIDs => { // Init an RNG and the item IDs set once for partition val random = new Random() val allItemIDs = bAllItemIDs.value userIDAndPosItemIDs.map { case (userID, posItemIDs) => val posItemIDSet = posItemIDs.toSet val negative = new ArrayBuffer[Int]() var i = 0 // Keep about as many negative examples per user as positive. // Duplicates are OK while (i < allItemIDs.size && negative.size < posItemIDSet.size) { val itemID = allItemIDs(random.nextInt(allItemIDs.size)) if (!posItemIDSet.contains(itemID)) { negative += itemID } i += 1 } // Result is a collection of (user,negative-item) tuples negative.map(itemID => (userID, itemID)) } } }.flatMap(t => t) // flatMap breaks the collections above down into one big set of tuples // Make predictions on the rest: val negativePredictions = predictFunction(negativeUserProducts).groupBy(_.user) // Join positive and negative by user positivePredictions.join(negativePredictions).values.map { case (positiveRatings, negativeRatings) => // AUC may be viewed as the probability that a random positive item scores // higher than a random negative one. Here the proportion of all positive-negative // pairs that are correctly ranked is computed. The result is equal to the AUC metric. var correct = 0L var total = 0L // For each pairing, for (positive <- positiveRatings; negative <- negativeRatings) { // Count the correctly-ranked pairs if (positive.rating > negative.rating) { correct += 1 } total += 1 } // Return AUC: fraction of pairs ranked correctly correct.toDouble / total }.mean() // Return mean AUC over users } def predictMostListened(sc: SparkContext, train: RDD[Rating])(allData: RDD[(Int,Int)]) = { val bListenCount = sc.broadcast(train.map(r => (r.product, r.rating)).reduceByKey(_ + _).collectAsMap()) allData.map { case (user, product) => Rating(user, product, bListenCount.value.getOrElse(product, 0.0)) } } def buildArtistAlias(rawArtistAlias: RDD[String]): Map[Int,Int] = rawArtistAlias.flatMap { line => val tokens = line.split('\t') if (tokens(0).isEmpty) { None } else { Some((tokens(0).toInt, tokens(1).toInt)) } }.collectAsMap() def buildRatings( rawUserArtistData: RDD[String], bArtistAlias: Broadcast[Map[Int,Int]]) = { rawUserArtistData.map { line => val Array(userID, artistID, count) = line.split(' ').map(_.toInt) val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID) Rating(userID, finalArtistID, count) } }}
转载地址:http://mnmxo.baihongyu.com/