Skip to content

Commit

Permalink
fix: stop logging about command topic creation on startup if exists (…
Browse files Browse the repository at this point in the history
…MINOR) (#4709)

* fix: stop logging about command topic creation on startup if exists

fixes: #4708

* Changed the code to only log about `Creating topic _confluent-ksql-4_command_topic with replication factor of 1 which is less than 2` if its actually creating to the command topic, otherwise log different message, (UX improvement)
* Fixed the code to not log `Corrected retention.ms on ksql internal topic. topic:_confluent-ksql-4_command_topic, retention.ms:9223372036854775807` when retention already correctly set, (bug fix).
  The bug on the retention logging was that the code that checks to see if a change needs applying was comparing `Long.MAX_VALUE` with a String holding the same value.  Changed code to compare like for like.
  • Loading branch information
big-andy-coates authored Mar 9, 2020
1 parent 31a9d9d commit f4cec0a
Show file tree
Hide file tree
Showing 5 changed files with 702 additions and 641 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,19 @@ public Map<String, String> getTopicConfig(final String topicName) {
public boolean addTopicConfig(final String topicName, final Map<String, ?> overrides) {
final ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);

final Map<String, String> stringConfigs = toStringConfigs(overrides);

try {
final Map<String, String> existingConfig = topicConfig(topicName, false);

final boolean changed = overrides.entrySet().stream()
final boolean changed = stringConfigs.entrySet().stream()
.anyMatch(e -> !Objects.equals(existingConfig.get(e.getKey()), e.getValue()));
if (!changed) {
return false;
}

final Set<AlterConfigOp> entries = overrides.entrySet().stream()
.map(e -> new ConfigEntry(e.getKey(), e.getValue().toString()))
final Set<AlterConfigOp> entries = stringConfigs.entrySet().stream()
.map(e -> new ConfigEntry(e.getKey(), e.getValue()))
.map(ce -> new AlterConfigOp(ce, AlterConfigOp.OpType.SET))
.collect(Collectors.toSet());

Expand All @@ -228,7 +230,7 @@ public boolean addTopicConfig(final String topicName, final Map<String, ?> overr

return true;
} catch (final UnsupportedVersionException e) {
return addTopicConfigLegacy(topicName, overrides);
return addTopicConfigLegacy(topicName, stringConfigs);
} catch (final Exception e) {
throw new KafkaResponseGetFailedException(
"Failed to set config for Kafka Topic " + topicName, e);
Expand Down Expand Up @@ -332,8 +334,10 @@ private void validateTopicProperties(
topic, requiredNumPartition, requiredNumReplicas);
}

private Map<String, String> topicConfig(final String topicName,
final boolean includeDefaults) {
private Map<String, String> topicConfig(
final String topicName,
final boolean includeDefaults
) {
final ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
final List<ConfigResource> request = Collections.singletonList(resource);

Expand All @@ -342,8 +346,7 @@ private Map<String, String> topicConfig(final String topicName,
() -> adminClient.get().describeConfigs(request).all().get(),
ExecutorUtil.RetryBehaviour.ON_RETRYABLE).get(resource);
return config.entries().stream()
.filter(e -> includeDefaults
|| e.source().equals(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG))
.filter(e -> includeDefaults || !e.isDefault())
.collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
} catch (final Exception e) {
throw new KafkaResponseGetFailedException(
Expand All @@ -354,12 +357,15 @@ private Map<String, String> topicConfig(final String topicName,
// 'alterConfigs' deprecated, but new `incrementalAlterConfigs` only available on Kafka v2.3+
// So we need to continue to support older brokers until our min requirements reaches v2.3
@SuppressWarnings({"deprecation", "RedundantSuppression"})
private boolean addTopicConfigLegacy(final String topicName, final Map<String, ?> overrides) {
private boolean addTopicConfigLegacy(
final String topicName,
final Map<String, String> overrides
) {
final ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);

try {
final Map<String, String> existingConfig = topicConfig(topicName, false);
existingConfig.putAll(toStringConfigs(overrides));
existingConfig.putAll(overrides);

final Set<ConfigEntry> entries = existingConfig.entrySet().stream()
.map(e -> new ConfigEntry(e.getKey(), e.getValue()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;
package io.confluent.ksql.services;

import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -25,8 +25,6 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.KafkaTopicClientImpl;
import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster;
import io.confluent.ksql.topic.TopicProperties;
import java.util.Collections;
Expand Down
Loading

0 comments on commit f4cec0a

Please sign in to comment.