Skip to content

Commit 55fe797

Browse files
committed
[Iceberg]Fix jmx metrics overwrite each other between nodes in query runner
1 parent bbf20b5 commit 55fe797

File tree

6 files changed

+43
-11
lines changed

6 files changed

+43
-11
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConnectorFactory.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import com.facebook.presto.spi.connector.ConnectorContext;
1919
import com.facebook.presto.spi.connector.ConnectorFactory;
2020

21+
import javax.management.MBeanServer;
22+
2123
import java.lang.reflect.InvocationTargetException;
2224
import java.util.Map;
2325
import java.util.Optional;
@@ -27,6 +29,13 @@
2729
public class IcebergConnectorFactory
2830
implements ConnectorFactory
2931
{
32+
private final MBeanServer mBeanServer;
33+
34+
public IcebergConnectorFactory(MBeanServer mBeanServer)
35+
{
36+
this.mBeanServer = mBeanServer;
37+
}
38+
3039
@Override
3140
public String getName()
3241
{
@@ -45,8 +54,8 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
4554
ClassLoader classLoader = IcebergConnectorFactory.class.getClassLoader();
4655
try {
4756
return (Connector) classLoader.loadClass(InternalIcebergConnectorFactory.class.getName())
48-
.getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class)
49-
.invoke(null, catalogName, config, context, Optional.empty());
57+
.getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class, MBeanServer.class)
58+
.invoke(null, catalogName, config, context, Optional.empty(), mBeanServer);
5059
}
5160
catch (InvocationTargetException e) {
5261
Throwable targetException = e.getTargetException();

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPlugin.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,30 @@
1919
import com.google.common.collect.ImmutableList;
2020
import com.google.common.collect.ImmutableSet;
2121

22+
import javax.management.MBeanServer;
23+
24+
import java.lang.management.ManagementFactory;
2225
import java.util.Set;
2326

2427
public class IcebergPlugin
2528
implements Plugin
2629
{
30+
private final MBeanServer mBeanServer;
31+
32+
public IcebergPlugin()
33+
{
34+
this(ManagementFactory.getPlatformMBeanServer());
35+
}
36+
37+
public IcebergPlugin(MBeanServer mBeanServer)
38+
{
39+
this.mBeanServer = mBeanServer;
40+
}
41+
2742
@Override
2843
public Iterable<ConnectorFactory> getConnectorFactories()
2944
{
30-
return ImmutableList.of(new IcebergConnectorFactory());
45+
return ImmutableList.of(new IcebergConnectorFactory(mBeanServer));
3146
}
3247

3348
@Override

presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757

5858
import javax.management.MBeanServer;
5959

60-
import java.lang.management.ManagementFactory;
6160
import java.util.ArrayList;
6261
import java.util.List;
6362
import java.util.Map;
@@ -72,7 +71,8 @@ public static Connector createConnector(
7271
String catalogName,
7372
Map<String, String> config,
7473
ConnectorContext context,
75-
Optional<ExtendedHiveMetastore> metastore)
74+
Optional<ExtendedHiveMetastore> metastore,
75+
MBeanServer mBeanServer)
7676
{
7777
ClassLoader classLoader = InternalIcebergConnectorFactory.class.getClassLoader();
7878
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
@@ -88,8 +88,7 @@ public static Connector createConnector(
8888
new CachingModule(),
8989
new HiveCommonModule(),
9090
binder -> {
91-
MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
92-
binder.bind(MBeanServer.class).toInstance(new RebindSafeMBeanServer(platformMBeanServer));
91+
binder.bind(MBeanServer.class).toInstance(new RebindSafeMBeanServer(mBeanServer));
9392
binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
9493
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
9594
binder.bind(TypeManager.class).toInstance(context.getTypeManager());

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
import com.google.common.collect.ImmutableSet;
3939
import io.airlift.tpch.TpchTable;
4040

41+
import javax.management.MBeanServer;
42+
import javax.management.MBeanServerFactory;
43+
4144
import java.io.IOException;
4245
import java.net.URI;
4346
import java.nio.file.Files;
@@ -197,7 +200,13 @@ public static DistributedQueryRunner createIcebergQueryRunner(
197200
queryRunner.installPlugin(new TpcdsPlugin());
198201
queryRunner.createCatalog("tpcds", "tpcds");
199202

200-
queryRunner.installPlugin(new IcebergPlugin());
203+
queryRunner.getServers().forEach(server -> {
204+
MBeanServer mBeanServer = MBeanServerFactory.newMBeanServer();
205+
server.installPlugin(new IcebergPlugin(mBeanServer));
206+
if (addJmxPlugin) {
207+
server.installPlugin(new JmxPlugin(mBeanServer));
208+
}
209+
});
201210

202211
String catalogType = extraConnectorProperties.getOrDefault("iceberg.catalog.type", HIVE.name());
203212
Path icebergDataDirectory = getIcebergDataDirectoryPath(queryRunner.getCoordinator().getDataDirectory(), catalogType, format, addStorageFormatToPath);
@@ -211,7 +220,6 @@ public static DistributedQueryRunner createIcebergQueryRunner(
211220
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties);
212221

213222
if (addJmxPlugin) {
214-
queryRunner.installPlugin(new JmxPlugin());
215223
queryRunner.createCatalog("jmx", "jmx");
216224
}
217225

presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConnectorFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.common.collect.ImmutableMap;
1919
import org.testng.annotations.Test;
2020

21+
import java.lang.management.ManagementFactory;
2122
import java.util.Map;
2223

2324
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -38,7 +39,7 @@ public void testCachingHiveMetastore()
3839

3940
private static void createConnector(Map<String, String> config)
4041
{
41-
ConnectorFactory factory = new IcebergConnectorFactory();
42+
ConnectorFactory factory = new IcebergConnectorFactory(ManagementFactory.getPlatformMBeanServer());
4243
factory.create("iceberg-test", config, new TestingConnectorContext())
4344
.shutdown();
4445
}

presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergParquetMetadataCaching.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ protected QueryRunner createQueryRunner()
4545
PARQUET,
4646
false,
4747
true,
48-
OptionalInt.of(1),
48+
OptionalInt.of(2),
4949
Optional.empty());
5050
}
5151

0 commit comments

Comments
 (0)