Skip to content
2 changes: 1 addition & 1 deletion engine.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"engineFactory": "org.template.complementarypurchase.ComplementaryPurchaseEngine",
"datasource": {
"params" : {
"appName": "INVALID_APP_NAME"
"appName": "techselector_v3"
}
},
"algorithms": [
Expand Down
38 changes: 36 additions & 2 deletions src/main/scala/DataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import org.apache.spark.rdd.RDD

import grizzled.slf4j.Logger

case class DataSourceParams(appName: String) extends Params
case class DataSourceEvalParams(kFold: Int, queryNum: Int)
case class DataSourceParams(appName: String, evalParams: Option[DataSourceEvalParams]) extends Params

class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData,
EmptyEvaluationInfo, Query, EmptyActualResult] {
EmptyEvaluationInfo, Query, ActualResult] {

@transient lazy val logger = Logger[this.type]

Expand Down Expand Up @@ -46,6 +47,39 @@ class DataSource(val dsp: DataSourceParams)

new TrainingData(buyEvents)
}
override
def readEval(sc: SparkContext)
: Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
require(!dsp.evalParams.isEmpty, "Must specify evalParams")
val evalParams = dsp.evalParams.get

val kFold = evalParams.kFold
// val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
//ratings.cache

(0 until kFold).map { idx => {

// val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1) no ratings, so commented
// val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1) no ratings, so commented

// I guess here we have to compare predicted with actual, but i dont know exactly how.

// I think here should be the business logic, for that:
// Quote Kenneth: instead of filter by high rating item,
// as long as the items bought together given the input item,
//you treat it as positive actual results and compare with the predicted result.
// Not sure how to imlement this. Some hints?

val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)

(new TrainingData(trainingRatings),
new EmptyEvaluationInfo(),
testingUsers.map {
case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray))
}
)
}}
}
}

case class BuyEvent(user: String, item: String, t: Long)
Expand Down
9 changes: 8 additions & 1 deletion src/main/scala/Engine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package org.template.complementarypurchase
import org.apache.predictionio.controller.EngineFactory
import org.apache.predictionio.controller.Engine

case class Query(items: Set[String], num: Int)
case class Query(
items: Set[String], num: Int)
extends Serializable

case class PredictedResult(rules: Array[Rule])
extends Serializable

case class ActualResult(items: Array[Item])
extends Serializable
//case class ItemScore(item: String, score: Double) extends Serializable
case class Rule(cond: Set[String], itemScores: Array[ItemScore])
extends Serializable
Expand All @@ -17,6 +20,10 @@ case class ItemScore(
item: String, support: Double, confidence: Double, lift: Double
) extends Serializable

case class Item(
item: String
)

object ComplementaryPurchaseEngine extends EngineFactory {
def apply() = {
new Engine(
Expand Down
65 changes: 65 additions & 0 deletions src/main/scala/Evaluation.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.template.complementarypurchase

import org.apache.predictionio.controller.Evaluation
import org.apache.predictionio.controller.EngineParamsGenerator
import org.apache.predictionio.controller.EngineParams
import org.apache.predictionio.controller.MetricEvaluator
import org.apache.predictionio.controller.OptionAverageMetric

// Usage:
// $ pio eval org.template.complementarypurchase.ComplementaryPurchaseEvaluation \
// org.template.complementarypurchase.EngineParamsList

case class PrecisionAtK(k: Int)
extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
require(k > 0, "k must be greater than 0")

override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)"

override
def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
//val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet

//Here we should filter by items to find only the ones, which were bought
//together with the input item, but how should the syntax look like?
//val positives: Set[String] a.items,filter

// If there is no positive results, Precision is undefined. We don't consider this case in the
// metrics, hence we return None.
if (positives.size == 0) {
None
} else {
val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size
Some(tpCount.toDouble / math.min(k, positives.size))
}
}
}

object ComplementaryPurchaseEvaluation extends Evaluation {
engineEvaluator = (
ComplementaryPurchaseEngine(),
MetricEvaluator(
metric = PrecisionAtK(k = 10)
)
)
}

object ComprehensiveRecommendationEvaluation extends Evaluation {

// val ratingThresholds = Seq(0.0, 2.0, 4.0)
val ks = Seq(1, 3, 10)

engineEvaluator = (
ComplementaryPurchaseEngine(),
MetricEvaluator(
metric = PrecisionAtK(k = 3)
)
)
}

trait BaseEngineParamsList extends EngineParamsGenerator {
protected val baseEP = EngineParams(
dataSourceParams = DataSourceParams(
appName = "techselector_v3",
evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10))))
}