diff --git a/engine.json b/engine.json index efc7575..3b74b8d 100644 --- a/engine.json +++ b/engine.json @@ -4,7 +4,7 @@ "engineFactory": "org.template.complementarypurchase.ComplementaryPurchaseEngine", "datasource": { "params" : { - "appName": "INVALID_APP_NAME" + "appName": "techselector_v3" } }, "algorithms": [ diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala index cc313f8..725acec 100644 --- a/src/main/scala/DataSource.scala +++ b/src/main/scala/DataSource.scala @@ -13,11 +13,12 @@ import org.apache.spark.rdd.RDD import grizzled.slf4j.Logger -case class DataSourceParams(appName: String) extends Params +case class DataSourceEvalParams(kFold: Int, queryNum: Int) +case class DataSourceParams(appName: String, evalParams: Option[DataSourceEvalParams]) extends Params class DataSource(val dsp: DataSourceParams) extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { + EmptyEvaluationInfo, Query, ActualResult] { @transient lazy val logger = Logger[this.type] @@ -46,6 +47,39 @@ class DataSource(val dsp: DataSourceParams) new TrainingData(buyEvents) } + override + def readEval(sc: SparkContext) + : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = { + require(!dsp.evalParams.isEmpty, "Must specify evalParams") + val evalParams = dsp.evalParams.get + + val kFold = evalParams.kFold + // val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId + //ratings.cache + + (0 until kFold).map { idx => { + + // val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1) no ratings, so commented + // val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1) no ratings, so commented + + // I guess here we have to compare predicted with actual, but i dont know exactly how. + + // I think here should be the business logic, for that: + // Quote Kenneth: instead of filter by high rating item, + // as long as the items bought together given the input item, + //you treat it as positive actual results and compare with the predicted result. + // Not sure how to imlement this. Some hints? + + val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user) + + (new TrainingData(trainingRatings), + new EmptyEvaluationInfo(), + testingUsers.map { + case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray)) + } + ) + }} + } } case class BuyEvent(user: String, item: String, t: Long) diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala index 6c6faa3..7a1a32d 100644 --- a/src/main/scala/Engine.scala +++ b/src/main/scala/Engine.scala @@ -3,12 +3,15 @@ package org.template.complementarypurchase import org.apache.predictionio.controller.EngineFactory import org.apache.predictionio.controller.Engine -case class Query(items: Set[String], num: Int) +case class Query( + items: Set[String], num: Int) extends Serializable case class PredictedResult(rules: Array[Rule]) extends Serializable +case class ActualResult(items: Array[Item]) + extends Serializable //case class ItemScore(item: String, score: Double) extends Serializable case class Rule(cond: Set[String], itemScores: Array[ItemScore]) extends Serializable @@ -17,6 +20,10 @@ case class ItemScore( item: String, support: Double, confidence: Double, lift: Double ) extends Serializable +case class Item( + item: String +) + object ComplementaryPurchaseEngine extends EngineFactory { def apply() = { new Engine( diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala new file mode 100644 index 0000000..5ee7f09 --- /dev/null +++ b/src/main/scala/Evaluation.scala @@ -0,0 +1,65 @@ +package org.template.complementarypurchase + +import org.apache.predictionio.controller.Evaluation +import org.apache.predictionio.controller.EngineParamsGenerator +import org.apache.predictionio.controller.EngineParams +import org.apache.predictionio.controller.MetricEvaluator +import org.apache.predictionio.controller.OptionAverageMetric + +// Usage: +// $ pio eval org.template.complementarypurchase.ComplementaryPurchaseEvaluation \ +// org.template.complementarypurchase.EngineParamsList + +case class PrecisionAtK(k: Int) + extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + require(k > 0, "k must be greater than 0") + + override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)" + + override + def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = { + //val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet + + //Here we should filter by items to find only the ones, which were bought + //together with the input item, but how should the syntax look like? + //val positives: Set[String] a.items,filter + + // If there is no positive results, Precision is undefined. We don't consider this case in the + // metrics, hence we return None. + if (positives.size == 0) { + None + } else { + val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size + Some(tpCount.toDouble / math.min(k, positives.size)) + } + } +} + +object ComplementaryPurchaseEvaluation extends Evaluation { + engineEvaluator = ( + ComplementaryPurchaseEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 10) + ) + ) +} + +object ComprehensiveRecommendationEvaluation extends Evaluation { + + // val ratingThresholds = Seq(0.0, 2.0, 4.0) + val ks = Seq(1, 3, 10) + + engineEvaluator = ( + ComplementaryPurchaseEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 3) + ) + ) +} + +trait BaseEngineParamsList extends EngineParamsGenerator { + protected val baseEP = EngineParams( + dataSourceParams = DataSourceParams( + appName = "techselector_v3", + evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10)))) +}