Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ tests:
| `weaver test --coverage` | Show test coverage report |
| `weaver apply <pipeline>` | Execute the pipeline |
| `weaver apply --env prod` | Execute with environment profile |
| `weaver install <artifact>` | Install a connector plugin |
| `weaver scaffold connector <name>` | Generate a connector project (ready to build) |
| `weaver scaffold transform <name>` | Generate a transform plugin project |
| `weaver scaffold registry <name>` | Generate a private connector registry |
| `weaver install <artifact>` | Install a connector plugin |
| `weaver list connectors` | List available source and sink connectors |
| `weaver list transforms` | List available transformation types |
| `weaver list plugins` | List installed plugin JARs |

## Connectors

Expand Down Expand Up @@ -251,6 +259,57 @@ class MyConnector extends SourceConnector {

Package as JAR, drop in `plugins/`, and it's automatically discovered via ServiceLoader.

## Tutorials

Ready-to-run example pipelines in [`docs/tutorials/`](docs/tutorials/):

| Tutorial | Description | Concepts |
|----------|-------------|----------|
| [01 — CSV to Parquet](docs/tutorials/01-csv-to-parquet.yaml) | Read CSV, clean with SQL, validate, write Parquet | File I/O, SQL, DataQuality |
| [02 — Multi-source Join](docs/tutorials/02-multi-source-join.yaml) | Join two sources with parallel transforms | Parallel DAG, aggregation |
| [03 — RAG Pipeline](docs/tutorials/03-rag-pipeline.yaml) | Chunk documents for vector search | Chunking, RAG preparation |
| [04 — LLM Classification](docs/tutorials/04-llm-classification.yaml) | Classify tickets with Gemini/Ollama | LLMTransform, local models |
| [05 — Production ETL](docs/tutorials/05-production-etl.yaml) | Full pipeline: PostgreSQL → DeltaLake | Connections, profiles, merge |

```bash
# Run a tutorial
weaver validate docs/tutorials/01-csv-to-parquet.yaml
weaver plan docs/tutorials/01-csv-to-parquet.yaml
weaver apply docs/tutorials/01-csv-to-parquet.yaml
```

## Plugin Registry

Install connector plugins without recompiling:

```bash
# Install from Maven Central
weaver install connector-kafka
weaver install com.example:my-connector:1.0.0

# Install local JAR
weaver install /path/to/connector.jar

# List what's available
weaver list connectors
weaver list transforms
weaver list plugins
```

### Build Your Own Connector

```bash
# Scaffold a complete connector project (build.sbt, source code, tests, ServiceLoader)
weaver scaffold connector my-redis-connector
weaver scaffold connector my-api-connector --type both # source + sink
weaver scaffold transform my-custom-transform

# For your company: create a private registry
weaver scaffold registry my-company-connectors
```

Plugins are loaded from `~/.weaver/plugins/` automatically. See the [Connector SDK](docs/connector-sdk/CONNECTOR_SDK.md) for the full guide.

## Architecture

```
Expand Down
43 changes: 42 additions & 1 deletion cli/src/main/scala/com/dataweaver/cli/WeaverCLI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ object WeaverCLI {
showCoverage: Boolean = false,
description: Option[String] = None,
interactive: Boolean = false,
projectName: Option[String] = None
projectName: Option[String] = None,
artifact: Option[String] = None,
listCategory: Option[String] = None,
scaffoldType: Option[String] = None,
scaffoldName: Option[String] = None,
connectorKind: String = "source"
)

def main(args: Array[String]): Unit = {
Expand Down Expand Up @@ -45,6 +50,36 @@ object WeaverCLI {
.action((x, c) => c.copy(description = Some(x)))
.text("Natural language description of the pipeline")
),
cmd("scaffold")
.action((_, c) => c.copy(command = "scaffold"))
.text("Generate a connector, transform, or private registry project")
.children(
arg[String]("<type>")
.action((x, c) => c.copy(scaffoldType = Some(x)))
.text("What to scaffold: connector, transform, registry"),
arg[String]("<name>")
.action((x, c) => c.copy(scaffoldName = Some(x)))
.text("Project name"),
opt[String]("type")
.action((x, c) => c.copy(connectorKind = x))
.text("Connector kind: source (default), sink, or both")
),
cmd("install")
.action((_, c) => c.copy(command = "install"))
.text("Install a connector plugin from Maven Central or local JAR")
.children(
arg[String]("<artifact>")
.action((x, c) => c.copy(artifact = Some(x)))
.text("Artifact: connector-kafka, groupId:artifactId:version, or /path/to.jar")
),
cmd("list")
.action((_, c) => c.copy(command = "list"))
.text("List available connectors, transforms, or installed plugins")
.children(
arg[String]("<category>")
.action((x, c) => c.copy(listCategory = Some(x)))
.text("Category: connectors, transforms, plugins, all")
),
cmd("doctor")
.action((_, c) => c.copy(command = "doctor"))
.text("Full system diagnostic")
Expand Down Expand Up @@ -122,6 +157,12 @@ object WeaverCLI {
case "init" =>
if (config.interactive) InteractiveWizard.run()
else InitCommand.run(config.projectName.getOrElse("my-project"))
case "scaffold" =>
ScaffoldCommand.run(config.scaffoldType.get, config.scaffoldName.get, config.connectorKind)
case "install" =>
InstallCommand.run(config.artifact.get)
case "list" =>
ListCommand.run(config.listCategory.getOrElse("all"))
case "generate" =>
GenerateCommand.run(config.description.get)
case "doctor" =>
Expand Down
131 changes: 131 additions & 0 deletions cli/src/main/scala/com/dataweaver/cli/commands/InstallCommand.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.dataweaver.cli.commands

import com.dataweaver.core.plugin.PluginLoader
import org.apache.log4j.LogManager

import java.io.{File, FileOutputStream}
import java.net.URI
import java.net.http.{HttpClient, HttpRequest, HttpResponse}

/** Installs connector plugins from Maven Central or a custom registry.
*
* Usage:
* weaver install connector-kafka # shorthand
* weaver install com.dataweaver:connector-kafka:1.0.0 # full Maven coords
* weaver install /path/to/connector.jar # local JAR
*/
object InstallCommand {
private val logger = LogManager.getLogger(getClass)
private val httpClient = HttpClient.newHttpClient()

private val pluginDir = sys.props.getOrElse("user.home", ".") + "/.weaver/plugins"

/** Known shorthand mappings for official connectors. */
private val shorthands = Map(
"connector-kafka" -> ("com.dataweaver", "data-weaver-connector-kafka", "0.2.0"),
"connector-mongodb" -> ("com.dataweaver", "data-weaver-connector-mongodb", "0.2.0"),
"connector-elasticsearch" -> ("com.dataweaver", "data-weaver-connector-elasticsearch", "0.2.0"),
"connector-redis" -> ("com.dataweaver", "data-weaver-connector-redis", "0.2.0"),
"connector-s3" -> ("com.dataweaver", "data-weaver-connector-s3", "0.2.0"),
"connector-neo4j" -> ("com.dataweaver", "data-weaver-connector-neo4j", "0.2.0")
)

def run(artifact: String): Unit = {
PluginLoader.ensurePluginDirs()

// Local JAR file
if (artifact.endsWith(".jar") && new File(artifact).exists()) {
installLocalJar(artifact)
return
}

// Shorthand or Maven coordinates
val (groupId, artifactId, version) = shorthands.getOrElse(artifact, parseMavenCoords(artifact))

println(s"Installing $groupId:$artifactId:$version...")

val jarUrl = mavenCentralUrl(groupId, artifactId, version)
val targetFile = new File(s"$pluginDir/$artifactId-$version.jar")

if (targetFile.exists()) {
println(s" Already installed: ${targetFile.getName}")
return
}

downloadJar(jarUrl, targetFile) match {
case Right(_) =>
println(s" Downloaded to: ${targetFile.getAbsolutePath}")
println(s" Connector will be available on next weaver command.")
println()
case Left(error) =>
System.err.println(s" Failed to download: $error")
System.err.println()
System.err.println(" If this is a local JAR, use:")
System.err.println(s" weaver install /path/to/$artifactId.jar")
System.err.println()
System.err.println(" Or copy manually:")
System.err.println(s" cp /path/to/$artifactId.jar $pluginDir/")
}
}

/** List installed plugins. */
def listInstalled(): List[String] = {
val dir = new File(pluginDir)
if (!dir.exists()) return List.empty
dir.listFiles()
.filter(f => f.isFile && f.getName.endsWith(".jar"))
.map(_.getName)
.toList
}

private def installLocalJar(path: String): Unit = {
val source = new File(path)
val target = new File(s"$pluginDir/${source.getName}")

java.nio.file.Files.copy(source.toPath, target.toPath,
java.nio.file.StandardCopyOption.REPLACE_EXISTING)

println(s" Installed: ${target.getName}")
println(s" Location: ${target.getAbsolutePath}")
}

private def parseMavenCoords(coords: String): (String, String, String) = {
coords.split(":") match {
case Array(g, a, v) => (g, a, v)
case Array(g, a) => (g, a, "LATEST")
case _ =>
throw new IllegalArgumentException(
s"Invalid artifact format: '$coords'. Expected: groupId:artifactId:version")
}
}

private def mavenCentralUrl(groupId: String, artifactId: String, version: String): String = {
val groupPath = groupId.replace('.', '/')
s"https://repo1.maven.org/maven2/$groupPath/$artifactId/$version/$artifactId-$version.jar"
}

private def downloadJar(url: String, target: File): Either[String, Unit] = {
try {
val request = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.build()

val response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream())

if (response.statusCode() != 200) {
return Left(s"HTTP ${response.statusCode()} from $url")
}

val fos = new FileOutputStream(target)
try {
response.body().transferTo(fos)
Right(())
} finally {
fos.close()
}
} catch {
case e: Exception => Left(e.getMessage)
}
}
}
62 changes: 62 additions & 0 deletions cli/src/main/scala/com/dataweaver/cli/commands/ListCommand.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.dataweaver.cli.commands

import com.dataweaver.core.plugin.{PluginLoader, PluginRegistry}

/** Lists available connectors, transforms, and installed plugins. */
object ListCommand {

def run(category: String): Unit = {
// Load external plugins first
PluginLoader.loadExternalPlugins()

category match {
case "connectors" => listConnectors()
case "transforms" => listTransforms()
case "plugins" => listPlugins()
case "all" => listConnectors(); println(); listTransforms(); println(); listPlugins()
case other =>
System.err.println(s"Unknown category: '$other'. Use: connectors, transforms, plugins, all")
}
}

private def listConnectors(): Unit = {
println()
println(" Source Connectors:")
println(" " + "\u2500" * 40)
val sources = PluginRegistry.availableSources.toList.sorted
if (sources.isEmpty) println(" (none loaded)")
else sources.foreach(s => println(s" \u2022 $s"))

println()
println(" Sink Connectors:")
println(" " + "\u2500" * 40)
val sinks = PluginRegistry.availableSinks.toList.sorted
if (sinks.isEmpty) println(" (none loaded)")
else sinks.foreach(s => println(s" \u2022 $s"))
}

private def listTransforms(): Unit = {
println()
println(" Transform Types:")
println(" " + "\u2500" * 40)
val transforms = PluginRegistry.availableTransforms.toList.sorted
if (transforms.isEmpty) println(" (none loaded)")
else transforms.foreach(t => println(s" \u2022 $t"))
}

private def listPlugins(): Unit = {
val installed = InstallCommand.listInstalled()
println()
println(" Installed Plugins:")
println(" " + "\u2500" * 40)
if (installed.isEmpty) {
println(" (none)")
println()
println(" Install plugins with:")
println(" weaver install connector-kafka")
println(" weaver install /path/to/connector.jar")
} else {
installed.foreach(p => println(s" \u2022 $p"))
}
}
}
Loading
Loading