Skip to content

Commit 68b692e

Browse files
committed
Update postgres monitoring
1 parent ea293ad commit 68b692e

2 files changed

Lines changed: 121 additions & 0 deletions

File tree

internal/connectors/postgresql.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
v1 "github.com/dataflow-operator/dataflow/api/v1"
2929
"github.com/dataflow-operator/dataflow/internal/constants"
3030
"github.com/dataflow-operator/dataflow/internal/logkeys"
31+
"github.com/dataflow-operator/dataflow/internal/metrics"
3132
"github.com/dataflow-operator/dataflow/internal/retry"
3233
"github.com/dataflow-operator/dataflow/internal/types"
3334
"github.com/go-logr/logr"
@@ -40,6 +41,8 @@ type PostgreSQLSourceConnector struct {
4041
config *v1.PostgreSQLSourceSpec
4142
conn *pgx.Conn
4243
logger logr.Logger
44+
namespace string
45+
name string
4346
lastReadChangeTime *time.Time // Track last change time for CDC (ChangeTrackingColumn or COALESCE(updated_at, created_at))
4447
}
4548

@@ -56,6 +59,12 @@ func (p *PostgreSQLSourceConnector) SetLogger(logger logr.Logger) {
5659
p.logger = logger
5760
}
5861

62+
// SetMetadata sets the metadata for metrics
63+
func (p *PostgreSQLSourceConnector) SetMetadata(namespace, name string) {
64+
p.namespace = namespace
65+
p.name = name
66+
}
67+
5968
// Connect establishes connection to PostgreSQL
6069
func (p *PostgreSQLSourceConnector) Connect(ctx context.Context) error {
6170
if !p.guardConnect() {
@@ -66,16 +75,26 @@ func (p *PostgreSQLSourceConnector) Connect(ctx context.Context) error {
6675
p.logger.Info("Connecting to PostgreSQL", "table", p.config.Table)
6776
conn, err := pgx.Connect(ctx, p.config.ConnectionString)
6877
if err != nil {
78+
if p.namespace != "" && p.name != "" {
79+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "source", "connect", "connection_error")
80+
}
6981
p.logger.Error(err, "Failed to connect to PostgreSQL", "table", p.config.Table)
7082
return fmt.Errorf("failed to connect to PostgreSQL: %w", err)
7183
}
7284

7385
p.conn = conn
7486
p.logger.Info("Successfully connected to PostgreSQL", "table", p.config.Table)
7587

88+
if p.namespace != "" && p.name != "" {
89+
metrics.SetConnectorConnectionStatus(p.namespace, p.name, "postgresql", "source", true)
90+
}
91+
7692
// Auto-create table if enabled (source)
7793
if p.config.AutoCreateTable != nil && *p.config.AutoCreateTable {
7894
if err := p.ensureSourceTable(ctx); err != nil {
95+
if p.namespace != "" && p.name != "" {
96+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "source", "connect", "ensure_table_error")
97+
}
7998
p.logger.Error(err, "Failed to ensure source table exists", "table", p.config.Table)
8099
return fmt.Errorf("failed to ensure source table exists: %w", err)
81100
}
@@ -177,6 +196,9 @@ func (p *PostgreSQLSourceConnector) readRows(ctx context.Context, msgChan chan *
177196
p.logger.V(1).Info("Executing PostgreSQL query", "query", query, "table", p.config.Table)
178197
rows, err := p.conn.Query(ctx, query)
179198
if err != nil {
199+
if p.namespace != "" && p.name != "" {
200+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "source", "read", "query_error")
201+
}
180202
p.logger.Error(err, "Failed to execute PostgreSQL query", "query", query, "table", p.config.Table)
181203
return
182204
}
@@ -202,6 +224,9 @@ func (p *PostgreSQLSourceConnector) readRows(ctx context.Context, msgChan chan *
202224
for rows.Next() {
203225
values, err := rows.Values()
204226
if err != nil {
227+
if p.namespace != "" && p.name != "" {
228+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "source", "read", "scan_error")
229+
}
205230
p.logger.Error(err, "Failed to read row values", "table", p.config.Table)
206231
rows.Close()
207232
return
@@ -232,6 +257,9 @@ func (p *PostgreSQLSourceConnector) readRows(ctx context.Context, msgChan chan *
232257

233258
jsonData, err := json.Marshal(rowMap)
234259
if err != nil {
260+
if p.namespace != "" && p.name != "" {
261+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "source", "read", "marshal_error")
262+
}
235263
p.logger.Error(err, "Failed to marshal row to JSON", "table", p.config.Table)
236264
continue
237265
}
@@ -244,6 +272,9 @@ func (p *PostgreSQLSourceConnector) readRows(ctx context.Context, msgChan chan *
244272
metadata["operation"] = operation
245273
jsonData, err = buildRawModeJSON(rowMap, metadata)
246274
if err != nil {
275+
if p.namespace != "" && p.name != "" {
276+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "source", "read", "marshal_error")
277+
}
247278
p.logger.Error(err, "Failed to build raw mode message", "table", p.config.Table)
248279
continue
249280
}
@@ -258,6 +289,9 @@ func (p *PostgreSQLSourceConnector) readRows(ctx context.Context, msgChan chan *
258289

259290
select {
260291
case msgChan <- msg:
292+
if p.namespace != "" && p.name != "" {
293+
metrics.RecordConnectorMessageRead(p.namespace, p.name, "postgresql", "source")
294+
}
261295
case <-ctx.Done():
262296
rows.Close()
263297
return
@@ -332,6 +366,9 @@ func (p *PostgreSQLSourceConnector) Close() error {
332366
}
333367
defer p.Unlock()
334368

369+
if p.namespace != "" && p.name != "" {
370+
metrics.SetConnectorConnectionStatus(p.namespace, p.name, "postgresql", "source", false)
371+
}
335372
p.logger.Info("Closing PostgreSQL source connection", "table", p.config.Table)
336373
if p.conn != nil {
337374
return p.conn.Close(context.Background())
@@ -345,6 +382,8 @@ type PostgreSQLSinkConnector struct {
345382
config *v1.PostgreSQLSinkSpec
346383
conn *pgx.Conn
347384
logger logr.Logger
385+
namespace string
386+
name string
348387
firstWriteOnce sync.Once
349388
// Cache to avoid N queries per message (tableExists + hasJSONB check)
350389
tableExistsCached *bool
@@ -364,6 +403,12 @@ func (p *PostgreSQLSinkConnector) SetLogger(logger logr.Logger) {
364403
p.logger = logger
365404
}
366405

406+
// SetMetadata sets the metadata for metrics
407+
func (p *PostgreSQLSinkConnector) SetMetadata(namespace, name string) {
408+
p.namespace = namespace
409+
p.name = name
410+
}
411+
367412
// Connect establishes connection to PostgreSQL
368413
func (p *PostgreSQLSinkConnector) Connect(ctx context.Context) error {
369414
if !p.guardConnect() {
@@ -374,16 +419,26 @@ func (p *PostgreSQLSinkConnector) Connect(ctx context.Context) error {
374419
p.logger.Info("Connecting to PostgreSQL", "table", p.config.Table)
375420
conn, err := pgx.Connect(ctx, p.config.ConnectionString)
376421
if err != nil {
422+
if p.namespace != "" && p.name != "" {
423+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "sink", "connect", "connection_error")
424+
}
377425
p.logger.Error(err, "Failed to connect to PostgreSQL", "table", p.config.Table)
378426
return fmt.Errorf("failed to connect to PostgreSQL: %w", err)
379427
}
380428

381429
p.conn = conn
382430
p.logger.Info("Successfully connected to PostgreSQL", "table", p.config.Table)
383431

432+
if p.namespace != "" && p.name != "" {
433+
metrics.SetConnectorConnectionStatus(p.namespace, p.name, "postgresql", "sink", true)
434+
}
435+
384436
// Auto-create table if enabled and RawMode (structure known at Connect time)
385437
if p.config.AutoCreateTable != nil && *p.config.AutoCreateTable && p.rawMode() {
386438
if err := p.ensureTable(ctx); err != nil {
439+
if p.namespace != "" && p.name != "" {
440+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "sink", "connect", "ensure_table_error")
441+
}
387442
p.logger.Error(err, "Failed to ensure table exists", "table", p.config.Table)
388443
return fmt.Errorf("failed to ensure table exists: %w", err)
389444
}
@@ -629,9 +684,16 @@ func (p *PostgreSQLSinkConnector) Write(ctx context.Context, messages <-chan *ty
629684
if err := retry.OnTimeout(batchCtx, retry.DefaultMaxAttempts, retry.DefaultInitialBackoff, func() error {
630685
return p.executeBatch(batchCtx, batch)
631686
}); err != nil {
687+
if p.namespace != "" && p.name != "" {
688+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "sink", "write", "batch_error")
689+
}
632690
return err
633691
}
634692
for _, m := range batchMessages {
693+
if p.namespace != "" && p.name != "" {
694+
route := getRouteFromMessage(m)
695+
metrics.RecordConnectorMessageWritten(p.namespace, p.name, "postgresql", "sink", route)
696+
}
635697
if m.Ack != nil {
636698
m.Ack()
637699
}
@@ -691,6 +753,9 @@ func (p *PostgreSQLSinkConnector) Write(ctx context.Context, messages <-chan *ty
691753
}
692754
var data map[string]interface{}
693755
if err := json.Unmarshal(msg.Data, &data); err != nil {
756+
if p.namespace != "" && p.name != "" {
757+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "sink", "write", "unmarshal_error")
758+
}
694759
p.logger.Error(err, "Failed to unmarshal message", logkeys.MessageID, types.MessageID(msg), "table", p.config.Table)
695760
continue
696761
}
@@ -702,6 +767,9 @@ func (p *PostgreSQLSinkConnector) Write(ctx context.Context, messages <-chan *ty
702767

703768
query, values, err := p.buildInsertForMessage(ctx, data, msg)
704769
if err != nil {
770+
if p.namespace != "" && p.name != "" {
771+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "sink", "write", "build_insert_error")
772+
}
705773
p.logger.Error(err, "Failed to build insert", logkeys.MessageID, types.MessageID(msg), "table", p.config.Table)
706774
continue
707775
}
@@ -754,6 +822,9 @@ func (p *PostgreSQLSinkConnector) Write(ctx context.Context, messages <-chan *ty
754822
}
755823
var data map[string]interface{}
756824
if err := json.Unmarshal(msg.Data, &data); err != nil {
825+
if p.namespace != "" && p.name != "" {
826+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "sink", "write", "unmarshal_error")
827+
}
757828
p.logger.Error(err, "Failed to unmarshal message", logkeys.MessageID, types.MessageID(msg), "table", p.config.Table)
758829
continue
759830
}
@@ -765,6 +836,9 @@ func (p *PostgreSQLSinkConnector) Write(ctx context.Context, messages <-chan *ty
765836

766837
query, values, err := p.buildInsertForMessage(ctx, data, msg)
767838
if err != nil {
839+
if p.namespace != "" && p.name != "" {
840+
metrics.RecordConnectorError(p.namespace, p.name, "postgresql", "sink", "write", "build_insert_error")
841+
}
768842
p.logger.Error(err, "Failed to build insert", logkeys.MessageID, types.MessageID(msg), "table", p.config.Table)
769843
continue
770844
}
@@ -963,6 +1037,9 @@ func (p *PostgreSQLSinkConnector) Close() error {
9631037
}
9641038
defer p.Unlock()
9651039

1040+
if p.namespace != "" && p.name != "" {
1041+
metrics.SetConnectorConnectionStatus(p.namespace, p.name, "postgresql", "sink", false)
1042+
}
9661043
p.logger.Info("Closing PostgreSQL sink connection", "table", p.config.Table)
9671044
if p.conn != nil {
9681045
return p.conn.Close(context.Background())

internal/connectors/postgresql_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ import (
2323
"time"
2424

2525
"github.com/jackc/pgx/v5"
26+
dto "github.com/prometheus/client_model/go"
2627
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/require"
2829

2930
v1 "github.com/dataflow-operator/dataflow/api/v1"
31+
"github.com/dataflow-operator/dataflow/internal/metrics"
3032
"github.com/dataflow-operator/dataflow/internal/types"
3133
)
3234

@@ -314,3 +316,45 @@ func TestPostgreSQLSinkConnector_rawMode(t *testing.T) {
314316
})
315317
}
316318
}
319+
320+
// TestPostgreSQLConnectors_ImplementSetMetadata verifies both PostgreSQL connectors
321+
// implement the SetMetadata interface used by the processor for metrics.
322+
func TestPostgreSQLConnectors_ImplementSetMetadata(t *testing.T) {
323+
t.Run("source", func(t *testing.T) {
324+
p := NewPostgreSQLSourceConnector(&v1.PostgreSQLSourceSpec{Table: "t"})
325+
_, ok := interface{}(p).(interface{ SetMetadata(string, string) })
326+
assert.True(t, ok, "PostgreSQLSourceConnector must implement SetMetadata")
327+
})
328+
t.Run("sink", func(t *testing.T) {
329+
p := NewPostgreSQLSinkConnector(&v1.PostgreSQLSinkSpec{Table: "t"})
330+
_, ok := interface{}(p).(interface{ SetMetadata(string, string) })
331+
assert.True(t, ok, "PostgreSQLSinkConnector must implement SetMetadata")
332+
})
333+
}
334+
335+
// TestPostgreSQLSourceConnector_SetConnectorConnectionStatus verifies that Close
336+
// updates the connection status metric to disconnected.
337+
func TestPostgreSQLSourceConnector_SetConnectorConnectionStatus(t *testing.T) {
338+
// Use a connection string that will fail - we only care about the error path
339+
// recording SetConnectorConnectionStatus. On success we'd set true; on failure
340+
// we never set it. So we need a successful connect to test SetConnectorConnectionStatus.
341+
// Skip if no real DB - use a connection that fails, then we only test RecordConnectorError.
342+
// For SetConnectorConnectionStatus we need successful Connect. That requires a real DB.
343+
// Instead, test that SetMetadata is used: create connector, set metadata, call Close.
344+
// Close will call SetConnectorConnectionStatus(false) even if Connect was never called
345+
// (guardClose will run, and we'll set status false before closing conn - but conn is nil).
346+
p := NewPostgreSQLSourceConnector(&v1.PostgreSQLSourceSpec{Table: "t"})
347+
p.SetMetadata("pg-status-ns", "pg-status-name")
348+
349+
// Close without Connect - should still call SetConnectorConnectionStatus(false)
350+
err := p.Close()
351+
require.NoError(t, err)
352+
353+
metric, err := metrics.ConnectorConnectionStatus.GetMetricWithLabelValues("pg-status-ns", "pg-status-name", "postgresql", "source")
354+
require.NoError(t, err)
355+
var dtoMetric dto.Metric
356+
require.NoError(t, metric.Write(&dtoMetric))
357+
require.NotNil(t, dtoMetric.Gauge)
358+
require.NotNil(t, dtoMetric.Gauge.Value)
359+
assert.Equal(t, 0.0, *dtoMetric.Gauge.Value, "connection status should be 0 after Close")
360+
}

0 commit comments

Comments
 (0)