Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
FROM eclipse-temurin:17-jre-jammy AS base

# Install Spark
ENV SPARK_VERSION=4.0.2
ENV SPARK_HOME=/opt/spark
ENV PATH=$SPARK_HOME/bin:$PATH

RUN apt-get update && apt-get install -y curl procps && \
curl -fsSL "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz" | \
tar -xz -C /opt && \
mv /opt/spark-${SPARK_VERSION}-bin-hadoop3 $SPARK_HOME && \
apt-get clean && rm -rf /var/lib/apt/lists/*

# Create app directory
WORKDIR /app

# Copy the assembled JAR
COPY cli/target/scala-2.13/data-weaver.jar /app/data-weaver.jar

# Copy default configs
COPY scripts/docker-entrypoint.sh /app/entrypoint.sh
RUN chmod +x /app/entrypoint.sh

# Default environment
ENV WEAVER_HOME=/app
ENV JAVA_OPTS="-Xmx1g"

ENTRYPOINT ["/app/entrypoint.sh"]
CMD ["--help"]
6 changes: 6 additions & 0 deletions airflow-operator/data_weaver_airflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Data Weaver Airflow Operator - Execute Data Weaver pipelines from Airflow DAGs."""

from data_weaver_airflow.operator import DataWeaverOperator

__all__ = ["DataWeaverOperator"]
__version__ = "0.2.0"
86 changes: 86 additions & 0 deletions airflow-operator/data_weaver_airflow/operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Data Weaver Airflow Operator.

Usage in DAG:
from data_weaver_airflow import DataWeaverOperator

etl_task = DataWeaverOperator(
task_id="customer_etl",
pipeline="pipelines/customer_etl.yaml",
env="prod",
weaver_jar="/opt/weaver/data-weaver.jar",
dag=dag,
)
"""

import subprocess
from typing import Optional

from airflow.models import BaseOperator


class DataWeaverOperator(BaseOperator):
"""Execute a Data Weaver pipeline.

Args:
pipeline: Path to the pipeline YAML file
env: Environment profile (dev, prod)
command: Weaver command (default: apply)
weaver_jar: Path to data-weaver.jar
java_opts: JVM options (default: -Xmx1g)
spark_master: Spark master URL (default: local[*])
extra_args: Additional CLI arguments
"""

template_fields = ("pipeline", "env", "extra_args")

def __init__(
self,
pipeline: str,
env: Optional[str] = None,
command: str = "apply",
weaver_jar: str = "/opt/weaver/data-weaver.jar",
java_opts: str = "-Xmx1g",
spark_master: Optional[str] = None,
extra_args: str = "",
**kwargs,
):
super().__init__(**kwargs)
self.pipeline = pipeline
self.env = env
self.command = command
self.weaver_jar = weaver_jar
self.java_opts = java_opts
self.spark_master = spark_master
self.extra_args = extra_args

def execute(self, context):
cmd = ["java", self.java_opts, "-jar", self.weaver_jar, self.command, self.pipeline]

if self.env:
cmd.extend(["--env", self.env])

if self.extra_args:
cmd.extend(self.extra_args.split())

self.log.info(f"Executing Data Weaver: {' '.join(cmd)}")

env = {}
if self.spark_master:
env["SPARK_MASTER"] = self.spark_master

result = subprocess.run(
cmd,
capture_output=True,
text=True,
env={**dict(__import__("os").environ), **env},
)

self.log.info(f"stdout: {result.stdout}")

if result.returncode != 0:
self.log.error(f"stderr: {result.stderr}")
raise RuntimeError(
f"Data Weaver pipeline failed with exit code {result.returncode}: {result.stderr}"
)

return result.stdout
15 changes: 15 additions & 0 deletions airflow-operator/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[build-system]
requires = ["setuptools>=64", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "data-weaver-airflow"
version = "0.2.0"
description = "Airflow operator for Data Weaver pipelines"
requires-python = ">=3.9"
dependencies = [
"apache-airflow>=2.7.0",
]

[project.urls]
Repository = "https://github.com/netsirius/data-weaver"
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ lazy val connectors = (project in file("connectors"))
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"com.mysql" % "mysql-connector-j" % "9.1.0",
"org.json4s" %% "json4s-jackson" % "4.0.7",
"org.postgresql" % "postgresql" % "42.7.4"
"org.postgresql" % "postgresql" % "42.7.4",
"org.apache.kafka" % "kafka-clients" % "3.7.0" % "provided"
)
)

Expand Down
109 changes: 109 additions & 0 deletions cli/src/main/scala/com/dataweaver/cli/commands/SubmitCommand.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.dataweaver.cli.commands

import org.apache.log4j.LogManager

import scala.sys.process._
import scala.util.{Failure, Success, Try}

/** Submits a Data Weaver pipeline to a remote Spark cluster.
* Wraps spark-submit for various deployment targets.
*
* Targets: standalone, k8s, emr, dataproc
*/
object SubmitCommand {
private val logger = LogManager.getLogger(getClass)

def run(
pipelinePath: String,
target: String,
env: Option[String] = None,
config: Map[String, String] = Map.empty
): Unit = {
val jarPath = config.getOrElse("jar", findJar())
val envFlag = env.map(e => s"--env $e").getOrElse("")

target match {
case "standalone" => submitStandalone(jarPath, pipelinePath, envFlag, config)
case "k8s" => submitK8s(jarPath, pipelinePath, envFlag, config)
case "emr" => submitEMR(jarPath, pipelinePath, envFlag, config)
case "dataproc" => submitDataproc(jarPath, pipelinePath, envFlag, config)
case other => System.err.println(s"Unknown submit target: $other. Use: standalone, k8s, emr, dataproc")
}
}

private def submitStandalone(
jar: String, pipeline: String, envFlag: String, config: Map[String, String]
): Unit = {
val master = config.getOrElse("master", "spark://localhost:7077")
val cmd = s"""spark-submit --master $master --class com.dataweaver.cli.WeaverCLI $jar apply $pipeline $envFlag"""
executeCommand(cmd)
}

private def submitK8s(
jar: String, pipeline: String, envFlag: String, config: Map[String, String]
): Unit = {
val master = config.getOrElse("master", throw new IllegalArgumentException("k8s requires --master"))
val image = config.getOrElse("image", "data-weaver:latest")
val namespace = config.getOrElse("namespace", "default")

val cmd = s"""spark-submit
--master $master
--deploy-mode cluster
--conf spark.kubernetes.container.image=$image
--conf spark.kubernetes.namespace=$namespace
--class com.dataweaver.cli.WeaverCLI
local:///app/data-weaver.jar apply $pipeline $envFlag""".replaceAll("\\s+", " ").trim

executeCommand(cmd)
}

private def submitEMR(
jar: String, pipeline: String, envFlag: String, config: Map[String, String]
): Unit = {
val clusterId = config.getOrElse("cluster-id",
throw new IllegalArgumentException("EMR requires --cluster-id"))

val cmd = s"""aws emr add-steps
--cluster-id $clusterId
--steps Type=Spark,Name=DataWeaver,Args=[--class,com.dataweaver.cli.WeaverCLI,$jar,apply,$pipeline${if (envFlag.nonEmpty) s",$envFlag" else ""}],ActionOnFailure=CONTINUE""".replaceAll("\\s+", " ").trim

executeCommand(cmd)
}

private def submitDataproc(
jar: String, pipeline: String, envFlag: String, config: Map[String, String]
): Unit = {
val cluster = config.getOrElse("cluster",
throw new IllegalArgumentException("Dataproc requires --cluster"))
val region = config.getOrElse("region", "us-central1")

val cmd = s"""gcloud dataproc jobs submit spark
--cluster=$cluster --region=$region
--class=com.dataweaver.cli.WeaverCLI
--jars=$jar
-- apply $pipeline $envFlag""".replaceAll("\\s+", " ").trim

executeCommand(cmd)
}

private def executeCommand(cmd: String): Unit = {
println(s"Executing: $cmd")
Try(cmd.!) match {
case Success(0) => println("Submission completed successfully.")
case Success(code) => System.err.println(s"Submission failed with exit code: $code")
case Failure(e) => System.err.println(s"Submission error: ${e.getMessage}")
}
}

private def findJar(): String = {
val candidates = List(
"cli/target/scala-2.13/data-weaver.jar",
"data-weaver.jar",
sys.env.getOrElse("WEAVER_JAR", "")
).filter(_.nonEmpty)

candidates.find(p => new java.io.File(p).exists())
.getOrElse(throw new IllegalArgumentException(
"Cannot find data-weaver.jar. Build it with: sbt 'cli/assembly'"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ com.dataweaver.connectors.sinks.BigQuerySinkConnector
com.dataweaver.connectors.sinks.TestSinkConnector
com.dataweaver.connectors.sinks.FileSinkConnector
com.dataweaver.connectors.sinks.DeltaLakeSinkConnector
com.dataweaver.connectors.sinks.KafkaSinkConnector
com.dataweaver.connectors.sinks.ElasticsearchSinkConnector
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@ com.dataweaver.connectors.sources.SQLSourceConnector
com.dataweaver.connectors.sources.TestSourceConnector
com.dataweaver.connectors.sources.PostgreSQLSourceConnector
com.dataweaver.connectors.sources.FileSourceConnector
com.dataweaver.connectors.sources.KafkaSourceConnector
com.dataweaver.connectors.sources.MongoDBSourceConnector
com.dataweaver.connectors.sources.RESTSourceConnector
com.dataweaver.connectors.sources.BigQuerySourceConnector
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.dataweaver.connectors.sinks

import com.dataweaver.core.plugin.SinkConnector
import org.apache.log4j.LogManager
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.net.URI
import java.net.http.{HttpClient, HttpRequest, HttpResponse}

/** Writes data to Elasticsearch indices.
*
* Config:
* nodes - Elasticsearch node URLs (e.g., "http://localhost:9200")
* index - Index name
* saveMode - "Append" (default) or "Overwrite"
* idColumn - Column to use as document _id (optional)
*
* Uses Spark Elasticsearch connector if available, falls back to REST bulk API.
*/
class ElasticsearchSinkConnector extends SinkConnector {
private val logger = LogManager.getLogger(getClass)
private val httpClient = HttpClient.newHttpClient()

def connectorType: String = "Elasticsearch"

def write(data: DataFrame, pipelineName: String, config: Map[String, String])(implicit
spark: SparkSession
): Unit = {
val nodes = config.getOrElse("nodes",
throw new IllegalArgumentException("Elasticsearch: 'nodes' is required"))
val index = config.getOrElse("index",
throw new IllegalArgumentException("Elasticsearch: 'index' is required"))
val saveMode = config.getOrElse("saveMode", "Append")
val idColumn = config.get("idColumn")

// Try Spark ES connector first
try {
val writer = data.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes", nodes.replace("http://", "").replace("https://", ""))
.option("es.resource", index)
.option("es.nodes.wan.only", "true")

idColumn.foreach(id => writer.option("es.mapping.id", id))

if (saveMode.toLowerCase == "overwrite") {
writer.mode("overwrite").save()
} else {
writer.mode("append").save()
}
} catch {
case _: ClassNotFoundException =>
logger.warn("Spark ES connector not found, using REST bulk API fallback")
writeBulkREST(data, nodes, index, idColumn)
}
}

/** Fallback: write via Elasticsearch REST bulk API. */
private def writeBulkREST(df: DataFrame, nodes: String, index: String, idColumn: Option[String]): Unit = {
val rows = df.toJSON.collect()
val bulkBody = new StringBuilder()

rows.foreach { json =>
val action = idColumn match {
case Some(_) => s"""{"index":{"_index":"$index"}}"""
case None => s"""{"index":{"_index":"$index"}}"""
}
bulkBody.append(action).append("\n").append(json).append("\n")
}

val request = HttpRequest.newBuilder()
.uri(URI.create(s"$nodes/_bulk"))
.header("Content-Type", "application/x-ndjson")
.POST(HttpRequest.BodyPublishers.ofString(bulkBody.toString()))
.build()

val response = httpClient.send(request, HttpResponse.BodyHandlers.ofString())
if (response.statusCode() >= 400) {
throw new RuntimeException(s"Elasticsearch bulk write failed (${response.statusCode()}): ${response.body().take(500)}")
}

logger.info(s"Wrote ${rows.length} documents to Elasticsearch index '$index'")
}

override def healthCheck(config: Map[String, String]): Either[String, Long] = {
val nodes = config.getOrElse("nodes", return Left("nodes not configured"))
try {
val start = System.currentTimeMillis()
val request = HttpRequest.newBuilder()
.uri(URI.create(s"$nodes/_cluster/health"))
.GET()
.build()
val response = httpClient.send(request, HttpResponse.BodyHandlers.ofString())
val latency = System.currentTimeMillis() - start
if (response.statusCode() < 400) Right(latency)
else Left(s"Elasticsearch unhealthy (${response.statusCode()})")
} catch {
case e: Exception => Left(s"Elasticsearch unreachable: ${e.getMessage}")
}
}
}
Loading
Loading