Skip to content

Commit e9390d0

Browse files
committed
Update change crd structure
1 parent 7059503 commit e9390d0

17 files changed

Lines changed: 1468 additions & 4306 deletions

api/v1/dataflow_types.go

Lines changed: 378 additions & 58 deletions
Large diffs are not rendered by default.

api/v1/dataflow_validation.go

Lines changed: 161 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package v1
1818

1919
import (
20+
"encoding/json"
21+
2022
corev1 "k8s.io/api/core/v1"
2123
"k8s.io/apimachinery/pkg/util/validation/field"
2224
)
@@ -64,36 +66,62 @@ func validateSource(s *SourceSpec, f *field.Path) field.ErrorList {
6466
all = append(all, field.NotSupported(f.Child("type"), s.Type, []string{"kafka", "postgresql", "trino", "clickhouse", "nessie"}))
6567
return all
6668
}
69+
hasConfig := s.Config != nil && len(s.Config.Raw) > 0
6770
switch s.Type {
6871
case "kafka":
69-
if s.Kafka == nil {
70-
all = append(all, field.Required(f.Child("kafka"), "kafka source configuration is required"))
72+
if hasConfig {
73+
var cfg KafkaSourceSpec
74+
if err := json.Unmarshal(s.Config.Raw, &cfg); err != nil {
75+
all = append(all, field.Invalid(f.Child("config"), string(s.Config.Raw), "invalid kafka config: "+err.Error()))
76+
} else {
77+
all = append(all, validateKafkaSource(&cfg, f.Child("config"))...)
78+
}
7179
} else {
72-
all = append(all, validateKafkaSource(s.Kafka, f.Child("kafka"))...)
80+
all = append(all, field.Required(f.Child("config"), "kafka source configuration is required"))
7381
}
7482
case "postgresql":
75-
if s.PostgreSQL == nil {
76-
all = append(all, field.Required(f.Child("postgresql"), "postgresql source configuration is required"))
83+
if hasConfig {
84+
var cfg PostgreSQLSourceSpec
85+
if err := json.Unmarshal(s.Config.Raw, &cfg); err != nil {
86+
all = append(all, field.Invalid(f.Child("config"), string(s.Config.Raw), "invalid postgresql config: "+err.Error()))
87+
} else {
88+
all = append(all, validatePostgreSQLSource(&cfg, f.Child("config"))...)
89+
}
7790
} else {
78-
all = append(all, validatePostgreSQLSource(s.PostgreSQL, f.Child("postgresql"))...)
91+
all = append(all, field.Required(f.Child("config"), "postgresql source configuration is required"))
7992
}
8093
case "trino":
81-
if s.Trino == nil {
82-
all = append(all, field.Required(f.Child("trino"), "trino source configuration is required"))
94+
if hasConfig {
95+
var cfg TrinoSourceSpec
96+
if err := json.Unmarshal(s.Config.Raw, &cfg); err != nil {
97+
all = append(all, field.Invalid(f.Child("config"), string(s.Config.Raw), "invalid trino config: "+err.Error()))
98+
} else {
99+
all = append(all, validateTrinoSource(&cfg, f.Child("config"))...)
100+
}
83101
} else {
84-
all = append(all, validateTrinoSource(s.Trino, f.Child("trino"))...)
102+
all = append(all, field.Required(f.Child("config"), "trino source configuration is required"))
85103
}
86104
case "clickhouse":
87-
if s.ClickHouse == nil {
88-
all = append(all, field.Required(f.Child("clickhouse"), "clickhouse source configuration is required"))
105+
if hasConfig {
106+
var cfg ClickHouseSourceSpec
107+
if err := json.Unmarshal(s.Config.Raw, &cfg); err != nil {
108+
all = append(all, field.Invalid(f.Child("config"), string(s.Config.Raw), "invalid clickhouse config: "+err.Error()))
109+
} else {
110+
all = append(all, validateClickHouseSource(&cfg, f.Child("config"))...)
111+
}
89112
} else {
90-
all = append(all, validateClickHouseSource(s.ClickHouse, f.Child("clickhouse"))...)
113+
all = append(all, field.Required(f.Child("config"), "clickhouse source configuration is required"))
91114
}
92115
case "nessie":
93-
if s.Nessie == nil {
94-
all = append(all, field.Required(f.Child("nessie"), "nessie source configuration is required"))
116+
if hasConfig {
117+
var cfg NessieSourceSpec
118+
if err := json.Unmarshal(s.Config.Raw, &cfg); err != nil {
119+
all = append(all, field.Invalid(f.Child("config"), string(s.Config.Raw), "invalid nessie config: "+err.Error()))
120+
} else {
121+
all = append(all, validateNessieSource(&cfg, f.Child("config"))...)
122+
}
95123
} else {
96-
all = append(all, validateNessieSource(s.Nessie, f.Child("nessie"))...)
124+
all = append(all, field.Required(f.Child("config"), "nessie source configuration is required"))
97125
}
98126
}
99127
return all
@@ -184,36 +212,62 @@ func validateSink(s *SinkSpec, f *field.Path) field.ErrorList {
184212
all = append(all, field.NotSupported(f.Child("type"), s.Type, []string{"kafka", "postgresql", "trino", "clickhouse", "nessie"}))
185213
return all
186214
}
215+
hasConfig := s.Config != nil && len(s.Config.Raw) > 0
187216
switch s.Type {
188217
case "kafka":
189-
if s.Kafka == nil {
190-
all = append(all, field.Required(f.Child("kafka"), "kafka sink configuration is required"))
218+
if hasConfig {
219+
var cfg KafkaSinkSpec
220+
if err := json.Unmarshal(s.Config.Raw, &cfg); err != nil {
221+
all = append(all, field.Invalid(f.Child("config"), string(s.Config.Raw), "invalid kafka config: "+err.Error()))
222+
} else {
223+
all = append(all, validateKafkaSink(&cfg, f.Child("config"))...)
224+
}
191225
} else {
192-
all = append(all, validateKafkaSink(s.Kafka, f.Child("kafka"))...)
226+
all = append(all, field.Required(f.Child("config"), "kafka sink configuration is required"))
193227
}
194228
case "postgresql":
195-
if s.PostgreSQL == nil {
196-
all = append(all, field.Required(f.Child("postgresql"), "postgresql sink configuration is required"))
229+
if hasConfig {
230+
var cfg PostgreSQLSinkSpec
231+
if err := json.Unmarshal(s.Config.Raw, &cfg); err != nil {
232+
all = append(all, field.Invalid(f.Child("config"), string(s.Config.Raw), "invalid postgresql config: "+err.Error()))
233+
} else {
234+
all = append(all, validatePostgreSQLSink(&cfg, f.Child("config"))...)
235+
}
197236
} else {
198-
all = append(all, validatePostgreSQLSink(s.PostgreSQL, f.Child("postgresql"))...)
237+
all = append(all, field.Required(f.Child("config"), "postgresql sink configuration is required"))
199238
}
200239
case "trino":
201-
if s.Trino == nil {
202-
all = append(all, field.Required(f.Child("trino"), "trino sink configuration is required"))
240+
if hasConfig {
241+
var cfg TrinoSinkSpec
242+
if err := json.Unmarshal(s.Config.Raw, &cfg); err != nil {
243+
all = append(all, field.Invalid(f.Child("config"), string(s.Config.Raw), "invalid trino config: "+err.Error()))
244+
} else {
245+
all = append(all, validateTrinoSink(&cfg, f.Child("config"))...)
246+
}
203247
} else {
204-
all = append(all, validateTrinoSink(s.Trino, f.Child("trino"))...)
248+
all = append(all, field.Required(f.Child("config"), "trino sink configuration is required"))
205249
}
206250
case "clickhouse":
207-
if s.ClickHouse == nil {
208-
all = append(all, field.Required(f.Child("clickhouse"), "clickhouse sink configuration is required"))
251+
if hasConfig {
252+
var cfg ClickHouseSinkSpec
253+
if err := json.Unmarshal(s.Config.Raw, &cfg); err != nil {
254+
all = append(all, field.Invalid(f.Child("config"), string(s.Config.Raw), "invalid clickhouse config: "+err.Error()))
255+
} else {
256+
all = append(all, validateClickHouseSink(&cfg, f.Child("config"))...)
257+
}
209258
} else {
210-
all = append(all, validateClickHouseSink(s.ClickHouse, f.Child("clickhouse"))...)
259+
all = append(all, field.Required(f.Child("config"), "clickhouse sink configuration is required"))
211260
}
212261
case "nessie":
213-
if s.Nessie == nil {
214-
all = append(all, field.Required(f.Child("nessie"), "nessie sink configuration is required"))
262+
if hasConfig {
263+
var cfg NessieSinkSpec
264+
if err := json.Unmarshal(s.Config.Raw, &cfg); err != nil {
265+
all = append(all, field.Invalid(f.Child("config"), string(s.Config.Raw), "invalid nessie config: "+err.Error()))
266+
} else {
267+
all = append(all, validateNessieSink(&cfg, f.Child("config"))...)
268+
}
215269
} else {
216-
all = append(all, validateNessieSink(s.Nessie, f.Child("nessie"))...)
270+
all = append(all, field.Required(f.Child("config"), "nessie sink configuration is required"))
217271
}
218272
}
219273
return all
@@ -413,59 +467,105 @@ func validateTransformations(transformations []TransformationSpec, f *field.Path
413467
[]string{"timestamp", "flatten", "filter", "mask", "router", "select", "remove", "snakeCase", "camelCase"}))
414468
continue
415469
}
470+
hasConfig := t.Config != nil && len(t.Config.Raw) > 0
416471
switch t.Type {
417472
case "timestamp":
418-
if t.Timestamp == nil {
419-
all = append(all, field.Required(idx.Child("timestamp"), "timestamp transformation configuration is required"))
473+
if hasConfig {
474+
var cfg TimestampTransformation
475+
if err := json.Unmarshal(t.Config.Raw, &cfg); err != nil {
476+
all = append(all, field.Invalid(idx.Child("config"), string(t.Config.Raw), "invalid timestamp config: "+err.Error()))
477+
}
478+
} else {
479+
all = append(all, field.Required(idx.Child("config"), "timestamp transformation configuration is required"))
420480
}
421481
case "flatten":
422-
if t.Flatten == nil {
423-
all = append(all, field.Required(idx.Child("flatten"), "flatten transformation configuration is required"))
424-
} else if t.Flatten.Field == "" {
425-
all = append(all, field.Required(idx.Child("flatten", "field"), "field is required"))
482+
if hasConfig {
483+
var cfg FlattenTransformation
484+
if err := json.Unmarshal(t.Config.Raw, &cfg); err != nil {
485+
all = append(all, field.Invalid(idx.Child("config"), string(t.Config.Raw), "invalid flatten config: "+err.Error()))
486+
} else if cfg.Field == "" {
487+
all = append(all, field.Required(idx.Child("config", "field"), "field is required"))
488+
}
489+
} else {
490+
all = append(all, field.Required(idx.Child("config"), "flatten transformation configuration is required"))
426491
}
427492
case "filter":
428-
if t.Filter == nil {
429-
all = append(all, field.Required(idx.Child("filter"), "filter transformation configuration is required"))
430-
} else if t.Filter.Condition == "" {
431-
all = append(all, field.Required(idx.Child("filter", "condition"), "condition is required"))
493+
if hasConfig {
494+
var cfg FilterTransformation
495+
if err := json.Unmarshal(t.Config.Raw, &cfg); err != nil {
496+
all = append(all, field.Invalid(idx.Child("config"), string(t.Config.Raw), "invalid filter config: "+err.Error()))
497+
} else if cfg.Condition == "" {
498+
all = append(all, field.Required(idx.Child("config", "condition"), "condition is required"))
499+
}
500+
} else {
501+
all = append(all, field.Required(idx.Child("config"), "filter transformation configuration is required"))
432502
}
433503
case "mask":
434-
if t.Mask == nil {
435-
all = append(all, field.Required(idx.Child("mask"), "mask transformation configuration is required"))
436-
} else if len(t.Mask.Fields) == 0 {
437-
all = append(all, field.Required(idx.Child("mask", "fields"), "at least one field is required"))
504+
if hasConfig {
505+
var cfg MaskTransformation
506+
if err := json.Unmarshal(t.Config.Raw, &cfg); err != nil {
507+
all = append(all, field.Invalid(idx.Child("config"), string(t.Config.Raw), "invalid mask config: "+err.Error()))
508+
} else if len(cfg.Fields) == 0 {
509+
all = append(all, field.Required(idx.Child("config", "fields"), "at least one field is required"))
510+
}
511+
} else {
512+
all = append(all, field.Required(idx.Child("config"), "mask transformation configuration is required"))
438513
}
439514
case "router":
440-
if t.Router == nil {
441-
all = append(all, field.Required(idx.Child("router"), "router transformation configuration is required"))
515+
routerCfg, _ := t.GetRouterConfig()
516+
if routerCfg == nil {
517+
all = append(all, field.Required(idx.Child("config"), "router transformation configuration is required (config or router)"))
442518
} else {
443-
for j, route := range t.Router.Routes {
519+
routesPath := idx.Child("router", "routes")
520+
if hasConfig {
521+
routesPath = idx.Child("config", "routes")
522+
}
523+
for j, route := range routerCfg.Routes {
444524
if route.Condition == "" {
445-
all = append(all, field.Required(idx.Child("router", "routes").Index(j).Child("condition"), "condition is required"))
525+
all = append(all, field.Required(routesPath.Index(j).Child("condition"), "condition is required"))
446526
}
447-
all = append(all, validateSink(&route.Sink, idx.Child("router", "routes").Index(j).Child("sink"))...)
527+
all = append(all, validateSink(&route.Sink, routesPath.Index(j).Child("sink"))...)
448528
}
449529
}
450530
case "select":
451-
if t.Select == nil {
452-
all = append(all, field.Required(idx.Child("select"), "select transformation configuration is required"))
453-
} else if len(t.Select.Fields) == 0 {
454-
all = append(all, field.Required(idx.Child("select", "fields"), "at least one field is required"))
531+
if hasConfig {
532+
var cfg SelectTransformation
533+
if err := json.Unmarshal(t.Config.Raw, &cfg); err != nil {
534+
all = append(all, field.Invalid(idx.Child("config"), string(t.Config.Raw), "invalid select config: "+err.Error()))
535+
} else if len(cfg.Fields) == 0 {
536+
all = append(all, field.Required(idx.Child("config", "fields"), "at least one field is required"))
537+
}
538+
} else {
539+
all = append(all, field.Required(idx.Child("config"), "select transformation configuration is required"))
455540
}
456541
case "remove":
457-
if t.Remove == nil {
458-
all = append(all, field.Required(idx.Child("remove"), "remove transformation configuration is required"))
459-
} else if len(t.Remove.Fields) == 0 {
460-
all = append(all, field.Required(idx.Child("remove", "fields"), "at least one field is required"))
542+
if hasConfig {
543+
var cfg RemoveTransformation
544+
if err := json.Unmarshal(t.Config.Raw, &cfg); err != nil {
545+
all = append(all, field.Invalid(idx.Child("config"), string(t.Config.Raw), "invalid remove config: "+err.Error()))
546+
} else if len(cfg.Fields) == 0 {
547+
all = append(all, field.Required(idx.Child("config", "fields"), "at least one field is required"))
548+
}
549+
} else {
550+
all = append(all, field.Required(idx.Child("config"), "remove transformation configuration is required"))
461551
}
462552
case "snakeCase":
463-
if t.SnakeCase == nil {
464-
all = append(all, field.Required(idx.Child("snakeCase"), "snakeCase transformation configuration is required"))
553+
if hasConfig {
554+
var cfg SnakeCaseTransformation
555+
if err := json.Unmarshal(t.Config.Raw, &cfg); err != nil {
556+
all = append(all, field.Invalid(idx.Child("config"), string(t.Config.Raw), "invalid snakeCase config: "+err.Error()))
557+
}
558+
} else {
559+
all = append(all, field.Required(idx.Child("config"), "snakeCase transformation configuration is required"))
465560
}
466561
case "camelCase":
467-
if t.CamelCase == nil {
468-
all = append(all, field.Required(idx.Child("camelCase"), "camelCase transformation configuration is required"))
562+
if hasConfig {
563+
var cfg CamelCaseTransformation
564+
if err := json.Unmarshal(t.Config.Raw, &cfg); err != nil {
565+
all = append(all, field.Invalid(idx.Child("config"), string(t.Config.Raw), "invalid camelCase config: "+err.Error()))
566+
}
567+
} else {
568+
all = append(all, field.Required(idx.Child("config"), "camelCase transformation configuration is required"))
469569
}
470570
}
471571
}

api/v1/dataflow_webhook_test.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,27 @@ package v1
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"testing"
23+
24+
"k8s.io/apimachinery/pkg/runtime"
2225
)
2326

27+
func mustConfig(v interface{}) *runtime.RawExtension {
28+
b, _ := json.Marshal(v)
29+
return &runtime.RawExtension{Raw: b}
30+
}
31+
2432
func TestDataFlow_ValidateCreate_Valid(t *testing.T) {
2533
df := &DataFlow{}
2634
df.Spec = DataFlowSpec{
2735
Source: SourceSpec{
28-
Type: "kafka",
29-
Kafka: &KafkaSourceSpec{Brokers: []string{"b"}, Topic: "t"},
36+
Type: "kafka",
37+
Config: mustConfig(KafkaSourceSpec{Brokers: []string{"b"}, Topic: "t"}),
3038
},
3139
Sink: SinkSpec{
32-
Type: "kafka",
33-
Kafka: &KafkaSinkSpec{Brokers: []string{"b"}, Topic: "t"},
40+
Type: "kafka",
41+
Config: mustConfig(KafkaSinkSpec{Brokers: []string{"b"}, Topic: "t"}),
3442
},
3543
}
3644
warnings, err := df.ValidateCreate(context.Background(), df)
@@ -45,10 +53,10 @@ func TestDataFlow_ValidateCreate_Valid(t *testing.T) {
4553
func TestDataFlow_ValidateCreate_Invalid(t *testing.T) {
4654
df := &DataFlow{}
4755
df.Spec = DataFlowSpec{
48-
Source: SourceSpec{Type: "kafka"}, // Kafka config nil
56+
Source: SourceSpec{Type: "kafka"}, // Config nil
4957
Sink: SinkSpec{
50-
Type: "kafka",
51-
Kafka: &KafkaSinkSpec{Brokers: []string{"b"}, Topic: "t"},
58+
Type: "kafka",
59+
Config: mustConfig(KafkaSinkSpec{Brokers: []string{"b"}, Topic: "t"}),
5260
},
5361
}
5462
_, err := df.ValidateCreate(context.Background(), df)
@@ -61,12 +69,12 @@ func TestDataFlow_ValidateUpdate_Valid(t *testing.T) {
6169
df := &DataFlow{}
6270
df.Spec = DataFlowSpec{
6371
Source: SourceSpec{
64-
Type: "kafka",
65-
Kafka: &KafkaSourceSpec{Brokers: []string{"b"}, Topic: "t"},
72+
Type: "kafka",
73+
Config: mustConfig(KafkaSourceSpec{Brokers: []string{"b"}, Topic: "t"}),
6674
},
6775
Sink: SinkSpec{
68-
Type: "kafka",
69-
Kafka: &KafkaSinkSpec{Brokers: []string{"b"}, Topic: "t"},
76+
Type: "kafka",
77+
Config: mustConfig(KafkaSinkSpec{Brokers: []string{"b"}, Topic: "t"}),
7078
},
7179
}
7280
old := &DataFlow{}

0 commit comments

Comments
 (0)