FunctionStream SDK is a powerful Python library for building and deploying serverless functions that process messages from Apache Pulsar. It provides a simple yet flexible framework for creating event-driven applications with robust error handling, metrics collection, and resource management.
- Easy Function Development: Simple API for creating serverless functions
- Message Processing: Built-in support for Apache Pulsar message processing
- Metrics Collection: Automatic collection of performance metrics
- Resource Management: Efficient handling of connections and resources
- Configuration Management: Flexible configuration through YAML files
- Error Handling: Comprehensive error handling and logging
pip install function-stream- Create a function that processes messages:
from function_stream import FSFunction
async def my_process_function(request_data: dict) -> dict:
# Process the request data
result = process_data(request_data)
return {"result": result}
# Initialize and run the function
function = FSFunction(
process_funcs={
'my_module': my_process_function
}
)
await function.start()- Create a configuration file (
config.yaml):
pulsar:
service_url: "pulsar://localhost:6650"
authPlugin: "" # Optional
authParams: "" # Optional
module: "my_module"
subscriptionName: "my-subscription"
requestSource:
- pulsar:
topic: "input-topic"
sink:
pulsar:
topic: "output-topic"- Define your function package (
package.yaml):
name: my_function
type: pulsar
modules:
my_module:
name: my_process
description: "Process incoming messages"
inputSchema:
type: object
properties:
data:
type: string
required:
- data
outputSchema:
type: object
properties:
result:
type: stringThe main class for creating serverless functions. It handles:
- Message consumption and processing
- Response generation
- Resource management
- Metrics collection
- Error handling
The SDK uses YAML configuration files to define:
- Pulsar connection settings
- Module selection
- Topic subscriptions
- Input/output topics
- Custom configuration parameters
Built-in metrics collection for:
- Request processing time
- Success/failure rates
- Message throughput
- Resource utilization
Check out the examples directory for complete examples:
string_function.py: A simple string processing functiontest_string_function.py: Test client for the string functionconfig.yaml: Example configurationpackage.yaml: Example package definition
-
Error Handling
- Always handle exceptions in your process functions
- Use proper logging for debugging
- Implement graceful shutdown
-
Resource Management
- Close resources properly
- Use context managers when possible
- Monitor resource usage
-
Configuration
- Use environment variables for sensitive data
- Validate configuration values
- Document configuration options
-
Testing
- Write unit tests for your functions
- Test error scenarios
- Validate input/output schemas
- Python 3.7+
- Apache Pulsar
- pip
# Create virtual environment
python -m venv venv
source venv/bin/activate # Linux/Mac
# or
.\venv\Scripts\activate # Windows
# Install dependencies
pip install -r requirements.txt
# Install the package in development mode
python -m pip install -e .make testFor support, please open an issue in the GitHub repository or contact the maintainers.