The Databricks MCP Toolkit is designed as a multi-layered architecture that provides a robust, scalable interface between AI agents and Databricks services through the Model Context Protocol (MCP).
Location: src/server/databricks_mcp_server.py
This layer implements the MCP specification and exposes Databricks functionality as standardized MCP tools.
Responsibilities:
- MCP protocol compliance
- Tool registration and discovery
- Request/response marshaling
- Error handling and standardization
Key Components:
DatabricksMCPServer: Main server class implementing MCP handlers- Tool definitions with JSON schemas
- Protocol-level error handling
Location: src/api/
Provides high-level abstractions over Databricks REST APIs with consistent error handling and async support.
Modules:
src/api/
├── clusters.py # Cluster lifecycle management
├── jobs.py # Job and run operations
├── notebooks.py # Notebook management
├── sql.py # SQL execution
├── command_execution.py # Command execution contexts
└── dbfs.py # File system operations
Design Patterns:
- Async/await for non-blocking I/O
- Consistent error handling with custom exceptions
- Type hints for better IDE support
- Retry logic for transient failures
Location: src/core/
Provides foundational services used across the application.
Components:
auth.py: Authentication and token managementconfig.py: Configuration management with Pydanticutils.py: HTTP client, logging, and utilities
Features:
- Environment-based configuration
- Connection pooling for HTTP requests
- Structured logging
- Token validation and refresh
Location: src/cli/
Optional command-line interface for direct server interaction.
Capabilities:
- Server startup and shutdown
- Configuration validation
- Health checks
- Tool invocation
AI Agent Request
↓
MCP Protocol Handler
↓
Parameter Validation
↓
API Client Method
↓
HTTP Request (with retry)
↓
Databricks REST API
↓
Response Processing
↓
Result Marshaling
↓
MCP Response to Agent
Create Context
↓
Initialize on Cluster
↓
Execute Command(s) ←─┐
↓ │
Get Status (polling) │
↓ │
More Commands? ──────┘
↓
Destroy Context
Rationale: Databricks operations are I/O-bound, making async operations ideal for performance.
Implementation:
async def execute_sql(params: Dict[str, Any]) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.post(...)
return await process_response(response)Approach: Each layer has specific error handling responsibilities.
Layers:
- MCP Layer: Protocol-level errors (malformed requests)
- API Layer: Service-level errors (authentication, permissions)
- HTTP Layer: Network-level errors (timeouts, retries)
Strategy: Environment-based with validation.
class DatabricksConfig(BaseSettings):
host: str
token: SecretStr
warehouse_id: Optional[str] = None
class Config:
env_prefix = "DATABRICKS_"
case_sensitive = FalseApproach: Comprehensive type hints throughout codebase.
Benefits:
- Compile-time error detection
- Better IDE support
- Self-documenting code
HTTP client maintains persistent connections:
client = httpx.AsyncClient(
limits=httpx.Limits(max_keepalive_connections=20),
timeout=httpx.Timeout(60.0)
)Multiple tools can execute concurrently:
results = await asyncio.gather(
server.call_tool('list_clusters', ...),
server.call_tool('list_jobs', ...),
server.call_tool('execute_sql', ...)
)Context managers ensure proper cleanup:
async with create_execution_context(...) as context:
await execute_commands(context)
# Context automatically destroyed on exitEnvironment Variables
↓
Configuration Loader
↓
Token Validator
↓
Request Authenticator
↓
Databricks API
- Storage: Environment variables only, never in code
- Transmission: HTTPS only, token in Authorization header
- Validation: Token format and expiration checks
- Scope: Minimum required permissions
- Define tool schema in
DatabricksMCPServer - Implement API method in appropriate
src/api/module - Register tool handler in server
- Add tests in
tests/
Example:
# In src/api/new_feature.py
async def new_operation(params: Dict[str, Any]) -> Dict[str, Any]:
"""Implement new Databricks operation."""
pass
# In src/server/databricks_mcp_server.py
@server.list_tools()
async def handle_list_tools():
return [{
"name": "new_operation",
"description": "Description of operation",
"inputSchema": {...}
}]Create new module in src/api/:
# src/api/new_service.py
from src.core.utils import get_http_client
from src.core.config import get_config
async def service_operation(**kwargs):
"""Implement service-specific logic."""
config = get_config()
client = get_http_client()
# Implementationtests/
├── test_mcp_server.py # MCP protocol compliance
├── test_clusters.py # Cluster API tests
├── test_jobs.py # Job API tests
├── test_sql.py # SQL execution tests
└── conftest.py # Shared fixtures
Mock external dependencies:
@pytest.fixture
def mock_databricks_api(mocker):
return mocker.patch('httpx.AsyncClient.post')
async def test_list_clusters(mock_databricks_api):
mock_databricks_api.return_value.json.return_value = {...}
result = await list_clusters()
assert result is not None- Tool Invocation Latency: Time from MCP request to response
- API Call Duration: Time for Databricks API calls
- Connection Pool Utilization: Active vs. idle connections
- Error Rate: Failed requests per total requests
- MCP tool execution time
- HTTP request/response time
- Retry attempts and success rate
- Memory usage for large result sets
- Caching Layer: Cache frequently accessed data (cluster lists, job definitions)
- Batch Operations: Support for bulk operations where applicable
- Webhook Integration: Event-driven notifications for long-running operations
- Metrics Export: Prometheus-compatible metrics endpoint
- Rate Limit Management: Intelligent request throttling
Following semantic versioning:
- Patch: Bug fixes, no API changes
- Minor: New tools, backward compatible
- Major: Breaking changes to existing tools
- mcp: MCP protocol implementation
- httpx: Async HTTP client
- pydantic: Data validation and settings
- databricks-sdk: Official Databricks SDK (for reference)
- pytest: Testing framework
- pytest-asyncio: Async test support
- black: Code formatting
- pylint: Static analysis
- mypy: Type checking
python -m src.mainFROM python:3.10-slim
WORKDIR /app
COPY . .
RUN pip install -e .
CMD ["python", "-m", "src.main"]Embed in larger applications:
from src.server.databricks_mcp_server import DatabricksMCPServer
async def main():
server = DatabricksMCPServer()
# Use server in applicationDocument Version: 1.0
Last Updated: January 2025
Maintainer: Kush Patel