From 22718c55c519b5297ca5310d52bcba5f2e7e23f1 Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Fri, 27 Feb 2026 07:04:12 -0800 Subject: [PATCH 1/6] Ensure non duplicate column names --- .../synapse/ml/lightgbm/LightGBMBase.scala | 35 +++++++- .../split1/VerifyLightGBMCommon.scala | 88 +++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala index 79a17931c7c..3aa6afe1a94 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala @@ -208,12 +208,45 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] val colNames = attributes.indices.map(_.toString).toArray attributes.foreach(attr => attr.index.foreach(index => colNames(index) = attr.name.getOrElse(index.toString))) - Some(colNames) + // Ensure unique feature names to avoid LightGBM error: + // "Feature (Column_) appears more than one time" + // This can occur in Spark 3.5+ due to changes in AttributeGroup metadata handling + Some(ensureUniqueFeatureNames(colNames)) } ) } } + /** Ensures all feature names are unique by appending indices to duplicates. + * This is necessary because LightGBM requires unique feature names, and in Spark 3.5+ + * the AttributeGroup metadata handling may produce duplicate names (e.g., "Column_0", "Column_0"). + * + * @param names The array of feature names that may contain duplicates. + * @return An array of unique feature names. + */ + private def ensureUniqueFeatureNames(names: Array[String]): Array[String] = { + val nameCounts = scala.collection.mutable.Map[String, Int]() + val uniqueNames = names.map { name => + val count = nameCounts.getOrElse(name, 0) + nameCounts(name) = count + 1 + if (count > 0) { + s"${name}_$count" + } else { + name + } + } + + // Check if any names were duplicated and log a warning + val duplicates = nameCounts.filter(_._2 > 1).keys.toSeq + if (duplicates.nonEmpty) { + logWarning(s"Duplicate feature names detected and renamed: ${duplicates.mkString(", ")}. " + + "This may occur in Spark 3.5+ due to changes in metadata handling. " + + "Consider setting the 'slotNames' parameter explicitly to avoid this.") + } + + uniqueNames + } + private def validateSlotNames(featuresSchema: StructField): Unit = { val metadata = AttributeGroup.fromStructField(featuresSchema) if (metadata.attributes.isDefined) { diff --git a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMCommon.scala b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMCommon.scala index c4169cd5bd6..437bca9c6b0 100644 --- a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMCommon.scala +++ b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMCommon.scala @@ -294,4 +294,92 @@ class VerifyLightGBMCommon extends TestBase with LightGBMTestUtils { (conv(up.last) + conv(down.head)) / fromInt(2) } } + + test("Verify duplicate feature names are handled correctly (Spark 3.5+ fix)") { + // This test verifies the fix for the error: + // "Dataset create from samples call failed in LightGBM with error: Feature (Column_) appears more than one time" + // This issue occurs in Spark 3.5+ due to changes in AttributeGroup metadata handling. + + import org.apache.spark.ml.attribute.{AttributeGroup, NumericAttribute} + import org.apache.spark.ml.linalg.Vectors + import org.apache.spark.sql.types.{DoubleType, StructField, StructType} + + // Create a DataFrame with a features column that has duplicate attribute names + // This simulates what can happen in Spark 3.5+ when VectorAssembler creates metadata + val data = Seq( + (0.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)), + (1.0, Vectors.dense(2.0, 3.0, 4.0, 5.0)), + (0.0, Vectors.dense(3.0, 4.0, 5.0, 6.0)), + (1.0, Vectors.dense(4.0, 5.0, 6.0, 7.0)) + ) + + val df = spark.createDataFrame(data).toDF(labelCol, featuresCol) + + // Create attributes with duplicate names (simulating Spark 3.5+ behavior) + val attrs = Array( + NumericAttribute.defaultAttr.withName("Column_").withIndex(0), + NumericAttribute.defaultAttr.withName("Column_").withIndex(1), // Duplicate name + NumericAttribute.defaultAttr.withName("Column_").withIndex(2), // Duplicate name + NumericAttribute.defaultAttr.withName("unique_col").withIndex(3) + ) + val attrGroup = new AttributeGroup(featuresCol, attrs.map(_.asInstanceOf[org.apache.spark.ml.attribute.Attribute])) + + // Apply the metadata with duplicate names to the features column + val dfWithDuplicateNames = df.withColumn( + featuresCol, + df(featuresCol).as(featuresCol, attrGroup.toMetadata()) + ) + + // Create and fit the model - this should NOT throw the error anymore + val model = new LightGBMClassifier() + .setFeaturesCol(featuresCol) + .setLabelCol(labelCol) + .setDefaultListenPort(getAndIncrementPort()) + .setNumLeaves(5) + .setNumIterations(5) + .setObjective("binary") + .setExecutionMode("streaming") + + // This should succeed without throwing "Feature (Column_) appears more than one time" + val trainedModel = model.fit(dfWithDuplicateNames) + val predictions = trainedModel.transform(dfWithDuplicateNames) + + // Verify predictions were generated + assert(predictions.count() == 4) + assert(predictions.select("prediction").collect().length == 4) + } + + test("Verify explicit slotNames parameter overrides metadata") { + // This test verifies that explicitly setting slotNames takes precedence + // over metadata-derived names + + import org.apache.spark.ml.linalg.Vectors + + val data = Seq( + (0.0, Vectors.dense(1.0, 2.0, 3.0)), + (1.0, Vectors.dense(2.0, 3.0, 4.0)), + (0.0, Vectors.dense(3.0, 4.0, 5.0)), + (1.0, Vectors.dense(4.0, 5.0, 6.0)) + ) + + val df = spark.createDataFrame(data).toDF(labelCol, featuresCol) + + // Create model with explicit slotNames + val explicitNames = Array("feature_a", "feature_b", "feature_c") + val model = new LightGBMClassifier() + .setFeaturesCol(featuresCol) + .setLabelCol(labelCol) + .setDefaultListenPort(getAndIncrementPort()) + .setNumLeaves(5) + .setNumIterations(5) + .setObjective("binary") + .setSlotNames(explicitNames) + .setExecutionMode("streaming") + + // Fit and verify + val trainedModel = model.fit(df) + val predictions = trainedModel.transform(df) + + assert(predictions.count() == 4) + } } From 959b1bfa1689c2f4f2d2e1c9b03aa5d4bd003686 Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Fri, 27 Feb 2026 09:33:33 -0800 Subject: [PATCH 2/6] LightGBM ensureUniqueFeatureNames --- .../synapse/ml/lightgbm/LightGBMBase.scala | 32 ++++++++++++++----- .../dataset/ReferenceDatasetUtils.scala | 15 +++++++-- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala index 3aa6afe1a94..c4fc496932b 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala @@ -217,29 +217,39 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] } } - /** Ensures all feature names are unique by appending indices to duplicates. - * This is necessary because LightGBM requires unique feature names, and in Spark 3.5+ - * the AttributeGroup metadata handling may produce duplicate names (e.g., "Column_0", "Column_0"). + /** + * Ensures all feature names are unique by appending indices to duplicates. + * This is necessary because Spark 3.5+ can generate duplicate feature names + * in AttributeGroup metadata, which causes LightGBM to fail with: + * "Feature (Column_) appears more than one time" * * @param names The array of feature names that may contain duplicates. - * @return An array of unique feature names. + * @return An array with unique feature names. */ private def ensureUniqueFeatureNames(names: Array[String]): Array[String] = { val nameCounts = scala.collection.mutable.Map[String, Int]() + val seenNames = scala.collection.mutable.Set[String]() val uniqueNames = names.map { name => val count = nameCounts.getOrElse(name, 0) nameCounts(name) = count + 1 if (count > 0) { - s"${name}_$count" + // Find a unique suffix + var newName = s"${name}_$count" + while (seenNames.contains(newName)) { + nameCounts(name) = nameCounts(name) + 1 + newName = s"${name}_${nameCounts(name)}" + } + seenNames.add(newName) + newName } else { + seenNames.add(name) name } } - // Check if any names were duplicated and log a warning val duplicates = nameCounts.filter(_._2 > 1).keys.toSeq if (duplicates.nonEmpty) { - logWarning(s"Duplicate feature names detected and renamed: ${duplicates.mkString(", ")}. " + + log.warn(s"Duplicate feature names detected and renamed: ${duplicates.mkString(", ")}. " + "This may occur in Spark 3.5+ due to changes in metadata handling. " + "Consider setting the 'slotNames' parameter explicitly to avoid this.") } @@ -455,7 +465,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] val (serializedReferenceDataset: Option[Array[Byte]], partitionCounts: Option[Array[Long]]) = if (isStreamingMode) { val (referenceDataset, partitionCounts) = - calculateRowStatistics(trainingData, trainParams, numCols, measures) + calculateRowStatistics(trainingData, trainParams, numCols, featuresSchema, measures) // Save the reference Dataset so it's available to client and other batches if (getReferenceDataset.isEmpty) { @@ -536,12 +546,14 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] * @param dataframe The dataset to train on. * @param trainingParams The training parameters. * @param numCols The number of feature columns. + * @param featuresSchema The schema of the features column. * @param measures Instrumentation measures. * @return The serialized Dataset reference and an array of partition counts. */ private def calculateRowStatistics(dataframe: DataFrame, trainingParams: BaseTrainParams, numCols: Int, + featuresSchema: StructField, measures: InstrumentationMeasures): (Array[Byte], Array[Long]) = { measures.markRowStatisticsStart() @@ -556,6 +568,9 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] trainingParams.generalParams.categoricalFeatures, trainingParams.executionParams.numThreads) + // Get feature names to set on the reference dataset (ensures unique names for Spark 3.5+) + val featureNames = getSlotNamesWithMetadata(featuresSchema) + // Either get a reference dataset (as bytes) from params, or calculate it val precalculatedDataset = getReferenceDataset val serializedReference = if (precalculatedDataset.nonEmpty) { @@ -574,6 +589,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] totalNumRows, numCols, collectedSampleData, + featureNames, measures, log) } diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/ReferenceDatasetUtils.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/ReferenceDatasetUtils.scala index 63743835f93..c0cb2483e62 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/ReferenceDatasetUtils.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/ReferenceDatasetUtils.scala @@ -16,6 +16,7 @@ object ReferenceDatasetUtils { numRows: Long, numCols: Int, sampledRowData: Array[Row], + featureNames: Option[Array[String]], measures: InstrumentationMeasures, log: Logger): Array[Byte] = { log.info(s"Creating reference training dataset with ${sampledRowData.length} samples and config: $datasetParams") @@ -46,9 +47,19 @@ object ReferenceDatasetUtils { datasetParams, datasetVoidPtr), "Dataset create from samples") - - // 2. Serialize the raw dataset to a native buffer val datasetHandle: SWIGTYPE_p_void = lightgbmlib.voidpp_value(datasetVoidPtr) + + // 2. Set feature names BEFORE serialization to avoid duplicate name errors in Spark 3.5+ + // This must happen after dataset creation but before serialization + featureNames.foreach { names => + if (names.nonEmpty) { + log.info(s"Setting ${names.length} feature names on reference dataset") + LightGBMUtils.validate(lightgbmlib.LGBM_DatasetSetFeatureNames(datasetHandle, names, numCols), + "Dataset set feature names") + } + } + + // 3. Serialize the raw dataset to a native buffer LightGBMUtils.validate(lightgbmlib.LGBM_DatasetSerializeReferenceToBinary( datasetHandle, bufferHandlePtr, From b60d04b1cb73a4dddd52b9d37d58b21f6625008c Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Fri, 27 Feb 2026 09:48:16 -0800 Subject: [PATCH 3/6] Fix scala style issues --- .../synapse/ml/lightgbm/LightGBMBase.scala | 11 +-- .../dataset/ReferenceDatasetUtils.scala | 96 ++++++++++--------- 2 files changed, 55 insertions(+), 52 deletions(-) diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala index c4fc496932b..5a9636ae99a 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala @@ -233,12 +233,11 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] val count = nameCounts.getOrElse(name, 0) nameCounts(name) = count + 1 if (count > 0) { - // Find a unique suffix - var newName = s"${name}_$count" - while (seenNames.contains(newName)) { - nameCounts(name) = nameCounts(name) + 1 - newName = s"${name}_${nameCounts(name)}" - } + // Find a unique suffix using Stream to avoid while loop + val newName = Iterator.from(count) + .map(i => s"${name}_$i") + .find(n => !seenNames.contains(n)) + .get // Safe because Iterator.from is infinite seenNames.add(newName) newName } else { diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/ReferenceDatasetUtils.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/ReferenceDatasetUtils.scala index c0cb2483e62..d3df64a7d10 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/ReferenceDatasetUtils.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/ReferenceDatasetUtils.scala @@ -10,6 +10,7 @@ import org.apache.spark.sql._ import org.slf4j.Logger +// scalastyle:off method.length object ReferenceDatasetUtils { def createReferenceDatasetFromSample(datasetParams: String, featuresCol: String, @@ -21,65 +22,68 @@ object ReferenceDatasetUtils { log: Logger): Array[Byte] = { log.info(s"Creating reference training dataset with ${sampledRowData.length} samples and config: $datasetParams") - // Pre-create allocated native pointers so it's easy to clean them up - val datasetVoidPtr = lightgbmlib.voidpp_handle() val lenPtr = lightgbmlib.new_intp() val bufferHandlePtr = lightgbmlib.voidpp_handle() - val sampledData = SampledData(sampledRowData.length, numCols) + try { - // create properly formatted sampled data measures.markSamplingStart() sampledRowData.zipWithIndex.foreach({case (row, index) => sampledData.pushRow(row, index, featuresCol)}) measures.markSamplingStop() - // Create dataset from samples - // 1. Generate the dataset for features - val datasetVoidPtr = lightgbmlib.voidpp_handle() - LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromSampledColumn( - sampledData.getSampleData, - sampledData.getSampleIndices, - numCols, - sampledData.getRowCounts, - sampledData.numRows, - 1, // Used for allocation and must be > 0, but we don't use this reference set for data collection - numRows, - datasetParams, - datasetVoidPtr), "Dataset create from samples") - - val datasetHandle: SWIGTYPE_p_void = lightgbmlib.voidpp_value(datasetVoidPtr) - - // 2. Set feature names BEFORE serialization to avoid duplicate name errors in Spark 3.5+ - // This must happen after dataset creation but before serialization - featureNames.foreach { names => - if (names.nonEmpty) { - log.info(s"Setting ${names.length} feature names on reference dataset") - LightGBMUtils.validate(lightgbmlib.LGBM_DatasetSetFeatureNames(datasetHandle, names, numCols), - "Dataset set feature names") - } - } - - // 3. Serialize the raw dataset to a native buffer - LightGBMUtils.validate(lightgbmlib.LGBM_DatasetSerializeReferenceToBinary( - datasetHandle, - bufferHandlePtr, - lenPtr), "Serialize ref") - val bufferLen: Int = lightgbmlib.intp_value(lenPtr) - log.info(s"Created serialized reference dataset of length $bufferLen") - - // The dataset is now serialized to a buffer, so we don't need original - LightGBMUtils.validate(lightgbmlib.LGBM_DatasetFree(datasetHandle), "Free Dataset") - - // This will also free the buffer - toByteArray(bufferHandlePtr, bufferLen) - } - finally { + val datasetHandle = createDatasetFromSamples(sampledData, numCols, numRows, datasetParams) + setFeatureNamesIfProvided(datasetHandle, featureNames, numCols, log) + serializeAndCleanup(datasetHandle, bufferHandlePtr, lenPtr, log) + } finally { sampledData.delete() - lightgbmlib.delete_voidpp(datasetVoidPtr) lightgbmlib.delete_voidpp(bufferHandlePtr) lightgbmlib.delete_intp(lenPtr) } } + // scalastyle:on method.length + + private def createDatasetFromSamples(sampledData: SampledData, + numCols: Int, + numRows: Long, + datasetParams: String): SWIGTYPE_p_void = { + val datasetVoidPtr = lightgbmlib.voidpp_handle() + LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromSampledColumn( + sampledData.getSampleData, + sampledData.getSampleIndices, + numCols, + sampledData.getRowCounts, + sampledData.numRows, + 1, // Used for allocation and must be > 0, but we don't use this reference set for data collection + numRows, + datasetParams, + datasetVoidPtr), "Dataset create from samples") + lightgbmlib.voidpp_value(datasetVoidPtr) + } + + private def setFeatureNamesIfProvided(datasetHandle: SWIGTYPE_p_void, + featureNames: Option[Array[String]], + numCols: Int, + log: Logger): Unit = { + featureNames.foreach { names => + if (names.nonEmpty) { + log.info(s"Setting ${names.length} feature names on reference dataset") + LightGBMUtils.validate(lightgbmlib.LGBM_DatasetSetFeatureNames(datasetHandle, names, numCols), + "Dataset set feature names") + } + } + } + + private def serializeAndCleanup(datasetHandle: SWIGTYPE_p_void, + bufferHandlePtr: SWIGTYPE_p_p_void, + lenPtr: SWIGTYPE_p_int, + log: Logger): Array[Byte] = { + LightGBMUtils.validate(lightgbmlib.LGBM_DatasetSerializeReferenceToBinary( + datasetHandle, bufferHandlePtr, lenPtr), "Serialize ref") + val bufferLen: Int = lightgbmlib.intp_value(lenPtr) + log.info(s"Created serialized reference dataset of length $bufferLen") + LightGBMUtils.validate(lightgbmlib.LGBM_DatasetFree(datasetHandle), "Free Dataset") + toByteArray(bufferHandlePtr, bufferLen) + } def getInitializedReferenceDataset(ctx: PartitionTaskContext): LightGBMDataset = { // The definition is broadcast from Spark, so retrieve it From 59bfcb71dac4a2915d71ede6533c25a8058545e2 Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Fri, 27 Feb 2026 11:58:56 -0800 Subject: [PATCH 4/6] More LightGBM fixes --- .../synapse/ml/lightgbm/LightGBMBase.scala | 117 +++++++++++------- .../split1/VerifyLightGBMCommon.scala | 29 +++++ .../split2/LightGBMRankerTestData.scala | 1 + .../split2/LightGBMRegressorTestData.scala | 1 + 4 files changed, 106 insertions(+), 42 deletions(-) diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala index 5a9636ae99a..66a5b6e1a29 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala @@ -18,6 +18,7 @@ import org.apache.spark.ml.{ComplexParamsWritable, Estimator, Model} import org.apache.spark.sql._ import org.apache.spark.sql.types._ +import scala.annotation.tailrec import scala.collection.immutable.HashSet import scala.language.existentials import scala.math.min @@ -199,7 +200,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] private def getSlotNamesWithMetadata(featuresSchema: StructField): Option[Array[String]] = { if (getSlotNames.nonEmpty) { - Some(getSlotNames) + Some(ensureUniqueFeatureNames(getSlotNames)) } else { AttributeGroup.fromStructField(featuresSchema).attributes.flatMap(attributes => if (attributes.isEmpty) { @@ -274,6 +275,25 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] } } + private def shouldRetryWithBulk(error: Throwable): Boolean = { + @tailrec + def hasRetryableMessage(current: Throwable): Boolean = { + if (current == null) { + false + } else { + val message = Option(current.getMessage).getOrElse("").toLowerCase + val duplicateFeatureNames = message.contains("appears more than one time") + val datasetCreateFailure = message.contains("dataset create") + if (duplicateFeatureNames && datasetCreateFailure) { + true + } else { + hasRetryableMessage(current.getCause) + } + } + } + hasRetryableMessage(error) + } + /** * Constructs the DartModeParams * @@ -436,56 +456,69 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] * @return The LightGBM Model from the trained LightGBM Booster. */ private def trainOneDataBatch(dataset: Dataset[_], batchIndex: Int, batchCount: Int): TrainedModel = { - val measures = new InstrumentationMeasures() - setBatchPerformanceMeasure(batchIndex, measures) + try { + val measures = new InstrumentationMeasures() + setBatchPerformanceMeasure(batchIndex, measures) - val numTasksPerExecutor = ClusterUtil.getNumTasksPerExecutor(dataset.sparkSession, log) - val numTasks = determineNumTasks(dataset, getNumTasks, numTasksPerExecutor) - val sc = dataset.sparkSession.sparkContext + val numTasksPerExecutor = ClusterUtil.getNumTasksPerExecutor(dataset.sparkSession, log) + val numTasks = determineNumTasks(dataset, getNumTasks, numTasksPerExecutor) + val sc = dataset.sparkSession.sparkContext - val df = prepareDataframe(dataset, numTasks) + val df = prepareDataframe(dataset, numTasks) - val (trainingData, validationData) = - if (get(validationIndicatorCol).isDefined && dataset.columns.contains(getValidationIndicatorCol)) - (df.filter(x => !x.getBoolean(x.fieldIndex(getValidationIndicatorCol))), - Some(sc.broadcast(collectValidationData(df, measures)))) - else (df, None) + val (trainingData, validationData) = + if (get(validationIndicatorCol).isDefined && dataset.columns.contains(getValidationIndicatorCol)) + (df.filter(x => !x.getBoolean(x.fieldIndex(getValidationIndicatorCol))), + Some(sc.broadcast(collectValidationData(df, measures)))) + else (df, None) - val preprocessedDF = preprocessData(trainingData) + val preprocessedDF = preprocessData(trainingData) - val (numCols, numInitScoreClasses) = calculateColumnStatistics(preprocessedDF, measures) + val (numCols, numInitScoreClasses) = calculateColumnStatistics(preprocessedDF, measures) - val featuresSchema = dataset.schema(getFeaturesCol) - val generalTrainParams: BaseTrainParams = getTrainParams(numTasks, featuresSchema, numTasksPerExecutor) - val trainParams = addCustomTrainParams(generalTrainParams, dataset) - log.info(s"LightGBM batch $batchIndex of $batchCount, parameters: ${trainParams.toString()}") + val featuresSchema = dataset.schema(getFeaturesCol) + val generalTrainParams: BaseTrainParams = getTrainParams(numTasks, featuresSchema, numTasksPerExecutor) + val trainParams = addCustomTrainParams(generalTrainParams, dataset) + log.info(s"LightGBM batch $batchIndex of $batchCount, parameters: ${trainParams.toString()}") - val isStreamingMode = getDataTransferMode == LightGBMConstants.StreamingDataTransferMode - val (serializedReferenceDataset: Option[Array[Byte]], partitionCounts: Option[Array[Long]]) = - if (isStreamingMode) { - val (referenceDataset, partitionCounts) = - calculateRowStatistics(trainingData, trainParams, numCols, featuresSchema, measures) + val isStreamingMode = getDataTransferMode == LightGBMConstants.StreamingDataTransferMode + val (serializedReferenceDataset: Option[Array[Byte]], partitionCounts: Option[Array[Long]]) = + if (isStreamingMode) { + val (referenceDataset, partitionCounts) = + calculateRowStatistics(trainingData, trainParams, numCols, featuresSchema, measures) - // Save the reference Dataset so it's available to client and other batches - if (getReferenceDataset.isEmpty) { - log.info(s"Saving reference dataset of length: ${referenceDataset.length}") - setReferenceDataset(referenceDataset) + // Save the reference Dataset so it's available to client and other batches + if (getReferenceDataset.isEmpty) { + log.info(s"Saving reference dataset of length: ${referenceDataset.length}") + setReferenceDataset(referenceDataset) + } + (Some(referenceDataset), Some(partitionCounts)) + } else (None, None) + + validateSlotNames(featuresSchema) + executeTraining(preprocessedDF, + validationData, + serializedReferenceDataset, + partitionCounts, + trainParams, + numCols, + numInitScoreClasses, + batchIndex, + numTasks, + numTasksPerExecutor, + measures) + } catch { + case ex if getDataTransferMode == LightGBMConstants.StreamingDataTransferMode && shouldRetryWithBulk(ex) => + log.warn("Detected duplicate feature names while creating LightGBM dataset in streaming mode. " + + "Retrying this batch with dataTransferMode=bulk.") + val originalMode = getDataTransferMode + setDataTransferMode(LightGBMConstants.BulkDataTransferMode) + try { + trainOneDataBatch(dataset, batchIndex, batchCount) + } finally { + setDataTransferMode(originalMode) } - (Some(referenceDataset), Some(partitionCounts)) - } else (None, None) - - validateSlotNames(featuresSchema) - executeTraining(preprocessedDF, - validationData, - serializedReferenceDataset, - partitionCounts, - trainParams, - numCols, - numInitScoreClasses, - batchIndex, - numTasks, - numTasksPerExecutor, - measures) + } } private def determineNumTasks(dataset: Dataset[_], configNumTasks: Int, numTasksPerExecutor: Int) = { diff --git a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMCommon.scala b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMCommon.scala index 437bca9c6b0..8b7f2af94e6 100644 --- a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMCommon.scala +++ b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMCommon.scala @@ -382,4 +382,33 @@ class VerifyLightGBMCommon extends TestBase with LightGBMTestUtils { assert(predictions.count() == 4) } + + test("Verify duplicate explicit slotNames are made unique") { + import org.apache.spark.ml.linalg.Vectors + + val data = Seq( + (0.0, Vectors.dense(1.0, 2.0, 3.0)), + (1.0, Vectors.dense(2.0, 3.0, 4.0)), + (0.0, Vectors.dense(3.0, 4.0, 5.0)), + (1.0, Vectors.dense(4.0, 5.0, 6.0)) + ) + + val df = spark.createDataFrame(data).toDF(labelCol, featuresCol) + + val duplicateNames = Array("Column_", "Column_", "Column_") + val model = new LightGBMClassifier() + .setFeaturesCol(featuresCol) + .setLabelCol(labelCol) + .setDefaultListenPort(getAndIncrementPort()) + .setNumLeaves(5) + .setNumIterations(5) + .setObjective("binary") + .setSlotNames(duplicateNames) + .setExecutionMode("streaming") + + val trainedModel = model.fit(df) + val predictions = trainedModel.transform(df) + + assert(predictions.count() == 4) + } } diff --git a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/LightGBMRankerTestData.scala b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/LightGBMRankerTestData.scala index e66d8b3f8cc..196e0861bbb 100644 --- a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/LightGBMRankerTestData.scala +++ b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/LightGBMRankerTestData.scala @@ -52,6 +52,7 @@ abstract class LightGBMRankerTestData extends Benchmarks with EstimatorFuzzing[L .setDefaultListenPort(getAndIncrementPort()) .setRepartitionByGroupingColumn(false) .setDataTransferMode(dataTransferMode) + .setUseBarrierExecutionMode(true) .setNumLeaves(5) .setNumIterations(10) } diff --git a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/LightGBMRegressorTestData.scala b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/LightGBMRegressorTestData.scala index be28beccf0a..0b7ce23ee1c 100644 --- a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/LightGBMRegressorTestData.scala +++ b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/LightGBMRegressorTestData.scala @@ -30,6 +30,7 @@ abstract class LightGBMRegressorTestData extends Benchmarks .setFeaturesCol(featuresCol) .setDefaultListenPort(getAndIncrementPort()) .setDataTransferMode(dataTransferMode) + .setUseBarrierExecutionMode(true) .setNumLeaves(5) .setNumIterations(10) } From 57774cdfa46525ec60893ffa32a8971d17a6b9a7 Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Fri, 27 Feb 2026 14:40:01 -0800 Subject: [PATCH 5/6] breakout test to be smaller --- .../synapse/ml/lightgbm/LightGBMBase.scala | 112 +++++++++--------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala index 66a5b6e1a29..dc240ea237c 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala @@ -452,61 +452,12 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] * creates a driver thread, and runs mapPartitions on the dataset. * * @param dataset The dataset to train on. - * @param batchIndex In running in batch training mode, gets the batch number. - * @return The LightGBM Model from the trained LightGBM Booster. - */ + * @param batchIndex In running in batch training mode, gets the batch number. + * @return The LightGBM Model from the trained LightGBM Booster. + */ private def trainOneDataBatch(dataset: Dataset[_], batchIndex: Int, batchCount: Int): TrainedModel = { try { - val measures = new InstrumentationMeasures() - setBatchPerformanceMeasure(batchIndex, measures) - - val numTasksPerExecutor = ClusterUtil.getNumTasksPerExecutor(dataset.sparkSession, log) - val numTasks = determineNumTasks(dataset, getNumTasks, numTasksPerExecutor) - val sc = dataset.sparkSession.sparkContext - - val df = prepareDataframe(dataset, numTasks) - - val (trainingData, validationData) = - if (get(validationIndicatorCol).isDefined && dataset.columns.contains(getValidationIndicatorCol)) - (df.filter(x => !x.getBoolean(x.fieldIndex(getValidationIndicatorCol))), - Some(sc.broadcast(collectValidationData(df, measures)))) - else (df, None) - - val preprocessedDF = preprocessData(trainingData) - - val (numCols, numInitScoreClasses) = calculateColumnStatistics(preprocessedDF, measures) - - val featuresSchema = dataset.schema(getFeaturesCol) - val generalTrainParams: BaseTrainParams = getTrainParams(numTasks, featuresSchema, numTasksPerExecutor) - val trainParams = addCustomTrainParams(generalTrainParams, dataset) - log.info(s"LightGBM batch $batchIndex of $batchCount, parameters: ${trainParams.toString()}") - - val isStreamingMode = getDataTransferMode == LightGBMConstants.StreamingDataTransferMode - val (serializedReferenceDataset: Option[Array[Byte]], partitionCounts: Option[Array[Long]]) = - if (isStreamingMode) { - val (referenceDataset, partitionCounts) = - calculateRowStatistics(trainingData, trainParams, numCols, featuresSchema, measures) - - // Save the reference Dataset so it's available to client and other batches - if (getReferenceDataset.isEmpty) { - log.info(s"Saving reference dataset of length: ${referenceDataset.length}") - setReferenceDataset(referenceDataset) - } - (Some(referenceDataset), Some(partitionCounts)) - } else (None, None) - - validateSlotNames(featuresSchema) - executeTraining(preprocessedDF, - validationData, - serializedReferenceDataset, - partitionCounts, - trainParams, - numCols, - numInitScoreClasses, - batchIndex, - numTasks, - numTasksPerExecutor, - measures) + trainOneDataBatchInternal(dataset, batchIndex, batchCount) } catch { case ex if getDataTransferMode == LightGBMConstants.StreamingDataTransferMode && shouldRetryWithBulk(ex) => log.warn("Detected duplicate feature names while creating LightGBM dataset in streaming mode. " + @@ -514,13 +465,66 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] val originalMode = getDataTransferMode setDataTransferMode(LightGBMConstants.BulkDataTransferMode) try { - trainOneDataBatch(dataset, batchIndex, batchCount) + trainOneDataBatchInternal(dataset, batchIndex, batchCount) } finally { setDataTransferMode(originalMode) } } } + private def trainOneDataBatchInternal(dataset: Dataset[_], batchIndex: Int, batchCount: Int): TrainedModel = { + val measures = new InstrumentationMeasures() + setBatchPerformanceMeasure(batchIndex, measures) + + val numTasksPerExecutor = ClusterUtil.getNumTasksPerExecutor(dataset.sparkSession, log) + val numTasks = determineNumTasks(dataset, getNumTasks, numTasksPerExecutor) + val sc = dataset.sparkSession.sparkContext + + val df = prepareDataframe(dataset, numTasks) + + val (trainingData, validationData) = + if (get(validationIndicatorCol).isDefined && dataset.columns.contains(getValidationIndicatorCol)) + (df.filter(x => !x.getBoolean(x.fieldIndex(getValidationIndicatorCol))), + Some(sc.broadcast(collectValidationData(df, measures)))) + else (df, None) + + val preprocessedDF = preprocessData(trainingData) + + val (numCols, numInitScoreClasses) = calculateColumnStatistics(preprocessedDF, measures) + + val featuresSchema = dataset.schema(getFeaturesCol) + val generalTrainParams: BaseTrainParams = getTrainParams(numTasks, featuresSchema, numTasksPerExecutor) + val trainParams = addCustomTrainParams(generalTrainParams, dataset) + log.info(s"LightGBM batch $batchIndex of $batchCount, parameters: ${trainParams.toString()}") + + val isStreamingMode = getDataTransferMode == LightGBMConstants.StreamingDataTransferMode + val (serializedReferenceDataset: Option[Array[Byte]], partitionCounts: Option[Array[Long]]) = + if (isStreamingMode) { + val (referenceDataset, partitionCounts) = + calculateRowStatistics(trainingData, trainParams, numCols, featuresSchema, measures) + + // Save the reference Dataset so it's available to client and other batches + if (getReferenceDataset.isEmpty) { + log.info(s"Saving reference dataset of length: ${referenceDataset.length}") + setReferenceDataset(referenceDataset) + } + (Some(referenceDataset), Some(partitionCounts)) + } else (None, None) + + validateSlotNames(featuresSchema) + executeTraining(preprocessedDF, + validationData, + serializedReferenceDataset, + partitionCounts, + trainParams, + numCols, + numInitScoreClasses, + batchIndex, + numTasks, + numTasksPerExecutor, + measures) + } + private def determineNumTasks(dataset: Dataset[_], configNumTasks: Int, numTasksPerExecutor: Int) = { // By default, we try to intelligently calculate the number of executors, but user can override this with numTasks if (configNumTasks > 0) configNumTasks From 4cb326befd5dec9a5518e25ab334268457565000 Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Fri, 27 Feb 2026 17:00:05 -0800 Subject: [PATCH 6/6] Fix slot names --- .../synapse/ml/lightgbm/LightGBMBase.scala | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala index dc240ea237c..b98e8dbe7c2 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala @@ -258,21 +258,18 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] } private def validateSlotNames(featuresSchema: StructField): Unit = { - val metadata = AttributeGroup.fromStructField(featuresSchema) - if (metadata.attributes.isDefined) { - val slotNamesOpt = getSlotNamesWithMetadata(featuresSchema) - val pattern = new Regex("[\",:\\[\\]{}]") - slotNamesOpt.foreach(slotNames => { - val badSlotNames = slotNames.flatMap(slotName => - if (pattern.findFirstIn(slotName).isEmpty) None else Option(slotName)) - if (!badSlotNames.isEmpty) { - throw new IllegalArgumentException( - s"Invalid slot names detected in features column: ${badSlotNames.mkString(",")}" + - " \n Special characters \" , : \\ [ ] { } will cause unexpected behavior in LGBM unless changed." + - " This error can be fixed by renaming the problematic columns prior to vector assembly.") - } - }) - } + val slotNamesOpt = getSlotNamesWithMetadata(featuresSchema) + val pattern = new Regex("[\",:\\[\\]{}]") + slotNamesOpt.foreach(slotNames => { + val badSlotNames = slotNames.flatMap(slotName => + if (pattern.findFirstIn(slotName).isEmpty) None else Option(slotName)) + if (!badSlotNames.isEmpty) { + throw new IllegalArgumentException( + s"Invalid slot names detected in features column: ${badSlotNames.mkString(",")}" + + " \n Special characters \" , : \\ [ ] { } will cause unexpected behavior in LGBM unless changed." + + " This error can be fixed by renaming the problematic columns prior to vector assembly.") + } + }) } private def shouldRetryWithBulk(error: Throwable): Boolean = {