diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index 56130667..e9813e3d 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -29,9 +29,12 @@ #include #include #include +#include +#include #include #include +#include #include namespace pulsar { @@ -68,6 +71,20 @@ class PULSAR_PUBLIC Client { */ Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration); + /** + * Create a Pulsar client object using the specified ServiceInfoProvider. + * + * The ServiceInfoProvider is responsible for providing the service information (such as service URL) + * dynamically. For example, if it detects a primary Pulsar service is down, it can switch to a secondary + * service and update the client with the new service information. + * + * The Client instance takes ownership of the given ServiceInfoProvider. The provider will be destroyed + * as part of the client's shutdown lifecycle, for example when `Client::close()` or + * `Client::closeAsync()` is called, ensuring that its lifetime is properly managed. + */ + static Client create(std::unique_ptr serviceInfoProvider, + const ClientConfiguration& clientConfiguration); + /** * Create a producer with default configuration * @@ -414,6 +431,13 @@ class PULSAR_PUBLIC Client { void getSchemaInfoAsync(const std::string& topic, int64_t version, std::function callback); + /** + * Get the current service information of the client. + * + * @return the current service information + */ + ServiceInfo getServiceInfo() const; + private: Client(const std::shared_ptr&); diff --git a/include/pulsar/ClientConfiguration.h b/include/pulsar/ClientConfiguration.h index 98ccff7f..b37b7c6a 100644 --- a/include/pulsar/ClientConfiguration.h +++ b/include/pulsar/ClientConfiguration.h @@ -70,15 +70,12 @@ class PULSAR_PUBLIC ClientConfiguration { /** * Set the authentication method to be used with the broker * + * You can get the configured authentication data in `ServiceInfo` returned by `Client::getServiceInfo`. + * * @param authentication the authentication data to use */ ClientConfiguration& setAuth(const AuthenticationPtr& authentication); - /** - * @return the authentication data - */ - Authentication& getAuth() const; - /** * Set timeout on client operations (subscribe, create producer, close, unsubscribe) * Default is 30 seconds. @@ -202,20 +199,6 @@ class PULSAR_PUBLIC ClientConfiguration { */ ClientConfiguration& setLogger(LoggerFactory* loggerFactory); - /** - * Configure whether to use the TLS encryption on the connections. - * - * The default value is false. - * - * @param useTls - */ - ClientConfiguration& setUseTls(bool useTls); - - /** - * @return whether the TLS encryption is used on the connections - */ - bool isUseTls() const; - /** * Set the path to the TLS private key file. * @@ -243,15 +226,13 @@ class PULSAR_PUBLIC ClientConfiguration { /** * Set the path to the trusted TLS certificate file. * + * You can get the configured trusted TLS certificate file path in `ServiceInfo` returned by + * `Client::getServiceInfo`. + * * @param tlsTrustCertsFilePath */ ClientConfiguration& setTlsTrustCertsFilePath(const std::string& tlsTrustCertsFilePath); - /** - * @return the path to the trusted TLS certificate file - */ - const std::string& getTlsTrustCertsFilePath() const; - /** * Configure whether the Pulsar client accepts untrusted TLS certificates from brokers. * diff --git a/include/pulsar/ServiceInfo.h b/include/pulsar/ServiceInfo.h new file mode 100644 index 00000000..1f63ce38 --- /dev/null +++ b/include/pulsar/ServiceInfo.h @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_SERVICE_INFO_H_ +#define PULSAR_SERVICE_INFO_H_ + +#include + +#include +#include + +namespace pulsar { + +/** + * ServiceInfo encapsulates the information of a Pulsar service, which is used by the client to connect to the + * service. It includes the service URL, authentication information, and TLS configuration. + */ +class PULSAR_PUBLIC ServiceInfo final { + public: + ServiceInfo(std::string serviceUrl, AuthenticationPtr authentication = AuthFactory::Disabled(), + std::optional tlsTrustCertsFilePath = std::nullopt); + + auto& serviceUrl() const noexcept { return serviceUrl_; } + auto useTls() const noexcept { return useTls_; } + auto& authentication() const noexcept { return authentication_; } + auto& tlsTrustCertsFilePath() const noexcept { return tlsTrustCertsFilePath_; } + + bool operator==(const ServiceInfo& other) const noexcept { + return serviceUrl_ == other.serviceUrl_ && useTls_ == other.useTls_ && + authentication_ == other.authentication_ && + tlsTrustCertsFilePath_ == other.tlsTrustCertsFilePath_; + } + + private: + std::string serviceUrl_; + bool useTls_; + AuthenticationPtr authentication_; + std::optional tlsTrustCertsFilePath_; +}; + +} // namespace pulsar +#endif diff --git a/include/pulsar/ServiceInfoProvider.h b/include/pulsar/ServiceInfoProvider.h new file mode 100644 index 00000000..1b518da5 --- /dev/null +++ b/include/pulsar/ServiceInfoProvider.h @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_SERVICE_INFO_PROVIDER_H_ +#define PULSAR_SERVICE_INFO_PROVIDER_H_ + +#include + +#include + +namespace pulsar { + +class PULSAR_PUBLIC ServiceInfoProvider { + public: + /** + * The destructor will be called when `Client::close()` is invoked, and the provider should stop any + * ongoing work and release the resources in the destructor. + */ + virtual ~ServiceInfoProvider() = default; + + /** + * Get the initial `ServiceInfo` connection for the client. + * This method is called **only once** internally in `Client::create()` to get the initial `ServiceInfo` + * for the client to connect to the Pulsar service, typically before {@link initialize} is invoked. + * Since it's only called once, it's legal to return a moved `ServiceInfo` object to avoid unnecessary + * copying. + */ + virtual ServiceInfo initialServiceInfo() = 0; + + /** + * Initialize the ServiceInfoProvider. + * + * After the client has obtained the initial `ServiceInfo` via {@link initialServiceInfo}, this method is + * called to allow the provider to start any background work (for example, service discovery or watching + * configuration changes) and to report subsequent updates to the service information. + * + * @param onServiceInfoUpdate the callback to deliver updated `ServiceInfo` values to the client after + * the initial connection has been established + * + * Implementations may choose not to invoke `onServiceInfoUpdate` if the `ServiceInfo` never changes. + */ + virtual void initialize(std::function onServiceInfoUpdate) = 0; +}; + +}; // namespace pulsar + +#endif diff --git a/include/pulsar/c/client_configuration.h b/include/pulsar/c/client_configuration.h index 9e154530..1be7c1f4 100644 --- a/include/pulsar/c/client_configuration.h +++ b/include/pulsar/c/client_configuration.h @@ -147,16 +147,9 @@ PULSAR_PUBLIC void pulsar_client_configuration_set_logger(pulsar_client_configur PULSAR_PUBLIC void pulsar_client_configuration_set_logger_t(pulsar_client_configuration_t *conf, pulsar_logger_t logger); -PULSAR_PUBLIC void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t *conf, int useTls); - -PULSAR_PUBLIC int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t *conf); - PULSAR_PUBLIC void pulsar_client_configuration_set_tls_trust_certs_file_path( pulsar_client_configuration_t *conf, const char *tlsTrustCertsFilePath); -PULSAR_PUBLIC const char *pulsar_client_configuration_get_tls_trust_certs_file_path( - pulsar_client_configuration_t *conf); - PULSAR_PUBLIC void pulsar_client_configuration_set_tls_allow_insecure_connection( pulsar_client_configuration_t *conf, int allowInsecure); diff --git a/lib/AtomicSharedPtr.h b/lib/AtomicSharedPtr.h new file mode 100644 index 00000000..30dea074 --- /dev/null +++ b/lib/AtomicSharedPtr.h @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +namespace pulsar { + +// C++17 does not have std::atomic>, so we have to manually implement it. +template +class AtomicSharedPtr { + public: + using Pointer = std::shared_ptr; + + AtomicSharedPtr() = default; + explicit AtomicSharedPtr(T value) : ptr_(std::make_shared(std::move(value))) {} + + auto load() const { return std::atomic_load(&ptr_); } + + void store(Pointer&& newPtr) { std::atomic_store(&ptr_, std::move(newPtr)); } + + private: + std::shared_ptr ptr_; +}; + +} // namespace pulsar diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index 948c7f1f..35dcb163 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -38,9 +39,9 @@ using GetSchemaPromisePtr = std::shared_ptr>; class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { public: - BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& pool, + BinaryProtoLookupService(const ServiceInfo& serviceInfo, ConnectionPool& pool, const ClientConfiguration& clientConfiguration) - : serviceNameResolver_(serviceUrl), + : serviceNameResolver_(serviceInfo.serviceUrl()), cnxPool_(pool), listenerName_(clientConfiguration.getListenerName()), maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {} diff --git a/lib/Client.cc b/lib/Client.cc index 39a5948a..48e92dda 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -17,6 +17,7 @@ * under the License. */ #include +#include #include #include @@ -33,13 +34,17 @@ DECLARE_LOG_OBJECT() namespace pulsar { -Client::Client(const std::shared_ptr& impl) : impl_(impl) {} +Client::Client(const std::shared_ptr& impl) : impl_(impl) { impl_->initialize(); } -Client::Client(const std::string& serviceUrl) - : impl_(std::make_shared(serviceUrl, ClientConfiguration())) {} +Client::Client(const std::string& serviceUrl) : Client(serviceUrl, ClientConfiguration()) {} Client::Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration) - : impl_(std::make_shared(serviceUrl, clientConfiguration)) {} + : Client(std::make_shared(serviceUrl, clientConfiguration)) {} + +Client Client::create(std::unique_ptr serviceInfoProvider, + const ClientConfiguration& clientConfiguration) { + return Client(std::make_shared(std::move(serviceInfoProvider), clientConfiguration)); +} Result Client::createProducer(const std::string& topic, Producer& producer) { return createProducer(topic, ProducerConfiguration(), producer); @@ -193,8 +198,10 @@ uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, std::function callback) { - impl_->getLookup() - ->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") + impl_->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") .addListener(std::move(callback)); } + +ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); } + } // namespace pulsar diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc index b99c5d25..c59dd43b 100644 --- a/lib/ClientConfiguration.cc +++ b/lib/ClientConfiguration.cc @@ -57,8 +57,6 @@ ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr& authe return *this; } -Authentication& ClientConfiguration::getAuth() const { return *impl_->authenticationPtr; } - const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->authenticationPtr; } ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int timeout) { @@ -94,13 +92,6 @@ ClientConfiguration& ClientConfiguration::setMessageListenerThreads(int threads) int ClientConfiguration::getMessageListenerThreads() const { return impl_->messageListenerThreads; } -ClientConfiguration& ClientConfiguration::setUseTls(bool useTls) { - impl_->useTls = useTls; - return *this; -} - -bool ClientConfiguration::isUseTls() const { return impl_->useTls; } - ClientConfiguration& ClientConfiguration::setValidateHostName(bool validateHostName) { impl_->validateHostName = validateHostName; return *this; @@ -131,10 +122,6 @@ ClientConfiguration& ClientConfiguration::setTlsTrustCertsFilePath(const std::st return *this; } -const std::string& ClientConfiguration::getTlsTrustCertsFilePath() const { - return impl_->tlsTrustCertsFilePath; -} - ClientConfiguration& ClientConfiguration::setTlsAllowInsecureConnection(bool allowInsecure) { impl_->tlsAllowInsecureConnection = allowInsecure; return *this; diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h index e7a83a19..45b2aa3b 100644 --- a/lib/ClientConfigurationImpl.h +++ b/lib/ClientConfigurationImpl.h @@ -20,8 +20,10 @@ #define LIB_CLIENTCONFIGURATIONIMPL_H_ #include +#include #include +#include namespace pulsar { @@ -53,6 +55,11 @@ struct ClientConfigurationImpl { ClientConfiguration::ProxyProtocol proxyProtocol; std::unique_ptr takeLogger() { return std::move(loggerFactory); } + + ServiceInfo toServiceInfo(const std::string& serviceUrl) const { + return {serviceUrl, authenticationPtr, + tlsTrustCertsFilePath.empty() ? std::nullopt : std::make_optional(tlsTrustCertsFilePath)}; + } }; } // namespace pulsar diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index c373c25c..cc7e1f67 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -19,7 +19,9 @@ #include "ClientConnection.h" #include +#include #include +#include #include #include @@ -37,6 +39,8 @@ #include "ProducerImpl.h" #include "PulsarApi.pb.h" #include "ResultUtils.h" +#include "ServiceNameResolver.h" +#include "ServiceURI.h" #include "Url.h" #include "auth/AuthOauth2.h" #include "auth/InitialAuthData.h" @@ -179,12 +183,11 @@ static bool file_exists(const std::string& path) { std::atomic ClientConnection::maxMessageSize_{Commands::DefaultMaxMessageSize}; ClientConnection::ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress, - const ExecutorServicePtr& executor, + const ServiceInfo& serviceInfo, const ExecutorServicePtr& executor, const ClientConfiguration& clientConfiguration, - const AuthenticationPtr& authentication, const std::string& clientVersion, - ConnectionPool& pool, size_t poolIndex) + const std::string& clientVersion, ConnectionPool& pool, size_t poolIndex) : operationsTimeout_(ClientImpl::getOperationTimeout(clientConfiguration)), - authentication_(authentication), + authentication_(serviceInfo.authentication()), serverProtocolVersion_(proto::ProtocolVersion_MIN), executor_(executor), resolver_(executor_->createTcpResolver()), @@ -210,15 +213,14 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: return; } - auto oauth2Auth = std::dynamic_pointer_cast(authentication_); - if (oauth2Auth) { + if (auto oauth2Auth = std::dynamic_pointer_cast(authentication_)) { // Configure the TLS trust certs file for Oauth2 auto authData = std::dynamic_pointer_cast( - std::make_shared(clientConfiguration.getTlsTrustCertsFilePath())); + std::make_shared(serviceInfo.tlsTrustCertsFilePath().value_or(""))); oauth2Auth->getAuthData(authData); } - if (clientConfiguration.isUseTls()) { + if (serviceInfo.useTls()) { ASIO::ssl::context ctx(ASIO::ssl::context::sslv23_client); ctx.set_options(ASIO::ssl::context::default_workarounds | ASIO::ssl::context::no_sslv2 | ASIO::ssl::context::no_sslv3 | ASIO::ssl::context::no_tlsv1 | @@ -239,8 +241,8 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: } else { ctx.set_verify_mode(ASIO::ssl::context::verify_peer); - const auto& trustCertFilePath = clientConfiguration.getTlsTrustCertsFilePath(); - if (!trustCertFilePath.empty()) { + if (serviceInfo.tlsTrustCertsFilePath()) { + const auto& trustCertFilePath = *serviceInfo.tlsTrustCertsFilePath(); if (file_exists(trustCertFilePath)) { ctx.load_verify_file(trustCertFilePath); } else { @@ -1247,7 +1249,7 @@ void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec, startConsumerStatsTimer(consumerStatsRequests); } -const std::future& ClientConnection::close(Result result) { +const std::future& ClientConnection::close(Result result, bool switchCluster) { Lock lock(mutex_); if (closeFuture_) { connectPromise_.setFailed(result); @@ -1332,6 +1334,9 @@ const std::future& ClientConnection::close(Result result) { for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) { auto consumer = it->second.lock(); if (consumer) { + if (switchCluster) { + consumer->onClusterSwitching(); + } consumer->handleDisconnection(result, self); } } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index b9880ee2..75e4bca8 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -20,6 +20,7 @@ #define _PULSAR_CLIENT_CONNECTION_HEADER_ #include +#include #include #include @@ -145,8 +146,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this& close(Result result = ResultConnectError); + const std::future& close(Result result = ResultConnectError, bool switchCluster = false); bool isClosed() const; diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index eec3b34a..b84c14c4 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -24,7 +24,9 @@ #include #include #include +#include #include +#include #include #include "BinaryProtoLookupService.h" @@ -32,6 +34,7 @@ #include "Commands.h" #include "ConsumerImpl.h" #include "ConsumerInterceptors.h" +#include "DefaultServiceInfoProvider.h" #include "ExecutorService.h" #include "HTTPLookupService.h" #include "LogUtils.h" @@ -74,20 +77,17 @@ std::string generateRandomName() { return randomName; } -typedef std::unique_lock Lock; - typedef std::vector StringList; -static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl, +static LookupServicePtr defaultLookupServiceFactory(const ServiceInfo& serviceInfo, const ClientConfiguration& clientConfiguration, - ConnectionPool& pool, const AuthenticationPtr& auth) { - if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) { + ConnectionPool& pool) { + if (ServiceNameResolver::useHttp(ServiceURI(serviceInfo.serviceUrl()))) { LOG_DEBUG("Using HTTP Lookup"); - return std::make_shared(serviceUrl, std::cref(clientConfiguration), - std::cref(auth)); + return std::make_shared(std::cref(serviceInfo), std::cref(clientConfiguration)); } else { LOG_DEBUG("Using Binary Lookup"); - return std::make_shared(serviceUrl, std::ref(pool), + return std::make_shared(std::cref(serviceInfo), std::ref(pool), std::cref(clientConfiguration)); } } @@ -97,17 +97,28 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, LookupServiceFactory&& lookupServiceFactory) - : mutex_(), + : ClientImpl(std::make_unique(std::cref(serviceUrl), + std::cref(*clientConfiguration.impl_)), + clientConfiguration, std::move(lookupServiceFactory)) {} + +ClientImpl::ClientImpl(std::unique_ptr serviceInfoProvider, + const ClientConfiguration& clientConfiguration) + : ClientImpl(std::move(serviceInfoProvider), clientConfiguration, &defaultLookupServiceFactory) {} + +ClientImpl::ClientImpl(std::unique_ptr serviceInfoProvider, + const ClientConfiguration& clientConfiguration, + LookupServiceFactory&& lookupServiceFactory) + : serviceInfoProvider_(std::move(serviceInfoProvider)), state_(Open), - clientConfiguration_(ClientConfiguration(clientConfiguration) - .setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))), + clientConfiguration_(clientConfiguration), + serviceInfo_(serviceInfoProvider_->initialServiceInfo()), memoryLimitController_(clientConfiguration.getMemoryLimit()), ioExecutorProvider_(std::make_shared(clientConfiguration_.getIOThreads())), listenerExecutorProvider_( std::make_shared(clientConfiguration_.getMessageListenerThreads())), partitionListenerExecutorProvider_( std::make_shared(clientConfiguration_.getMessageListenerThreads())), - pool_(clientConfiguration_, ioExecutorProvider_, clientConfiguration_.getAuthPtr(), + pool_(serviceInfo_, clientConfiguration_, ioExecutorProvider_, ClientImpl::getClientVersion(clientConfiguration)), producerIdGenerator_(0), consumerIdGenerator_(0), @@ -119,14 +130,24 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& if (loggerFactory) { LogUtils::setLoggerFactory(std::move(loggerFactory)); } - lookupServicePtr_ = createLookup(serviceUrl); + + lookupServicePtr_ = createLookup(*serviceInfo_.load()); } ClientImpl::~ClientImpl() { shutdown(); } -LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) { +void ClientImpl::initialize() { + auto weakSelf = weak_from_this(); + serviceInfoProvider_->initialize([weakSelf](ServiceInfo serviceInfo) { + if (auto self = weakSelf.lock()) { + self->updateServiceInfo(std::move(serviceInfo)); + } + }); +} + +LookupServicePtr ClientImpl::createLookup(ServiceInfo serviceInfo) { auto lookupServicePtr = RetryableLookupService::create( - lookupServiceFactory_(serviceUrl, clientConfiguration_, pool_, clientConfiguration_.getAuthPtr()), + lookupServiceFactory_(std::move(serviceInfo), clientConfiguration_, pool_), clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_); return lookupServicePtr; } @@ -144,19 +165,26 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() { } LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) { + std::shared_lock readLock(mutex_); if (redirectedClusterURI.empty()) { return lookupServicePtr_; } - Lock lock(mutex_); - auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); - if (it == redirectedClusterLookupServicePtrs_.end()) { - auto lookup = createLookup(redirectedClusterURI); - redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup); - return lookup; + if (auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); + it != redirectedClusterLookupServicePtrs_.end()) { + return it->second; } + readLock.unlock(); - return it->second; + std::unique_lock writeLock(mutex_); + // Double check in case another thread acquires the lock and inserts a pair first + if (auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); + it != redirectedClusterLookupServicePtrs_.end()) { + return it->second; + } + auto lookup = createRedirectedLookup(redirectedClusterURI); + redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup); + return lookup; } void ClientImpl::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, @@ -166,7 +194,7 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon } TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Producer()); @@ -180,7 +208,7 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon if (autoDownloadSchema) { auto self = shared_from_this(); - lookupServicePtr_->getSchema(topicName).addListener( + getSchema(topicName).addListener( [self, topicName, callback](Result res, const SchemaInfo& topicSchema) { if (res != ResultOk) { callback(res, Producer()); @@ -188,12 +216,12 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon } ProducerConfiguration conf; conf.setSchema(topicSchema); - self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + self->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); }); } else { - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); } @@ -253,7 +281,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st const ReaderConfiguration& conf, const ReaderCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Reader()); @@ -266,7 +294,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st } MessageId msgId(startMessageId); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, msgId, conf, callback)); } @@ -275,7 +303,7 @@ void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewC const TableViewCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, TableView()); @@ -341,7 +369,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const const SubscribeCallback& callback) { TopicNamePtr topicNamePtr = TopicName::get(regexPattern); - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -379,7 +407,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const return; } - lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) + getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), std::placeholders::_1, std::placeholders::_2, regexPattern, mode, subscriptionName, conf, callback)); @@ -401,9 +429,8 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace auto interceptors = std::make_shared(conf.getInterceptors()); - consumer = std::make_shared(shared_from_this(), regexPattern, mode, - *matchTopics, subscriptionName, conf, - lookupServicePtr_, interceptors); + consumer = std::make_shared( + shared_from_this(), regexPattern, mode, *matchTopics, subscriptionName, conf, interceptors); consumer->getConsumerCreatedFuture().addListener( std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -426,7 +453,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, auto it = std::unique(topics.begin(), topics.end()); auto newSize = std::distance(topics.begin(), it); topics.resize(newSize); - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -450,7 +477,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, auto interceptors = std::make_shared(conf.getInterceptors()); ConsumerImplBasePtr consumer = std::make_shared( - shared_from_this(), topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_, interceptors); + shared_from_this(), topics, subscriptionName, topicNamePtr, conf, interceptors); consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -462,7 +489,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub const ConsumerConfiguration& conf, const SubscribeCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, Consumer()); @@ -480,7 +507,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub } } - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, subscriptionName, conf, callback)); } @@ -503,9 +530,9 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti callback(ResultInvalidConfiguration, Consumer()); return; } - consumer = std::make_shared( - shared_from_this(), topicName, partitionMetadata->getPartitions(), subscriptionName, conf, - lookupServicePtr_, interceptors); + consumer = std::make_shared(shared_from_this(), topicName, + partitionMetadata->getPartitions(), + subscriptionName, conf, interceptors); } else { auto consumerImpl = std::make_shared(shared_from_this(), topicName->toString(), subscriptionName, conf, @@ -647,7 +674,7 @@ void ClientImpl::handleGetPartitions(Result result, const LookupDataResultPtr& p void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetPartitionsCallback& callback) { TopicNamePtr topicName; { - Lock lock(mutex_); + std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); callback(ResultAlreadyClosed, StringList()); @@ -658,13 +685,16 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP return; } } - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleGetPartitions, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, topicName, callback)); + getPartitionMetadataAsync(topicName).addListener(std::bind(&ClientImpl::handleGetPartitions, + shared_from_this(), std::placeholders::_1, + std::placeholders::_2, topicName, callback)); } void ClientImpl::closeAsync(const CloseCallback& callback) { + serviceInfoProvider_.reset(); + std::unique_lock lock(mutex_); if (state_ != Open) { + lock.unlock(); if (callback) { callback(ResultAlreadyClosed); } @@ -678,6 +708,8 @@ void ClientImpl::closeAsync(const CloseCallback& callback) { for (const auto& it : redirectedClusterLookupServicePtrs_) { it.second->close(); } + redirectedClusterLookupServicePtrs_.clear(); + lock.unlock(); auto producers = producers_.move(); auto consumers = consumers_.move(); @@ -726,7 +758,7 @@ void ClientImpl::handleClose(Result result, const SharedInt& numberOfOpenHandler --(*numberOfOpenHandlers); } if (*numberOfOpenHandlers == 0) { - Lock lock(mutex_); + std::unique_lock lock(mutex_); if (state_ == Closed) { LOG_DEBUG("Client is already shutting down, possible race condition in handleClose"); return; @@ -776,7 +808,9 @@ void ClientImpl::shutdown() { << " consumers have been shutdown."); } + std::shared_lock lock(mutex_); lookupServicePtr_->close(); + lock.unlock(); if (!pool_.close()) { // pool_ has already been closed. It means shutdown() has been called before. return; @@ -805,15 +839,9 @@ void ClientImpl::shutdown() { lookupCount_ = 0; } -uint64_t ClientImpl::newProducerId() { - Lock lock(mutex_); - return producerIdGenerator_++; -} +uint64_t ClientImpl::newProducerId() { return producerIdGenerator_++; } -uint64_t ClientImpl::newConsumerId() { - Lock lock(mutex_); - return consumerIdGenerator_++; -} +uint64_t ClientImpl::newConsumerId() { return consumerIdGenerator_++; } uint64_t ClientImpl::newRequestId() { return (*requestIdGenerator_)++; } @@ -854,4 +882,28 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati return clientConfiguration.impl_->operationTimeout; } +void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) { + std::unique_lock lock{mutex_}; + if (state_ != Open) { + LOG_ERROR("Client is not open, cannot update service info"); + return; + } + + serviceInfo_.store(std::make_shared(serviceInfo)); + pool_.closeAllConnectionsForNewCluster(); + if (lookupServicePtr_) { + lookupServicePtr_->close(); + } + lookupServicePtr_ = createLookup(serviceInfo); + + for (auto&& it : redirectedClusterLookupServicePtrs_) { + it.second->close(); + } + redirectedClusterLookupServicePtrs_.clear(); + useProxy_ = false; + lookupCount_ = 0; +} + +ServiceInfo ClientImpl::getServiceInfo() const { return *(serviceInfo_.load()); } + } /* namespace pulsar */ diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 0b4d5969..7772b15b 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -20,14 +20,20 @@ #define LIB_CLIENTIMPL_H_ #include +#include +#include #include #include #include +#include +#include +#include "AtomicSharedPtr.h" #include "ConnectionPool.h" #include "Future.h" #include "LookupDataResult.h" +#include "LookupService.h" #include "MemoryLimitController.h" #include "ProtoApiEnums.h" #include "SynchronizedHashMap.h" @@ -52,10 +58,8 @@ typedef std::weak_ptr ConsumerImplBaseWeakPtr; class ClientConnection; using ClientConnectionPtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; -using LookupServiceFactory = std::function; +using LookupServiceFactory = std::function; class ProducerImplBase; using ProducerImplBaseWeakPtr = std::weak_ptr; @@ -71,12 +75,17 @@ std::string generateRandomName(); class ClientImpl : public std::enable_shared_from_this { public: + ClientImpl(std::unique_ptr serviceInfoProvider, + const ClientConfiguration& clientConfiguration); + ClientImpl(std::unique_ptr serviceInfoProvider, + const ClientConfiguration& clientConfiguration, LookupServiceFactory&& lookupServiceFactory); ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration); // only for tests ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, LookupServiceFactory&& lookupServiceFactory); + void initialize(); virtual ~ClientImpl(); /** @@ -128,7 +137,6 @@ class ClientImpl : public std::enable_shared_from_this { ExecutorServiceProviderPtr getIOExecutorProvider(); ExecutorServiceProviderPtr getListenerExecutorProvider(); ExecutorServiceProviderPtr getPartitionListenerExecutorProvider(); - LookupServicePtr getLookup(const std::string& redirectedClusterURI = ""); void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); } @@ -139,6 +147,26 @@ class ClientImpl : public std::enable_shared_from_this { ConnectionPool& getConnectionPool() noexcept { return pool_; } uint64_t getLookupCount() { return lookupCount_; } + void updateServiceInfo(ServiceInfo&& serviceInfo); + ServiceInfo getServiceInfo() const; + + // Since the underlying `lookupServicePtr_` can be modified by `updateServiceInfo`, we should not expose + // it to other classes, otherwise the update might not be visible. + auto getPartitionMetadataAsync(const TopicNamePtr& topicName) { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getPartitionMetadataAsync(topicName); + } + + auto getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getTopicsOfNamespaceAsync(nsName, mode); + } + + auto getSchema(const TopicNamePtr& topicName, const std::string& version = "") { + std::shared_lock lock(mutex_); + return lookupServicePtr_->getSchema(topicName, version); + } + static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); friend class PulsarFriend; @@ -177,7 +205,17 @@ class ClientImpl : public std::enable_shared_from_this { const std::string& getPhysicalAddress(const std::string& redirectedClusterURI, const std::string& logicalAddress); - LookupServicePtr createLookup(const std::string& serviceUrl); + // This overload is only used for blue-green migration, where only the service URL is modified, the other + // parameters remain the same + LookupServicePtr createRedirectedLookup(const std::string& redirectedUrl) { + auto serviceInfo = serviceInfo_.load(); + return createLookup( + ServiceInfo{redirectedUrl, serviceInfo->authentication(), serviceInfo->tlsTrustCertsFilePath()}); + } + + LookupServicePtr createLookup(ServiceInfo serviceInfo); + + LookupServicePtr getLookup(const std::string& redirectedClusterURI); static std::string getClientVersion(const ClientConfiguration& clientConfiguration); @@ -188,10 +226,12 @@ class ClientImpl : public std::enable_shared_from_this { Closed }; - std::mutex mutex_; + std::unique_ptr serviceInfoProvider_; + mutable std::shared_mutex mutex_; State state_; ClientConfiguration clientConfiguration_; + AtomicSharedPtr serviceInfo_; MemoryLimitController memoryLimitController_; ExecutorServiceProviderPtr ioExecutorProvider_; @@ -202,8 +242,8 @@ class ClientImpl : public std::enable_shared_from_this { std::unordered_map redirectedClusterLookupServicePtrs_; ConnectionPool pool_; - uint64_t producerIdGenerator_; - uint64_t consumerIdGenerator_; + std::atomic_uint64_t producerIdGenerator_; + std::atomic_uint64_t consumerIdGenerator_; std::shared_ptr> requestIdGenerator_{std::make_shared>(0)}; SynchronizedHashMap producers_; diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc index c814cf85..6465ff77 100644 --- a/lib/ConnectionPool.cc +++ b/lib/ConnectionPool.cc @@ -38,12 +38,13 @@ DECLARE_LOG_OBJECT() namespace pulsar { -ConnectionPool::ConnectionPool(const ClientConfiguration& conf, +ConnectionPool::ConnectionPool(const AtomicSharedPtr& serviceInfo, + const ClientConfiguration& conf, const ExecutorServiceProviderPtr& executorProvider, - const AuthenticationPtr& authentication, const std::string& clientVersion) - : clientConfiguration_(conf), + const std::string& clientVersion) + : serviceInfo_(serviceInfo), + clientConfiguration_(conf), executorProvider_(executorProvider), - authentication_(authentication), clientVersion_(clientVersion), randomDistribution_(0, conf.getConnectionsPerBroker() - 1), randomEngine_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {} @@ -54,19 +55,8 @@ bool ConnectionPool::close() { return false; } - std::vector connectionsToClose; - // ClientConnection::close() will remove the connection from the pool, which is not allowed when iterating - // over a map, so we store the connections to close in a vector first and don't iterate the pool when - // closing the connections. - std::unique_lock lock(mutex_); - connectionsToClose.reserve(pool_.size()); - for (auto&& kv : pool_) { - connectionsToClose.emplace_back(kv.second); - } - pool_.clear(); - lock.unlock(); - - for (auto&& cnx : connectionsToClose) { + for (auto&& kv : releaseConnections()) { + auto& cnx = kv.second; if (cnx) { // Close with a fatal error to not let client retry auto& future = cnx->close(ResultAlreadyClosed); @@ -94,6 +84,12 @@ bool ConnectionPool::close() { return true; } +void ConnectionPool::closeAllConnectionsForNewCluster() { + for (auto&& kv : releaseConnections()) { + kv.second->close(ResultDisconnected, true); + } +} + static const std::string getKey(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix) { std::stringstream ss; @@ -134,9 +130,9 @@ Future ConnectionPool::getConnectionAsync(const // No valid or pending connection found in the pool, creating a new one ClientConnectionPtr cnx; try { - cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(keySuffix), - clientConfiguration_, authentication_, clientVersion_, *this, - keySuffix)); + cnx.reset(new ClientConnection(logicalAddress, physicalAddress, *serviceInfo_.load(), + executorProvider_->get(keySuffix), clientConfiguration_, + clientVersion_, *this, keySuffix)); } catch (Result result) { Promise promise; promise.setFailed(result); diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h index 0e3a6d0a..f828ac69 100644 --- a/lib/ConnectionPool.h +++ b/lib/ConnectionPool.h @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -31,6 +32,7 @@ #include #include "Future.h" +#include "lib/AtomicSharedPtr.h" namespace pulsar { class ClientConnection; @@ -41,8 +43,8 @@ using ExecutorServiceProviderPtr = std::shared_ptr; class PULSAR_PUBLIC ConnectionPool { public: - ConnectionPool(const ClientConfiguration& conf, const ExecutorServiceProviderPtr& executorProvider, - const AuthenticationPtr& authentication, const std::string& clientVersion); + ConnectionPool(const AtomicSharedPtr& serviceInfo, const ClientConfiguration& conf, + const ExecutorServiceProviderPtr& executorProvider, const std::string& clientVersion); /** * Close the connection pool. @@ -51,6 +53,12 @@ class PULSAR_PUBLIC ConnectionPool { */ bool close(); + /** + * Close all existing connections and notify the connection that a new cluster will be used. + * Unlike close(), the pool remains open for new connections. + */ + void closeAllConnectionsForNewCluster(); + void remove(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix, ClientConnection* value); @@ -90,9 +98,9 @@ class PULSAR_PUBLIC ConnectionPool { size_t generateRandomIndex() { return randomDistribution_(randomEngine_); } private: + const AtomicSharedPtr& serviceInfo_; ClientConfiguration clientConfiguration_; ExecutorServiceProviderPtr executorProvider_; - AuthenticationPtr authentication_; typedef std::map> PoolMap; PoolMap pool_; const std::string clientVersion_; @@ -102,6 +110,13 @@ class PULSAR_PUBLIC ConnectionPool { std::uniform_int_distribution<> randomDistribution_; std::mt19937 randomEngine_; + auto releaseConnections() { + decltype(pool_) pool; + std::lock_guard lock{mutex_}; + pool.swap(pool_); + return pool; + } + friend class PulsarFriend; }; } // namespace pulsar diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 757b6e84..026f146d 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -125,7 +125,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic negativeAcksTracker_(std::make_shared(client, *this, conf)), ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)), readCompacted_(conf.isReadCompacted()), - startMessageId_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), + startMessageIdFromConfig_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), + startMessageId_(startMessageIdFromConfig_), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()), @@ -1134,6 +1135,20 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { } } +void ConsumerImpl::onClusterSwitching() { + { + LockGuard lock{mutex_}; + incomingMessages_.clear(); + startMessageId_ = startMessageIdFromConfig_; + lastDequedMessageId_ = MessageId::earliest(); + lastMessageIdInBroker_ = MessageId::earliest(); + seekStatus_ = SeekStatus::NOT_STARTED; + lastSeekArg_.reset(); + } + setRedirectedClusterURI(""); + ackGroupingTrackerPtr_->flushAndClean(); +} + /** * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that * was diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 0da82a2d..6f287aa2 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -162,6 +162,8 @@ class ConsumerImpl : public ConsumerImplBase { void doImmediateAck(const MessageId& msgId, const ResultCallback& callback, CommandAck_AckType ackType); void doImmediateAck(const std::set& msgIds, const ResultCallback& callback); + void onClusterSwitching(); + protected: // overrided methods from HandlerBase Future connectionOpened(const ClientConnectionPtr& cnx) override; @@ -266,6 +268,11 @@ class ConsumerImpl : public ConsumerImplBase { MessageId lastDequedMessageId_{MessageId::earliest()}; MessageId lastMessageIdInBroker_{MessageId::earliest()}; + + // When the consumer switches to a new cluster, we should reset `startMessageId_` to the original value, + // otherwise, the message id of the old cluster might be passed in the Subscribe request on the new + // cluster. + const optional startMessageIdFromConfig_; optional startMessageId_; SeekStatus seekStatus_{SeekStatus::NOT_STARTED}; diff --git a/lib/DefaultServiceInfoProvider.h b/lib/DefaultServiceInfoProvider.h new file mode 100644 index 00000000..6479bf9a --- /dev/null +++ b/lib/DefaultServiceInfoProvider.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include + +#include "ClientConfigurationImpl.h" + +namespace pulsar { + +class DefaultServiceInfoProvider : public ServiceInfoProvider { + public: + DefaultServiceInfoProvider(const std::string& serviceUrl, const ClientConfigurationImpl& config) + : serviceInfo_(config.toServiceInfo(serviceUrl)) {} + + ServiceInfo initialServiceInfo() override { return std::move(serviceInfo_); } + + void initialize(std::function onServiceInfoUpdate) override {} + + private: + ServiceInfo serviceInfo_; +}; + +} // namespace pulsar diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc index 79d9e944..0be9713c 100644 --- a/lib/HTTPLookupService.cc +++ b/lib/HTTPLookupService.cc @@ -47,18 +47,17 @@ const static std::string ADMIN_PATH_V2 = "/admin/v2/"; const static std::string PARTITION_METHOD_NAME = "partitions"; const static int NUMBER_OF_LOOKUP_THREADS = 1; -HTTPLookupService::HTTPLookupService(const std::string &serviceUrl, - const ClientConfiguration &clientConfiguration, - const AuthenticationPtr &authData) +HTTPLookupService::HTTPLookupService(const ServiceInfo &serviceInfo, + const ClientConfiguration &clientConfiguration) : executorProvider_(std::make_shared(NUMBER_OF_LOOKUP_THREADS)), - serviceNameResolver_(serviceUrl), - authenticationPtr_(authData), + serviceNameResolver_(serviceInfo.serviceUrl()), + authenticationPtr_(serviceInfo.authentication()), lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()), maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()), tlsPrivateFilePath_(clientConfiguration.getTlsPrivateKeyFilePath()), tlsCertificateFilePath_(clientConfiguration.getTlsCertificateFilePath()), - tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()), - isUseTls_(clientConfiguration.isUseTls()), + tlsTrustCertsFilePath_(serviceInfo.tlsTrustCertsFilePath().value_or("")), + isUseTls_(serviceInfo.useTls()), tlsAllowInsecure_(clientConfiguration.isTlsAllowInsecureConnection()), tlsValidateHostname_(clientConfiguration.isValidateHostName()) {} diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h index d17edd53..61a06155 100644 --- a/lib/HTTPLookupService.h +++ b/lib/HTTPLookupService.h @@ -19,6 +19,8 @@ #ifndef PULSAR_CPP_HTTPLOOKUPSERVICE_H #define PULSAR_CPP_HTTPLOOKUPSERVICE_H +#include + #include #include "ClientImpl.h" @@ -67,7 +69,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t Result sendHTTPRequest(const std::string& completeUrl, std::string& responseData, long& responseCode); public: - HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&); + HTTPLookupService(const ServiceInfo& serviceInfo, const ClientConfiguration& config); LookupResultFuture getBroker(const TopicName& topicName) override; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 0799eb63..a5699781 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -44,20 +44,19 @@ using std::chrono::seconds; MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode subscriptionMode, const optional& startMessageId) : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf, - lookupServicePtr, interceptors, subscriptionMode, startMessageId) { + interceptors, subscriptionMode, startMessageId) { topicsPartitions_[topicName->toString()] = numPartitions; } MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( const ClientImplPtr& client, const std::vector& topics, const std::string& subscriptionName, const TopicNamePtr& topicName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, - Commands::SubscriptionMode subscriptionMode, const optional& startMessageId) + const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode subscriptionMode, + const optional& startMessageId) : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics", Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf, client->getListenerExecutorProvider()->get()), @@ -66,7 +65,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( conf_(conf), incomingMessages_(conf.getReceiverQueueSize()), messageListener_(conf.getMessageListener()), - lookupServicePtr_(lookupServicePtr), numberTopicPartitions_(std::make_shared>(0)), topics_(topics), subscriptionMode_(subscriptionMode), @@ -93,7 +91,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( if (partitionsUpdateInterval > 0) { partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); partitionsUpdateInterval_ = seconds(partitionsUpdateInterval); - lookupServicePtr_ = client->getLookup(); } state_ = Pending; @@ -185,7 +182,12 @@ Future MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s auto entry = topicsPartitions_.find(topic); if (entry == topicsPartitions_.end()) { lock.unlock(); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + auto client = client_.lock(); + if (!client) { + topicPromise->setFailed(ResultAlreadyClosed); + return topicPromise->getFuture(); + } + client->getPartitionMetadataAsync(topicName).addListener( [this, topicName, topicPromise](Result result, const LookupDataResultPtr& lookupDataResult) { if (result != ResultOk) { LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- " @@ -1003,7 +1005,11 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() { auto topicName = TopicName::get(item.first); auto currentNumPartitions = item.second; auto weakSelf = weak_from_this(); - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + auto client = client_.lock(); + if (!client) { + return; + } + client->getPartitionMetadataAsync(topicName).addListener( [this, weakSelf, topicName, currentNumPartitions](Result result, const LookupDataResultPtr& lookupDataResult) { auto self = weakSelf.lock(); diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index dc628652..38a44cdf 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -46,23 +46,19 @@ class MultiTopicsBrokerConsumerStatsImpl; using MultiTopicsBrokerConsumerStatsPtr = std::shared_ptr; class UnAckedMessageTrackerInterface; using UnAckedMessageTrackerPtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; class MultiTopicsConsumerImpl; class MultiTopicsConsumerImpl : public ConsumerImplBase { public: MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, const optional& startMessageId = optional{}); MultiTopicsConsumerImpl(const ClientImplPtr& client, const std::vector& topics, const std::string& subscriptionName, const TopicNamePtr& topicName, - const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_, - const ConsumerInterceptorsPtr& interceptors, + const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, const optional& startMessageId = optional{}); @@ -119,7 +115,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { MessageListener messageListener_; DeadlineTimerPtr partitionsUpdateTimer_; TimeDuration partitionsUpdateInterval_; - LookupServicePtr lookupServicePtr_; std::shared_ptr> numberTopicPartitions_; std::atomic failedResult{ResultOk}; Promise multiTopicsConsumerCreatedPromise_; diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 4a923666..1aa5c87b 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -23,7 +23,6 @@ #include "ClientImpl.h" #include "ExecutorService.h" #include "LogUtils.h" -#include "LookupService.h" #include "ProducerImpl.h" #include "RoundRobinMessageRouter.h" #include "SinglePartitionMessageRouter.h" @@ -59,7 +58,6 @@ PartitionedProducerImpl::PartitionedProducerImpl(const ClientImplPtr& client, co listenerExecutor_ = client->getListenerExecutorProvider()->get(); partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); partitionsUpdateInterval_ = std::chrono::seconds(partitionsUpdateInterval); - lookupServicePtr_ = client->getLookup(); } } @@ -433,7 +431,11 @@ void PartitionedProducerImpl::runPartitionUpdateTask() { void PartitionedProducerImpl::getPartitionMetadata() { using namespace std::placeholders; auto weakSelf = weak_from_this(); - lookupServicePtr_->getPartitionMetadataAsync(topicName_) + auto client = client_.lock(); + if (!client) { + return; + } + client->getPartitionMetadataAsync(topicName_) .addListener([weakSelf](Result result, const LookupDataResultPtr& lookupDataResult) { auto self = weakSelf.lock(); if (self) { diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h index 40f2d34d..94ba7179 100644 --- a/lib/PartitionedProducerImpl.h +++ b/lib/PartitionedProducerImpl.h @@ -38,8 +38,6 @@ using ClientImplPtr = std::shared_ptr; using ClientImplWeakPtr = std::weak_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; -class LookupService; -using LookupServicePtr = std::shared_ptr; class ProducerImpl; using ProducerImplPtr = std::shared_ptr; class TopicName; @@ -133,7 +131,6 @@ class PartitionedProducerImpl : public ProducerImplBase, ExecutorServicePtr listenerExecutor_; DeadlineTimerPtr partitionsUpdateTimer_; TimeDuration partitionsUpdateInterval_; - LookupServicePtr lookupServicePtr_; ProducerInterceptorsPtr interceptors_; diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc index fd48feed..4b5aab73 100644 --- a/lib/PatternMultiTopicsConsumerImpl.cc +++ b/lib/PatternMultiTopicsConsumerImpl.cc @@ -21,7 +21,6 @@ #include "ClientImpl.h" #include "ExecutorService.h" #include "LogUtils.h" -#include "LookupService.h" DECLARE_LOG_OBJECT() @@ -32,10 +31,8 @@ using std::chrono::seconds; PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl( const ClientImplPtr& client, const std::string& pattern, CommandGetTopicsOfNamespace_Mode getTopicsMode, const std::vector& topics, const std::string& subscriptionName, - const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_, - const ConsumerInterceptorsPtr& interceptors) - : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, - lookupServicePtr_, interceptors), + const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr& interceptors) + : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, interceptors), patternString_(pattern), pattern_(PULSAR_REGEX_NAMESPACE::regex(TopicName::removeDomain(pattern))), getTopicsMode_(getTopicsMode), @@ -84,7 +81,11 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR& er // already get namespace from pattern. assert(namespaceName_); - lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_) + auto client = client_.lock(); + if (!client) { + return; + } + client->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_) .addListener(std::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace, this, std::placeholders::_1, std::placeholders::_2)); } diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h index 63527965..796abcc2 100644 --- a/lib/PatternMultiTopicsConsumerImpl.h +++ b/lib/PatternMultiTopicsConsumerImpl.h @@ -52,7 +52,6 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { CommandGetTopicsOfNamespace_Mode getTopicsMode, const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr& lookupServicePtr_, const ConsumerInterceptorsPtr& interceptors); ~PatternMultiTopicsConsumerImpl() override; diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc index 7fa7e8b9..754137c5 100644 --- a/lib/ReaderImpl.cc +++ b/lib/ReaderImpl.cc @@ -90,7 +90,6 @@ void ReaderImpl::start(const MessageId& startMessageId, if (partitions_ > 0) { auto consumerImpl = std::make_shared( client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf, - client_.lock()->getLookup(), std::make_shared(std::vector()), Commands::SubscriptionModeNonDurable, startMessageId); consumer_ = consumerImpl; diff --git a/lib/ServiceInfo.cc b/lib/ServiceInfo.cc new file mode 100644 index 00000000..642b39ab --- /dev/null +++ b/lib/ServiceInfo.cc @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include + +#include "ServiceNameResolver.h" +#include "ServiceURI.h" + +namespace pulsar { + +ServiceInfo::ServiceInfo(std::string serviceUrl, AuthenticationPtr authentication, + std::optional tlsTrustCertsFilePath) + : serviceUrl_(std::move(serviceUrl)), + useTls_(ServiceNameResolver::useTls(ServiceURI(serviceUrl_))), + authentication_(std::move(authentication)), + tlsTrustCertsFilePath_(std::move(tlsTrustCertsFilePath)) {} + +} // namespace pulsar diff --git a/lib/c/c_ClientConfiguration.cc b/lib/c/c_ClientConfiguration.cc index 483c74ef..6b11a2aa 100644 --- a/lib/c/c_ClientConfiguration.cc +++ b/lib/c/c_ClientConfiguration.cc @@ -115,14 +115,6 @@ void pulsar_client_configuration_set_logger_t(pulsar_client_configuration_t *con conf->conf.setLogger(new PulsarCLoggerFactory(logger)); } -void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t *conf, int useTls) { - conf->conf.setUseTls(useTls); -} - -int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t *conf) { - return conf->conf.isUseTls(); -} - void pulsar_client_configuration_set_validate_hostname(pulsar_client_configuration_t *conf, int validateHostName) { conf->conf.setValidateHostName(validateHostName); @@ -155,10 +147,6 @@ void pulsar_client_configuration_set_tls_trust_certs_file_path(pulsar_client_con conf->conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath); } -const char *pulsar_client_configuration_get_tls_trust_certs_file_path(pulsar_client_configuration_t *conf) { - return conf->conf.getTlsTrustCertsFilePath().c_str(); -} - void pulsar_client_configuration_set_tls_allow_insecure_connection(pulsar_client_configuration_t *conf, int allowInsecure) { conf->conf.setTlsAllowInsecureConnection(allowInsecure); diff --git a/perf/PerfConsumer.cc b/perf/PerfConsumer.cc index 7a707a10..88c4f5da 100644 --- a/perf/PerfConsumer.cc +++ b/perf/PerfConsumer.cc @@ -57,7 +57,6 @@ static int64_t currentTimeMillis() { struct Arguments { std::string authParams; std::string authPlugin; - bool isUseTls; bool isTlsAllowInsecureConnection; std::string tlsTrustCertsFilePath; std::string topic; @@ -155,7 +154,6 @@ void handleSubscribe(Result result, Consumer consumer, Latch latch) { void startPerfConsumer(const Arguments& args) { ClientConfiguration conf; - conf.setUseTls(args.isUseTls); conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection); if (!args.tlsTrustCertsFilePath.empty()) { std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath); @@ -262,9 +260,6 @@ int main(int argc, char** argv) { ("auth-plugin,a", po::value(&args.authPlugin)->default_value(""), "Authentication plugin class library path") // - ("use-tls,b", po::value(&args.isUseTls)->default_value(false), - "Whether tls connection is used") // - ("allow-insecure,d", po::value(&args.isTlsAllowInsecureConnection)->default_value(true), "Whether insecure tls connection is allowed") // diff --git a/perf/PerfProducer.cc b/perf/PerfProducer.cc index 17e70cdf..784c651a 100644 --- a/perf/PerfProducer.cc +++ b/perf/PerfProducer.cc @@ -47,7 +47,6 @@ typedef std::shared_ptr RateLimiterPtr; struct Arguments { std::string authParams; std::string authPlugin; - bool isUseTls; bool isTlsAllowInsecureConnection; std::string tlsTrustCertsFilePath; std::string topic; @@ -223,9 +222,6 @@ int main(int argc, char** argv) { ("auth-plugin,a", po::value(&args.authPlugin)->default_value(""), "Authentication plugin class library path") // - ("use-tls,b", po::value(&args.isUseTls)->default_value(false), - "Whether tls connection is used") // - ("allow-insecure,d", po::value(&args.isTlsAllowInsecureConnection)->default_value(true), "Whether insecure tls connection is allowed") // @@ -366,7 +362,6 @@ int main(int argc, char** argv) { pulsar::ClientConfiguration conf; conf.setConnectionsPerBroker(args.connectionsPerBroker); conf.setMemoryLimit(args.memoryLimitMb * 1024 * 1024); - conf.setUseTls(args.isUseTls); conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection); if (!args.tlsTrustCertsFilePath.empty()) { std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath); diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc index 84e8572d..4bfd8085 100644 --- a/tests/AuthTokenTest.cc +++ b/tests/AuthTokenTest.cc @@ -42,7 +42,7 @@ static const std::string serviceUrlHttp = "http://localhost:8080"; static const std::string tokenPath = TOKEN_PATH; -static std::string getToken() { +std::string getToken() { std::ifstream file(tokenPath); std::string str((std::istreambuf_iterator(file)), std::istreambuf_iterator()); return str; diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index d3c6e612..df2c1617 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -700,7 +700,8 @@ TEST(BasicEndToEndTest, testConfigurationFile) { ClientConfiguration config2 = config1; AuthenticationDataPtr authData; - ASSERT_EQ(ResultOk, config1.getAuth().getAuthData(authData)); + Client client(lookupUrl, config1); + ASSERT_EQ(ResultOk, client.getServiceInfo().authentication()->getAuthData(authData)); ASSERT_EQ(100, config2.getOperationTimeoutSeconds()); ASSERT_EQ(10, config2.getIOThreads()); ASSERT_EQ(1, config2.getMessageListenerThreads()); diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 92aa8204..53cb76e1 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -30,6 +31,7 @@ #include "HttpHelper.h" #include "PulsarFriend.h" #include "PulsarWrapper.h" +#include "lib/AtomicSharedPtr.h" #include "lib/BinaryProtoLookupService.h" #include "lib/ClientConnection.h" #include "lib/ConnectionPool.h" @@ -79,11 +81,12 @@ using namespace pulsar; TEST(LookupServiceTest, basicLookup) { ExecutorServiceProviderPtr service = std::make_shared(1); - AuthenticationPtr authData = AuthFactory::Disabled(); std::string url = "pulsar://localhost:6650"; ClientConfiguration conf; ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared(1)); - ConnectionPool pool_(conf, ioExecutorProvider_, authData, ""); + AtomicSharedPtr serviceInfo; + serviceInfo.store(std::make_shared(url)); + ConnectionPool pool_(serviceInfo, conf, ioExecutorProvider_, ""); BinaryProtoLookupService lookupService(url, pool_, conf); TopicNamePtr topicName = TopicName::get("topic"); @@ -146,24 +149,30 @@ static void testMultiAddresses(LookupService& lookupService) { } TEST(LookupServiceTest, testMultiAddresses) { - ConnectionPool pool({}, std::make_shared(1), AuthFactory::Disabled(), ""); + AtomicSharedPtr serviceInfo; + serviceInfo.store(std::make_shared(binaryLookupUrl)); + ConnectionPool pool(serviceInfo, {}, std::make_shared(1), ""); ClientConfiguration conf; - BinaryProtoLookupService binaryLookupService("pulsar://localhost,localhost:9999", pool, conf); + BinaryProtoLookupService binaryLookupService(ServiceInfo{"pulsar://localhost,localhost:9999"}, pool, + conf); testMultiAddresses(binaryLookupService); // HTTPLookupService calls shared_from_this() internally, we must create a shared pointer to test auto httpLookupServicePtr = std::make_shared( - "http://localhost,localhost:9999", ClientConfiguration{}, AuthFactory::Disabled()); + ServiceInfo{"http://localhost,localhost:9999"}, ClientConfiguration{}); testMultiAddresses(*httpLookupServicePtr); } TEST(LookupServiceTest, testRetry) { auto executorProvider = std::make_shared(1); - ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), ""); + AtomicSharedPtr serviceInfo; + serviceInfo.store(std::make_shared(binaryLookupUrl)); + ConnectionPool pool(serviceInfo, {}, executorProvider, ""); ClientConfiguration conf; - auto lookupService = RetryableLookupService::create( - std::make_shared("pulsar://localhost:9999,localhost", pool, conf), - std::chrono::seconds(30), executorProvider); + auto lookupService = + RetryableLookupService::create(std::make_shared( + ServiceInfo{"pulsar://localhost:9999,localhost"}, pool, conf), + std::chrono::seconds(30), executorProvider); ServiceNameResolver& serviceNameResolver = lookupService->getServiceNameResolver(); PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); @@ -192,13 +201,17 @@ TEST(LookupServiceTest, testRetry) { TEST(LookupServiceTest, testTimeout) { auto executorProvider = std::make_shared(1); - ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), ""); + AtomicSharedPtr serviceInfo; + serviceInfo.store(std::make_shared(binaryLookupUrl)); + ConnectionPool pool(serviceInfo, {}, executorProvider, ""); ClientConfiguration conf; constexpr int timeoutInSeconds = 2; auto lookupService = RetryableLookupService::create( - std::make_shared("pulsar://localhost:9990,localhost:9902,localhost:9904", - pool, conf), + std::make_shared( + ServiceInfo{"pulsar://localhost:9990,localhost:9902,localhost:9904", AuthFactory::Disabled(), + std::nullopt}, + pool, conf), std::chrono::seconds(timeoutInSeconds), executorProvider); auto topicNamePtr = TopicName::get("lookup-service-test-retry"); @@ -259,7 +272,7 @@ TEST_P(LookupServiceTest, basicGetNamespaceTopics) { ASSERT_EQ(ResultOk, result); // 2. verify getTopicsOfNamespace by regex mode. - auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_)->getLookup(); + auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_); auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode, const std::set& expectedTopics) { Future getTopicsFuture = @@ -292,11 +305,8 @@ TEST_P(LookupServiceTest, testGetSchema) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); ASSERT_EQ(jsonSchema, schemaInfo.getSchema()); ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType()); @@ -310,11 +320,8 @@ TEST_P(LookupServiceTest, testGetSchemaNotFound) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo)); } @@ -335,11 +342,8 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) { Producer producer; ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); - auto lookup = clientImplPtr->getLookup(); - SchemaInfo schemaInfo; - auto future = lookup->getSchema(TopicName::get(topic)); + auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema()); ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType()); @@ -464,9 +468,9 @@ INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLook class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupService { public: - BinaryProtoLookupServiceRedirectTestHelper(const std::string& serviceUrl, ConnectionPool& pool, + BinaryProtoLookupServiceRedirectTestHelper(const ServiceInfo& serviceInfo, ConnectionPool& pool, const ClientConfiguration& clientConfiguration) - : BinaryProtoLookupService(serviceUrl, pool, clientConfiguration) {} + : BinaryProtoLookupService(serviceInfo, pool, clientConfiguration) {} LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic, size_t redirectCount) { @@ -476,13 +480,14 @@ class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupServi TEST(LookupServiceTest, testRedirectionLimit) { const auto redirect_limit = 5; - AuthenticationPtr authData = AuthFactory::Disabled(); ClientConfiguration conf; conf.setMaxLookupRedirects(redirect_limit); ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared(1)); - ConnectionPool pool_(conf, ioExecutorProvider_, authData, ""); - string url = "pulsar://localhost:6650"; - BinaryProtoLookupServiceRedirectTestHelper lookupService(url, pool_, conf); + AtomicSharedPtr serviceInfo; + serviceInfo.store(std::make_shared(binaryLookupUrl)); + ConnectionPool pool_(serviceInfo, conf, ioExecutorProvider_, ""); + const ServiceInfo lookupServiceInfo{"pulsar://localhost:6650"}; + BinaryProtoLookupServiceRedirectTestHelper lookupService(lookupServiceInfo, pool_, conf); const auto topicNamePtr = TopicName::get("topic"); for (auto idx = 0; idx < redirect_limit + 5; ++idx) { @@ -493,8 +498,8 @@ TEST(LookupServiceTest, testRedirectionLimit) { if (idx <= redirect_limit) { ASSERT_EQ(ResultOk, result); - ASSERT_EQ(url, lookupResult.logicalAddress); - ASSERT_EQ(url, lookupResult.physicalAddress); + ASSERT_EQ(lookupServiceInfo.serviceUrl(), lookupResult.logicalAddress); + ASSERT_EQ(lookupServiceInfo.serviceUrl(), lookupResult.physicalAddress); } else { ASSERT_EQ(ResultTooManyLookupRequestException, result); } @@ -522,12 +527,12 @@ class MockLookupService : public BinaryProtoLookupService { }; TEST(LookupServiceTest, testAfterClientShutdown) { - auto client = std::make_shared("pulsar://localhost:6650", ClientConfiguration{}, - [](const std::string& serviceUrl, const ClientConfiguration&, - ConnectionPool& pool, const AuthenticationPtr&) { - return std::make_shared( - serviceUrl, pool, ClientConfiguration{}); - }); + auto client = std::make_shared( + "pulsar://localhost:6650", ClientConfiguration{}, + [](const ServiceInfo& serviceInfo, const ClientConfiguration&, ConnectionPool& pool) { + return std::make_shared(serviceInfo, pool, ClientConfiguration{}); + }); + std::promise promise; client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub", ConsumerConfiguration{}, [&promise](Result result, const Consumer&) { promise.set_value(result); }); @@ -545,10 +550,12 @@ TEST(LookupServiceTest, testAfterClientShutdown) { TEST(LookupServiceTest, testRetryAfterDestroyed) { auto executorProvider = std::make_shared(1); - ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), ""); + AtomicSharedPtr serviceInfo; + serviceInfo.store(std::make_shared(binaryLookupUrl)); + ConnectionPool pool(serviceInfo, {}, executorProvider, ""); - auto internalLookupService = - std::make_shared("pulsar://localhost:6650", pool, ClientConfiguration{}); + auto internalLookupService = std::make_shared(ServiceInfo{"pulsar://localhost:6650"}, + pool, ClientConfiguration{}); auto lookupService = RetryableLookupService::create(internalLookupService, std::chrono::seconds(30), executorProvider); diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc new file mode 100644 index 00000000..82f5f6f7 --- /dev/null +++ b/tests/ServiceInfoProviderTest.cc @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include +#include +#include +#include + +#include "PulsarFriend.h" +#include "WaitUtils.h" +#include "lib/LogUtils.h" + +DECLARE_LOG_OBJECT() + +using namespace pulsar; +using namespace std::chrono_literals; + +class ServiceInfoHolder { + public: + ServiceInfoHolder(ServiceInfo info) : serviceInfo_(std::move(info)) {} + + std::optional getUpdatedValue() { + std::lock_guard lock(mutex_); + if (!owned_) { + return std::nullopt; + } + owned_ = false; + return std::move(serviceInfo_); + } + + void updateValue(ServiceInfo info) { + std::lock_guard lock(mutex_); + serviceInfo_ = std::move(info); + owned_ = true; + } + + private: + ServiceInfo serviceInfo_; + bool owned_{true}; + + mutable std::mutex mutex_; +}; + +class TestServiceInfoProvider : public ServiceInfoProvider { + public: + TestServiceInfoProvider(ServiceInfoHolder &serviceInfo) : serviceInfo_(serviceInfo) {} + + ServiceInfo initialServiceInfo() override { return serviceInfo_.getUpdatedValue().value(); } + + void initialize(std::function onServiceInfoUpdate) override { + thread_ = std::thread([this, onServiceInfoUpdate] { + while (running_) { + auto updatedValue = serviceInfo_.getUpdatedValue(); + if (updatedValue) { + onServiceInfoUpdate(std::move(*updatedValue)); + } + // Use a tight wait loop for tests + std::this_thread::sleep_for(10ms); + } + }); + } + + ~TestServiceInfoProvider() override { + running_ = false; + if (thread_.joinable()) { + thread_.join(); + } + } + + private: + std::thread thread_; + ServiceInfoHolder &serviceInfo_; + std::atomic_bool running_{true}; + mutable std::mutex mutex_; +}; + +TEST(ServiceInfoProviderTest, testSwitchCluster) { + extern std::string getToken(); // from tests/AuthTokenTest.cc + // Access "private/auth" namespace in cluster 1 + ServiceInfo info1{"pulsar://localhost:6650", AuthToken::createWithToken(getToken())}; + // Access "private/auth" namespace in cluster 2 + ServiceInfo info2{"pulsar+ssl://localhost:6653", + AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), + TEST_CONF_DIR "/hn-verification/cacert.pem"}; + // Access "public/default" namespace in cluster 1, which doesn't require authentication + ServiceInfo info3{"pulsar://localhost:6650"}; + + ServiceInfoHolder serviceInfo{info1}; + auto client = Client::create(std::make_unique(serviceInfo), {}); + + const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + std::to_string(time(nullptr)); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer)); + + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topicRequiredAuth, MessageId::earliest(), {}, reader)); + + auto sendAndReceive = [&](const std::string &value) { + MessageId msgId; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(value).build(), msgId)); + LOG_INFO("Sent " << value << " to " << msgId); + + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); + LOG_INFO("Read " << msg.getDataAsString() << " from " << msgId); + ASSERT_EQ(value, msg.getDataAsString()); + }; + + sendAndReceive("msg-0"); + + // Switch to cluster 2 (started by ./build-support/start-mim-test-service-inside-container.sh) + ASSERT_FALSE(PulsarFriend::getConnections(client).empty()); + serviceInfo.updateValue(info2); + ASSERT_TRUE(waitUntil(1s, [&] { + return PulsarFriend::getConnections(client).empty() && client.getServiceInfo() == info2; + })); + + // Now the same will access the same topic in cluster 2 + sendAndReceive("msg-1"); + + // Switch back to cluster 1 without any authentication, the previous authentication info configured for + // cluster 2 will be cleared. + ASSERT_FALSE(PulsarFriend::getConnections(client).empty()); + serviceInfo.updateValue(info3); + ASSERT_TRUE(waitUntil(1s, [&] { + return PulsarFriend::getConnections(client).empty() && client.getServiceInfo() == info3; + })); + + const auto topicNoAuth = "testUpdateConnectionInfo-" + std::to_string(time(nullptr)); + producer.close(); + ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-2").build())); + + client.close(); + + // Verify messages sent to cluster 1 and cluster 2 can be consumed successfully with correct + // authentication info. + auto verify = [](Client &client, const std::string &topic, const std::string &value) { + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader)); + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); + ASSERT_EQ(value, msg.getDataAsString()); + }; + Client client1{info1.serviceUrl(), ClientConfiguration().setAuth(info1.authentication())}; + verify(client1, topicRequiredAuth, "msg-0"); + client1.close(); + + Client client2{info2.serviceUrl(), ClientConfiguration() + .setAuth(info2.authentication()) + .setTlsTrustCertsFilePath(*info2.tlsTrustCertsFilePath())}; + verify(client2, topicRequiredAuth, "msg-1"); + client2.close(); + + Client client3{info3.serviceUrl()}; + verify(client3, topicNoAuth, "msg-2"); + client3.close(); +}