Skip to content

Commit c6167de

Browse files
committed
Remove old config
1 parent 9a56ff1 commit c6167de

18 files changed

Lines changed: 66 additions & 201 deletions

CHANGELOG.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,29 @@
22

33
## [1.0.4](https://github.com/dataflow-operator/dataflow/compare/v1.0.3...v1.0.4) (2026-03-15)
44

5+
### BREAKING CHANGE
6+
7+
Удалена поддержка legacy-формата конфигурации. Source, sink и transformations теперь используют только формат `type` + `config`.
8+
9+
**Миграция:**
10+
```yaml
11+
# Было (legacy):
12+
source:
13+
type: kafka
14+
kafka:
15+
brokers: [localhost:9092]
16+
topic: my-topic
17+
18+
# Стало:
19+
source:
20+
type: kafka
21+
config:
22+
brokers: [localhost:9092]
23+
topic: my-topic
24+
```
25+
26+
Структура внутри `config` совпадает со структурой внутри `kafka`/`postgresql`/`clickhouse`/`trino` — меняется только ключ верхнего уровня.
27+
528
## [1.0.3](https://github.com/dataflow-operator/dataflow/compare/v1.0.2...v1.0.3) (2026-03-09)
629

730
## [1.0.2](https://github.com/dataflow-operator/dataflow/compare/v1.0.1...v1.0.2) (2026-03-09)

api/v1/dataflow_types.go

Lines changed: 0 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ type DataFlowSpec struct {
7878
}
7979

8080
// SourceSpec defines the source configuration (type + config).
81-
// Supports both generic format (type+config) and legacy format (type+kafka/postgresql/etc) when unmarshaling.
8281
// +kubebuilder:pruning:PreserveUnknownFields
8382
type SourceSpec struct {
8483
// Type of source: kafka, postgresql, trino, clickhouse, nessie, or plugin type
@@ -91,50 +90,6 @@ type SourceSpec struct {
9190
Config *runtime.RawExtension `json:"config,omitempty"`
9291
}
9392

94-
// sourceSpecRaw is used for unmarshaling legacy format (type + kafka/postgresql/etc).
95-
type sourceSpecRaw struct {
96-
Type string `json:"type"`
97-
Config *runtime.RawExtension `json:"config,omitempty"`
98-
Kafka *KafkaSourceSpec `json:"kafka,omitempty"`
99-
PostgreSQL *PostgreSQLSourceSpec `json:"postgresql,omitempty"`
100-
Trino *TrinoSourceSpec `json:"trino,omitempty"`
101-
ClickHouse *ClickHouseSourceSpec `json:"clickhouse,omitempty"`
102-
Nessie *NessieSourceSpec `json:"nessie,omitempty"`
103-
}
104-
105-
// UnmarshalJSON supports both generic (type+config) and legacy (type+kafka/etc) formats.
106-
func (s *SourceSpec) UnmarshalJSON(data []byte) error {
107-
var r sourceSpecRaw
108-
if err := json.Unmarshal(data, &r); err != nil {
109-
return err
110-
}
111-
s.Type = r.Type
112-
s.Config = r.Config
113-
if s.Config == nil || len(s.Config.Raw) == 0 {
114-
var cfg interface{}
115-
switch r.Type {
116-
case "kafka":
117-
cfg = r.Kafka
118-
case "postgresql":
119-
cfg = r.PostgreSQL
120-
case "trino":
121-
cfg = r.Trino
122-
case "clickhouse":
123-
cfg = r.ClickHouse
124-
case "nessie":
125-
cfg = r.Nessie
126-
}
127-
if cfg != nil {
128-
b, err := json.Marshal(cfg)
129-
if err != nil {
130-
return err
131-
}
132-
s.Config = &runtime.RawExtension{Raw: b}
133-
}
134-
}
135-
return nil
136-
}
137-
13893
// GetKafkaConfig returns Kafka config from Config.
13994
func (s *SourceSpec) GetKafkaConfig() (*KafkaSourceSpec, error) {
14095
if s.Config == nil || len(s.Config.Raw) == 0 {
@@ -505,7 +460,6 @@ type KeycloakConfig struct {
505460
}
506461

507462
// SinkSpec defines the sink configuration (type + config).
508-
// Supports both generic format (type+config) and legacy format (type+kafka/postgresql/etc) when unmarshaling.
509463
// +kubebuilder:pruning:PreserveUnknownFields
510464
type SinkSpec struct {
511465
// Type of sink: kafka, postgresql, trino, clickhouse, nessie, or plugin type
@@ -517,50 +471,6 @@ type SinkSpec struct {
517471
Config *runtime.RawExtension `json:"config,omitempty"`
518472
}
519473

520-
// sinkSpecRaw is used for unmarshaling legacy format.
521-
type sinkSpecRaw struct {
522-
Type string `json:"type"`
523-
Config *runtime.RawExtension `json:"config,omitempty"`
524-
Kafka *KafkaSinkSpec `json:"kafka,omitempty"`
525-
PostgreSQL *PostgreSQLSinkSpec `json:"postgresql,omitempty"`
526-
Trino *TrinoSinkSpec `json:"trino,omitempty"`
527-
ClickHouse *ClickHouseSinkSpec `json:"clickhouse,omitempty"`
528-
Nessie *NessieSinkSpec `json:"nessie,omitempty"`
529-
}
530-
531-
// UnmarshalJSON supports both generic and legacy formats.
532-
func (s *SinkSpec) UnmarshalJSON(data []byte) error {
533-
var r sinkSpecRaw
534-
if err := json.Unmarshal(data, &r); err != nil {
535-
return err
536-
}
537-
s.Type = r.Type
538-
s.Config = r.Config
539-
if s.Config == nil || len(s.Config.Raw) == 0 {
540-
var cfg interface{}
541-
switch r.Type {
542-
case "kafka":
543-
cfg = r.Kafka
544-
case "postgresql":
545-
cfg = r.PostgreSQL
546-
case "trino":
547-
cfg = r.Trino
548-
case "clickhouse":
549-
cfg = r.ClickHouse
550-
case "nessie":
551-
cfg = r.Nessie
552-
}
553-
if cfg != nil {
554-
b, err := json.Marshal(cfg)
555-
if err != nil {
556-
return err
557-
}
558-
s.Config = &runtime.RawExtension{Raw: b}
559-
}
560-
}
561-
return nil
562-
}
563-
564474
// GetKafkaConfig returns Kafka sink config.
565475
func (s *SinkSpec) GetKafkaConfig() (*KafkaSinkSpec, error) {
566476
if s.Config == nil || len(s.Config.Raw) == 0 {
@@ -905,7 +815,6 @@ type SASLConfig struct {
905815
}
906816

907817
// TransformationSpec defines a transformation to apply (type + config).
908-
// Supports both generic format (type+config) and legacy format (type+timestamp/flatten/etc) when unmarshaling.
909818
// +kubebuilder:pruning:PreserveUnknownFields
910819
type TransformationSpec struct {
911820
// Type of transformation: timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase
@@ -917,62 +826,6 @@ type TransformationSpec struct {
917826
Config *runtime.RawExtension `json:"config,omitempty"`
918827
}
919828

920-
// transformationSpecRaw is used for unmarshaling legacy format.
921-
type transformationSpecRaw struct {
922-
Type string `json:"type"`
923-
Config *runtime.RawExtension `json:"config,omitempty"`
924-
Timestamp *TimestampTransformation `json:"timestamp,omitempty"`
925-
Flatten *FlattenTransformation `json:"flatten,omitempty"`
926-
Filter *FilterTransformation `json:"filter,omitempty"`
927-
Mask *MaskTransformation `json:"mask,omitempty"`
928-
Router *RouterTransformation `json:"router,omitempty"`
929-
Select *SelectTransformation `json:"select,omitempty"`
930-
Remove *RemoveTransformation `json:"remove,omitempty"`
931-
SnakeCase *SnakeCaseTransformation `json:"snakeCase,omitempty"`
932-
CamelCase *CamelCaseTransformation `json:"camelCase,omitempty"`
933-
}
934-
935-
// UnmarshalJSON supports both generic and legacy formats.
936-
func (t *TransformationSpec) UnmarshalJSON(data []byte) error {
937-
var r transformationSpecRaw
938-
if err := json.Unmarshal(data, &r); err != nil {
939-
return err
940-
}
941-
t.Type = r.Type
942-
t.Config = r.Config
943-
if t.Config == nil || len(t.Config.Raw) == 0 {
944-
var cfg interface{}
945-
switch r.Type {
946-
case "timestamp":
947-
cfg = r.Timestamp
948-
case "flatten":
949-
cfg = r.Flatten
950-
case "filter":
951-
cfg = r.Filter
952-
case "mask":
953-
cfg = r.Mask
954-
case "router":
955-
cfg = r.Router
956-
case "select":
957-
cfg = r.Select
958-
case "remove":
959-
cfg = r.Remove
960-
case "snakeCase":
961-
cfg = r.SnakeCase
962-
case "camelCase":
963-
cfg = r.CamelCase
964-
}
965-
if cfg != nil {
966-
b, err := json.Marshal(cfg)
967-
if err != nil {
968-
return err
969-
}
970-
t.Config = &runtime.RawExtension{Raw: b}
971-
}
972-
}
973-
return nil
974-
}
975-
976829
// GetTimestampConfig returns Timestamp transformation config.
977830
func (t *TransformationSpec) GetTimestampConfig() (*TimestampTransformation, error) {
978831
if t.Config == nil || len(t.Config.Raw) == 0 {

config/crd/bases/dataflow.dataflow.io_dataflows.yaml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,9 +1149,8 @@ spec:
11491149
description: Transformations is a list of transformations to apply
11501150
to messages
11511151
items:
1152-
description: |-
1153-
TransformationSpec defines a transformation to apply (type + config).
1154-
Supports both generic format (type+config) and legacy format (type+timestamp/flatten/etc) when unmarshaling.
1152+
description: TransformationSpec defines a transformation to apply
1153+
(type + config).
11551154
properties:
11561155
config:
11571156
description: Config holds transformation configuration. Structure
@@ -1171,7 +1170,6 @@ spec:
11711170
- sink
11721171
- source
11731172
type: object
1174-
x-kubernetes-preserve-unknown-fields: true
11751173
status:
11761174
description: DataFlowStatus defines the observed state of DataFlow
11771175
properties:

config/samples/clickhouse-to-clickhouse.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ metadata:
55
spec:
66
source:
77
type: clickhouse
8-
clickhouse:
8+
config:
99
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
1010
table: products
1111
pollInterval: 5
1212
sink:
1313
type: clickhouse
14-
clickhouse:
14+
config:
1515
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
1616
table: products_clone
1717
batchSize: 100

config/samples/clickhouse-to-clickhouse2.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ metadata:
55
spec:
66
source:
77
type: clickhouse
8-
clickhouse:
8+
config:
99
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
1010
table: products
1111
pollInterval: 5
1212
sink:
1313
type: clickhouse
14-
clickhouse:
14+
config:
1515
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
1616
table: products_row
1717
batchSize: 100

config/samples/flatten-example.yaml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,21 @@ metadata:
55
spec:
66
source:
77
type: kafka
8-
kafka:
8+
config:
99
brokers:
1010
- localhost:9092
1111
topic: stock-topic
1212
consumerGroup: dataflow-group
1313
sink:
1414
type: postgresql
15-
postgresql:
15+
config:
1616
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
1717
table: stock_items
1818
autoCreateTable: true
1919
transformations:
2020
- type: flatten
21-
flatten:
21+
config:
2222
field: rowsStock
2323
- type: timestamp
24-
timestamp:
24+
config:
2525
fieldName: created_at
26-

config/samples/kafka-to-clickhouse.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ metadata:
55
spec:
66
source:
77
type: kafka
8-
kafka:
8+
config:
99
brokers:
1010
- localhost:9092
1111
topic: input-topic
1212
consumerGroup: dataflow-group
1313
sink:
1414
type: clickhouse
15-
clickhouse:
15+
config:
1616
connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
1717
table: output_table
1818
batchSize: 100

config/samples/kafka-to-postgres-secrets.yaml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ metadata:
2828
spec:
2929
source:
3030
type: kafka
31-
kafka:
31+
config:
3232
brokersSecretRef:
3333
name: kafka-credentials
3434
key: brokers
@@ -48,11 +48,10 @@ spec:
4848
key: password
4949
sink:
5050
type: postgresql
51-
postgresql:
51+
config:
5252
connectionStringSecretRef:
5353
name: postgres-credentials
5454
key: connectionString
5555
tableSecretRef:
5656
name: postgres-credentials
5757
key: table
58-

config/samples/kafka-to-postgres-with-errors.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,19 @@ metadata:
55
spec:
66
source:
77
type: kafka
8-
kafka:
8+
config:
99
brokers:
1010
- localhost:9092
1111
topic: input-topic
1212
consumerGroup: dataflow-group
1313
sink:
1414
type: postgresql
15-
postgresql:
15+
config:
1616
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
1717
table: output_table
1818
errors:
1919
type: kafka
20-
kafka:
20+
config:
2121
brokers:
2222
- localhost:9092
2323
topic: error-topic

config/samples/kafka-to-postgres-with-resources.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ metadata:
55
spec:
66
source:
77
type: kafka
8-
kafka:
8+
config:
99
brokers:
1010
- localhost:9092
1111
topic: input-topic
1212
consumerGroup: dataflow-group
1313
sink:
1414
type: postgresql
15-
postgresql:
15+
config:
1616
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
1717
table: output_table
1818
# Настройка ресурсов для пода процессора

0 commit comments

Comments
 (0)