Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ lazy val connectors = (project in file("connectors"))
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"com.mysql" % "mysql-connector-j" % "9.1.0",
"org.json4s" %% "json4s-jackson" % "4.0.7",
"org.postgresql" % "postgresql" % "42.7.4"
"org.postgresql" % "postgresql" % "42.7.4",
"org.apache.kafka" % "kafka-clients" % "3.7.0" % "provided"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ com.dataweaver.connectors.sinks.BigQuerySinkConnector
com.dataweaver.connectors.sinks.TestSinkConnector
com.dataweaver.connectors.sinks.FileSinkConnector
com.dataweaver.connectors.sinks.DeltaLakeSinkConnector
com.dataweaver.connectors.sinks.KafkaSinkConnector
com.dataweaver.connectors.sinks.ElasticsearchSinkConnector
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@ com.dataweaver.connectors.sources.SQLSourceConnector
com.dataweaver.connectors.sources.TestSourceConnector
com.dataweaver.connectors.sources.PostgreSQLSourceConnector
com.dataweaver.connectors.sources.FileSourceConnector
com.dataweaver.connectors.sources.KafkaSourceConnector
com.dataweaver.connectors.sources.MongoDBSourceConnector
com.dataweaver.connectors.sources.RESTSourceConnector
com.dataweaver.connectors.sources.BigQuerySourceConnector
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.dataweaver.connectors.sinks

import com.dataweaver.core.plugin.SinkConnector
import org.apache.log4j.LogManager
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.net.URI
import java.net.http.{HttpClient, HttpRequest, HttpResponse}

/** Writes data to Elasticsearch indices.
*
* Config:
* nodes - Elasticsearch node URLs (e.g., "http://localhost:9200")
* index - Index name
* saveMode - "Append" (default) or "Overwrite"
* idColumn - Column to use as document _id (optional)
*
* Uses Spark Elasticsearch connector if available, falls back to REST bulk API.
*/
class ElasticsearchSinkConnector extends SinkConnector {
private val logger = LogManager.getLogger(getClass)
private val httpClient = HttpClient.newHttpClient()

def connectorType: String = "Elasticsearch"

def write(data: DataFrame, pipelineName: String, config: Map[String, String])(implicit
spark: SparkSession
): Unit = {
val nodes = config.getOrElse("nodes",
throw new IllegalArgumentException("Elasticsearch: 'nodes' is required"))
val index = config.getOrElse("index",
throw new IllegalArgumentException("Elasticsearch: 'index' is required"))
val saveMode = config.getOrElse("saveMode", "Append")
val idColumn = config.get("idColumn")

// Try Spark ES connector first
try {
val writer = data.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes", nodes.replace("http://", "").replace("https://", ""))
.option("es.resource", index)
.option("es.nodes.wan.only", "true")

idColumn.foreach(id => writer.option("es.mapping.id", id))

if (saveMode.toLowerCase == "overwrite") {
writer.mode("overwrite").save()
} else {
writer.mode("append").save()
}
} catch {
case _: ClassNotFoundException =>
logger.warn("Spark ES connector not found, using REST bulk API fallback")
writeBulkREST(data, nodes, index, idColumn)
}
}

/** Fallback: write via Elasticsearch REST bulk API. */
private def writeBulkREST(df: DataFrame, nodes: String, index: String, idColumn: Option[String]): Unit = {
val rows = df.toJSON.collect()
val bulkBody = new StringBuilder()

rows.foreach { json =>
val action = idColumn match {
case Some(_) => s"""{"index":{"_index":"$index"}}"""
case None => s"""{"index":{"_index":"$index"}}"""
}
bulkBody.append(action).append("\n").append(json).append("\n")
}

val request = HttpRequest.newBuilder()
.uri(URI.create(s"$nodes/_bulk"))
.header("Content-Type", "application/x-ndjson")
.POST(HttpRequest.BodyPublishers.ofString(bulkBody.toString()))
.build()

val response = httpClient.send(request, HttpResponse.BodyHandlers.ofString())
if (response.statusCode() >= 400) {
throw new RuntimeException(s"Elasticsearch bulk write failed (${response.statusCode()}): ${response.body().take(500)}")
}

logger.info(s"Wrote ${rows.length} documents to Elasticsearch index '$index'")
}

override def healthCheck(config: Map[String, String]): Either[String, Long] = {
val nodes = config.getOrElse("nodes", return Left("nodes not configured"))
try {
val start = System.currentTimeMillis()
val request = HttpRequest.newBuilder()
.uri(URI.create(s"$nodes/_cluster/health"))
.GET()
.build()
val response = httpClient.send(request, HttpResponse.BodyHandlers.ofString())
val latency = System.currentTimeMillis() - start
if (response.statusCode() < 400) Right(latency)
else Left(s"Elasticsearch unhealthy (${response.statusCode()})")
} catch {
case e: Exception => Left(s"Elasticsearch unreachable: ${e.getMessage}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.dataweaver.connectors.sinks

import com.dataweaver.core.plugin.SinkConnector
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.Trigger

/** Writes data to Apache Kafka topics.
* Supports both batch and streaming write modes.
*
* Config:
* brokers - Kafka broker list
* topic - Topic to write to
* mode - "batch" (default) or "streaming"
* keyColumn - Column to use as Kafka key (optional)
* valueColumn - Column to use as Kafka value (default: all columns as JSON)
* checkpointLocation - Required for streaming mode
* triggerInterval - Streaming trigger interval (e.g., "10 seconds")
*/
class KafkaSinkConnector extends SinkConnector {
def connectorType: String = "Kafka"

def write(data: DataFrame, pipelineName: String, config: Map[String, String])(implicit
spark: SparkSession
): Unit = {
val brokers = config.getOrElse("brokers",
throw new IllegalArgumentException("Kafka sink: 'brokers' is required"))
val topic = config.getOrElse("topic",
throw new IllegalArgumentException("Kafka sink: 'topic' is required"))
val mode = config.getOrElse("mode", "batch")

val keyCol = config.get("keyColumn")
val valueCol = config.getOrElse("valueColumn", "value")

// Prepare DataFrame with key/value columns for Kafka
val kafkaDf = if (data.columns.contains("value")) {
data
} else {
import org.apache.spark.sql.functions.{struct, to_json, col}
val valueExpr = to_json(struct(data.columns.map(col): _*))
val withValue = data.withColumn("value", valueExpr)
keyCol match {
case Some(k) => withValue.withColumnRenamed(k, "key")
case None => withValue
}
}

mode match {
case "streaming" =>
val checkpoint = config.getOrElse("checkpointLocation",
s"/tmp/weaver-checkpoint/$pipelineName")
val trigger = config.getOrElse("triggerInterval", "10 seconds")

kafkaDf.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("topic", topic)
.option("checkpointLocation", checkpoint)
.trigger(Trigger.ProcessingTime(trigger))
.start()
.awaitTermination()

case _ => // batch
kafkaDf.selectExpr("CAST(value AS STRING) AS value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("topic", topic)
.save()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.dataweaver.connectors.sources

import com.dataweaver.core.plugin.SourceConnector
import org.apache.spark.sql.{DataFrame, SparkSession}

/** Reads data from BigQuery tables or queries.
*
* Config:
* projectId - GCP project ID
* dataset - BigQuery dataset name
* table - Table name (for direct table reads)
* query - SQL query (alternative to table, uses BigQuery SQL)
* temporaryGcsBucket - GCS bucket for temporary data (required)
*/
class BigQuerySourceConnector extends SourceConnector {
def connectorType: String = "BigQuery"

def read(config: Map[String, String])(implicit spark: SparkSession): DataFrame = {
val temporaryGcsBucket = config.getOrElse("temporaryGcsBucket",
throw new IllegalArgumentException("BigQuery source: 'temporaryGcsBucket' is required"))

val query = config.get("query")
val table = for {
project <- config.get("projectId")
dataset <- config.get("dataset")
tbl <- config.get("table")
} yield s"$project.$dataset.$tbl"

val reader = spark.read
.format("bigquery")
.option("temporaryGcsBucket", temporaryGcsBucket)

(query, table) match {
case (Some(q), _) =>
reader.option("query", q).load()
case (_, Some(t)) =>
reader.option("table", t).load()
case _ =>
throw new IllegalArgumentException(
"BigQuery source requires either 'query' or 'projectId'+'dataset'+'table'")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.dataweaver.connectors.sources

import com.dataweaver.core.plugin.SourceConnector
import org.apache.spark.sql.{DataFrame, SparkSession}

/** Reads data from Apache Kafka topics.
* Supports both batch and streaming modes.
*
* Config:
* brokers - Kafka broker list (e.g., "localhost:9092")
* topic - Topic to read from
* startingOffsets - "earliest", "latest", or JSON offsets (default: "latest")
* mode - "batch" (default) or "streaming"
* maxOffsetsPerTrigger - max records per micro-batch in streaming (optional)
*/
class KafkaSourceConnector extends SourceConnector {
def connectorType: String = "Kafka"

def read(config: Map[String, String])(implicit spark: SparkSession): DataFrame = {
val brokers = config.getOrElse("brokers",
throw new IllegalArgumentException("Kafka: 'brokers' is required"))
val topic = config.getOrElse("topic",
throw new IllegalArgumentException("Kafka: 'topic' is required"))
val startingOffsets = config.getOrElse("startingOffsets", "latest")
val mode = config.getOrElse("mode", "batch")

mode match {
case "streaming" =>
val reader = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", startingOffsets)

config.get("maxOffsetsPerTrigger").foreach(v => reader.option("maxOffsetsPerTrigger", v))

reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")

case _ => // batch
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", startingOffsets)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")
}
}

override def healthCheck(config: Map[String, String]): Either[String, Long] = {
val brokers = config.getOrElse("brokers", return Left("brokers not configured"))
try {
import java.util.Properties
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("request.timeout.ms", "5000")
props.put("default.api.timeout.ms", "5000")

val start = System.currentTimeMillis()
val consumer = new org.apache.kafka.clients.consumer.KafkaConsumer[String, String](props,
new org.apache.kafka.common.serialization.StringDeserializer(),
new org.apache.kafka.common.serialization.StringDeserializer())
consumer.listTopics()
val latency = System.currentTimeMillis() - start
consumer.close()
Right(latency)
} catch {
case e: Exception => Left(s"Kafka connection failed: ${e.getMessage}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.dataweaver.connectors.sources

import com.dataweaver.core.plugin.SourceConnector
import org.apache.spark.sql.{DataFrame, SparkSession}

/** Reads data from MongoDB collections via Spark MongoDB connector.
*
* Config:
* uri - MongoDB connection URI (e.g., "mongodb://host:27017")
* database - Database name
* collection - Collection name
* pipeline - Optional aggregation pipeline JSON (e.g., "[{$match: {active: true}}]")
*
* Requires: org.mongodb.spark:mongo-spark-connector on the Spark classpath.
*/
class MongoDBSourceConnector extends SourceConnector {
def connectorType: String = "MongoDB"

def read(config: Map[String, String])(implicit spark: SparkSession): DataFrame = {
val uri = config.getOrElse("uri",
throw new IllegalArgumentException("MongoDB: 'uri' is required"))
val database = config.getOrElse("database",
throw new IllegalArgumentException("MongoDB: 'database' is required"))
val collection = config.getOrElse("collection",
throw new IllegalArgumentException("MongoDB: 'collection' is required"))

val reader = spark.read
.format("mongodb")
.option("connection.uri", uri)
.option("database", database)
.option("collection", collection)

config.get("pipeline").foreach(p => reader.option("aggregation.pipeline", p))

reader.load()
}

override def healthCheck(config: Map[String, String]): Either[String, Long] = {
val uri = config.getOrElse("uri", return Left("uri not configured"))
try {
val start = System.currentTimeMillis()
// Simple TCP check on MongoDB port
val uriParts = uri.replace("mongodb://", "").split(":")
val host = uriParts(0)
val port = if (uriParts.length > 1) uriParts(1).split("/")(0).toInt else 27017
val socket = new java.net.Socket()
socket.connect(new java.net.InetSocketAddress(host, port), 5000)
val latency = System.currentTimeMillis() - start
socket.close()
Right(latency)
} catch {
case e: Exception => Left(s"MongoDB connection failed: ${e.getMessage}")
}
}
}
Loading
Loading