From acea721b66c787934e387844f608d08479163f4d Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Thu, 19 Feb 2026 16:33:48 -0500 Subject: [PATCH] feat: expose cluster ID via getClusterId() and describeCluster() librdkafka provides rd_kafka_clusterid() to retrieve the cluster ID from broker metadata, but the Node.js binding never exposed it. This change: - Adds NodeGetClusterId NAN_METHOD to Connection (src/connection.cc) which calls RdKafka::Handle::clusterid() - Registers getClusterId on Producer, KafkaConsumer, and AdminClient native prototypes - Adds Client.prototype.getClusterId() JS wrapper (lib/client.js) - Adds admin.describeCluster() to the KafkaJS-compatible admin API (lib/kafkajs/_admin.js), matching the KafkaJS interface - Adds test for describeCluster Closes #28 (partially - adds describeCluster admin operation) Co-Authored-By: Claude Opus 4.6 (1M context) --- lib/client.js | 17 +++++++ lib/kafkajs/_admin.js | 49 +++++++++++++++++++ src/admin.cc | 1 + src/connection.cc | 26 ++++++++++ src/connection.h | 1 + src/kafka-consumer.cc | 1 + src/producer.cc | 1 + .../admin/describe_cluster.spec.js | 47 ++++++++++++++++++ 8 files changed, 143 insertions(+) create mode 100644 test/promisified/admin/describe_cluster.spec.js diff --git a/lib/client.js b/lib/client.js index 5e9527bc..5d506310 100644 --- a/lib/client.js +++ b/lib/client.js @@ -580,6 +580,23 @@ Client.prototype.setSaslCredentials = function(username, password) { this._client.setSaslCredentials(username, password); }; +/** + * Get the cluster ID reported by the broker metadata. + * + * Returns null if the client is not connected or if the cluster ID + * could not be retrieved within the given timeout. + * + * @param {number} timeout - Timeout in milliseconds (default: 200) + * @returns {string|null} - The cluster ID, or null + */ +Client.prototype.getClusterId = function(timeout) { + if (!this.isConnected()) { + return null; + } + + return this._client.getClusterId(timeout) || null; +}; + /** * Wrap a potential RdKafka error. * diff --git a/lib/kafkajs/_admin.js b/lib/kafkajs/_admin.js index e51f7d30..a2579b69 100644 --- a/lib/kafkajs/_admin.js +++ b/lib/kafkajs/_admin.js @@ -513,6 +513,55 @@ class Admin { }); } + /** + * Describe the Kafka cluster. + * + * Returns the cluster ID, the controller broker, and the list + * of brokers in the cluster. + * + * @param {object?} options + * @param {number?} options.timeout - Timeout in ms (default: 5000). + * @returns {Promise<{clusterId: string, controller: number, + * brokers: Array<{nodeId: number, host: string, port: number}>}>} + */ + async describeCluster(options = {}) { + if (this.#state !== AdminState.CONNECTED) { + throw new error.KafkaJSError( + "Admin client is not connected.", + { code: error.ErrorCodes.ERR__STATE } + ); + } + + const timeout = options.timeout ?? 5000; + + return new Promise((resolve, reject) => { + this.#internalClient.getMetadata( + { allTopics: true, timeout }, + (err, metadata) => { + if (err) { + reject(createKafkaJsErrorFromLibRdKafkaError(err)); + } else { + // Metadata has been fetched, so clusterid() returns + // from cache instantly with no blocking. + const clusterId = + this.#internalClient.getClusterId(0) || null; + resolve({ + clusterId, + controller: + metadata.orig_broker_id != null + ? metadata.orig_broker_id : null, + brokers: (metadata.brokers || []).map(b => ({ + nodeId: b.id, + host: b.host, + port: b.port, + })), + }); + } + } + ); + }); + } + /** * Fetch the offsets for topic partition(s) for consumer group(s). * diff --git a/src/admin.cc b/src/admin.cc index 84603c27..7163a95a 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -136,6 +136,7 @@ void AdminClient::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect); Nan::SetPrototypeMethod(tpl, "setSaslCredentials", NodeSetSaslCredentials); Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata); + Nan::SetPrototypeMethod(tpl, "getClusterId", NodeGetClusterId); Nan::SetPrototypeMethod(tpl, "setOAuthBearerToken", NodeSetOAuthBearerToken); Nan::SetPrototypeMethod(tpl, "setOAuthBearerTokenFailure", NodeSetOAuthBearerTokenFailure); diff --git a/src/connection.cc b/src/connection.cc index 637feac3..864a00b6 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -700,4 +700,30 @@ NAN_METHOD(Connection::NodeName) { info.GetReturnValue().Set(Nan::New(name).ToLocalChecked()); } +NAN_METHOD(Connection::NodeGetClusterId) { + Connection* obj = ObjectWrap::Unwrap(info.This()); + + int timeout_ms = 200; + if (info[0]->IsNumber()) { + Nan::Maybe maybeTimeout = Nan::To(info[0]); + if (!maybeTimeout.IsNothing()) { + timeout_ms = static_cast(maybeTimeout.FromJust()); + } + } + + if (!obj->IsConnected()) { + info.GetReturnValue().Set(Nan::Null()); + return; + } + + std::string cluster_id = obj->m_client->clusterid(timeout_ms); + if (cluster_id.empty()) { + info.GetReturnValue().Set(Nan::Null()); + return; + } + + info.GetReturnValue().Set( + Nan::New(cluster_id).ToLocalChecked()); +} + } // namespace NodeKafka diff --git a/src/connection.h b/src/connection.h index e7371825..d75bb95e 100644 --- a/src/connection.h +++ b/src/connection.h @@ -110,6 +110,7 @@ class Connection : public Nan::ObjectWrap { static NAN_METHOD(NodeSetOAuthBearerToken); static NAN_METHOD(NodeSetOAuthBearerTokenFailure); static NAN_METHOD(NodeName); + static NAN_METHOD(NodeGetClusterId); }; } // namespace NodeKafka diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 4bc778d4..6aaf15a7 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -546,6 +546,7 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "connect", NodeConnect); Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect); Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata); + Nan::SetPrototypeMethod(tpl, "getClusterId", NodeGetClusterId); Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT Nan::SetPrototypeMethod(tpl, "offsetsForTimes", NodeOffsetsForTimes); Nan::SetPrototypeMethod(tpl, "getWatermarkOffsets", NodeGetWatermarkOffsets); diff --git a/src/producer.cc b/src/producer.cc index 92751f4a..5652336e 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -70,6 +70,7 @@ void Producer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "connect", NodeConnect); Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect); Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata); + Nan::SetPrototypeMethod(tpl, "getClusterId", NodeGetClusterId); Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT Nan::SetPrototypeMethod(tpl, "poll", NodePoll); Nan::SetPrototypeMethod(tpl, "setPollInBackground", NodeSetPollInBackground); diff --git a/test/promisified/admin/describe_cluster.spec.js b/test/promisified/admin/describe_cluster.spec.js new file mode 100644 index 00000000..90cd9123 --- /dev/null +++ b/test/promisified/admin/describe_cluster.spec.js @@ -0,0 +1,47 @@ +jest.setTimeout(30000); + +const { + createAdmin, +} = require('../testhelpers'); + +describe('Admin > describeCluster', () => { + let admin; + + beforeEach(async () => { + admin = createAdmin({}); + }); + + afterEach(async () => { + admin && (await admin.disconnect()); + }); + + it('should fail if not connected', async () => { + await expect(admin.describeCluster()).rejects.toHaveProperty( + 'code', + -172 // ERR__STATE + ); + }); + + it('should describe the cluster', async () => { + await admin.connect(); + + const result = await admin.describeCluster(); + + expect(result).toEqual( + expect.objectContaining({ + clusterId: expect.any(String), + controller: expect.any(Number), + brokers: expect.arrayContaining([ + expect.objectContaining({ + nodeId: expect.any(Number), + host: expect.any(String), + port: expect.any(Number), + }), + ]), + }) + ); + + expect(result.clusterId.length).toBeGreaterThan(0); + expect(result.brokers.length).toBeGreaterThan(0); + }); +});