From 8f330e284d4e0a8988c116c089f522c62773e43a Mon Sep 17 00:00:00 2001 From: Alexey Grachev Date: Sat, 13 Jul 2019 09:25:08 +0200 Subject: [PATCH 01/10] WIP: adapting existing file structure for the implementation of the evaluation logic --- src/main/scala/DataSource.scala | 5 +++-- src/main/scala/Engine.scala | 9 ++++++++- src/main/scala/Evaluation.scala | 27 +++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 3 deletions(-) create mode 100644 src/main/scala/Evaluation.scala diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala index cc313f8..50ecd25 100644 --- a/src/main/scala/DataSource.scala +++ b/src/main/scala/DataSource.scala @@ -13,11 +13,12 @@ import org.apache.spark.rdd.RDD import grizzled.slf4j.Logger -case class DataSourceParams(appName: String) extends Params +case class DataSourceEvalParams(kFold: Int, queryNum: Int) +case class DataSourceParams(appName: String, evalParams: Option[DataSourceEvalParams]) extends Params class DataSource(val dsp: DataSourceParams) extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { + EmptyEvaluationInfo, Query, ActualResult] { @transient lazy val logger = Logger[this.type] diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala index 6c6faa3..7a1a32d 100644 --- a/src/main/scala/Engine.scala +++ b/src/main/scala/Engine.scala @@ -3,12 +3,15 @@ package org.template.complementarypurchase import org.apache.predictionio.controller.EngineFactory import org.apache.predictionio.controller.Engine -case class Query(items: Set[String], num: Int) +case class Query( + items: Set[String], num: Int) extends Serializable case class PredictedResult(rules: Array[Rule]) extends Serializable +case class ActualResult(items: Array[Item]) + extends Serializable //case class ItemScore(item: String, score: Double) extends Serializable case class Rule(cond: Set[String], itemScores: Array[ItemScore]) extends Serializable @@ -17,6 +20,10 @@ case class ItemScore( item: String, support: Double, confidence: Double, lift: Double ) extends Serializable +case class Item( + item: String +) + object ComplementaryPurchaseEngine extends EngineFactory { def apply() = { new Engine( diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala new file mode 100644 index 0000000..4c67898 --- /dev/null +++ b/src/main/scala/Evaluation.scala @@ -0,0 +1,27 @@ +package org.template.complementarypurchase + +import org.apache.predictionio.controller.EngineParamsGenerator +import org.apache.predictionio.controller.EngineParams + +trait BaseEngineParamsList extends EngineParamsGenerator { + protected val baseEP = EngineParams( + dataSourceParams = DataSourceParams( + appName = "techselector_v3", + evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10)))) +} + +case class Precision(label: Double) + extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + def calculate(query: Query, predicted: PredictedResult, actual: ActualResult) + : Option[Double] = { + if (predicted.label == label) { + if (predicted.label == actual.label) { + Some(1.0) // True positive + } else { + Some(0.0) // False positive + } + } else { + None // Unrelated case for calcuating precision + } + } +} \ No newline at end of file From 2554364953950a54fc4914316d1b86acc60df514 Mon Sep 17 00:00:00 2001 From: Alexey Grachev Date: Sat, 13 Jul 2019 09:53:46 +0200 Subject: [PATCH 02/10] WIP: adding ComplementaryPurchaseEvaluation object to Evaluation.scala; changing algorithm to precision at k like in recommender engine --- src/main/scala/Evaluation.scala | 52 +++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala index 4c67898..039a956 100644 --- a/src/main/scala/Evaluation.scala +++ b/src/main/scala/Evaluation.scala @@ -2,26 +2,46 @@ package org.template.complementarypurchase import org.apache.predictionio.controller.EngineParamsGenerator import org.apache.predictionio.controller.EngineParams +// Usage: +// $ pio eval org.template.complementarypurchase.ComplementaryPurchaseEvaluation \ +// org.template.complementarypurchase.EngineParamsList +case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0) + extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + require(k > 0, "k must be greater than 0") + + override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)" + + override + def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = { + val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet + + // If there is no positive results, Precision is undefined. We don't consider this case in the + // metrics, hence we return None. + if (positives.size == 0) { + None + } else { + val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size + Some(tpCount.toDouble / math.min(k, positives.size)) + } + } +} +object ComplementaryPurchaseEvaluation extends Evaluation { + engineEvaluator = ( + RecommendationEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 10, ratingThreshold = 4.0), + otherMetrics = Seq( + PositiveCount(ratingThreshold = 4.0), + PrecisionAtK(k = 10, ratingThreshold = 2.0), + PositiveCount(ratingThreshold = 2.0), + PrecisionAtK(k = 10, ratingThreshold = 1.0), + PositiveCount(ratingThreshold = 1.0) + ))) +} trait BaseEngineParamsList extends EngineParamsGenerator { protected val baseEP = EngineParams( dataSourceParams = DataSourceParams( appName = "techselector_v3", evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10)))) } - -case class Precision(label: Double) - extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { - def calculate(query: Query, predicted: PredictedResult, actual: ActualResult) - : Option[Double] = { - if (predicted.label == label) { - if (predicted.label == actual.label) { - Some(1.0) // True positive - } else { - Some(0.0) // False positive - } - } else { - None // Unrelated case for calcuating precision - } - } -} \ No newline at end of file From da5c6faaaa59ff8239717ee3e36d464374fc87de Mon Sep 17 00:00:00 2001 From: Alexey Grachev Date: Sat, 13 Jul 2019 10:43:03 +0200 Subject: [PATCH 03/10] WIP: adding readEval custom logic to DataSource.scala, like in recommender template --- src/main/scala/DataSource.scala | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala index 50ecd25..001d892 100644 --- a/src/main/scala/DataSource.scala +++ b/src/main/scala/DataSource.scala @@ -49,6 +49,32 @@ class DataSource(val dsp: DataSourceParams) } } +override + def readEval(sc: SparkContext) + : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = { + require(!dsp.evalParams.isEmpty, "Must specify evalParams") + val evalParams = dsp.evalParams.get + + val kFold = evalParams.kFold + // val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId + ratings.cache + + (0 until kFold).map { idx => { + // I guess here we have to compare predicted with actual, but i dont know exactly how. + // val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1) no ratings, so commented + // val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1) no ratings, so commented + + val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user) + + (new TrainingData(trainingRatings), + new EmptyEvaluationInfo(), + testingUsers.map { + case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray)) + } + ) + }} + } + case class BuyEvent(user: String, item: String, t: Long) class TrainingData( From 13f47b1da96dfaabb7b0464f8733a559ec832c03 Mon Sep 17 00:00:00 2001 From: Alexey Grachev Date: Sat, 13 Jul 2019 11:12:25 +0200 Subject: [PATCH 04/10] WIP: added comments in DataSource.scala --- src/main/scala/DataSource.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala index 001d892..a21b34a 100644 --- a/src/main/scala/DataSource.scala +++ b/src/main/scala/DataSource.scala @@ -57,13 +57,19 @@ override val kFold = evalParams.kFold // val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId - ratings.cache + //ratings.cache (0 until kFold).map { idx => { // I guess here we have to compare predicted with actual, but i dont know exactly how. // val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1) no ratings, so commented // val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1) no ratings, so commented + // I think here should be the business logic, for that: + // Quote Kenneth: instead of filter by high rating item, + // as long as the items bought together given the input item, + //you treat it as positive actual results and compare with the predicted result. + // Not sure how to imlement this. Some hints? + val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user) (new TrainingData(trainingRatings), From 9bb277b3da3bda041106aa749e1fb9b22600ab06 Mon Sep 17 00:00:00 2001 From: Alexey Grachev Date: Sat, 13 Jul 2019 11:15:28 +0200 Subject: [PATCH 05/10] WIP: fixed folding in DataSource.scala --- src/main/scala/DataSource.scala | 63 ++++++++++++++++----------------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala index a21b34a..1c1168a 100644 --- a/src/main/scala/DataSource.scala +++ b/src/main/scala/DataSource.scala @@ -47,40 +47,39 @@ class DataSource(val dsp: DataSourceParams) new TrainingData(buyEvents) } + override + def readEval(sc: SparkContext) + : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = { + require(!dsp.evalParams.isEmpty, "Must specify evalParams") + val evalParams = dsp.evalParams.get + + val kFold = evalParams.kFold + // val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId + //ratings.cache + + (0 until kFold).map { idx => { + // I guess here we have to compare predicted with actual, but i dont know exactly how. + // val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1) no ratings, so commented + // val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1) no ratings, so commented + + // I think here should be the business logic, for that: + // Quote Kenneth: instead of filter by high rating item, + // as long as the items bought together given the input item, + //you treat it as positive actual results and compare with the predicted result. + // Not sure how to imlement this. Some hints? + + val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user) + + (new TrainingData(trainingRatings), + new EmptyEvaluationInfo(), + testingUsers.map { + case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray)) + } + ) + }} + } } -override - def readEval(sc: SparkContext) - : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = { - require(!dsp.evalParams.isEmpty, "Must specify evalParams") - val evalParams = dsp.evalParams.get - - val kFold = evalParams.kFold - // val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId - //ratings.cache - - (0 until kFold).map { idx => { - // I guess here we have to compare predicted with actual, but i dont know exactly how. - // val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1) no ratings, so commented - // val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1) no ratings, so commented - - // I think here should be the business logic, for that: - // Quote Kenneth: instead of filter by high rating item, - // as long as the items bought together given the input item, - //you treat it as positive actual results and compare with the predicted result. - // Not sure how to imlement this. Some hints? - - val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user) - - (new TrainingData(trainingRatings), - new EmptyEvaluationInfo(), - testingUsers.map { - case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray)) - } - ) - }} - } - case class BuyEvent(user: String, item: String, t: Long) class TrainingData( From 05969cbdaf5f2d9ad185e86f6332bc5588008d88 Mon Sep 17 00:00:00 2001 From: Alexey Grachev Date: Sat, 13 Jul 2019 11:21:55 +0200 Subject: [PATCH 06/10] WIP: removed ratingTreshold parameter from the Evaluation logic --- src/main/scala/Evaluation.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala index 039a956..b4b0bec 100644 --- a/src/main/scala/Evaluation.scala +++ b/src/main/scala/Evaluation.scala @@ -30,13 +30,13 @@ object ComplementaryPurchaseEvaluation extends Evaluation { engineEvaluator = ( RecommendationEngine(), MetricEvaluator( - metric = PrecisionAtK(k = 10, ratingThreshold = 4.0), + metric = PrecisionAtK(k = 10), otherMetrics = Seq( - PositiveCount(ratingThreshold = 4.0), - PrecisionAtK(k = 10, ratingThreshold = 2.0), - PositiveCount(ratingThreshold = 2.0), - PrecisionAtK(k = 10, ratingThreshold = 1.0), - PositiveCount(ratingThreshold = 1.0) + PositiveCount(), + PrecisionAtK(k = 10), + PositiveCount(), + PrecisionAtK(k = 10), + PositiveCount() ))) } trait BaseEngineParamsList extends EngineParamsGenerator { From 2900bdc90c4d7c3891c0c23af2089eb71157d4cf Mon Sep 17 00:00:00 2001 From: Alexey Grachev Date: Sat, 13 Jul 2019 11:52:54 +0200 Subject: [PATCH 07/10] WIP: added engineEvaluator object and namespace Evaluation imported --- engine.json | 2 +- src/main/scala/Evaluation.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/engine.json b/engine.json index efc7575..3b74b8d 100644 --- a/engine.json +++ b/engine.json @@ -4,7 +4,7 @@ "engineFactory": "org.template.complementarypurchase.ComplementaryPurchaseEngine", "datasource": { "params" : { - "appName": "INVALID_APP_NAME" + "appName": "techselector_v3" } }, "algorithms": [ diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala index b4b0bec..50ef41d 100644 --- a/src/main/scala/Evaluation.scala +++ b/src/main/scala/Evaluation.scala @@ -1,5 +1,6 @@ package org.template.complementarypurchase +import org.apache.predictionio.controller.Evaluation import org.apache.predictionio.controller.EngineParamsGenerator import org.apache.predictionio.controller.EngineParams // Usage: @@ -39,6 +40,17 @@ object ComplementaryPurchaseEvaluation extends Evaluation { PositiveCount() ))) } +object ComprehensiveRecommendationEvaluation extends Evaluation { + + // val ratingThresholds = Seq(0.0, 2.0, 4.0) + val ks = Seq(1, 3, 10) + + engineEvaluator = ( + RecommendationEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 3) + )) +} trait BaseEngineParamsList extends EngineParamsGenerator { protected val baseEP = EngineParams( dataSourceParams = DataSourceParams( From 13f62ac164b66633596b633ba8067ca7298eed3c Mon Sep 17 00:00:00 2001 From: Alexey Grachev Date: Sat, 13 Jul 2019 12:04:08 +0200 Subject: [PATCH 08/10] WIP: namespace OptionAvarageMetric in Evaluation.scala imported --- src/main/scala/Evaluation.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala index 50ef41d..edb712b 100644 --- a/src/main/scala/Evaluation.scala +++ b/src/main/scala/Evaluation.scala @@ -3,6 +3,9 @@ package org.template.complementarypurchase import org.apache.predictionio.controller.Evaluation import org.apache.predictionio.controller.EngineParamsGenerator import org.apache.predictionio.controller.EngineParams +import org.apache.predictionio.controller.MetricEvaluator +import org.apache.predictionio.controller.OptionAverageMetric + // Usage: // $ pio eval org.template.complementarypurchase.ComplementaryPurchaseEvaluation \ // org.template.complementarypurchase.EngineParamsList From 3368932e61ea2ab8e56796286a38a23b4fc16bf0 Mon Sep 17 00:00:00 2001 From: Alexey Grachev Date: Sat, 13 Jul 2019 12:17:13 +0200 Subject: [PATCH 09/10] WIP: renamed RecommendationEngine to ComplementaryPurchaseEngine in Evaluation.scala --- src/main/scala/Evaluation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala index edb712b..3320a37 100644 --- a/src/main/scala/Evaluation.scala +++ b/src/main/scala/Evaluation.scala @@ -32,7 +32,7 @@ case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0) } object ComplementaryPurchaseEvaluation extends Evaluation { engineEvaluator = ( - RecommendationEngine(), + ComplementaryPurchaseEngine(), MetricEvaluator( metric = PrecisionAtK(k = 10), otherMetrics = Seq( @@ -49,7 +49,7 @@ object ComprehensiveRecommendationEvaluation extends Evaluation { val ks = Seq(1, 3, 10) engineEvaluator = ( - RecommendationEngine(), + ComplementaryPurchaseEngine(), MetricEvaluator( metric = PrecisionAtK(k = 3) )) From f9bf33a7b859976dcad934237aab794daf476465 Mon Sep 17 00:00:00 2001 From: Alexey Grachev Date: Sat, 13 Jul 2019 12:37:31 +0200 Subject: [PATCH 10/10] WIP: added comments in Evaluation --- src/main/scala/DataSource.scala | 4 +++- src/main/scala/Evaluation.scala | 25 ++++++++++++++----------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala index 1c1168a..725acec 100644 --- a/src/main/scala/DataSource.scala +++ b/src/main/scala/DataSource.scala @@ -58,10 +58,12 @@ class DataSource(val dsp: DataSourceParams) //ratings.cache (0 until kFold).map { idx => { - // I guess here we have to compare predicted with actual, but i dont know exactly how. + // val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1) no ratings, so commented // val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1) no ratings, so commented + // I guess here we have to compare predicted with actual, but i dont know exactly how. + // I think here should be the business logic, for that: // Quote Kenneth: instead of filter by high rating item, // as long as the items bought together given the input item, diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala index 3320a37..5ee7f09 100644 --- a/src/main/scala/Evaluation.scala +++ b/src/main/scala/Evaluation.scala @@ -10,7 +10,7 @@ import org.apache.predictionio.controller.OptionAverageMetric // $ pio eval org.template.complementarypurchase.ComplementaryPurchaseEvaluation \ // org.template.complementarypurchase.EngineParamsList -case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0) +case class PrecisionAtK(k: Int) extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { require(k > 0, "k must be greater than 0") @@ -18,7 +18,11 @@ case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0) override def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = { - val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet + //val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet + + //Here we should filter by items to find only the ones, which were bought + //together with the input item, but how should the syntax look like? + //val positives: Set[String] a.items,filter // If there is no positive results, Precision is undefined. We don't consider this case in the // metrics, hence we return None. @@ -30,19 +34,16 @@ case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0) } } } + object ComplementaryPurchaseEvaluation extends Evaluation { engineEvaluator = ( ComplementaryPurchaseEngine(), MetricEvaluator( - metric = PrecisionAtK(k = 10), - otherMetrics = Seq( - PositiveCount(), - PrecisionAtK(k = 10), - PositiveCount(), - PrecisionAtK(k = 10), - PositiveCount() - ))) + metric = PrecisionAtK(k = 10) + ) + ) } + object ComprehensiveRecommendationEvaluation extends Evaluation { // val ratingThresholds = Seq(0.0, 2.0, 4.0) @@ -52,8 +53,10 @@ object ComprehensiveRecommendationEvaluation extends Evaluation { ComplementaryPurchaseEngine(), MetricEvaluator( metric = PrecisionAtK(k = 3) - )) + ) + ) } + trait BaseEngineParamsList extends EngineParamsGenerator { protected val baseEP = EngineParams( dataSourceParams = DataSourceParams(