Skip to content

Commit 8e4c362

Browse files
committed
Improve build image and fix connector pg
1 parent 09ac791 commit 8e4c362

8 files changed

Lines changed: 123 additions & 97 deletions

File tree

.github/workflows/crd.yml

Lines changed: 0 additions & 74 deletions
This file was deleted.

.github/workflows/release.yml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,12 @@ jobs:
8282
platforms: ${{ steps.platforms.outputs.platforms }}
8383
build-args: |
8484
VERSION=v${{ needs.release.outputs.new_release_version }}
85-
cache-from: type=gha
86-
cache-to: type=gha,mode=max
85+
# GHA cache + registry cache (сохраняется между runs, переживает eviction)
86+
cache-from: |
87+
type=gha
88+
type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:buildcache
89+
cache-to: |
90+
type=gha,mode=max
91+
type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:buildcache,mode=max
92+
# Отключаем attestation для ускорения (опционально)
93+
provenance: false

Dockerfile

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# syntax=docker/dockerfile:1.4
12
# Build stage. Контекст — каталог dataflow (docker build из dataflow/ или -f dataflow/Dockerfile dataflow).
23
FROM golang:1.25-alpine AS builder
34

@@ -13,11 +14,13 @@ COPY . .
1314

1415
ARG VERSION=dev
1516
ARG TARGETARCH=amd64
17+
# -trimpath убирает пути из бинарника, -ldflags -s -w уменьшает размер
1618
RUN --mount=type=cache,target=/go/pkg/mod \
1719
--mount=type=cache,target=/root/.cache/go-build \
18-
CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH sh -c "\
19-
go build -o manager -ldflags \"-X github.com/dataflow-operator/dataflow/internal/version.Version=${VERSION}\" main.go && \
20-
go build -o processor cmd/processor/main.go"
20+
CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH go build -trimpath -ldflags "-s -w -X github.com/dataflow-operator/dataflow/internal/version.Version=${VERSION}" \
21+
-o manager main.go && \
22+
CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH go build -trimpath -ldflags "-s -w" \
23+
-o processor ./cmd/processor
2124
# Final stage
2225
FROM alpine:3.19
2326

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Kafka → PostgreSQL (rawMode) для staging
2+
# Namespace: dataflow-staging (совпадает с staging-postgresql, staging-kafka)
3+
# Kafka: staging-kafka:9092 (как в conductor). Bitnami: staging-kafka-kafka-bootstrap:9092
4+
apiVersion: dataflow.dataflow.io/v1
5+
kind: DataFlow
6+
metadata:
7+
name: kafka-to-postgres-raw
8+
namespace: dataflow-staging
9+
spec:
10+
source:
11+
type: kafka
12+
config:
13+
brokers:
14+
- staging-kafka:9092
15+
topic: kafka-to-postgres-raw
16+
consumerGroup: dataflow-raw-group
17+
sink:
18+
type: postgresql
19+
config:
20+
connectionString: "postgres://postgres:dataflow@staging-postgresql:5432/dataflow?sslmode=disable"
21+
table: kafka-to-postgres-raw-events
22+
autoCreateTable: true
23+
rawMode: true

internal/connectors/base.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,3 +169,16 @@ func ParseTableRef(table string) (schema, name string) {
169169
}
170170
return "public", table
171171
}
172+
173+
// quotePostgreSQLIdentifier quotes a PostgreSQL identifier (table, column, index name).
174+
// Required when identifier contains hyphens, spaces, or other special chars.
175+
func quotePostgreSQLIdentifier(id string) string {
176+
return `"` + strings.ReplaceAll(id, `"`, `""`) + `"`
177+
}
178+
179+
// QuotePostgreSQLTableRef returns a properly quoted schema.table ref for use in SQL.
180+
// E.g. "kafka-to-postgres-raw-events" -> "public"."kafka-to-postgres-raw-events"
181+
func QuotePostgreSQLTableRef(table string) string {
182+
schema, name := ParseTableRef(table)
183+
return quotePostgreSQLIdentifier(schema) + "." + quotePostgreSQLIdentifier(name)
184+
}

internal/connectors/base_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,3 +155,40 @@ func TestParseTableRef(t *testing.T) {
155155
})
156156
}
157157
}
158+
159+
func TestQuotePostgreSQLIdentifier(t *testing.T) {
160+
tests := []struct {
161+
id string
162+
want string
163+
}{
164+
{"users", `"users"`},
165+
{"kafka-to-postgres-raw-events", `"kafka-to-postgres-raw-events"`},
166+
{"table-name", `"table-name"`},
167+
{`col"umn`, `"col""umn"`},
168+
{"simple", `"simple"`},
169+
}
170+
for _, tt := range tests {
171+
t.Run(tt.id, func(t *testing.T) {
172+
got := quotePostgreSQLIdentifier(tt.id)
173+
assert.Equal(t, tt.want, got)
174+
})
175+
}
176+
}
177+
178+
func TestQuotePostgreSQLTableRef(t *testing.T) {
179+
tests := []struct {
180+
table string
181+
want string
182+
}{
183+
{"users", `"public"."users"`},
184+
{"kafka-to-postgres-raw-events", `"public"."kafka-to-postgres-raw-events"`},
185+
{"public.events", `"public"."events"`},
186+
{"myschema.my-table", `"myschema"."my-table"`},
187+
}
188+
for _, tt := range tests {
189+
t.Run(tt.table, func(t *testing.T) {
190+
got := QuotePostgreSQLTableRef(tt.table)
191+
assert.Equal(t, tt.want, got)
192+
})
193+
}
194+
}

internal/connectors/postgresql.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,14 @@ func (p *PostgreSQLSourceConnector) ensureSourceTable(ctx context.Context) error
152152
return nil
153153
}
154154
p.logger.Info("Creating source table", "table", p.config.Table)
155+
quotedTable := QuotePostgreSQLTableRef(p.config.Table)
155156
createQuery := fmt.Sprintf(`
156157
CREATE TABLE IF NOT EXISTS %s (
157158
id SERIAL PRIMARY KEY,
158159
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
159160
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
160161
)
161-
`, p.config.Table)
162+
`, quotedTable)
162163
_, err = p.conn.Exec(ctx, createQuery)
163164
if err != nil {
164165
return fmt.Errorf("failed to create source table: %w", err)
@@ -348,12 +349,13 @@ func (p *PostgreSQLSourceConnector) buildReadQuery() string {
348349
p.checkpointMu.Lock()
349350
lastRead := p.lastReadChangeTime
350351
p.checkpointMu.Unlock()
352+
quotedTable := QuotePostgreSQLTableRef(p.config.Table)
351353
if lastRead != nil {
352354
// RFC3339Nano preserves sub-second precision to avoid re-reading or skipping rows at boundaries
353355
return fmt.Sprintf("SELECT * FROM %s WHERE %s > '%s' ORDER BY %s, id",
354-
p.config.Table, orderExpr, lastRead.UTC().Format(time.RFC3339Nano), orderExpr)
356+
quotedTable, orderExpr, lastRead.UTC().Format(time.RFC3339Nano), orderExpr)
355357
}
356-
return fmt.Sprintf("SELECT * FROM %s ORDER BY %s, id", p.config.Table, orderExpr)
358+
return fmt.Sprintf("SELECT * FROM %s ORDER BY %s, id", quotedTable, orderExpr)
357359
}
358360

359361
// extractChangeTime returns the change tracking timestamp for the row (for checkpoint).
@@ -529,6 +531,7 @@ func (p *PostgreSQLSinkConnector) ensureTable(ctx context.Context) error {
529531
}
530532

531533
p.logger.Info("Creating table (raw mode)", "table", p.config.Table)
534+
quotedTable := QuotePostgreSQLTableRef(p.config.Table)
532535
createQuery := fmt.Sprintf(`
533536
CREATE TABLE IF NOT EXISTS %s (
534537
id SERIAL PRIMARY KEY,
@@ -538,7 +541,7 @@ func (p *PostgreSQLSinkConnector) ensureTable(ctx context.Context) error {
538541
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
539542
deleted_at TIMESTAMP
540543
)
541-
`, p.config.Table)
544+
`, quotedTable)
542545

543546
_, err = p.conn.Exec(ctx, createQuery)
544547
if err != nil {
@@ -549,7 +552,9 @@ func (p *PostgreSQLSinkConnector) ensureTable(ctx context.Context) error {
549552
p.hasJSONBCached = nil // raw mode table has value, not data column
550553
p.logger.Info("Table created successfully", "table", p.config.Table)
551554

552-
indexQuery := fmt.Sprintf(`CREATE INDEX IF NOT EXISTS idx_%s_value ON %s USING GIN (value)`, p.config.Table, p.config.Table)
555+
_, tableName := ParseTableRef(p.config.Table)
556+
indexName := quotePostgreSQLIdentifier("idx_" + tableName + "_value")
557+
indexQuery := fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s ON %s USING GIN (value)`, indexName, quotedTable)
553558
_, err = p.conn.Exec(ctx, indexQuery)
554559
if err != nil {
555560
p.logger.Info("Failed to create index (non-critical)", "table", p.config.Table, "error", err)
@@ -590,7 +595,7 @@ func (p *PostgreSQLSinkConnector) ensureTableFromMessage(ctx context.Context, da
590595
for _, col := range columns {
591596
val := data[col]
592597
pgType := inferPostgreSQLType(val)
593-
def := fmt.Sprintf(`"%s" %s`, col, pgType)
598+
def := fmt.Sprintf(`%s %s`, quotePostgreSQLIdentifier(col), pgType)
594599
if col == "id" {
595600
def += " PRIMARY KEY"
596601
}
@@ -607,7 +612,8 @@ func (p *PostgreSQLSinkConnector) ensureTableFromMessage(ctx context.Context, da
607612
}
608613

609614
p.logger.Info("Creating table from message structure", "table", p.config.Table, "columns", columns)
610-
createQuery := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s)`, p.config.Table, joinStrings(colDefs, ", "))
615+
quotedTable := QuotePostgreSQLTableRef(p.config.Table)
616+
createQuery := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s)`, quotedTable, joinStrings(colDefs, ", "))
611617
_, err = p.conn.Exec(ctx, createQuery)
612618
if err != nil {
613619
return fmt.Errorf("failed to create table from message: %w", err)
@@ -906,7 +912,8 @@ func (p *PostgreSQLSinkConnector) trySoftDelete(msg *types.Message, batch *pgx.B
906912
if p.config.ConflictKey != nil && *p.config.ConflictKey != "" {
907913
conflictKey = *p.config.ConflictKey
908914
}
909-
query := fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s = $1", p.config.Table, *p.config.SoftDeleteColumn, conflictKey)
915+
quotedTable := QuotePostgreSQLTableRef(p.config.Table)
916+
query := fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s = $1", quotedTable, quotePostgreSQLIdentifier(*p.config.SoftDeleteColumn), quotePostgreSQLIdentifier(conflictKey))
910917
batch.Queue(query, idVal)
911918
*batchMessages = append(*batchMessages, msg)
912919
*count++
@@ -959,21 +966,23 @@ func (p *PostgreSQLSinkConnector) buildInsertForMessage(ctx context.Context, dat
959966
}
960967
metaJSON, _ = json.Marshal(meta)
961968
}
969+
quotedTable := QuotePostgreSQLTableRef(p.config.Table)
962970
if upsertMode {
963-
query = fmt.Sprintf("INSERT INTO %s (value, _metadata) VALUES ($1::jsonb, $2::jsonb) ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value, _metadata = EXCLUDED._metadata", p.config.Table)
971+
query = fmt.Sprintf("INSERT INTO %s (value, _metadata) VALUES ($1::jsonb, $2::jsonb) ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value, _metadata = EXCLUDED._metadata", quotedTable)
964972
} else {
965-
query = fmt.Sprintf("INSERT INTO %s (value, _metadata) VALUES ($1::jsonb, $2::jsonb)", p.config.Table)
973+
query = fmt.Sprintf("INSERT INTO %s (value, _metadata) VALUES ($1::jsonb, $2::jsonb)", quotedTable)
966974
}
967975
values = []interface{}{string(valueJSON), string(metaJSON)}
968976
return query, values, nil
969977
}
970978

971979
hasJSONB, e := p.hasJSONBColumn(ctx)
972980
if e == nil && hasJSONB {
981+
quotedTable := QuotePostgreSQLTableRef(p.config.Table)
973982
if upsertMode {
974-
query = fmt.Sprintf("INSERT INTO %s (data) VALUES ($1::jsonb) ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data", p.config.Table)
983+
query = fmt.Sprintf("INSERT INTO %s (data) VALUES ($1::jsonb) ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data", quotedTable)
975984
} else {
976-
query = fmt.Sprintf("INSERT INTO %s (data) VALUES ($1::jsonb) ON CONFLICT DO NOTHING", p.config.Table)
985+
query = fmt.Sprintf("INSERT INTO %s (data) VALUES ($1::jsonb) ON CONFLICT DO NOTHING", quotedTable)
977986
}
978987
jsonData, _ := json.Marshal(data)
979988
values = []interface{}{string(jsonData)}
@@ -996,14 +1005,15 @@ func (p *PostgreSQLSinkConnector) buildInsertForMessage(ctx context.Context, dat
9961005
if len(columns) == 0 {
9971006
return "", nil, fmt.Errorf("empty message, no columns to insert")
9981007
}
999-
columnList := fmt.Sprintf(`"%s"`, columns[0])
1008+
columnList := quotePostgreSQLIdentifier(columns[0])
10001009
for i := 1; i < len(columns); i++ {
1001-
columnList += fmt.Sprintf(`, "%s"`, columns[i])
1010+
columnList += ", " + quotePostgreSQLIdentifier(columns[i])
10021011
}
10031012
placeholderList := "$1"
10041013
for i := 2; i <= len(placeholders); i++ {
10051014
placeholderList += fmt.Sprintf(", $%d", i)
10061015
}
1016+
quotedTable := QuotePostgreSQLTableRef(p.config.Table)
10071017
if upsertMode {
10081018
conflictKey := "id"
10091019
if p.config.ConflictKey != nil && *p.config.ConflictKey != "" {
@@ -1012,20 +1022,20 @@ func (p *PostgreSQLSinkConnector) buildInsertForMessage(ctx context.Context, dat
10121022
updateClauses := make([]string, 0)
10131023
for _, col := range columns {
10141024
if col != conflictKey {
1015-
updateClauses = append(updateClauses, fmt.Sprintf(`"%s" = EXCLUDED."%s"`, col, col))
1025+
updateClauses = append(updateClauses, fmt.Sprintf(`%s = EXCLUDED.%s`, quotePostgreSQLIdentifier(col), quotePostgreSQLIdentifier(col)))
10161026
}
10171027
}
10181028
if len(updateClauses) == 0 {
1019-
query = fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO NOTHING", p.config.Table, columnList, placeholderList, conflictKey)
1029+
query = fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO NOTHING", quotedTable, columnList, placeholderList, quotePostgreSQLIdentifier(conflictKey))
10201030
} else {
10211031
updateClause := updateClauses[0]
10221032
for i := 1; i < len(updateClauses); i++ {
10231033
updateClause += ", " + updateClauses[i]
10241034
}
1025-
query = fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s", p.config.Table, columnList, placeholderList, conflictKey, updateClause)
1035+
query = fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s", quotedTable, columnList, placeholderList, quotePostgreSQLIdentifier(conflictKey), updateClause)
10261036
}
10271037
} else {
1028-
query = fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT DO NOTHING", p.config.Table, columnList, placeholderList)
1038+
query = fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT DO NOTHING", quotedTable, columnList, placeholderList)
10291039
}
10301040
values = colValues
10311041
return query, values, nil

internal/connectors/postgresql_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ func TestPostgreSQLSourceConnector_buildReadQuery(t *testing.T) {
7575
lastReadChangeTime: ptrTime(time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)),
7676
wantContains: []string{`WHERE "modified_at" > '2024-01-15T10:00:00`, `ORDER BY "modified_at", id`},
7777
},
78+
{
79+
name: "table with hyphens is properly quoted",
80+
config: &v1.PostgreSQLSourceSpec{
81+
Table: "kafka-to-postgres-raw-events",
82+
},
83+
wantContains: []string{`"public"."kafka-to-postgres-raw-events"`, "ORDER BY"},
84+
},
7885
}
7986
for _, tt := range tests {
8087
t.Run(tt.name, func(t *testing.T) {

0 commit comments

Comments
 (0)