diff --git a/src/main/java/io/pixelsdb/pixels/sink/freshness/FreshnessClient.java b/src/main/java/io/pixelsdb/pixels/sink/freshness/FreshnessClient.java index 30f06ce..7de92cd 100644 --- a/src/main/java/io/pixelsdb/pixels/sink/freshness/FreshnessClient.java +++ b/src/main/java/io/pixelsdb/pixels/sink/freshness/FreshnessClient.java @@ -141,10 +141,13 @@ protected Connection createNewConnection(long queryTimestamp) throws SQLExceptio Properties properties = new Properties(); properties.setProperty("user", trinoUser); - String catalogName = "pixels"; - String sessionPropValue = String.format("%s.query_snapshot_timestamp:%d", catalogName, queryTimestamp); - properties.setProperty("sessionProperties", sessionPropValue); + String catalog = parseCatalogFromUrl(trinoJdbcUrl); + if (catalog != null && catalog.equalsIgnoreCase("pixels")) + { + String sessionPropValue = String.format("%s.query_snapshot_timestamp:%d", catalog, queryTimestamp); + properties.setProperty("sessionProperties", sessionPropValue); + } return DriverManager.getConnection(trinoJdbcUrl, properties); } @@ -227,7 +230,7 @@ public void stop() private void submitQueryTask() { - if (monitoredTables.isEmpty()) + if (!config.isSinkMonitorFreshnessEmbedStatic() && monitoredTables.isEmpty()) { LOGGER.debug("No tables configured for freshness monitoring. Skipping submission cycle."); return; @@ -398,4 +401,34 @@ private List getStaticList() { return config.getSinkMonitorFreshnessEmbedTableList(); } -} \ No newline at end of file + + private String parseCatalogFromUrl(String url) + { + if (url == null || !url.startsWith("jdbc:trino://")) + { + return null; + } + String withoutProtocol = url.substring("jdbc:trino://".length()); + int slashIndex = withoutProtocol.indexOf('/'); + if (slashIndex == -1) + { + return null; + } + String remaining = withoutProtocol.substring(slashIndex + 1); + + int nextSlash = remaining.indexOf('/'); + int questionMark = remaining.indexOf('?'); + + int endIndex = remaining.length(); + if (nextSlash != -1) + { + endIndex = nextSlash; + } + if (questionMark != -1 && questionMark < endIndex) + { + endIndex = questionMark; + } + + return remaining.substring(0, endIndex); + } +} diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PixelsPollingServiceImpl.java b/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PixelsPollingServiceImpl.java index 0db65f3..5c1ea37 100644 --- a/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PixelsPollingServiceImpl.java +++ b/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PixelsPollingServiceImpl.java @@ -29,6 +29,7 @@ import io.pixelsdb.pixels.sink.util.FlushRateLimiter; import io.pixelsdb.pixels.sink.writer.flink.FlinkPollingWriter; import io.pixelsdb.pixels.sink.util.DataTransform; +import io.pixelsdb.pixels.sink.freshness.FreshnessClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +71,7 @@ public PixelsPollingServiceImpl(FlinkPollingWriter writer) { public void pollEvents(SinkProto.PollRequest request, StreamObserver responseObserver) { SchemaTableName schemaTableName = new SchemaTableName(request.getSchemaName(), request.getTableName()); LOGGER.debug("Received poll request for table '{}'", schemaTableName); + FreshnessClient.getInstance().addMonitoredTable(request.getTableName()); List records = new ArrayList<>(pollBatchSize); try {