博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark高级数据分析· 3推荐引擎
阅读量:6671 次
发布时间:2019-06-25

本文共 7712 字,大约阅读时间需要 25 分钟。

推荐算法流程

预备

wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gzcd /Users/erichan/garden/spark-1.6.0-bin-hadoop2.6/bin./spark-shell --master local --driver-memory 6g

1 准备数据

val data ="/Users/erichan/AliDrive/ml_spark/data/profiledata_06-May-2005"val rawUserArtistData = sc.textFile(data+"/user_artist_data.txt",10)// ALS 需要ID必须为数值型rawUserArtistData.first//res3: String = 1092764  1000311//rawUserArtistData.map(_.split(' ')(0).toDouble).stats()//res10: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000)//rawUserArtistData.map(_.split(' ')(1).toDouble).stats()//res11: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000)val rawArtistData = sc.textFile(data+"/artist_data.txt")//rawArtistData.first//res12: String = 1134999    06Crazy Lifeval artistByID = rawArtistData.flatMap { line =>  val (id, name) = line.span(_ != '\t')  if (name.isEmpty) {    None  }else{    try {      Some((id.toInt, name.trim))    } catch {      case e: NumberFormatException => None    }  }}val rawArtistAlias = sc.textFile(data+"/artist_alias.txt")val artistAlias = rawArtistAlias.flatMap { line =>  val tokens = line.split('\t')  if (tokens(0).isEmpty) {    None  }else{    Some((tokens(0).toInt, tokens(1).toInt))  }}.collectAsMap()//artistByID.lookup(1000010).head//res14: String = Aerosmith

2 建模

import org.apache.spark.mllib.recommendation._val bArtistAlias = sc.broadcast(artistAlias)val trainData = rawUserArtistData.map { line =>  val Array(userID, artistID, count) = line.split(' ').map(_.toInt)  val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)  Rating(userID, finalArtistID, count)}.cache()val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)

3 检验

val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).filter {  case Array(user,_,_) => user.toInt == 2093760}val existingProducts = rawArtistsForUser.map {  case Array(_,artist,_) => artist.toInt}.collect().toSetartistByID.filter {  case (id, name) => existingProducts.contains(id)}.values.collect().foreach(println)val recommendations = model.recommendProducts(2093760, 5)recommendations.foreach(println)val recommendedProductIDs = recommendations.map(_.product).toSetartistByID.filter {  case (id, name) => recommendedProductIDs.contains(id)}.values.collect().foreach(println)

4 评价

:load /Users/erichan/sourcecode/book/aas/ch03-recommender/src/main/scala/RunAUC.scalaval bArtistAlias = sc.broadcast(RunAUC.buildArtistAlias(rawArtistAlias))val allData = RunAUC.buildRatings(rawUserArtistData, bArtistAlias)val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1))trainData.cache()cvData.cache()val allItemIDs = allData.map(_.product).distinct().collect()val bAllItemIDs = sc.broadcast(allItemIDs)val mostListenedAUC = RunAUC.areaUnderCurve(cvData, bAllItemIDs, RunAUC.predictMostListened(sc, trainData))println(mostListenedAUC)//0.9395286660878177trainData.unpersist()cvData.unpersist()

5 推荐

val someUsers = allData.map(_.user).distinct().take(100)val someRecommendations = someUsers.map(userID => model.recommendProducts(userID, 5))someRecommendations.map(  recs => recs.head.user + " -> " + recs.map(_.product).mkString(", ")).foreach(println)

附录

RunAUC.scala

import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.broadcast.Broadcastimport org.apache.spark.mllib.recommendation._import org.apache.spark.rdd.RDDimport scala.collection.Mapimport scala.collection.mutable.ArrayBufferimport scala.util.Random/**  * Created by erichan  * on 16/1/26.  */object RunAUC {  def areaUnderCurve(                      positiveData: RDD[Rating],                      bAllItemIDs: Broadcast[Array[Int]],                      predictFunction: (RDD[(Int,Int)] => RDD[Rating])) = {    // What this actually computes is AUC, per user. The result is actually something    // that might be called "mean AUC".    // Take held-out data as the "positive", and map to tuples    val positiveUserProducts = positiveData.map(r => (r.user, r.product))    // Make predictions for each of them, including a numeric score, and gather by user    val positivePredictions = predictFunction(positiveUserProducts).groupBy(_.user)    // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of    // small AUC problems, and it would be inefficient, when a direct computation is available.    // Create a set of "negative" products for each user. These are randomly chosen    // from among all of the other items, excluding those that are "positive" for the user.    val negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions {      // mapPartitions operates on many (user,positive-items) pairs at once      userIDAndPosItemIDs => {        // Init an RNG and the item IDs set once for partition        val random = new Random()        val allItemIDs = bAllItemIDs.value        userIDAndPosItemIDs.map { case (userID, posItemIDs) =>          val posItemIDSet = posItemIDs.toSet          val negative = new ArrayBuffer[Int]()          var i = 0          // Keep about as many negative examples per user as positive.          // Duplicates are OK          while (i < allItemIDs.size && negative.size < posItemIDSet.size) {            val itemID = allItemIDs(random.nextInt(allItemIDs.size))            if (!posItemIDSet.contains(itemID)) {              negative += itemID            }            i += 1          }          // Result is a collection of (user,negative-item) tuples          negative.map(itemID => (userID, itemID))        }      }    }.flatMap(t => t)    // flatMap breaks the collections above down into one big set of tuples    // Make predictions on the rest:    val negativePredictions = predictFunction(negativeUserProducts).groupBy(_.user)    // Join positive and negative by user    positivePredictions.join(negativePredictions).values.map {      case (positiveRatings, negativeRatings) =>        // AUC may be viewed as the probability that a random positive item scores        // higher than a random negative one. Here the proportion of all positive-negative        // pairs that are correctly ranked is computed. The result is equal to the AUC metric.        var correct = 0L        var total = 0L        // For each pairing,        for (positive <- positiveRatings;             negative <- negativeRatings) {          // Count the correctly-ranked pairs          if (positive.rating > negative.rating) {            correct += 1          }          total += 1        }        // Return AUC: fraction of pairs ranked correctly        correct.toDouble / total    }.mean() // Return mean AUC over users  }  def predictMostListened(sc: SparkContext, train: RDD[Rating])(allData: RDD[(Int,Int)]) = {    val bListenCount =      sc.broadcast(train.map(r => (r.product, r.rating)).reduceByKey(_ + _).collectAsMap())    allData.map { case (user, product) =>      Rating(user, product, bListenCount.value.getOrElse(product, 0.0))    }  }  def buildArtistAlias(rawArtistAlias: RDD[String]): Map[Int,Int] =    rawArtistAlias.flatMap { line =>      val tokens = line.split('\t')      if (tokens(0).isEmpty) {        None      } else {        Some((tokens(0).toInt, tokens(1).toInt))      }    }.collectAsMap()  def buildRatings(                    rawUserArtistData: RDD[String],                    bArtistAlias: Broadcast[Map[Int,Int]]) = {    rawUserArtistData.map { line =>      val Array(userID, artistID, count) = line.split(' ').map(_.toInt)      val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)      Rating(userID, finalArtistID, count)    }  }}

转载地址:http://mnmxo.baihongyu.com/

你可能感兴趣的文章
在vue项目中使用vuex
查看>>
服务器从零开始(1D)-user+sudo+vnc
查看>>
一张图让自己搞懂(mēng)原型&原型链
查看>>
Mybatis N+1问题解析
查看>>
前端每日实战:75# 视频演示如何用纯 CSS 创作一支摇曳着烛光的蜡烛
查看>>
我为什么要升级到Ionic3
查看>>
Elixir: 函数装饰器
查看>>
Java并发编程之volatile关键字解析
查看>>
309. Best Time to Buy and Sell Stock with Cooldown
查看>>
Git 2.7: 一个新的带来许多新特性和性能提升的主要版本
查看>>
jDays 2016综合报道
查看>>
大规模学习该如何权衡得失?解读NeurIPS 2018时间检验奖获奖论文
查看>>
解读2015之Spark篇:新生态系统的形成
查看>>
Node和JS基金会宣布合并为 OpenJS 基金会
查看>>
编转码、CDN和AI是如何撑起短视频数百亿市场规模的
查看>>
取代Python多进程!伯克利开源分布式框架Ray
查看>>
如何对DevOps数据库进行源代码控制
查看>>
虚拟主播上线:多模态将改变人机交互的未来
查看>>
Hyperledger Grid:一个用于分布式供应链解决方案的框架
查看>>
.NET或将引入类型类和扩展
查看>>