From 595b0ddd54419c8c12895fede874c58b625f589c Mon Sep 17 00:00:00 2001 From: Varsha Abhinandan Date: Fri, 24 Nov 2017 01:52:03 +0530 Subject: [PATCH 1/4] Support for logical datatypes with spark 2.x --- .../spark/avro/SchemaConverters.scala | 78 ++++++++++++++++-- src/test/resources/users.avro | Bin 0 -> 513 bytes .../com/databricks/spark/avro/AvroSuite.scala | 20 +++++ 3 files changed, 89 insertions(+), 9 deletions(-) create mode 100644 src/test/resources/users.avro diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index 985486c5..cae64246 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -29,6 +29,10 @@ import org.apache.avro.Schema.Type._ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ +import org.apache.avro.Schema.Type +import java.sql.Timestamp +import java.sql.Date +import scala.collection.JavaConversions._ /** * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice @@ -36,6 +40,13 @@ import org.apache.spark.sql.types._ */ object SchemaConverters { + val LOGICAL_TYPE = "logicalType" + val DECIMAL = "decimal" + val TIMESTAMP = "timestamp"; + val DATE = "date"; + val PRECISION = "precision" + val SCALE = "scale" + class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex) case class SchemaType(dataType: DataType, nullable: Boolean) @@ -46,17 +57,36 @@ object SchemaConverters { def toSqlType(avroSchema: Schema): SchemaType = { avroSchema.getType match { case INT => SchemaType(IntegerType, nullable = false) - case STRING => SchemaType(StringType, nullable = false) + case STRING => { + val logicalType = avroSchema.getJsonProp(LOGICAL_TYPE) + if (logicalType != null && logicalType.asText().equalsIgnoreCase(DECIMAL)) { + val precision = avroSchema.getJsonProp(PRECISION).asInt + val scale = avroSchema.getJsonProp(SCALE).asInt + SchemaType(DecimalType(precision, scale), nullable = false) + } else { + SchemaType(StringType, nullable = false) + } + } case BOOLEAN => SchemaType(BooleanType, nullable = false) case BYTES => SchemaType(BinaryType, nullable = false) case DOUBLE => SchemaType(DoubleType, nullable = false) case FLOAT => SchemaType(FloatType, nullable = false) - case LONG => SchemaType(LongType, nullable = false) + case LONG => { + val logicalType = avroSchema.getJsonProp(LOGICAL_TYPE) + if (logicalType != null && logicalType.asText().equalsIgnoreCase(TIMESTAMP)) { + SchemaType(TimestampType, nullable = false) + } else if (logicalType != null && logicalType.asText().equalsIgnoreCase(DATE)) { + SchemaType(TimestampType, nullable = false) + } else { + SchemaType(LongType, nullable = false) + } + } case FIXED => SchemaType(BinaryType, nullable = false) case ENUM => SchemaType(StringType, nullable = false) case RECORD => val fields = avroSchema.getFields.asScala.map { f => + f.getJsonProps.foreach(x => f.schema().addProp(x._1, x._2)) val schemaType = toSqlType(f.schema()) StructField(f.name, schemaType.dataType, schemaType.nullable) } @@ -80,7 +110,9 @@ object SchemaConverters { // In case of a union with null, eliminate it and make a recursive call val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL) if (remainingUnionTypes.size == 1) { - toSqlType(remainingUnionTypes.head).copy(nullable = true) + val remainingSchema = remainingUnionTypes.head + avroSchema.getJsonProps.foreach(x => remainingSchema.addProp(x._1, x._2)) + toSqlType(remainingSchema).copy(nullable = true) } else { toSqlType(Schema.createUnion(remainingUnionTypes.asJava)).copy(nullable = true) } @@ -148,16 +180,41 @@ object SchemaConverters { val avroType = avroSchema.getType (sqlType, avroType) match { // Avro strings are in Utf8, so we have to call toString on them - case (StringType, STRING) | (StringType, ENUM) => + case (StringType, ENUM) => (item: AnyRef) => if (item == null) null else item.toString - // Byte arrays are reused by avro, so we have to make a copy of them. - case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) | - (FloatType, FLOAT) | (LongType, LONG) => - identity + case (_, STRING) => + (item: AnyRef) => if (item == null) { + null + } else { + val logicalType = avroSchema.getJsonProp(LOGICAL_TYPE) + if (logicalType != null && logicalType.asText().equalsIgnoreCase(DECIMAL)) { + val precision = avroSchema.getJsonProp(PRECISION).asInt + val scale = avroSchema.getJsonProp(SCALE).asInt + Decimal.apply(BigDecimal.apply(item.toString()), precision, scale) + } else { + item.toString + } + } case (TimestampType, LONG) => (item: AnyRef) => new Timestamp(item.asInstanceOf[Long]) case (DateType, LONG) => (item: AnyRef) => new Date(item.asInstanceOf[Long]) + case (_, LONG) => (item: AnyRef) => if (item == null) { + null + } else { + val logicalType = avroSchema.getJsonProp(LOGICAL_TYPE) + if (logicalType != null && logicalType.asText().equalsIgnoreCase(TIMESTAMP)) { + new Timestamp(item.asInstanceOf[Long].longValue()) + } else if (logicalType != null && logicalType.asText().equalsIgnoreCase(DATE)) { + new Timestamp(item.asInstanceOf[Long].longValue()) + } else { + item + } + } + // Byte arrays are reused by avro, so we have to make a copy of them. + case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) | + (FloatType, FLOAT) => + identity case (BinaryType, FIXED) => (item: AnyRef) => if (item == null) { @@ -185,6 +242,7 @@ object SchemaConverters { val sqlField = struct.fields(i) val avroField = avroSchema.getField(sqlField.name) if (avroField != null) { + avroField.getJsonProps.foreach(x => avroField.schema().addProp(x._1, x._2)) val converter = createConverter(avroField.schema(), sqlField.dataType, path :+ sqlField.name) converters(i) = converter @@ -256,7 +314,9 @@ object SchemaConverters { if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL) if (remainingUnionTypes.size == 1) { - createConverter(remainingUnionTypes.head, sqlType, path) + val remainingSchema = remainingUnionTypes.head + avroSchema.getJsonProps.foreach(x => remainingSchema.addProp(x._1, x._2)) + createConverter(remainingSchema, sqlType, path) } else { createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path) } diff --git a/src/test/resources/users.avro b/src/test/resources/users.avro new file mode 100644 index 0000000000000000000000000000000000000000..95050de4973cbdc9de10cf70e1dec70a62e28cc9 GIT binary patch literal 513 zcmeZI%3@>^ODrqO*DFrWNX<=r!&$;xw>YB)uh>xv9k^K(` zq$X$PCW4%Y!?hUBfNLlK23BTqW`3TMm60jXdqAfG!^aZIaHxOKOoq6wHnx^w*AwQ( zyuBXczOSZk`8CBooMiy}QxMPtlT7!3$`Xq+ v$}{sa85%AgelrQEY|`ZB$)QY27Um}A#>Qr5rY3r(#%AWGW=wmS(X9ahHAk+v literal 0 HcmV?d00001 diff --git a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala index 0051a0f0..6859542d 100644 --- a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala +++ b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -36,6 +36,7 @@ import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException class AvroSuite extends FunSuite with BeforeAndAfterAll { val episodesFile = "src/test/resources/episodes.avro" val testFile = "src/test/resources/test.avro" + val userFile = "src/test/resources/users.avro" private var spark: SparkSession = _ @@ -773,4 +774,23 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll { assert(readDf.collect().sameElements(writeDf.collect())) } } + + test("Logical Types") { + val df = spark.read.avro(userFile) + val decimals = df.select("decimal").collect().map(x => Decimal.apply(x.getDecimal(0))) + val dec1 = Decimal.apply(BigDecimal.apply("55555.555550000"), 25, 9) + val dec2 = Decimal.apply(BigDecimal.apply("8747336654.536756000"), 25, 9) + + assert(decimals.apply(0).equals(dec1)) + assert(decimals.apply(1).equals(dec2)) + assert(df.schema.apply("decimal").dataType == DecimalType(25,9)) + + val timestamps = df.select("timestamp").collect().map(x => x.getAs[Timestamp](0)) + val t1 = new Timestamp(1460354720000l) + val t2 = new Timestamp(1462842320000l) + + assert(timestamps.apply(0).equals(t1)) + assert(timestamps.apply(1).equals(t2)) + assert(df.schema.apply("timestamp").dataType == TimestampType) + } } From 51aaff2522dc66f32f4690079dd74ee95a896078 Mon Sep 17 00:00:00 2001 From: Varsha Abhinandan Date: Mon, 27 Nov 2017 11:47:40 +0530 Subject: [PATCH 2/4] Removing the support for timestamp and date through logical types --- .../spark/avro/SchemaConverters.scala | 31 +++---------------- .../com/databricks/spark/avro/AvroSuite.scala | 9 +----- 2 files changed, 6 insertions(+), 34 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index cae64246..4f9313fb 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -71,16 +71,7 @@ object SchemaConverters { case BYTES => SchemaType(BinaryType, nullable = false) case DOUBLE => SchemaType(DoubleType, nullable = false) case FLOAT => SchemaType(FloatType, nullable = false) - case LONG => { - val logicalType = avroSchema.getJsonProp(LOGICAL_TYPE) - if (logicalType != null && logicalType.asText().equalsIgnoreCase(TIMESTAMP)) { - SchemaType(TimestampType, nullable = false) - } else if (logicalType != null && logicalType.asText().equalsIgnoreCase(DATE)) { - SchemaType(TimestampType, nullable = false) - } else { - SchemaType(LongType, nullable = false) - } - } + case LONG => SchemaType(LongType, nullable = false) case FIXED => SchemaType(BinaryType, nullable = false) case ENUM => SchemaType(StringType, nullable = false) @@ -195,26 +186,14 @@ object SchemaConverters { item.toString } } + // Byte arrays are reused by avro, so we have to make a copy of them. + case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) | + (FloatType, FLOAT) | (LongType, LONG) => + identity case (TimestampType, LONG) => (item: AnyRef) => new Timestamp(item.asInstanceOf[Long]) case (DateType, LONG) => (item: AnyRef) => new Date(item.asInstanceOf[Long]) - case (_, LONG) => (item: AnyRef) => if (item == null) { - null - } else { - val logicalType = avroSchema.getJsonProp(LOGICAL_TYPE) - if (logicalType != null && logicalType.asText().equalsIgnoreCase(TIMESTAMP)) { - new Timestamp(item.asInstanceOf[Long].longValue()) - } else if (logicalType != null && logicalType.asText().equalsIgnoreCase(DATE)) { - new Timestamp(item.asInstanceOf[Long].longValue()) - } else { - item - } - } - // Byte arrays are reused by avro, so we have to make a copy of them. - case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) | - (FloatType, FLOAT) => - identity case (BinaryType, FIXED) => (item: AnyRef) => if (item == null) { diff --git a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala index 6859542d..e7c14768 100644 --- a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala +++ b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -784,13 +784,6 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll { assert(decimals.apply(0).equals(dec1)) assert(decimals.apply(1).equals(dec2)) assert(df.schema.apply("decimal").dataType == DecimalType(25,9)) - - val timestamps = df.select("timestamp").collect().map(x => x.getAs[Timestamp](0)) - val t1 = new Timestamp(1460354720000l) - val t2 = new Timestamp(1462842320000l) - - assert(timestamps.apply(0).equals(t1)) - assert(timestamps.apply(1).equals(t2)) - assert(df.schema.apply("timestamp").dataType == TimestampType) + } } From 258ce7c48943b8ee7f073fbf6a1572e0cea46367 Mon Sep 17 00:00:00 2001 From: Varsha Abhinandan Date: Mon, 27 Nov 2017 11:50:48 +0530 Subject: [PATCH 3/4] Removing unused values --- src/main/scala/com/databricks/spark/avro/SchemaConverters.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index 4f9313fb..5144e90b 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -42,8 +42,6 @@ object SchemaConverters { val LOGICAL_TYPE = "logicalType" val DECIMAL = "decimal" - val TIMESTAMP = "timestamp"; - val DATE = "date"; val PRECISION = "precision" val SCALE = "scale" From 16d539b32100e5bcff49daf8c83bdc7205b4e171 Mon Sep 17 00:00:00 2001 From: Varsha Abhinandan Date: Mon, 27 Nov 2017 11:52:59 +0530 Subject: [PATCH 4/4] Removing unused imports --- src/main/scala/com/databricks/spark/avro/SchemaConverters.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index 5144e90b..854e4b04 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -30,8 +30,6 @@ import org.apache.avro.Schema.Type._ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ import org.apache.avro.Schema.Type -import java.sql.Timestamp -import java.sql.Date import scala.collection.JavaConversions._ /**