Skip to content
Snippets Groups Projects
Commit bcdd259c authored by Yanbo Liang's avatar Yanbo Liang
Browse files

[SPARK-15509][FOLLOW-UP][ML][SPARKR] R MLlib algorithms should support input...

[SPARK-15509][FOLLOW-UP][ML][SPARKR] R MLlib algorithms should support input columns "features" and "label"

## What changes were proposed in this pull request?
#13584 resolved the issue of features and label columns conflict with ```RFormula``` default ones when loading libsvm data, but it still left some issues should be resolved:
1, It’s not necessary to check and rename label column.
Since we have considerations on the design of ```RFormula```, it can handle the case of label column already exists(with restriction of the existing label column should be numeric/boolean type). So it’s not necessary to change the column name to avoid conflict. If the label column is not numeric/boolean type, ```RFormula``` will throw exception.

2, We should rename features column name to new one if there is conflict, but appending a random value is enough since it was used internally only. We done similar work when implementing ```SQLTransformer```.

3, We should set correct new features column for the estimators. Take ```GLM``` as example:
```GLM``` estimator should set features column with the changed one(rFormula.getFeaturesCol) rather than the default “features”. Although it’s same when training model, but it involves problems when predicting. The following is the prediction result of GLM before this PR:
![image](https://cloud.githubusercontent.com/assets/1962026/18308227/84c3c452-74a8-11e6-9caa-9d6d846cc957.png)
We should drop the internal used feature column name, otherwise, it will appear on the prediction DataFrame which will confused users. And this behavior is same as other scenarios which does not exist column name conflict.
After this PR:
![image](https://cloud.githubusercontent.com/assets/1962026/18308240/92082a04-74a8-11e6-9226-801f52b856d9.png)

## How was this patch tested?
Existing unit tests.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #14993 from yanboliang/spark-15509.
parent 1fec3ce4
No related branches found
No related tags found
No related merge requests found
......@@ -99,6 +99,7 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val aft = new AFTSurvivalRegression()
.setCensorCol(censorCol)
.setFitIntercept(rFormula.hasIntercept)
.setFeaturesCol(rFormula.getFeaturesCol)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, aft))
......
......@@ -85,6 +85,7 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
.setK(k)
.setMaxIter(maxIter)
.setTol(tol)
.setFeaturesCol(rFormula.getFeaturesCol)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, gm))
......
......@@ -89,6 +89,7 @@ private[r] object GeneralizedLinearRegressionWrapper
.setMaxIter(maxIter)
.setWeightCol(weightCol)
.setRegParam(regParam)
.setFeaturesCol(rFormula.getFeaturesCol)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, glr))
.fit(data)
......
......@@ -75,6 +75,7 @@ private[r] object IsotonicRegressionWrapper
.setIsotonic(isotonic)
.setFeatureIndex(featureIndex)
.setWeightCol(weightCol)
.setFeaturesCol(rFormula.getFeaturesCol)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, isotonicRegression))
......
......@@ -86,6 +86,7 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
.setK(k)
.setMaxIter(maxIter)
.setInitMode(initMode)
.setFeaturesCol(rFormula.getFeaturesCol)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, kMeans))
......
......@@ -73,6 +73,7 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] {
val naiveBayes = new NaiveBayes()
.setSmoothing(smoothing)
.setModelType("bernoulli")
.setFeaturesCol(rFormula.getFeaturesCol)
.setPredictionCol(PREDICTED_LABEL_INDEX_COL)
val idxToStr = new IndexToString()
.setInputCol(PREDICTED_LABEL_INDEX_COL)
......
......@@ -19,14 +19,15 @@ package org.apache.spark.ml.r
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.Dataset
object RWrapperUtils extends Logging {
/**
* DataFrame column check.
* When loading data, default columns "features" and "label" will be added. And these two names
* would conflict with RFormula default feature and label column names.
* When loading libsvm data, default columns "features" and "label" will be added.
* And "features" would conflict with RFormula default feature column names.
* Here is to change the column name to avoid "column already exists" error.
*
* @param rFormula RFormula instance
......@@ -34,38 +35,11 @@ object RWrapperUtils extends Logging {
* @return Unit
*/
def checkDataColumns(rFormula: RFormula, data: Dataset[_]): Unit = {
if (data.schema.fieldNames.contains(rFormula.getLabelCol)) {
val newLabelName = convertToUniqueName(rFormula.getLabelCol, data.schema.fieldNames)
logWarning(
s"data containing ${rFormula.getLabelCol} column, using new name $newLabelName instead")
rFormula.setLabelCol(newLabelName)
}
if (data.schema.fieldNames.contains(rFormula.getFeaturesCol)) {
val newFeaturesName = convertToUniqueName(rFormula.getFeaturesCol, data.schema.fieldNames)
val newFeaturesName = s"${Identifiable.randomUID(rFormula.getFeaturesCol)}"
logWarning(s"data containing ${rFormula.getFeaturesCol} column, " +
s"using new name $newFeaturesName instead")
rFormula.setFeaturesCol(newFeaturesName)
}
}
/**
* Convert conflicting name to be an unique name.
* Appending a sequence number, like originalName_output1
* and incrementing until it is not already there
*
* @param originalName Original name
* @param fieldNames Array of field names in existing schema
* @return String
*/
def convertToUniqueName(originalName: String, fieldNames: Array[String]): String = {
var counter = 1
var newName = originalName + "_output"
while (fieldNames.contains(newName)) {
newName = originalName + "_output" + counter
counter += 1
}
newName
}
}
......@@ -35,22 +35,14 @@ class RWrapperUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
// after checking, model build is ok
RWrapperUtils.checkDataColumns(rFormula, data)
assert(rFormula.getLabelCol == "label_output")
assert(rFormula.getFeaturesCol == "features_output")
assert(rFormula.getLabelCol == "label")
assert(rFormula.getFeaturesCol.startsWith("features_"))
val model = rFormula.fit(data)
assert(model.isInstanceOf[RFormulaModel])
assert(model.getLabelCol == "label_output")
assert(model.getFeaturesCol == "features_output")
}
test("generate unique name by appending a sequence number") {
val originalName = "label"
val fieldNames = Array("label_output", "label_output1", "label_output2")
val newName = RWrapperUtils.convertToUniqueName(originalName, fieldNames)
assert(newName === "label_output3")
assert(model.getLabelCol == "label")
assert(model.getFeaturesCol.startsWith("features_"))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment