#386: Implemented passing of CryptoKeyReader.#387
#386: Implemented passing of CryptoKeyReader.#387objecttrouve wants to merge 4 commits intostreamnative:masterfrom
CryptoKeyReader.#387Conversation
As requested [here](streamnative#386), it's now possible to pass a `CryptoKeyReader` (and encryption keys) to `FlinkPulsarSource` and `FlinkPulsarSink`. Used builder pattern for easy extensibility without breaking or excessively overloading public c'tors. Added integration test. (Maybe it can be moved to one of the other tests to avoid overhead.)
|
@jianyun8023 @syhily could you help take a look? |
|
Hi @objecttrouve, tks for your contribution. Using a builder pattern on internal Fetcher don't sounds like a Good design. The detailed review advice would be given in weekend. |
|
|
||
| private final PulsarSerializationSchema<T> serializationSchema; | ||
|
|
||
| public FlinkPulsarSink(final Builder<T> builder) { |
There was a problem hiding this comment.
I don't think this should be public for a Builder pattern.
There was a problem hiding this comment.
Absolutely. Must have missed it. Will fix.
|
|
||
| public FlinkPulsarSink(final Builder<T> builder) { | ||
| super( | ||
| new FlinkPulsarSinkBase.Config<T>() |
There was a problem hiding this comment.
This could be boilerplate. Why not simply use ctor? Since it's a builder pattern. The end user would never touch this ctor.
There was a problem hiding this comment.
The end user would never touch this ctor.
I wasn't sure about which classes an end user could touch. Therefore, I tried hard not to change the public c'tors plus not to add more than absolutely necessary. But if the class is just for internal use, right, it's unnecessary boilerplate. Will change it.
|
|
||
| super(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, semantic); | ||
| this.serializationSchema = serializationSchema; | ||
| this(new Builder<T>() |
There was a problem hiding this comment.
Refactor old ctor doen't look like a good choice.
There was a problem hiding this comment.
The goal was basically to delegate as much as possible to a single c'tor that contains the interesting logic, instead of repeating variants of the logic inside the c'tor all over the place.
But to be honest, I don't mind very much. So I'll just revert to the old c'tor.
| if (clientConf == null){ | ||
| throw new IllegalStateException("Client conf must be set."); | ||
| } | ||
| if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){ |
There was a problem hiding this comment.
cryptoKeyReader requires extra serialization check. Use Preconditions.checkState(InstantiationUtil.isSerializable(cryptoKeyReader))
| @Slf4j | ||
| abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, FlinkPulsarSinkBase.PulsarTransactionState<T>, Void> implements CheckpointedFunction { | ||
|
|
||
| public static class Config<T> { |
There was a problem hiding this comment.
Why we need builder for this base class?
There was a problem hiding this comment.
Much the same reasoning as implemented throughout. A builder or config object makes it easier to add parameters in the future, without breaking an API or resulting in more and more overloaded c'tors. I have to admit I'm not sure which classes exactly were intended strictly for private purposes, so I just applied the pattern all over the place.
(Knowing that builders and config objects imply a lot of boilerplate.)
If overloading c'tors or changing the API of the class is OK, I'm completely fine with it though. So I'll change this piece, too.
| PulsarSerializationSchema<T> serializationSchema, | ||
| MessageRouter messageRouter, | ||
| PulsarSinkSemantic semantic) { | ||
| public FlinkPulsarSinkBase(final Config<T> config) { |
There was a problem hiding this comment.
I think we may just keep this ctor.
There was a problem hiding this comment.
Much the same as above, I'll change it.
| if (clientConf == null){ | ||
| throw new IllegalStateException("Client conf mustn't be null. Either provide a client conf or a service URL plus properties."); | ||
| } | ||
| return new FlinkPulsarSource<>(this); |
There was a problem hiding this comment.
This cryptoKeyReader doesn't require encryptionKeys here. Right?
There was a problem hiding this comment.
We still need to check the serialization for encryptionKeys here.
There was a problem hiding this comment.
No, I think it doesn't require the encryptionKeys. But I'll double check.
OK, will check serialization.
There was a problem hiding this comment.
In the source, no encryption keys are required. Added serialization check.
| streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP), | ||
| useMetrics | ||
| ); | ||
| return new PulsarFetcher.Builder() |
There was a problem hiding this comment.
Since the PulsarFetcher is an internal API. Plz just keep the ctor.
|
@syhily, thanks for the feedback! I'll make the requested changes in my next free time slot. |
Turned out builder pattern wasn't necessary here. There's one path now for the CryptoKeyReader (plus the encryption keys) and possibly others where it's null (and the keys are empty).
|
@syhily, I made the changes as I understood your requests. Please let me know if there's anything else. Also, if I should squash. (Left multiple commits for better relatability to your remarks. For the time being.) |
|
Thanks for your contribution. The PR looks reasonable on my side. I will merge this on the weekend. |
Thanks @syhily! |
Implemented passing a
CryptoKeyReader(and encryption keys) toFlinkPulsarSourceandFlinkPulsarSink, as requested here.Used builder pattern for easy extensibility without breaking or excessively overloading public c'tors.
Added test. (Maybe it can be moved to one of the other tests to avoid overhead.)