diff --git a/README.md b/README.md index 46482d5..32fee39 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,14 @@ tests: | `weaver test --coverage` | Show test coverage report | | `weaver apply ` | Execute the pipeline | | `weaver apply --env prod` | Execute with environment profile | +| `weaver install ` | Install a connector plugin | +| `weaver scaffold connector ` | Generate a connector project (ready to build) | +| `weaver scaffold transform ` | Generate a transform plugin project | +| `weaver scaffold registry ` | Generate a private connector registry | +| `weaver install ` | Install a connector plugin | +| `weaver list connectors` | List available source and sink connectors | +| `weaver list transforms` | List available transformation types | +| `weaver list plugins` | List installed plugin JARs | ## Connectors @@ -251,6 +259,57 @@ class MyConnector extends SourceConnector { Package as JAR, drop in `plugins/`, and it's automatically discovered via ServiceLoader. +## Tutorials + +Ready-to-run example pipelines in [`docs/tutorials/`](docs/tutorials/): + +| Tutorial | Description | Concepts | +|----------|-------------|----------| +| [01 — CSV to Parquet](docs/tutorials/01-csv-to-parquet.yaml) | Read CSV, clean with SQL, validate, write Parquet | File I/O, SQL, DataQuality | +| [02 — Multi-source Join](docs/tutorials/02-multi-source-join.yaml) | Join two sources with parallel transforms | Parallel DAG, aggregation | +| [03 — RAG Pipeline](docs/tutorials/03-rag-pipeline.yaml) | Chunk documents for vector search | Chunking, RAG preparation | +| [04 — LLM Classification](docs/tutorials/04-llm-classification.yaml) | Classify tickets with Gemini/Ollama | LLMTransform, local models | +| [05 — Production ETL](docs/tutorials/05-production-etl.yaml) | Full pipeline: PostgreSQL → DeltaLake | Connections, profiles, merge | + +```bash +# Run a tutorial +weaver validate docs/tutorials/01-csv-to-parquet.yaml +weaver plan docs/tutorials/01-csv-to-parquet.yaml +weaver apply docs/tutorials/01-csv-to-parquet.yaml +``` + +## Plugin Registry + +Install connector plugins without recompiling: + +```bash +# Install from Maven Central +weaver install connector-kafka +weaver install com.example:my-connector:1.0.0 + +# Install local JAR +weaver install /path/to/connector.jar + +# List what's available +weaver list connectors +weaver list transforms +weaver list plugins +``` + +### Build Your Own Connector + +```bash +# Scaffold a complete connector project (build.sbt, source code, tests, ServiceLoader) +weaver scaffold connector my-redis-connector +weaver scaffold connector my-api-connector --type both # source + sink +weaver scaffold transform my-custom-transform + +# For your company: create a private registry +weaver scaffold registry my-company-connectors +``` + +Plugins are loaded from `~/.weaver/plugins/` automatically. See the [Connector SDK](docs/connector-sdk/CONNECTOR_SDK.md) for the full guide. + ## Architecture ``` diff --git a/cli/src/main/scala/com/dataweaver/cli/WeaverCLI.scala b/cli/src/main/scala/com/dataweaver/cli/WeaverCLI.scala index c537e03..d6afeb6 100644 --- a/cli/src/main/scala/com/dataweaver/cli/WeaverCLI.scala +++ b/cli/src/main/scala/com/dataweaver/cli/WeaverCLI.scala @@ -15,7 +15,12 @@ object WeaverCLI { showCoverage: Boolean = false, description: Option[String] = None, interactive: Boolean = false, - projectName: Option[String] = None + projectName: Option[String] = None, + artifact: Option[String] = None, + listCategory: Option[String] = None, + scaffoldType: Option[String] = None, + scaffoldName: Option[String] = None, + connectorKind: String = "source" ) def main(args: Array[String]): Unit = { @@ -45,6 +50,36 @@ object WeaverCLI { .action((x, c) => c.copy(description = Some(x))) .text("Natural language description of the pipeline") ), + cmd("scaffold") + .action((_, c) => c.copy(command = "scaffold")) + .text("Generate a connector, transform, or private registry project") + .children( + arg[String]("") + .action((x, c) => c.copy(scaffoldType = Some(x))) + .text("What to scaffold: connector, transform, registry"), + arg[String]("") + .action((x, c) => c.copy(scaffoldName = Some(x))) + .text("Project name"), + opt[String]("type") + .action((x, c) => c.copy(connectorKind = x)) + .text("Connector kind: source (default), sink, or both") + ), + cmd("install") + .action((_, c) => c.copy(command = "install")) + .text("Install a connector plugin from Maven Central or local JAR") + .children( + arg[String]("") + .action((x, c) => c.copy(artifact = Some(x))) + .text("Artifact: connector-kafka, groupId:artifactId:version, or /path/to.jar") + ), + cmd("list") + .action((_, c) => c.copy(command = "list")) + .text("List available connectors, transforms, or installed plugins") + .children( + arg[String]("") + .action((x, c) => c.copy(listCategory = Some(x))) + .text("Category: connectors, transforms, plugins, all") + ), cmd("doctor") .action((_, c) => c.copy(command = "doctor")) .text("Full system diagnostic") @@ -122,6 +157,12 @@ object WeaverCLI { case "init" => if (config.interactive) InteractiveWizard.run() else InitCommand.run(config.projectName.getOrElse("my-project")) + case "scaffold" => + ScaffoldCommand.run(config.scaffoldType.get, config.scaffoldName.get, config.connectorKind) + case "install" => + InstallCommand.run(config.artifact.get) + case "list" => + ListCommand.run(config.listCategory.getOrElse("all")) case "generate" => GenerateCommand.run(config.description.get) case "doctor" => diff --git a/cli/src/main/scala/com/dataweaver/cli/commands/InstallCommand.scala b/cli/src/main/scala/com/dataweaver/cli/commands/InstallCommand.scala new file mode 100644 index 0000000..ce03273 --- /dev/null +++ b/cli/src/main/scala/com/dataweaver/cli/commands/InstallCommand.scala @@ -0,0 +1,131 @@ +package com.dataweaver.cli.commands + +import com.dataweaver.core.plugin.PluginLoader +import org.apache.log4j.LogManager + +import java.io.{File, FileOutputStream} +import java.net.URI +import java.net.http.{HttpClient, HttpRequest, HttpResponse} + +/** Installs connector plugins from Maven Central or a custom registry. + * + * Usage: + * weaver install connector-kafka # shorthand + * weaver install com.dataweaver:connector-kafka:1.0.0 # full Maven coords + * weaver install /path/to/connector.jar # local JAR + */ +object InstallCommand { + private val logger = LogManager.getLogger(getClass) + private val httpClient = HttpClient.newHttpClient() + + private val pluginDir = sys.props.getOrElse("user.home", ".") + "/.weaver/plugins" + + /** Known shorthand mappings for official connectors. */ + private val shorthands = Map( + "connector-kafka" -> ("com.dataweaver", "data-weaver-connector-kafka", "0.2.0"), + "connector-mongodb" -> ("com.dataweaver", "data-weaver-connector-mongodb", "0.2.0"), + "connector-elasticsearch" -> ("com.dataweaver", "data-weaver-connector-elasticsearch", "0.2.0"), + "connector-redis" -> ("com.dataweaver", "data-weaver-connector-redis", "0.2.0"), + "connector-s3" -> ("com.dataweaver", "data-weaver-connector-s3", "0.2.0"), + "connector-neo4j" -> ("com.dataweaver", "data-weaver-connector-neo4j", "0.2.0") + ) + + def run(artifact: String): Unit = { + PluginLoader.ensurePluginDirs() + + // Local JAR file + if (artifact.endsWith(".jar") && new File(artifact).exists()) { + installLocalJar(artifact) + return + } + + // Shorthand or Maven coordinates + val (groupId, artifactId, version) = shorthands.getOrElse(artifact, parseMavenCoords(artifact)) + + println(s"Installing $groupId:$artifactId:$version...") + + val jarUrl = mavenCentralUrl(groupId, artifactId, version) + val targetFile = new File(s"$pluginDir/$artifactId-$version.jar") + + if (targetFile.exists()) { + println(s" Already installed: ${targetFile.getName}") + return + } + + downloadJar(jarUrl, targetFile) match { + case Right(_) => + println(s" Downloaded to: ${targetFile.getAbsolutePath}") + println(s" Connector will be available on next weaver command.") + println() + case Left(error) => + System.err.println(s" Failed to download: $error") + System.err.println() + System.err.println(" If this is a local JAR, use:") + System.err.println(s" weaver install /path/to/$artifactId.jar") + System.err.println() + System.err.println(" Or copy manually:") + System.err.println(s" cp /path/to/$artifactId.jar $pluginDir/") + } + } + + /** List installed plugins. */ + def listInstalled(): List[String] = { + val dir = new File(pluginDir) + if (!dir.exists()) return List.empty + dir.listFiles() + .filter(f => f.isFile && f.getName.endsWith(".jar")) + .map(_.getName) + .toList + } + + private def installLocalJar(path: String): Unit = { + val source = new File(path) + val target = new File(s"$pluginDir/${source.getName}") + + java.nio.file.Files.copy(source.toPath, target.toPath, + java.nio.file.StandardCopyOption.REPLACE_EXISTING) + + println(s" Installed: ${target.getName}") + println(s" Location: ${target.getAbsolutePath}") + } + + private def parseMavenCoords(coords: String): (String, String, String) = { + coords.split(":") match { + case Array(g, a, v) => (g, a, v) + case Array(g, a) => (g, a, "LATEST") + case _ => + throw new IllegalArgumentException( + s"Invalid artifact format: '$coords'. Expected: groupId:artifactId:version") + } + } + + private def mavenCentralUrl(groupId: String, artifactId: String, version: String): String = { + val groupPath = groupId.replace('.', '/') + s"https://repo1.maven.org/maven2/$groupPath/$artifactId/$version/$artifactId-$version.jar" + } + + private def downloadJar(url: String, target: File): Either[String, Unit] = { + try { + val request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .GET() + .build() + + val response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream()) + + if (response.statusCode() != 200) { + return Left(s"HTTP ${response.statusCode()} from $url") + } + + val fos = new FileOutputStream(target) + try { + response.body().transferTo(fos) + Right(()) + } finally { + fos.close() + } + } catch { + case e: Exception => Left(e.getMessage) + } + } +} diff --git a/cli/src/main/scala/com/dataweaver/cli/commands/ListCommand.scala b/cli/src/main/scala/com/dataweaver/cli/commands/ListCommand.scala new file mode 100644 index 0000000..fadacb0 --- /dev/null +++ b/cli/src/main/scala/com/dataweaver/cli/commands/ListCommand.scala @@ -0,0 +1,62 @@ +package com.dataweaver.cli.commands + +import com.dataweaver.core.plugin.{PluginLoader, PluginRegistry} + +/** Lists available connectors, transforms, and installed plugins. */ +object ListCommand { + + def run(category: String): Unit = { + // Load external plugins first + PluginLoader.loadExternalPlugins() + + category match { + case "connectors" => listConnectors() + case "transforms" => listTransforms() + case "plugins" => listPlugins() + case "all" => listConnectors(); println(); listTransforms(); println(); listPlugins() + case other => + System.err.println(s"Unknown category: '$other'. Use: connectors, transforms, plugins, all") + } + } + + private def listConnectors(): Unit = { + println() + println(" Source Connectors:") + println(" " + "\u2500" * 40) + val sources = PluginRegistry.availableSources.toList.sorted + if (sources.isEmpty) println(" (none loaded)") + else sources.foreach(s => println(s" \u2022 $s")) + + println() + println(" Sink Connectors:") + println(" " + "\u2500" * 40) + val sinks = PluginRegistry.availableSinks.toList.sorted + if (sinks.isEmpty) println(" (none loaded)") + else sinks.foreach(s => println(s" \u2022 $s")) + } + + private def listTransforms(): Unit = { + println() + println(" Transform Types:") + println(" " + "\u2500" * 40) + val transforms = PluginRegistry.availableTransforms.toList.sorted + if (transforms.isEmpty) println(" (none loaded)") + else transforms.foreach(t => println(s" \u2022 $t")) + } + + private def listPlugins(): Unit = { + val installed = InstallCommand.listInstalled() + println() + println(" Installed Plugins:") + println(" " + "\u2500" * 40) + if (installed.isEmpty) { + println(" (none)") + println() + println(" Install plugins with:") + println(" weaver install connector-kafka") + println(" weaver install /path/to/connector.jar") + } else { + installed.foreach(p => println(s" \u2022 $p")) + } + } +} diff --git a/cli/src/main/scala/com/dataweaver/cli/commands/ScaffoldCommand.scala b/cli/src/main/scala/com/dataweaver/cli/commands/ScaffoldCommand.scala new file mode 100644 index 0000000..9d007e6 --- /dev/null +++ b/cli/src/main/scala/com/dataweaver/cli/commands/ScaffoldCommand.scala @@ -0,0 +1,406 @@ +package com.dataweaver.cli.commands + +import java.io.{File, PrintWriter} +import java.nio.file.{Files, Paths} + +/** Scaffolds connector projects, private registries, and transform plugins. + * + * Usage: + * weaver scaffold connector my-redis-connector + * weaver scaffold connector my-salesforce-connector --type sink + * weaver scaffold connector my-connector --type both + * weaver scaffold transform my-custom-transform + * weaver scaffold registry my-company-registry + */ +object ScaffoldCommand { + + def run(what: String, name: String, connectorKind: String = "source"): Unit = { + what match { + case "connector" => scaffoldConnector(name, connectorKind) + case "transform" => scaffoldTransform(name) + case "registry" => scaffoldRegistry(name) + case other => + System.err.println(s"Unknown scaffold type: '$other'") + System.err.println("Usage: weaver scaffold connector|transform|registry ") + } + } + + /** Scaffold a complete connector project ready to build and install. */ + private def scaffoldConnector(name: String, kind: String): Unit = { + val projectDir = Paths.get(name) + if (Files.exists(projectDir)) { + System.err.println(s"Directory '$name' already exists.") + return + } + + // Derive class names from project name + val className = name.split("[-_]").map(_.capitalize).mkString("") + val packageName = s"com.dataweaver.connectors.${name.replace("-", "").replace("_", "")}" + val packagePath = packageName.replace('.', '/') + + // Create directory structure + val srcDir = projectDir.resolve(s"src/main/scala/$packagePath") + val resourceDir = projectDir.resolve("src/main/resources/META-INF/services") + val testDir = projectDir.resolve(s"src/test/scala/$packagePath") + Files.createDirectories(srcDir) + Files.createDirectories(resourceDir) + Files.createDirectories(testDir) + + // build.sbt + writeFile(projectDir.resolve("build.sbt").toFile, + s"""name := "$name" + |version := "0.1.0" + |scalaVersion := "2.13.14" + | + |libraryDependencies ++= Seq( + | "com.dataweaver" %% "data-weaver-core" % "0.2.0" % "provided", + | "org.apache.spark" %% "spark-sql" % "4.0.2" % "provided", + | "org.scalatest" %% "scalatest" % "3.2.19" % Test + |) + | + |// Package as fat JAR for installation + |assembly / assemblyJarName := s"$${name.value}-$${version.value}.jar" + |""".stripMargin) + + // Source connector + if (kind == "source" || kind == "both") { + writeFile(srcDir.resolve(s"${className}SourceConnector.scala").toFile, + s"""package $packageName + | + |import com.dataweaver.core.plugin.SourceConnector + |import org.apache.spark.sql.{DataFrame, SparkSession} + | + |/** TODO: Implement your source connector. + | * + | * Pipeline YAML usage: + | * {{{ + | * dataSources: + | * - id: my_source + | * type: $className + | * connection: my-connection + | * config: + | * option1: value1 + | * }}} + | */ + |class ${className}SourceConnector extends SourceConnector { + | + | def connectorType: String = "$className" + | + | def read(config: Map[String, String])(implicit spark: SparkSession): DataFrame = { + | // TODO: Implement data reading logic + | // config contains all key-value pairs from the YAML config section + | // plus "id" and "query" from the dataSource definition + | // + | // Example: + | // val host = config.getOrElse("host", + | // throw new IllegalArgumentException("$className: 'host' is required")) + | // spark.read.format("jdbc").option("url", s"jdbc:mydb://$$host").load() + | + | throw new UnsupportedOperationException("TODO: implement read()") + | } + | + | override def healthCheck(config: Map[String, String]): Either[String, Long] = { + | // TODO: Implement connectivity test for 'weaver doctor' + | // Return Right(latencyMs) on success, Left(errorMessage) on failure + | // + | // Example: + | // val start = System.currentTimeMillis() + | // // ... test connection ... + | // Right(System.currentTimeMillis() - start) + | + | Left("Health check not yet implemented") + | } + |} + |""".stripMargin) + + writeFile(resourceDir.resolve("com.dataweaver.core.plugin.SourceConnector").toFile, + s"$packageName.${className}SourceConnector\n") + + writeFile(testDir.resolve(s"${className}SourceConnectorTest.scala").toFile, + s"""package $packageName + | + |import org.scalatest.flatspec.AnyFlatSpec + |import org.scalatest.matchers.should.Matchers + | + |class ${className}SourceConnectorTest extends AnyFlatSpec with Matchers { + | + | "${className}SourceConnector" should "have correct connector type" in { + | val connector = new ${className}SourceConnector() + | connector.connectorType shouldBe "$className" + | } + | + | // TODO: Add integration tests with a real or mocked data source + |} + |""".stripMargin) + } + + // Sink connector + if (kind == "sink" || kind == "both") { + writeFile(srcDir.resolve(s"${className}SinkConnector.scala").toFile, + s"""package $packageName + | + |import com.dataweaver.core.plugin.SinkConnector + |import org.apache.spark.sql.{DataFrame, SparkSession} + | + |/** TODO: Implement your sink connector. + | * + | * Pipeline YAML usage: + | * {{{ + | * sinks: + | * - id: my_sink + | * type: $className + | * source: transform_id + | * config: + | * option1: value1 + | * }}} + | */ + |class ${className}SinkConnector extends SinkConnector { + | + | def connectorType: String = "$className" + | + | def write(data: DataFrame, pipelineName: String, config: Map[String, String])(implicit + | spark: SparkSession + | ): Unit = { + | // TODO: Implement data writing logic + | // config contains all key-value pairs from the YAML config section + | // + | // Example: + | // val path = config.getOrElse("path", + | // throw new IllegalArgumentException("$className: 'path' is required")) + | // data.write.format("myformat").save(path) + | + | throw new UnsupportedOperationException("TODO: implement write()") + | } + |} + |""".stripMargin) + + writeFile(resourceDir.resolve("com.dataweaver.core.plugin.SinkConnector").toFile, + s"$packageName.${className}SinkConnector\n") + } + + // README + writeFile(projectDir.resolve("README.md").toFile, + s"""# $name + | + |Data Weaver connector for $className. + | + |## Build + | + |```bash + |sbt assembly + |``` + | + |## Install + | + |```bash + |# Option 1: weaver install + |weaver install target/scala-2.13/$name-0.1.0.jar + | + |# Option 2: manual copy + |cp target/scala-2.13/$name-0.1.0.jar ~/.weaver/plugins/ + |``` + | + |## Usage + | + |```yaml + |dataSources: + | - id: my_data + | type: $className + | config: + | option1: value1 + |``` + | + |## Development + | + |1. Implement the connector in `src/main/scala/` + |2. Run tests: `sbt test` + |3. Build JAR: `sbt assembly` + |4. Install: `weaver install target/scala-2.13/$name-0.1.0.jar` + |5. Verify: `weaver list connectors` should show "$className" + |""".stripMargin) + + // .gitignore + writeFile(projectDir.resolve(".gitignore").toFile, + """**/target/ + |.idea/ + |.bsp/ + |*.class + |.DS_Store + |""".stripMargin) + + println() + println(s" Connector project '$name' created!") + println() + println(" Structure:") + println(s" $name/") + println(s" ├── build.sbt") + if (kind == "source" || kind == "both") + println(s" ├── src/main/scala/.../ ${className}SourceConnector.scala") + if (kind == "sink" || kind == "both") + println(s" ├── src/main/scala/.../ ${className}SinkConnector.scala") + println(s" ├── src/main/resources/ META-INF/services (auto-registered)") + println(s" ├── src/test/scala/.../ Tests") + println(s" └── README.md") + println() + println(" Next steps:") + println(s" cd $name") + println(s" # Edit the TODO sections in the connector code") + println(s" sbt test") + println(s" sbt assembly") + println(s" weaver install target/scala-2.13/$name-0.1.0.jar") + println() + } + + /** Scaffold a custom transform plugin project. */ + private def scaffoldTransform(name: String): Unit = { + val projectDir = Paths.get(name) + if (Files.exists(projectDir)) { + System.err.println(s"Directory '$name' already exists.") + return + } + + val className = name.split("[-_]").map(_.capitalize).mkString("") + val packageName = s"com.dataweaver.transforms.${name.replace("-", "").replace("_", "")}" + val packagePath = packageName.replace('.', '/') + + val srcDir = projectDir.resolve(s"src/main/scala/$packagePath") + val resourceDir = projectDir.resolve("src/main/resources/META-INF/services") + Files.createDirectories(srcDir) + Files.createDirectories(resourceDir) + + writeFile(projectDir.resolve("build.sbt").toFile, + s"""name := "$name" + |version := "0.1.0" + |scalaVersion := "2.13.14" + | + |libraryDependencies ++= Seq( + | "com.dataweaver" %% "data-weaver-core" % "0.2.0" % "provided", + | "org.apache.spark" %% "spark-sql" % "4.0.2" % "provided", + | "org.scalatest" %% "scalatest" % "3.2.19" % Test + |) + |""".stripMargin) + + writeFile(srcDir.resolve(s"${className}Plugin.scala").toFile, + s"""package $packageName + | + |import com.dataweaver.core.plugin.{TransformConfig, TransformPlugin} + |import org.apache.spark.sql.{DataFrame, SparkSession} + | + |class ${className}Plugin extends TransformPlugin { + | def transformType: String = "$className" + | + | def transform(inputs: Map[String, DataFrame], config: TransformConfig)(implicit + | spark: SparkSession + | ): DataFrame = { + | val df = inputs.values.head + | // TODO: implement transformation + | df + | } + |} + |""".stripMargin) + + writeFile(resourceDir.resolve("com.dataweaver.core.plugin.TransformPlugin").toFile, + s"$packageName.${className}Plugin\n") + + println(s" Transform project '$name' created!") + println(s" Next: cd $name && edit src/.../${className}Plugin.scala") + } + + /** Scaffold a private connector registry for an organization. */ + private def scaffoldRegistry(name: String): Unit = { + val projectDir = Paths.get(name) + if (Files.exists(projectDir)) { + System.err.println(s"Directory '$name' already exists.") + return + } + + Files.createDirectories(projectDir.resolve("connectors")) + + writeFile(projectDir.resolve("README.md").toFile, + s"""# $name — Private Connector Registry + | + |Private Data Weaver connector registry for your organization. + | + |## Structure + | + |``` + |$name/ + |├── connectors/ # Connector JARs + |│ ├── connector-a-1.0.0.jar + |│ └── connector-b-2.1.0.jar + |├── registry.yaml # Connector catalog + |└── install.sh # Team installer script + |``` + | + |## Publishing a connector + | + |1. Build your connector: `cd my-connector && sbt assembly` + |2. Copy JAR to `connectors/` + |3. Update `registry.yaml` + |4. Push to your Git remote + | + |## Installing for your team + | + |```bash + |# Clone the registry + |git clone git@github.com:your-org/$name.git + | + |# Install all connectors + |./$name/install.sh + | + |# Or install individual connectors + |weaver install $name/connectors/connector-a-1.0.0.jar + |``` + |""".stripMargin) + + writeFile(projectDir.resolve("registry.yaml").toFile, + s"""# $name — Connector Registry + |# List your organization's private connectors here + | + |connectors: + | # - name: my-connector + | # version: 1.0.0 + | # jar: connectors/my-connector-1.0.0.jar + | # description: Custom connector for internal system + | # type: source # source | sink | both + |""".stripMargin) + + writeFile(projectDir.resolve("install.sh").toFile, + s"""#!/bin/bash + |# Install all connectors from this registry + |set -e + |SCRIPT_DIR="$$( cd "$$( dirname "$${BASH_SOURCE[0]}" )" && pwd )" + | + |echo "Installing connectors from $name..." + |for jar in "$$SCRIPT_DIR"/connectors/*.jar; do + | if [ -f "$$jar" ]; then + | echo " Installing: $$(basename $$jar)" + | weaver install "$$jar" + | fi + |done + |echo "Done!" + |""".stripMargin) + + new File(projectDir.resolve("install.sh").toString).setExecutable(true) + + println() + println(s" Private registry '$name' created!") + println() + println(" Structure:") + println(s" $name/") + println(s" ├── connectors/ # Place connector JARs here") + println(s" ├── registry.yaml # Connector catalog") + println(s" ├── install.sh # Team installer script") + println(s" └── README.md") + println() + println(" Share with your team via Git:") + println(s" cd $name && git init && git add -A && git commit -m 'init registry'") + println() + } + + private def writeFile(file: File, content: String): Unit = { + val writer = new PrintWriter(file) + try writer.write(content) + finally writer.close() + } +} diff --git a/core/src/main/scala/com/dataweaver/core/plugin/PluginLoader.scala b/core/src/main/scala/com/dataweaver/core/plugin/PluginLoader.scala new file mode 100644 index 0000000..1f5e3a5 --- /dev/null +++ b/core/src/main/scala/com/dataweaver/core/plugin/PluginLoader.scala @@ -0,0 +1,76 @@ +package com.dataweaver.core.plugin + +import org.apache.log4j.LogManager + +import java.io.File +import java.net.{URL, URLClassLoader} +import java.util.ServiceLoader +import scala.jdk.CollectionConverters._ +import scala.util.Try + +/** Loads external plugin JARs from the plugins directory. + * Scans ~/.weaver/plugins/ (and project-local plugins/) for JARs, + * adds them to the classpath, and discovers connectors via ServiceLoader. + */ +object PluginLoader { + private val logger = LogManager.getLogger(getClass) + + /** Default plugin directories, checked in order. */ + val pluginDirs: List[String] = List( + sys.props.getOrElse("user.home", ".") + "/.weaver/plugins", + "plugins" + ) + + /** Load all plugin JARs from plugin directories and register discovered connectors. */ + def loadExternalPlugins(): Unit = { + val jarFiles = pluginDirs.flatMap(findJars).distinct + + if (jarFiles.isEmpty) { + logger.info("No external plugin JARs found") + return + } + + logger.info(s"Found ${jarFiles.size} plugin JAR(s)") + + val urls = jarFiles.map(f => f.toURI.toURL).toArray + val classLoader = new URLClassLoader(urls, getClass.getClassLoader) + + // Discover and register sources + val sources = ServiceLoader.load(classOf[SourceConnector], classLoader) + sources.asScala.foreach { connector => + logger.info(s" Loaded source: ${connector.connectorType}") + PluginRegistry.registerSource(connector) + } + + // Discover and register sinks + val sinks = ServiceLoader.load(classOf[SinkConnector], classLoader) + sinks.asScala.foreach { connector => + logger.info(s" Loaded sink: ${connector.connectorType}") + PluginRegistry.registerSink(connector) + } + + // Discover and register transforms + val transforms = ServiceLoader.load(classOf[TransformPlugin], classLoader) + transforms.asScala.foreach { plugin => + logger.info(s" Loaded transform: ${plugin.transformType}") + PluginRegistry.registerTransform(plugin) + } + } + + /** Find all .jar files in a directory. */ + private def findJars(dirPath: String): List[File] = { + val dir = new File(dirPath) + if (!dir.exists() || !dir.isDirectory) return List.empty + dir.listFiles() + .filter(f => f.isFile && f.getName.endsWith(".jar")) + .toList + } + + /** Ensure plugin directories exist. */ + def ensurePluginDirs(): Unit = { + pluginDirs.foreach { path => + val dir = new File(path) + if (!dir.exists()) dir.mkdirs() + } + } +} diff --git a/docs/connector-sdk/CONNECTOR_SDK.md b/docs/connector-sdk/CONNECTOR_SDK.md index 06e2f25..b0d64d7 100644 --- a/docs/connector-sdk/CONNECTOR_SDK.md +++ b/docs/connector-sdk/CONNECTOR_SDK.md @@ -59,11 +59,31 @@ class MySinkConnector extends SinkConnector { ### 3. Register via ServiceLoader -Create `src/main/resources/META-INF/services/com.dataweaver.core.plugin.SourceConnector`: +Data Weaver discovers connectors automatically using Java's ServiceLoader mechanism. You need to create a plain text file **inside your connector project** that tells ServiceLoader which classes implement the connector interface. + +Create this file in your project at `src/main/resources/META-INF/services/com.dataweaver.core.plugin.SourceConnector`: + ``` com.example.connector.MySourceConnector ``` +Your project structure should look like this: +``` +my-connector/ +├── src/ +│ └── main/ +│ ├── scala/com/example/connector/ +│ │ └── MySourceConnector.scala +│ └── resources/META-INF/services/ +│ └── com.dataweaver.core.plugin.SourceConnector ← text file +└── build.sbt +``` + +The file name IS the interface name. The file content is the full class name of your implementation (one per line if you have multiple). When you package your project as a JAR and place it in `~/.weaver/plugins/`, Data Weaver finds and loads your connector automatically at startup. + +For sink connectors, create `com.dataweaver.core.plugin.SinkConnector` instead. +For transforms, create `com.dataweaver.core.plugin.TransformPlugin`. + ### 4. Package and install ```bash diff --git a/docs/tutorials/04-llm-classification.yaml b/docs/tutorials/04-llm-classification.yaml new file mode 100644 index 0000000..5a354d6 --- /dev/null +++ b/docs/tutorials/04-llm-classification.yaml @@ -0,0 +1,69 @@ +# Tutorial 4: LLM-powered classification +# Reads support tickets from CSV and classifies them using an LLM +# +# Requires: ANTHROPIC_API_KEY, OPENAI_API_KEY, or GEMINI_API_KEY +# Or use a local Ollama model (no API key needed): +# Change provider to "local" and model to "llama3" +# +# Run: weaver apply docs/tutorials/04-llm-classification.yaml + +name: LLMClassification +tag: tutorial +engine: local + +dataSources: + - id: tickets + type: File + config: + path: docs/tutorials/data/sample_tickets.csv + format: csv + +transformations: + - id: classified + type: LLMTransform + sources: + - tickets + config: + provider: gemini + model: gemini-2.0-flash + prompt: | + Classify this support ticket: + Title: {title} + Description: {description} + + Return JSON with: + - category: bug, feature_request, question, or complaint + - priority: low, medium, or high + - sentiment: positive, neutral, or negative + inputColumns: title,description + outputSchema: category:string|priority:string|sentiment:string + batchSize: "1" + retryOnError: "3" + + - id: quality + type: DataQuality + sources: + - classified + checks: + - row_count > 0 + onFail: warn + +sinks: + - id: report + type: File + source: quality + config: + path: output/classified_tickets + format: json + saveMode: Overwrite + coalesce: "1" + +profiles: + local: + engine: local + transformations.classified.config.provider: local + transformations.classified.config.model: llama3 + +tests: + - name: "tickets classified" + assert: report.row_count > 0 diff --git a/docs/tutorials/05-production-etl.yaml b/docs/tutorials/05-production-etl.yaml new file mode 100644 index 0000000..2e90852 --- /dev/null +++ b/docs/tutorials/05-production-etl.yaml @@ -0,0 +1,127 @@ +# Tutorial 5: Production ETL pipeline +# Complete production-ready pipeline: PostgreSQL → transform → quality → DeltaLake +# Demonstrates: connections, variables, profiles, quality gates, tests +# +# Setup: +# 1. Copy .env.example to .env and fill in PostgreSQL credentials +# 2. Create connections.yaml with your database connection +# +# Run: +# weaver doctor docs/tutorials/05-production-etl.yaml # check everything +# weaver plan docs/tutorials/05-production-etl.yaml # dry-run +# weaver apply docs/tutorials/05-production-etl.yaml --env dev # local +# weaver apply docs/tutorials/05-production-etl.yaml --env prod # cluster + +name: ProductionETL +tag: daily +engine: auto + +dataSources: + - id: customers + type: PostgreSQL + connection: pg-prod + config: + query: > + SELECT id, name, email, status, created_at, updated_at + FROM customers + WHERE updated_at > '${date.yesterday}' + + - id: transactions + type: File + config: + path: ${env.DATA_LAKE_PATH}/transactions/ + format: parquet + +transformations: + # These two run in PARALLEL (independent sources) + - id: active_customers + type: SQL + sources: + - customers + query: > + SELECT id, name, email, created_at + FROM customers + WHERE status = 'active' + + - id: daily_revenue + type: SQL + sources: + - transactions + query: > + SELECT customer_id, SUM(amount) as total_spent, COUNT(*) as tx_count + FROM transactions + WHERE tx_date >= '${date.yesterday}' + GROUP BY customer_id + + # This waits for both above to complete + - id: enriched + type: SQL + sources: + - active_customers + - daily_revenue + query: > + SELECT + c.id, c.name, c.email, + COALESCE(r.total_spent, 0) as daily_spend, + COALESCE(r.tx_count, 0) as daily_transactions, + CASE + WHEN r.total_spent > 1000 THEN 'high_value' + WHEN r.total_spent > 100 THEN 'medium_value' + ELSE 'standard' + END as segment + FROM active_customers c + LEFT JOIN daily_revenue r ON c.id = r.customer_id + + # Quality gate — blocks pipeline if checks fail + - id: validated + type: DataQuality + sources: + - enriched + checks: + - row_count > 0 + - missing_count(email) = 0 + - duplicate_count(id) = 0 + onFail: abort + +sinks: + - id: warehouse + type: DeltaLake + source: validated + config: + path: ${env.WAREHOUSE_PATH}/customers_daily + saveMode: merge + mergeKey: id + + - id: report + type: File + source: validated + config: + path: output/daily_customer_report + format: json + saveMode: Overwrite + coalesce: "1" + +profiles: + dev: + engine: local + dataSources.customers.type: File + dataSources.customers.config.path: docs/tutorials/data/sample_customers.csv + dataSources.customers.config.format: csv + dataSources.transactions.config.path: docs/tutorials/data/sample_orders.csv + dataSources.transactions.config.format: csv + sinks.warehouse.type: File + sinks.warehouse.config.path: output/dev_warehouse + sinks.warehouse.config.format: parquet + sinks.warehouse.config.saveMode: Overwrite + prod: + engine: spark + spark.executor.memory: 4g + spark.executor.cores: "4" + +tests: + - name: "has enriched data" + assert: warehouse.row_count > 0 + - name: "no missing emails" + assert: warehouse.missing_count(email) = 0 + - name: "no duplicate customers" + assert: warehouse.duplicate_count(id) = 0 diff --git a/docs/tutorials/data/sample_tickets.csv b/docs/tutorials/data/sample_tickets.csv new file mode 100644 index 0000000..59e31e4 --- /dev/null +++ b/docs/tutorials/data/sample_tickets.csv @@ -0,0 +1,6 @@ +id,title,description,status,created_at +1,Login not working,I cannot log in to my account since this morning. I get a 500 error.,open,2026-04-01 +2,Add dark mode,It would be great to have a dark mode option in the settings page.,open,2026-04-02 +3,How to export data?,I need to export my data to CSV but I cannot find the option anywhere.,open,2026-04-03 +4,Slow page load,The dashboard takes 30 seconds to load. This is unacceptable for a paid product.,open,2026-04-04 +5,Great new feature,Just wanted to say the new reporting feature is amazing. Keep up the good work!,closed,2026-04-05