Skip to content
This repository was archived by the owner on Dec 20, 2018. It is now read-only.
27 changes: 25 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ In addition to the types listed above, `spark-avro` supports reading of three ty
2. `union(float, double)`
3. `union(something, null)`, where `something` is one of the supported Avro types listed above or is one of the supported `union` types.

At the moment, `spark-avro` ignores docs, aliases and other properties present in the Avro file.

## Supported types for Spark SQL -> Avro conversion

`spark-avro` supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are listed below:
Expand Down Expand Up @@ -183,6 +181,31 @@ val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).avro("/tmp/output")
```

To include extra columns when aliases are specified in the schema, set withAliases to true, as follows:
```
val df = sqlContext.read.format("com.databricks.spark.avro").option("withAliases", "true").load("test.avro")
```
The same can be done using OPTIONS passed to Spark SQL DDL:
```
CREATE TABLE mytable USING com.databricks.spark.avro OPTIONS(path "test.avro", withAliases "true");
```
While saving, 'aliases' metadata in DataFrame schema will be stored in `aliases` of avro file.
For example, 'column1' of DataFrame schema has 'aliases' metadata and returning array of aliases fields.
```
test.schema.apply("column1").metadata.getStringArray("aliases")
res5: Array[String] = Array(string_alias1, string_alias2)
```
While storing Dataframe as Avro, 'string_alias1' and 'string_alias2' will be aliases of 'column1' in Avro file.

#### Extra Metadata:

Avro schema doc:
Avro schema's each field 'doc' will be preserved in DataFrame's schema metadata.
```
scala> test.schema.apply("column_name").metadata.getString("doc")
res3: String = Meaningless string of characters
```

### Java API

```java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,12 @@ private[avro] class AvroOutputWriter(

while (convertersIterator.hasNext) {
val converter = convertersIterator.next()
record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
val fieldName = fieldNamesIterator.next()
if (schema.getField(fieldName) != null) {
record.put(fieldName, converter(rowIterator.next()))
} else {
rowIterator.next()
}
}
record
}
Expand Down
16 changes: 12 additions & 4 deletions src/main/scala/com/databricks/spark/avro/AvroRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ package com.databricks.spark.avro

import java.io.FileNotFoundException
import java.util.zip.Deflater

import scala.collection.Iterator
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

import com.google.common.base.Objects
import org.apache.avro.SchemaBuilder
import org.apache.avro.file.{DataFileConstants, DataFileReader, FileReader}
Expand All @@ -48,6 +46,7 @@ private[avro] class AvroRelation(
private val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"
private val recordName = parameters.getOrElse("recordName", "topLevelRecord")
private val recordNamespace = parameters.getOrElse("recordNamespace", "")
lazy val withAliasesFields = parameters.getOrElse("withAliases", "false").toBoolean

/** needs to be lazy so it is not evaluated when saving since no schema exists at that location */
private lazy val avroSchema = paths match {
Expand All @@ -64,7 +63,8 @@ private[avro] class AvroRelation(
*/
override def dataSchema: StructType = maybeDataSchema match {
case Some(structType) => structType
case None => SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
case None =>
SchemaConverters.toSqlType(avroSchema, withAliasesFields).dataType.asInstanceOf[StructType]
}

/**
Expand Down Expand Up @@ -113,6 +113,7 @@ private[avro] class AvroRelation(
* contain `requiredColumns`
*/
override def buildScan(requiredColumns: Array[String], inputs: Array[FileStatus]): RDD[Row] = {
val withalias = withAliasesFields
if (inputs.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
Expand All @@ -130,7 +131,14 @@ private[avro] class AvroRelation(
val firstRecord = records.next()
val superSchema = firstRecord.getSchema // the schema of the actual record
// the fields that are actually required along with their converters

val avroFieldMap = superSchema.getFields.map(f => (f.name, f)).toMap
val aliasFields = if (withalias) {
superSchema.getFields.flatMap(f => f.aliases.map(a => a -> f))
} else {
Nil
}
val allFields = (avroFieldMap ++ aliasFields).toMap

new Iterator[Row] {
private[this] val baseIterator = records
Expand All @@ -145,7 +153,7 @@ private[avro] class AvroRelation(
case (columnName, idx) =>
// Spark SQL should not pass us invalid columns
val field =
avroFieldMap.getOrElse(
allFields.getOrElse(
columnName,
throw new AssertionError(s"Invalid column $columnName"))
val converter = SchemaConverters.createConverterToSQL(field.schema)
Expand Down
45 changes: 40 additions & 5 deletions src/main/scala/com/databricks/spark/avro/SchemaConverters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@
package com.databricks.spark.avro

import java.nio.ByteBuffer
import java.util.ArrayList
import java.util.HashMap
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't remove these.


import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer

import org.apache.avro.generic.GenericData.Fixed
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.SchemaBuilder._
import org.apache.avro.Schema.Type._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

Expand All @@ -34,12 +37,17 @@ import org.apache.spark.sql.types._
*/
private object SchemaConverters {

val METADATA_KEY_DOC = "doc";
val METADATA_KEY_ALIASES = "aliases";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is good, you can put even the "_parent" here as a constant

val METADATA_KEY_PARENT = "_parent";

case class SchemaType(dataType: DataType, nullable: Boolean)

/**
* This function takes an avro schema and returns a sql schema.
*/
private[avro] def toSqlType(avroSchema: Schema): SchemaType = {
private[avro] def toSqlType(avroSchema: Schema, schemaWithAlias: Boolean = false): SchemaType = {
var aliasFields = ListBuffer[StructField]()
avroSchema.getType match {
case INT => SchemaType(IntegerType, nullable = false)
case STRING => SchemaType(StringType, nullable = false)
Expand All @@ -54,9 +62,23 @@ private object SchemaConverters {
case RECORD =>
val fields = avroSchema.getFields.map { f =>
val schemaType = toSqlType(f.schema())
StructField(f.name, schemaType.dataType, schemaType.nullable)
var meta = new MetadataBuilder()
if (f.doc != null) meta.putString(METADATA_KEY_DOC, f.doc)
if (f.aliases() != null && f.aliases().size() > 0) {
val aliasArray = new Array[String](f.aliases().size())
meta.putString(METADATA_KEY_PARENT, f.name)
f.aliases copyToArray(aliasArray)
meta.putStringArray(METADATA_KEY_ALIASES, aliasArray);
if (schemaWithAlias) {
for (aliasFieldName <- aliasArray) {
aliasFields += StructField(aliasFieldName, schemaType.dataType,
schemaType.nullable, meta.build())
}
}
}
StructField(f.name, schemaType.dataType, schemaType.nullable, meta.build())
}

fields.addAll(aliasFields)
SchemaType(StructType(fields), nullable = false)

case ARRAY =>
Expand Down Expand Up @@ -102,9 +124,22 @@ private object SchemaConverters {
schemaBuilder: RecordBuilder[T],
recordNamespace: String): T = {
val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields()
structType.fields.foreach { field =>
val newField = fieldsAssembler.name(field.name).`type`()

val nonAliasStructFields = structType.fields.filterNot(field =>
field.metadata.contains(METADATA_KEY_ALIASES)
&& field.metadata.contains(METADATA_KEY_PARENT)
&& !field.metadata.getString(METADATA_KEY_PARENT).equals(field.name))

nonAliasStructFields.foreach { field =>
var newFieldBuilder = fieldsAssembler.name(field.name)
if (field.metadata contains (METADATA_KEY_DOC)) {
newFieldBuilder = newFieldBuilder.doc(field.metadata.getString(METADATA_KEY_DOC))
}
if (field.metadata.contains(METADATA_KEY_ALIASES)) {
newFieldBuilder = newFieldBuilder
.aliases(field.metadata.getStringArray(METADATA_KEY_ALIASES): _*)
}
val newField = newFieldBuilder.`type`()
if (field.nullable) {
convertFieldTypeToAvro(field.dataType, newField.nullable(), field.name, recordNamespace)
.noDefault
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/com/databricks/spark/avro/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.databricks.spark

import org.apache.spark.sql.{SQLContext, DataFrameReader, DataFrameWriter, DataFrame}
import org.apache.spark.sql.DataFrame

package object avro {

Expand Down
Binary file modified src/test/resources/test.avro
Binary file not shown.
33 changes: 22 additions & 11 deletions src/test/resources/test.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,32 @@
"fields" : [{
"name" : "string",
"type" : "string",
"doc" : "Meaningless string of characters"
"doc" : "Meaningless string of characters",
"aliases" : ["string_alias1", "string_alias2"]
}, {
"name" : "simple_map",
"type" : {"type": "map", "values": "int"}
"type" : {"type": "map", "values": "int"},
"aliases" : ["map_alias"]
}, {
"name" : "complex_map",
"type" : {"type": "map", "values": {"type": "map", "values": "string"}}
"type" : {"type": "map", "values": {"type": "map", "values": "string"}},
"aliases" : ["complex_map_alias"]
}, {
"name" : "union_string_null",
"type" : ["null", "string"]
"type" : ["null", "string"],
"aliases" : ["union_string_alias"]
}, {
"name" : "union_int_long_null",
"type" : ["int", "long", "null"]
"type" : ["int", "long", "null"],
"aliases" : ["union_int_alias"]
}, {
"name" : "union_float_double",
"type" : ["float", "double"]
"type" : ["float", "double"],
"aliases" : ["union_float_alias1", "union_float_alias2"]
}, {
"name": "fixed3",
"type": {"type": "fixed", "size": 3, "name": "fixed3"}
"type": {"type": "fixed", "size": 3, "name": "fixed3"},
"aliases" : ["fixed3_alias"]
}, {
"name": "fixed2",
"type": {"type": "fixed", "size": 2, "name": "fixed2"}
Expand All @@ -31,7 +38,8 @@
"type": { "type": "enum",
"name": "Suit",
"symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}
},
"aliases" : ["enum_alias"]
}, {
"name": "record",
"type": {
Expand All @@ -40,14 +48,17 @@
"aliases": ["RecordAlias"],
"fields" : [{
"name": "value_field",
"type": "string"
"type": "string",
"aliases" : ["value_field_alias"]
}]
}
}, {
"name": "array_of_boolean",
"type": {"type": "array", "items": "boolean"}
"type": {"type": "array", "items": "boolean"},
"aliases" : ["array_of_boolean_alias"]
}, {
"name": "bytes",
"type": "bytes"
"type": "bytes",
"aliases" : ["bytes_alias"]
}]
}
76 changes: 74 additions & 2 deletions src/test/scala/com/databricks/spark/avro/AvroSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericRecord, GenericDatumWriter}
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.{AnalysisException, SQLContext, Row}
import org.apache.spark.sql.types._
import org.scalatest.{BeforeAndAfterAll, FunSuite}

Expand Down Expand Up @@ -270,6 +270,28 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll {
assert(sqlContext.sql("SELECT * FROM avroTable").collect().length === 8)
}

test("sql test without alias") {
intercept[AnalysisException] {
sqlContext.sql(
s"""
|CREATE TEMPORARY TABLE avroTable
|USING com.databricks.spark.avro
|OPTIONS (path "$testFile", withAliases "false")
""".stripMargin.replaceAll("\n", " "))
assert(sqlContext.sql("SELECT string_alias1 FROM avroTable").collect().length === 3)
}
}

test("sql test with alias") {
sqlContext.sql(
s"""
|CREATE TEMPORARY TABLE avroTable
|USING com.databricks.spark.avro
|OPTIONS (path "$testFile", withAliases "true")
""".stripMargin.replaceAll("\n", " "))
assert(sqlContext.sql("SELECT string_alias1 FROM avroTable").collect().length === 3)
}

test("conversion to avro and back") {
// Note that test.avro includes a variety of types, some of which are nullable. We expect to
// get the same values back.
Expand Down Expand Up @@ -419,4 +441,54 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll {
assert(newDf.count == 8)
}
}
}

test("test doc in meta") {
TestUtils.withTempDir { tempDir =>
val df = sqlContext.read.avro(testFile)
df.schema.fields(0).metadata.getString(SchemaConverters.METADATA_KEY_DOC)
for (x <- df.schema.fields) {
if (x.name == "title") {
assert("episode title" == x.metadata.getString(SchemaConverters.METADATA_KEY_DOC))
} else if (x.name == "doctor") {
assert("main actor playing the Doctor in episode" ==
x.metadata.getString(SchemaConverters.METADATA_KEY_DOC))
} else if (x.name == "air_date") {
assert("initial date" == x.metadata.getString(SchemaConverters.METADATA_KEY_DOC))
}
}
}
}

test("test aliases in meta") {
TestUtils.withTempDir { tempDir =>
val df = sqlContext.read.avro(testFile)
df.schema("string").metadata.getStringArray(SchemaConverters.METADATA_KEY_ALIASES) === Array("string_alias1", "string_alias1")
}
}

test("test without aliases columns in data frame") {
TestUtils.withTempDir { tempDir =>
val df = sqlContext.read.avro(testFile)
val fieldArray = df.schema.fieldNames;
assert(fieldArray contains "string")
assert(!(fieldArray contains "string_alias1"))
assert(!(fieldArray contains "string_alias2"))
assert(!(fieldArray contains "map_alias"))
assert(!(fieldArray contains "enum_alias"))
assert(!(fieldArray contains "union_int_alias"))
}
}

test("test with aliases columns in data frame") {
TestUtils.withTempDir { tempDir =>
val df = sqlContext.read.format("com.databricks.spark.avro").option("withAliases", "true").load(testFile)
val fieldArray = df.schema.fieldNames
assert(fieldArray contains "string")
assert(fieldArray contains "string_alias1")
assert(fieldArray contains "string_alias2")
assert(fieldArray contains "map_alias")
assert(fieldArray contains "enum_alias")
assert(fieldArray contains "union_int_alias")
}
}
}