Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions cli/src/main/scala/com/dataweaver/cli/WeaverCLI.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.dataweaver.cli

import com.dataweaver.cli.commands._
import com.dataweaver.core.config.ProfileApplier
import com.dataweaver.core.engine.EngineSelector
import org.apache.spark.sql.SparkSession
import com.dataweaver.cli.wizard.InteractiveWizard
import scopt.OParser

object WeaverCLI {
Expand All @@ -14,7 +12,10 @@ object WeaverCLI {
env: Option[String] = None,
inspectId: Option[String] = None,
autoGenerate: Boolean = false,
showCoverage: Boolean = false
showCoverage: Boolean = false,
description: Option[String] = None,
interactive: Boolean = false,
projectName: Option[String] = None
)

def main(args: Array[String]): Unit = {
Expand All @@ -24,6 +25,26 @@ object WeaverCLI {
OParser.sequence(
programName("weaver"),
head("Data Weaver", "0.2.0"),
cmd("init")
.action((_, c) => c.copy(command = "init"))
.text("Initialize a new project or generate pipeline interactively")
.children(
arg[String]("<project-name>")
.optional()
.action((x, c) => c.copy(projectName = Some(x)))
.text("Project name"),
opt[Unit]("interactive")
.action((_, c) => c.copy(interactive = true))
.text("Step-by-step pipeline wizard (no LLM required)")
),
cmd("generate")
.action((_, c) => c.copy(command = "generate"))
.text("Generate pipeline YAML from natural language description")
.children(
arg[String]("<description>")
.action((x, c) => c.copy(description = Some(x)))
.text("Natural language description of the pipeline")
),
cmd("doctor")
.action((_, c) => c.copy(command = "doctor"))
.text("Full system diagnostic")
Expand Down Expand Up @@ -98,6 +119,11 @@ object WeaverCLI {
OParser.parse(parser, args, Config()) match {
case Some(config) =>
config.command match {
case "init" =>
if (config.interactive) InteractiveWizard.run()
else InitCommand.run(config.projectName.getOrElse("my-project"))
case "generate" =>
GenerateCommand.run(config.description.get)
case "doctor" =>
val result = DoctorCommand.run(config.pipeline.get)
if (!result.overallHealthy) sys.exit(1)
Expand Down
108 changes: 108 additions & 0 deletions cli/src/main/scala/com/dataweaver/cli/ai/LLMClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.dataweaver.cli.ai

import org.apache.log4j.LogManager

import java.net.URI
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import scala.util.{Failure, Success, Try}

/** HTTP client for LLM APIs (Claude, OpenAI).
* Uses java.net.http — no external HTTP library dependencies.
*/
object LLMClient {
private val logger = LogManager.getLogger(getClass)
private val httpClient = HttpClient.newHttpClient()

/** Call an LLM API and return the generated text.
* @param prompt The user prompt
* @param config AI configuration (provider, apiKey, model)
* @return Right(generated text) or Left(error message)
*/
def generate(prompt: String, config: WeaverConfig.AIConfig): Either[String, String] = {
if (config.apiKey.isEmpty) {
return Left(
s"No API key configured for '${config.provider}'. " +
s"Set it via ~/.weaver/config.yaml or environment variable " +
s"(ANTHROPIC_API_KEY for Claude, OPENAI_API_KEY for OpenAI)")
}

config.provider match {
case "claude" => callClaude(prompt, config)
case "openai" => callOpenAI(prompt, config)
case other => Left(s"Unknown AI provider '$other'. Supported: claude, openai")
}
}

private def callClaude(prompt: String, config: WeaverConfig.AIConfig): Either[String, String] = {
val body = s"""{
"model": "${config.model}",
"max_tokens": 4096,
"messages": [{"role": "user", "content": ${escapeJson(prompt)}}]
}"""

val request = HttpRequest.newBuilder()
.uri(URI.create("https://api.anthropic.com/v1/messages"))
.header("Content-Type", "application/json")
.header("x-api-key", config.apiKey)
.header("anthropic-version", "2023-06-01")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build()

executeRequest(request).flatMap(extractClaudeResponse)
}

private def callOpenAI(prompt: String, config: WeaverConfig.AIConfig): Either[String, String] = {
val body = s"""{
"model": "${config.model}",
"messages": [{"role": "user", "content": ${escapeJson(prompt)}}],
"max_tokens": 4096
}"""

val request = HttpRequest.newBuilder()
.uri(URI.create("https://api.openai.com/v1/chat/completions"))
.header("Content-Type", "application/json")
.header("Authorization", s"Bearer ${config.apiKey}")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build()

executeRequest(request).flatMap(extractOpenAIResponse)
}

private def executeRequest(request: HttpRequest): Either[String, String] = {
Try(httpClient.send(request, HttpResponse.BodyHandlers.ofString())) match {
case Success(response) if response.statusCode() >= 200 && response.statusCode() < 300 =>
Right(response.body())
case Success(response) =>
Left(s"API error (${response.statusCode()}): ${response.body().take(500)}")
case Failure(e) =>
Left(s"Request failed: ${e.getMessage}")
}
}

/** Extract text content from Claude API response JSON. */
private def extractClaudeResponse(json: String): Either[String, String] = {
// Simple JSON extraction without a JSON library
val contentPattern = """"text"\s*:\s*"((?:[^"\\]|\\.)*)"""".r
contentPattern.findFirstMatchIn(json) match {
case Some(m) => Right(unescapeJson(m.group(1)))
case None => Left(s"Cannot parse Claude response: ${json.take(500)}")
}
}

/** Extract text content from OpenAI API response JSON. */
private def extractOpenAIResponse(json: String): Either[String, String] = {
val contentPattern = """"content"\s*:\s*"((?:[^"\\]|\\.)*)"""".r
contentPattern.findFirstMatchIn(json) match {
case Some(m) => Right(unescapeJson(m.group(1)))
case None => Left(s"Cannot parse OpenAI response: ${json.take(500)}")
}
}

private def escapeJson(s: String): String = {
"\"" + s.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") + "\""
}

private def unescapeJson(s: String): String = {
s.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t").replace("\\\"", "\"").replace("\\\\", "\\")
}
}
92 changes: 92 additions & 0 deletions cli/src/main/scala/com/dataweaver/cli/ai/PromptBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.dataweaver.cli.ai

import scala.io.Source
import scala.util.Try

/** Builds schema-grounded prompts for LLM pipeline generation.
* The prompt includes the JSON Schema so the LLM generates structurally valid YAML.
*/
object PromptBuilder {

/** Build a prompt for generating a pipeline YAML from a natural language description.
* @param description User's natural language description of the pipeline
* @return Complete prompt with schema grounding
*/
def buildGeneratePrompt(description: String): String = {
val schema = loadSchema()

s"""You are a data pipeline generator for Data Weaver, a declarative ETL framework.

Generate a valid YAML pipeline based on this description:
"$description"

IMPORTANT RULES:
1. Output ONLY the YAML content, no markdown fences, no explanation
2. The YAML must conform to this JSON Schema:

$schema

3. Available source types: PostgreSQL, MySQL, File, Test
4. Available sink types: BigQuery, DeltaLake, File, Test
5. Available transform types: SQL, DataQuality
6. Every sink MUST have a "source" field pointing to a transform id
7. Use SQL transforms with standard SQL queries
8. Add DataQuality checks where appropriate (row_count > 0, missing_count, duplicate_count)
9. Use $${env.VAR} for any credentials or sensitive values
10. Add inline tests to validate the pipeline output
11. Set engine to "auto"

CONNECTOR CONFIG REFERENCE:

PostgreSQL source config: host, port, database, user, password, query
MySQL source config: host, port, db, user, password, query, driver
File source config: path, format (csv|json|parquet|orc), header (true|false)
BigQuery sink config: projectId, datasetName, tableName, temporaryGcsBucket, saveMode
DeltaLake sink config: path, saveMode (Overwrite|Append|merge), mergeKey
File sink config: path, format (csv|json|parquet|orc), saveMode, coalesce, partitionBy

Generate the YAML now:"""
}

/** Build a prompt for fixing a pipeline that failed validation.
* @param yaml The invalid YAML
* @param errors Validation error messages
* @return Prompt asking the LLM to fix the errors
*/
def buildFixPrompt(yaml: String, errors: List[String]): String = {
s"""The following Data Weaver pipeline YAML has validation errors. Fix them.

CURRENT YAML:
$yaml

ERRORS:
${errors.mkString("\n")}

RULES:
1. Output ONLY the corrected YAML, no explanation
2. Fix all listed errors
3. Do not change parts that are already correct
4. Every sink must have a "source" field
5. All source references must exist as dataSource or transformation ids

Generate the corrected YAML now:"""
}

/** Load the pipeline JSON Schema from resources. */
private def loadSchema(): String = {
Try {
val stream = getClass.getClassLoader.getResourceAsStream("schemas/pipeline.schema.json")
if (stream != null) {
val source = Source.fromInputStream(stream)
try source.mkString finally source.close()
} else {
// Fallback: read from file
val source = Source.fromFile("core/src/main/resources/schemas/pipeline.schema.json")
try source.mkString finally source.close()
}
}.getOrElse {
// Minimal inline schema as last resort
"""{"type":"object","required":["name"],"properties":{"name":{"type":"string"},"dataSources":{"type":"array"},"transformations":{"type":"array"},"sinks":{"type":"array"}}}"""
}
}
}
80 changes: 80 additions & 0 deletions cli/src/main/scala/com/dataweaver/cli/ai/WeaverConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.dataweaver.cli.ai

import scala.io.Source
import scala.util.Try

/** Loads ~/.weaver/config.yaml for AI settings.
*
* Expected format:
* {{{
* ai:
* provider: claude # claude | openai
* apiKey: ${env.ANTHROPIC_API_KEY}
* model: claude-sonnet-4-20250514
* maxRetries: 3
* }}}
*/
object WeaverConfig {

case class AIConfig(
provider: String = "claude",
apiKey: String = "",
model: String = "claude-sonnet-4-20250514",
maxRetries: Int = 3
)

private val configPath = sys.props.getOrElse("user.home", ".") + "/.weaver/config.yaml"

/** Load AI configuration from ~/.weaver/config.yaml.
* Falls back to environment variables if config file doesn't exist.
*/
def loadAIConfig(): AIConfig = {
val fromFile = loadFromFile(configPath)

// Resolve API key from env if it's a ${env.X} reference or empty
val apiKey = resolveApiKey(fromFile.apiKey, fromFile.provider)

fromFile.copy(apiKey = apiKey)
}

private def loadFromFile(path: String): AIConfig = {
Try {
val content = Source.fromFile(path).mkString
val lines = content.split('\n').map(_.trim)

var provider = "claude"
var apiKey = ""
var model = "claude-sonnet-4-20250514"
var maxRetries = 3

lines.foreach {
case l if l.startsWith("provider:") => provider = l.split(":", 2)(1).trim.split("#")(0).trim
case l if l.startsWith("apiKey:") => apiKey = l.split(":", 2)(1).trim
case l if l.startsWith("model:") => model = l.split(":", 2)(1).trim.split("#")(0).trim
case l if l.startsWith("maxRetries:") =>
maxRetries = Try(l.split(":", 2)(1).trim.toInt).getOrElse(3)
case _ =>
}

AIConfig(provider, apiKey, model, maxRetries)
}.getOrElse(AIConfig())
}

private def resolveApiKey(key: String, provider: String): String = {
val envPattern = """\$\{env\.([^}]+)\}""".r

envPattern.findFirstMatchIn(key) match {
case Some(m) =>
sys.env.getOrElse(m.group(1), "")
case None if key.nonEmpty =>
key
case _ =>
// Fallback to standard env vars
provider match {
case "claude" => sys.env.getOrElse("ANTHROPIC_API_KEY", "")
case "openai" => sys.env.getOrElse("OPENAI_API_KEY", "")
case _ => ""
}
}
}
}
Loading
Loading