Skip to content

Commit 077bc8b

Browse files
committed
support go sdk
1 parent f5245ac commit 077bc8b

22 files changed

Lines changed: 1612 additions & 114 deletions

Makefile

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ C_0 := \033[0m
4242
log = @printf "$(C_B)[-]$(C_0) %-15s %s\n" "$(1)" "$(2)"
4343
success = @printf "$(C_G)[✔]$(C_0) %s\n" "$(1)"
4444

45-
.PHONY: all help build build-lite dist dist-lite clean test env env-clean docker docker-run docker-push .check-env .build-wasm
45+
.PHONY: all help build build-lite dist dist-lite clean test env env-clean go-sdk-env go-sdk-build go-sdk-clean docker docker-run docker-push .check-env .build-wasm
4646

4747
all: build
4848

@@ -54,6 +54,9 @@ help:
5454
@echo " dist Package full build (.tar.gz / .zip)"
5555
@echo " dist-lite Package lite build (.tar.gz / .zip)"
5656
@echo " env Setup Python dev environment (.venv)"
57+
@echo " go-sdk-env Setup Go SDK toolchain"
58+
@echo " go-sdk-build Copy WIT, generate bindings, build Go SDK"
59+
@echo " go-sdk-clean Remove generated Go SDK artifacts"
5760
@echo " test Run unit tests"
5861
@echo " clean Cleanup all artifacts"
5962
@echo " docker Build Docker image"
@@ -122,6 +125,16 @@ env:
122125
@./scripts/setup.sh
123126
$(call success,Environment Ready)
124127

128+
go-sdk-build:
129+
$(call log,GO,Building Go SDK)
130+
@$(MAKE) -C go-sdk build
131+
$(call success,Go SDK build complete)
132+
133+
go-sdk-clean:
134+
$(call log,GO,Cleaning Go SDK generated artifacts)
135+
@$(MAKE) -C go-sdk clean
136+
$(call success,Go SDK artifacts removed)
137+
125138
env-clean:
126139
$(call log,CLEAN,Python artifacts)
127140
@./scripts/clean.sh

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ function-stream/
6060
├── conf/ # Default runtime configuration
6161
├── docs/ # Documentation (English & Chinese)
6262
├── examples/ # Sample processors
63+
├── go-sdk/ # Go SDK and generated WIT bindings
6364
├── python/ # Python API, Client, and Runtime (WASM)
6465
├── scripts/ # Build and environment automation scripts
6566
├── Makefile # Unified build system

docs/go-sdk-guide-zh.md

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
<!--
2+
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
20+
-->
21+
22+
# Go SDK 指南
23+
24+
Function Stream 为 Go 开发者提供基于 WebAssembly(WASI P2)的算子开发 SDK。使用 TinyGo 将 Go 代码编译为 WASM 组件,在服务端沙箱中运行,具备 KV 状态存储与数据发射能力。
25+
26+
---
27+
28+
## 一、SDK 核心组件
29+
30+
| 组件 | 定位 | 说明 |
31+
|------|------|------|
32+
| **fssdk** | 主包 | 对外入口,提供 `Driver``Context``Store` 等类型及 `Run(driver)`|
33+
| **api** | 接口定义 | 定义 `Driver``Context``Store``Iterator``ComplexKey` 及错误码。 |
34+
| **impl** | 运行时实现 | 将 Driver 与 WASM 宿主(processor WIT)桥接,内部使用。 |
35+
| **bindings** | WIT 绑定 | 由 wit-bindgen-go 根据 `wit/processor.wit` 生成的 Go 绑定,供 impl 调用。 |
36+
37+
Go 算子**仅依赖 fssdk**:实现 `Driver`(或嵌入 `BaseDriver`),在 `init()` 中调用 `fssdk.Run(&YourProcessor{})` 即可。
38+
39+
---
40+
41+
## 二、Driver 接口与 BaseDriver
42+
43+
### 2.1 Driver 接口
44+
45+
所有 Go 算子必须实现 `fssdk.Driver` 接口。运行时在对应时机调用以下方法:
46+
47+
| 方法 | 触发时机 | 说明 |
48+
|------|----------|------|
49+
| `Init(ctx, config)` | 函数启动时执行一次 | 初始化状态、获取 Store、解析 config。 |
50+
| `Process(ctx, sourceID, data)` | 每收到一条消息时 | 核心处理逻辑:计算、状态读写、`ctx.Emit()`|
51+
| `ProcessWatermark(ctx, sourceID, watermark)` | 收到水位线事件时 | 处理基于时间的窗口或乱序重排,可转发 `EmitWatermark`|
52+
| `TakeCheckpoint(ctx, checkpointID)` | 系统做状态备份时 | 可持久化额外内存状态,保证强一致性。 |
53+
| `CheckHeartbeat(ctx)` | 定期健康检查 | 返回 `false` 会触发算子重启。 |
54+
| `Close(ctx)` | 函数关闭时 | 释放资源、清空引用。 |
55+
| `Exec(ctx, className, modules)` | 扩展能力(可选) | 动态加载模块等,默认可不实现。 |
56+
| `Custom(ctx, payload)` | 自定义 RPC(可选) | 请求/响应自定义字节,默认返回 payload 副本。 |
57+
58+
### 2.2 BaseDriver
59+
60+
`fssdk.BaseDriver` 为上述所有方法提供空实现。嵌入后只需重写你关心的方法:
61+
62+
```go
63+
type MyProcessor struct {
64+
fssdk.BaseDriver
65+
store fssdk.Store
66+
}
67+
68+
func (p *MyProcessor) Init(ctx fssdk.Context, config map[string]string) error {
69+
store, err := ctx.GetOrCreateStore("my-store")
70+
if err != nil {
71+
return err
72+
}
73+
p.store = store
74+
return nil
75+
}
76+
77+
func (p *MyProcessor) Process(ctx fssdk.Context, sourceID uint32, data []byte) error {
78+
// 业务逻辑,读写 p.store,最后 ctx.Emit(...)
79+
return ctx.Emit(0, result)
80+
}
81+
```
82+
83+
### 2.3 注册入口
84+
85+
`init()` 中调用 `fssdk.Run`,传入你的 Driver 实例(通常为单例):
86+
87+
```go
88+
func init() {
89+
fssdk.Run(&MyProcessor{})
90+
}
91+
```
92+
93+
---
94+
95+
## 三、Context 与 Store
96+
97+
### 3.1 Context
98+
99+
`fssdk.Context` 在 Init / Process / ProcessWatermark 等回调中传入,提供:
100+
101+
| 方法 | 说明 |
102+
|------|------|
103+
| `Emit(targetID uint32, data []byte) error` | 将数据发往指定输出通道。 |
104+
| `EmitWatermark(targetID uint32, watermark uint64) error` | 发射水位线。 |
105+
| `GetOrCreateStore(name string) (Store, error)` | 按名称获取或创建 KV Store(基于 RocksDB)。 |
106+
| `Config() map[string]string` | 获取启动时下发的配置(对应 config.yaml 中的 init 等)。 |
107+
| `Close() error` | 关闭 Context,一般由运行时管理。 |
108+
109+
### 3.2 Store(KV 状态)
110+
111+
`fssdk.Store` 提供键值存储与复杂键能力:
112+
113+
**基础 API:**
114+
115+
- `PutState(key, value []byte) error`
116+
- `GetState(key []byte) (value []byte, found bool, err error)`
117+
- `DeleteState(key []byte) error`
118+
- `ListStates(startInclusive, endExclusive []byte) ([][]byte, error)`:按字节序范围列出键。
119+
120+
**ComplexKey(多维键/前缀扫描):**
121+
122+
- `Put(key ComplexKey, value []byte)` / `Get` / `Delete` / `Merge` / `DeletePrefix`
123+
- `ListComplex(...)`:按 keyGroup、key、namespace 及范围列出 UserKey。
124+
- `ScanComplex(keyGroup, key, namespace []byte) (Iterator, error)`:返回迭代器,适用大范围扫描。
125+
126+
`ComplexKey` 结构包含 `KeyGroup``Key``Namespace``UserKey`,用于多维索引与前缀查询。
127+
128+
### 3.3 Iterator
129+
130+
`Store.ScanComplex` 返回的 `fssdk.Iterator` 接口:
131+
132+
- `HasNext() (bool, error)`
133+
- `Next() (key, value []byte, ok bool, err error)`
134+
- `Close() error`(用毕须调用以释放资源)
135+
136+
---
137+
138+
## 四、生产级示例(词频统计)
139+
140+
```go
141+
package main
142+
143+
import (
144+
"encoding/json"
145+
fssdk "github.com/functionstream/function-stream/go-sdk"
146+
"strconv"
147+
"strings"
148+
)
149+
150+
func init() {
151+
fssdk.Run(&CounterProcessor{})
152+
}
153+
154+
type CounterProcessor struct {
155+
fssdk.BaseDriver
156+
store fssdk.Store
157+
counterMap map[string]int64
158+
totalProcessed int64
159+
keyPrefix string
160+
}
161+
162+
func (p *CounterProcessor) Init(ctx fssdk.Context, config map[string]string) error {
163+
store, err := ctx.GetOrCreateStore("counter-store")
164+
if err != nil {
165+
return err
166+
}
167+
p.store = store
168+
p.counterMap = make(map[string]int64)
169+
p.totalProcessed = 0
170+
p.keyPrefix = strings.TrimSpace(config["key_prefix"])
171+
return nil
172+
}
173+
174+
func (p *CounterProcessor) Process(ctx fssdk.Context, sourceID uint32, data []byte) error {
175+
inputStr := strings.TrimSpace(string(data))
176+
if inputStr == "" {
177+
return nil
178+
}
179+
p.totalProcessed++
180+
181+
fullKey := p.keyPrefix + inputStr
182+
existing, found, err := p.store.GetState([]byte(fullKey))
183+
if err != nil {
184+
return err
185+
}
186+
currentCount := int64(0)
187+
if found {
188+
if n, e := strconv.ParseInt(string(existing), 10, 64); e == nil {
189+
currentCount = n
190+
}
191+
}
192+
newCount := currentCount + 1
193+
p.counterMap[inputStr] = newCount
194+
if err = p.store.PutState([]byte(fullKey), []byte(strconv.FormatInt(newCount, 10))); err != nil {
195+
return err
196+
}
197+
198+
out := map[string]interface{}{
199+
"total_processed": p.totalProcessed,
200+
"counter_map": p.counterMap,
201+
}
202+
jsonBytes, _ := json.Marshal(out)
203+
return ctx.Emit(0, jsonBytes)
204+
}
205+
206+
func (p *CounterProcessor) ProcessWatermark(ctx fssdk.Context, sourceID uint32, watermark uint64) error {
207+
return ctx.EmitWatermark(0, watermark)
208+
}
209+
210+
func (p *CounterProcessor) Close(ctx fssdk.Context) error {
211+
p.store = nil
212+
p.counterMap = nil
213+
return nil
214+
}
215+
216+
func main() {}
217+
```
218+
219+
---
220+
221+
## 五、构建与部署
222+
223+
### 5.1 环境要求
224+
225+
- **Go**:1.23+
226+
- **TinyGo**:0.40+(支持 WASI P2)
227+
- **wit-bindgen-go**`go install go.bytecodealliance.org/cmd/wit-bindgen-go@latest`
228+
- **wkg**(可选):用于拉取 WIT 依赖,`cargo install wkg --version 0.10.0`
229+
- **wasm-tools**:用于组件校验等,`cargo install wasm-tools`
230+
231+
### 5.2 生成绑定并构建
232+
233+
在项目根目录下:
234+
235+
```bash
236+
# 安装 Go SDK 工具链(wit-bindgen-go、wkg 等)
237+
make -C go-sdk env
238+
239+
# 生成 WIT 绑定(从 wit/processor.wit 及 deps)
240+
make -C go-sdk bindings
241+
242+
# 运行测试
243+
make -C go-sdk build
244+
```
245+
246+
算子项目(如 `examples/go-processor`)中:
247+
248+
```bash
249+
# 使用 TinyGo 编译为 WASI P2 组件
250+
tinygo build -o build/processor.wasm -target=wasi .
251+
```
252+
253+
具体参数见 `examples/go-processor/build.sh`
254+
255+
### 5.3 注册与运行
256+
257+
将编译得到的 `processor.wasm``config.yaml` 通过 SQL CLI 注册为 function:
258+
259+
```sql
260+
create function with (
261+
'function_path'='/path/to/build/processor.wasm',
262+
'config_path'='/path/to/config.yaml'
263+
);
264+
```
265+
266+
config.yaml 中需配置 `name``type: processor``input-groups``outputs`(如 Kafka)。详见 [Function 配置](function-configuration-zh.md)[examples/go-processor/README.md](../examples/go-processor/README.md)
267+
268+
---
269+
270+
## 六、错误码与异常处理
271+
272+
SDK 通过 `fssdk.SDKError` 返回错误,可用 `errors.As` 获取 `Code`
273+
274+
| 错误码 | 含义 |
275+
|--------|------|
276+
| `ErrRuntimeInvalidDriver` | 传入的 Driver 无效。 |
277+
| `ErrRuntimeNotInitialized` | 运行时未初始化即调用。 |
278+
| `ErrRuntimeClosed` | 运行时已关闭。 |
279+
| `ErrStoreInvalidName` | Store 名称不合法。 |
280+
| `ErrStoreInternal` | Store 内部错误。 |
281+
| `ErrStoreNotFound` | 未找到指定 Store。 |
282+
| `ErrStoreIO` | Store 读写异常。 |
283+
| `ErrResultUnexpected` | 宿主返回了意外结果。 |
284+
285+
处理示例:
286+
287+
```go
288+
if err != nil {
289+
var sdkErr *fssdk.SDKError
290+
if errors.As(err, &sdkErr) {
291+
switch sdkErr.Code {
292+
case fssdk.ErrStoreNotFound:
293+
// 按业务决定是否创建或返回
294+
default:
295+
// 记录并向上返回
296+
}
297+
}
298+
return err
299+
}
300+
```
301+
302+
---
303+
304+
## 七、目录结构参考
305+
306+
```
307+
go-sdk/
308+
├── Makefile # env / wit / bindings / build
309+
├── fssdk.go # 对外入口与类型重导出
310+
├── go.mod / go.sum
311+
├── api/ # 接口与错误码
312+
│ ├── driver.go # Driver、BaseDriver
313+
│ ├── context.go # Context
314+
│ ├── store.go # Store、Iterator、ComplexKey
315+
│ └── errors.go # ErrorCode、SDKError
316+
├── impl/ # 与 WASM 宿主桥接
317+
│ ├── runtime.go
318+
│ ├── context.go
319+
│ └── store.go
320+
├── wit/ # processor.wit 及依赖(可由 make wit 生成)
321+
└── bindings/ # wit-bindgen-go 生成的 Go 代码(make bindings)
322+
```
323+
324+
更多示例与 SQL 操作见 [examples/go-processor/README.md](../examples/go-processor/README.md)[SQL CLI 指南](sql-cli-guide-zh.md)

0 commit comments

Comments
 (0)