Skip to content

Commit 28f1bdb

Browse files
committed
Add support clickhouse
1 parent 96d796a commit 28f1bdb

50 files changed

Lines changed: 2846 additions & 1093 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/build-and-push.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ jobs:
5757
push: true
5858
tags: ${{ steps.meta.outputs.tags }}
5959
labels: ${{ steps.meta.outputs.labels }}
60+
platforms: linux/amd64,linux/arm64
61+
build-args: |
62+
VERSION=${{ github.ref_name }}
6063
cache-from: type=gha
6164
cache-to: type=gha,mode=max
6265

Taskfile.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,17 @@ tasks:
5757
cmds:
5858
- |
5959
KUBEBUILDER_ASSETS="$$({{.LOCALBIN}}/setup-envtest use {{.ENVTEST_K8S_VERSION}} -p path)" \
60-
go test ./internal/... ./test/... -coverprofile cover.out || true
60+
GOTOOLCHAIN=go1.25.0 go test ./internal/... ./test/... -coverprofile cover.out || true
6161
6262
test-unit:
6363
desc: Run unit tests without envtest with statistics
6464
cmds:
65-
- ./scripts/run-tests-with-stats.sh ./...
65+
- GOTOOLCHAIN=go1.25.0 ./scripts/run-tests-with-stats.sh ./...
6666

6767
test-unit-simple:
6868
desc: Run unit tests without envtest (simple output)
6969
cmds:
70-
- go test ./... -v
70+
- GOTOOLCHAIN=go1.25.0 go test ./... -v
7171

7272
test-all:
7373
desc: Run all unit tests (alias for test-unit)
@@ -77,12 +77,12 @@ tasks:
7777
test-integration:
7878
desc: Run integration tests with statistics
7979
cmds:
80-
- ./scripts/run-tests-with-stats.sh ./test/integration/...
80+
- GOTOOLCHAIN=go1.25.0 ./scripts/run-tests-with-stats.sh ./test/integration/...
8181

8282
test-integration-simple:
8383
desc: Run integration tests (simple output)
8484
cmds:
85-
- go test ./test/integration/... -v
85+
- GOTOOLCHAIN=go1.25.0 go test ./test/integration/... -v
8686

8787
# Build tasks
8888
build:

api/v1/dataflow_types.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ type SourceSpec struct {
7070
// Trino source configuration
7171
// +optional
7272
Trino *TrinoSourceSpec `json:"trino,omitempty"`
73+
74+
// ClickHouse source configuration
75+
// +optional
76+
ClickHouse *ClickHouseSourceSpec `json:"clickhouse,omitempty"`
7377
}
7478

7579
// KafkaSourceSpec defines Kafka source configuration
@@ -230,6 +234,31 @@ type TrinoSourceSpec struct {
230234
TableSecretRef *SecretRef `json:"tableSecretRef,omitempty"`
231235
}
232236

237+
// ClickHouseSourceSpec defines ClickHouse source configuration
238+
type ClickHouseSourceSpec struct {
239+
// ConnectionString for ClickHouse database (e.g., clickhouse://host:9000?username=default&password=xxx&database=default)
240+
ConnectionString string `json:"connectionString"`
241+
242+
// Table to read from
243+
Table string `json:"table"`
244+
245+
// Query for custom SQL query (optional, if not provided, reads from table)
246+
// +optional
247+
Query string `json:"query,omitempty"`
248+
249+
// PollInterval in seconds for polling mode
250+
// +optional
251+
PollInterval *int32 `json:"pollInterval,omitempty"`
252+
253+
// ConnectionStringSecretRef references a Kubernetes secret for connection string
254+
// +optional
255+
ConnectionStringSecretRef *SecretRef `json:"connectionStringSecretRef,omitempty"`
256+
257+
// TableSecretRef references a Kubernetes secret for table name
258+
// +optional
259+
TableSecretRef *SecretRef `json:"tableSecretRef,omitempty"`
260+
}
261+
233262
// KeycloakConfig defines Keycloak OAuth2/OIDC authentication configuration
234263
type KeycloakConfig struct {
235264
// ServerURL is the Keycloak server URL (e.g., https://keycloak.example.com/auth)
@@ -304,6 +333,10 @@ type SinkSpec struct {
304333
// Trino sink configuration
305334
// +optional
306335
Trino *TrinoSinkSpec `json:"trino,omitempty"`
336+
337+
// ClickHouse sink configuration
338+
// +optional
339+
ClickHouse *ClickHouseSinkSpec `json:"clickhouse,omitempty"`
307340
}
308341

309342
// KafkaSinkSpec defines Kafka sink configuration
@@ -409,6 +442,35 @@ type TrinoSinkSpec struct {
409442
TableSecretRef *SecretRef `json:"tableSecretRef,omitempty"`
410443
}
411444

445+
// ClickHouseSinkSpec defines ClickHouse sink configuration
446+
type ClickHouseSinkSpec struct {
447+
// ConnectionString for ClickHouse database (e.g., clickhouse://host:9000?username=default&password=xxx&database=default)
448+
ConnectionString string `json:"connectionString"`
449+
450+
// Table to write to
451+
Table string `json:"table"`
452+
453+
// BatchSize for batch inserts
454+
// +optional
455+
BatchSize *int32 `json:"batchSize,omitempty"`
456+
457+
// BatchFlushIntervalSeconds flushes the batch after this many seconds even if BatchSize is not reached (default: 10)
458+
// +optional
459+
BatchFlushIntervalSeconds *int32 `json:"batchFlushIntervalSeconds,omitempty"`
460+
461+
// AutoCreateTable automatically creates the table if it doesn't exist
462+
// +optional
463+
AutoCreateTable *bool `json:"autoCreateTable,omitempty"`
464+
465+
// ConnectionStringSecretRef references a Kubernetes secret for connection string
466+
// +optional
467+
ConnectionStringSecretRef *SecretRef `json:"connectionStringSecretRef,omitempty"`
468+
469+
// TableSecretRef references a Kubernetes secret for table name
470+
// +optional
471+
TableSecretRef *SecretRef `json:"tableSecretRef,omitempty"`
472+
}
473+
412474
// SecretRef references a Kubernetes secret
413475
type SecretRef struct {
414476
// Name of the secret

api/v1/dataflow_validation.go

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ import (
2222
)
2323

2424
// Valid source types (must match connectors/factory.go).
25-
var validSourceTypes = map[string]bool{"kafka": true, "postgresql": true, "trino": true}
25+
var validSourceTypes = map[string]bool{"kafka": true, "postgresql": true, "trino": true, "clickhouse": true}
2626

2727
// Valid sink types (must match connectors/factory.go).
28-
var validSinkTypes = map[string]bool{"kafka": true, "postgresql": true, "trino": true}
28+
var validSinkTypes = map[string]bool{"kafka": true, "postgresql": true, "trino": true, "clickhouse": true}
2929

3030
// Valid transformation types (must match transformers/factory.go).
3131
var validTransformationTypes = map[string]bool{
@@ -61,7 +61,7 @@ func validateSource(s *SourceSpec, f *field.Path) field.ErrorList {
6161
return all
6262
}
6363
if !validSourceTypes[s.Type] {
64-
all = append(all, field.NotSupported(f.Child("type"), s.Type, []string{"kafka", "postgresql", "trino"}))
64+
all = append(all, field.NotSupported(f.Child("type"), s.Type, []string{"kafka", "postgresql", "trino", "clickhouse"}))
6565
return all
6666
}
6767
switch s.Type {
@@ -83,6 +83,12 @@ func validateSource(s *SourceSpec, f *field.Path) field.ErrorList {
8383
} else {
8484
all = append(all, validateTrinoSource(s.Trino, f.Child("trino"))...)
8585
}
86+
case "clickhouse":
87+
if s.ClickHouse == nil {
88+
all = append(all, field.Required(f.Child("clickhouse"), "clickhouse source configuration is required"))
89+
} else {
90+
all = append(all, validateClickHouseSource(s.ClickHouse, f.Child("clickhouse"))...)
91+
}
8692
}
8793
return all
8894
}
@@ -169,7 +175,7 @@ func validateSink(s *SinkSpec, f *field.Path) field.ErrorList {
169175
return all
170176
}
171177
if !validSinkTypes[s.Type] {
172-
all = append(all, field.NotSupported(f.Child("type"), s.Type, []string{"kafka", "postgresql", "trino"}))
178+
all = append(all, field.NotSupported(f.Child("type"), s.Type, []string{"kafka", "postgresql", "trino", "clickhouse"}))
173179
return all
174180
}
175181
switch s.Type {
@@ -191,6 +197,12 @@ func validateSink(s *SinkSpec, f *field.Path) field.ErrorList {
191197
} else {
192198
all = append(all, validateTrinoSink(s.Trino, f.Child("trino"))...)
193199
}
200+
case "clickhouse":
201+
if s.ClickHouse == nil {
202+
all = append(all, field.Required(f.Child("clickhouse"), "clickhouse sink configuration is required"))
203+
} else {
204+
all = append(all, validateClickHouseSink(s.ClickHouse, f.Child("clickhouse"))...)
205+
}
194206
}
195207
return all
196208
}
@@ -266,6 +278,44 @@ func validateTrinoSink(t *TrinoSinkSpec, f *field.Path) field.ErrorList {
266278
return all
267279
}
268280

281+
func validateClickHouseSource(c *ClickHouseSourceSpec, f *field.Path) field.ErrorList {
282+
var all field.ErrorList
283+
hasConn := c.ConnectionString != "" || c.ConnectionStringSecretRef != nil
284+
if !hasConn {
285+
all = append(all, field.Required(f.Child("connectionString"), "connectionString or connectionStringSecretRef is required"))
286+
}
287+
hasTable := c.Table != "" || c.TableSecretRef != nil
288+
if !hasTable {
289+
all = append(all, field.Required(f.Child("table"), "table or tableSecretRef is required"))
290+
}
291+
if c.ConnectionStringSecretRef != nil {
292+
all = append(all, validateSecretRef(c.ConnectionStringSecretRef, f.Child("connectionStringSecretRef"))...)
293+
}
294+
if c.TableSecretRef != nil {
295+
all = append(all, validateSecretRef(c.TableSecretRef, f.Child("tableSecretRef"))...)
296+
}
297+
return all
298+
}
299+
300+
func validateClickHouseSink(c *ClickHouseSinkSpec, f *field.Path) field.ErrorList {
301+
var all field.ErrorList
302+
hasConn := c.ConnectionString != "" || c.ConnectionStringSecretRef != nil
303+
if !hasConn {
304+
all = append(all, field.Required(f.Child("connectionString"), "connectionString or connectionStringSecretRef is required"))
305+
}
306+
hasTable := c.Table != "" || c.TableSecretRef != nil
307+
if !hasTable {
308+
all = append(all, field.Required(f.Child("table"), "table or tableSecretRef is required"))
309+
}
310+
if c.ConnectionStringSecretRef != nil {
311+
all = append(all, validateSecretRef(c.ConnectionStringSecretRef, f.Child("connectionStringSecretRef"))...)
312+
}
313+
if c.TableSecretRef != nil {
314+
all = append(all, validateSecretRef(c.TableSecretRef, f.Child("tableSecretRef"))...)
315+
}
316+
return all
317+
}
318+
269319
func validateSecretRef(r *SecretRef, f *field.Path) field.ErrorList {
270320
var all field.ErrorList
271321
if r == nil {

api/v1/zz_generated.deepcopy.go

Lines changed: 80 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/gui-server/main.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,19 @@ func main() {
4242
flag.StringVar(&bindAddr, "bind-address", ":8080", "The address the GUI server binds to.")
4343
flag.StringVar(&logLevel, "log-level", "info", "Log level: debug, info, warn, error")
4444

45-
// kubeconfig может быть зарегистрирован плагинами auth, используем переменную окружения или стандартный путь
45+
// kubeconfig may be registered by auth plugins; use env var or standard path
4646
flag.Parse()
4747

48-
// Получаем kubeconfig из переменной окружения или используем стандартный путь
48+
// Get kubeconfig from env var or use standard path
4949
kubeconfig := os.Getenv("KUBECONFIG")
5050
if kubeconfig == "" {
51-
// Попробуем получить из флага, если он был зарегистрирован плагином
51+
// Try to get from flag if it was registered by a plugin
5252
if f := flag.Lookup("kubeconfig"); f != nil {
5353
kubeconfig = f.Value.String()
5454
}
5555
}
5656

57-
// Устанавливаем уровень логирования
57+
// Set log level
5858
var level zapcore.Level
5959
switch logLevel {
6060
case "debug":
@@ -74,23 +74,23 @@ func main() {
7474
config.EncoderConfig = zap.NewProductionEncoderConfig()
7575
zapLogger, err := config.Build()
7676
if err != nil {
77-
// Не panic — пишем в stderr и выходим с кодом 2, чтобы сообщение было видно в логах пода
77+
// Don't panic — write to stderr and exit with code 2 so the message is visible in pod logs
7878
os.Stderr.WriteString("gui-server: failed to create logger: " + err.Error() + "\n")
7979
os.Exit(2)
8080
}
8181
ctrl.SetLogger(zapr.NewLogger(zapLogger))
8282

8383
setupLog.Info("Starting GUI server", "bind-address", bindAddr)
8484

85-
// Создаем GUI сервер
85+
// Create GUI server
8686
server, err := gui.NewServer(bindAddr, kubeconfig)
8787
if err != nil {
8888
setupLog.Error(err, "unable to create GUI server")
89-
_ = zapLogger.Sync() // сброс буфера, чтобы сообщение об ошибке попало в логи пода
89+
_ = zapLogger.Sync() // flush buffer so error message appears in pod logs
9090
os.Exit(1)
9191
}
9292

93-
// Запускаем сервер
93+
// Start server
9494
if err := server.Start(); err != nil {
9595
setupLog.Error(err, "unable to start GUI server")
9696
_ = zapLogger.Sync()

0 commit comments

Comments
 (0)