Skip to content

Commit 14063ed

Browse files
abevk2023tdcmeehan
authored andcommitted
Add support for loading plugins in the function server
1 parent 892ad68 commit 14063ed

File tree

7 files changed

+415
-39
lines changed

7 files changed

+415
-39
lines changed

presto-function-server/pom.xml

+20
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,26 @@
109109
<artifactId>javax.ws.rs-api</artifactId>
110110
</dependency>
111111

112+
<dependency>
113+
<groupId>com.google.code.findbugs</groupId>
114+
<artifactId>jsr305</artifactId>
115+
</dependency>
116+
117+
<dependency>
118+
<groupId>com.facebook.drift</groupId>
119+
<artifactId>drift-api</artifactId>
120+
</dependency>
121+
122+
<dependency>
123+
<groupId>io.airlift.resolver</groupId>
124+
<artifactId>resolver</artifactId>
125+
</dependency>
126+
127+
<dependency>
128+
<groupId>org.sonatype.aether</groupId>
129+
<artifactId>aether-api</artifactId>
130+
</dependency>
131+
112132
<dependency>
113133
<groupId>com.facebook.presto</groupId>
114134
<artifactId>presto-tests</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.server;
15+
16+
import com.facebook.airlift.log.Logger;
17+
import com.facebook.presto.metadata.FunctionAndTypeManager;
18+
import com.facebook.presto.spi.CoordinatorPlugin;
19+
import com.facebook.presto.spi.Plugin;
20+
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
21+
import com.google.common.collect.ImmutableList;
22+
import com.google.common.collect.Ordering;
23+
import io.airlift.resolver.ArtifactResolver;
24+
import io.airlift.resolver.DefaultArtifact;
25+
import org.sonatype.aether.artifact.Artifact;
26+
27+
import javax.annotation.concurrent.ThreadSafe;
28+
import javax.inject.Inject;
29+
30+
import java.io.File;
31+
import java.io.IOException;
32+
import java.net.URL;
33+
import java.net.URLClassLoader;
34+
import java.util.ArrayList;
35+
import java.util.Arrays;
36+
import java.util.Collections;
37+
import java.util.List;
38+
import java.util.ServiceLoader;
39+
import java.util.Set;
40+
import java.util.concurrent.atomic.AtomicBoolean;
41+
42+
import static com.facebook.presto.metadata.FunctionExtractor.extractFunctions;
43+
import static com.facebook.presto.server.PluginDiscovery.discoverPlugins;
44+
import static com.facebook.presto.server.PluginDiscovery.writePluginServices;
45+
import static java.util.Objects.requireNonNull;
46+
47+
@ThreadSafe
48+
public class FunctionPluginManager
49+
{
50+
// When generating code the AfterBurner module loads classes with *some* classloader.
51+
// When the AfterBurner module is configured not to use the value classloader
52+
// (e.g., AfterBurner().setUseValueClassLoader(false)) AppClassLoader is used for loading those
53+
// classes. Otherwise, the PluginClassLoader is used, which is the default behavior.
54+
// Therefore, in the former case Afterburner won't be able to load the connector classes
55+
// as AppClassLoader doesn't see them, and in the latter case the PluginClassLoader won't be
56+
// able to load the AfterBurner classes themselves. So, our solution is to use the PluginClassLoader
57+
// and whitelist the AfterBurner classes here, so that the PluginClassLoader can load the
58+
// AfterBurner classes.
59+
private static final ImmutableList<String> SPI_PACKAGES = ImmutableList.<String>builder()
60+
.add("com.facebook.presto.spi.")
61+
.add("com.fasterxml.jackson.annotation.")
62+
.add("com.fasterxml.jackson.module.afterburner.")
63+
.add("io.airlift.slice.")
64+
.add("io.airlift.units.")
65+
.add("org.openjdk.jol.")
66+
.add("com.facebook.presto.common")
67+
.add("com.facebook.drift.annotations.")
68+
.add("com.facebook.drift.TException")
69+
.add("com.facebook.drift.TApplicationException")
70+
.build();
71+
72+
// TODO: Add plugins based on FunctionPlugin interface. Currently loading the plugins implemented on Plugin interface to the Function Server for now.
73+
private static final String PLUGIN_SERVICES_FILE = "META-INF/services/" + Plugin.class.getName();
74+
private static final Logger log = Logger.get(FunctionPluginManager.class);
75+
private final ArtifactResolver resolver;
76+
private final File installedPluginsDir;
77+
private final List<String> plugins;
78+
private final AtomicBoolean pluginsLoading = new AtomicBoolean();
79+
private final AtomicBoolean pluginsLoaded = new AtomicBoolean();
80+
private final FunctionAndTypeManager functionAndTypeManager;
81+
@Inject
82+
public FunctionPluginManager(
83+
PluginManagerConfig config,
84+
FunctionAndTypeManager functionAndTypeManager)
85+
{
86+
requireNonNull(config, "config is null");
87+
requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
88+
this.functionAndTypeManager = functionAndTypeManager;
89+
90+
installedPluginsDir = config.getInstalledPluginsDir();
91+
if (config.getPlugins() == null) {
92+
this.plugins = ImmutableList.of();
93+
}
94+
else {
95+
this.plugins = ImmutableList.copyOf(config.getPlugins());
96+
}
97+
this.resolver = new ArtifactResolver(config.getMavenLocalRepository(), config.getMavenRemoteRepository());
98+
}
99+
100+
public void loadPlugins()
101+
throws Exception
102+
{
103+
if (!pluginsLoading.compareAndSet(false, true)) {
104+
return;
105+
}
106+
107+
for (File file : listFiles(installedPluginsDir)) {
108+
if (file.isDirectory()) {
109+
loadPlugin(file.getAbsolutePath());
110+
}
111+
}
112+
113+
for (String plugin : plugins) {
114+
loadPlugin(plugin);
115+
}
116+
117+
pluginsLoaded.set(true);
118+
}
119+
120+
private void loadPlugin(String plugin)
121+
throws Exception
122+
{
123+
log.info("-- Loading plugin %s --", plugin);
124+
URLClassLoader pluginClassLoader = buildClassLoader(plugin);
125+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
126+
loadPlugin(pluginClassLoader, Plugin.class);
127+
}
128+
log.info("-- Finished loading plugin %s --", plugin);
129+
}
130+
131+
private void loadPlugin(URLClassLoader pluginClassLoader, Class<?> clazz)
132+
{
133+
ServiceLoader<?> serviceLoader = ServiceLoader.load(clazz, pluginClassLoader);
134+
List<?> plugins = ImmutableList.copyOf(serviceLoader);
135+
136+
if (plugins.isEmpty()) {
137+
log.warn("No service providers of type %s", clazz.getName());
138+
}
139+
140+
for (Object plugin : plugins) {
141+
log.info("Installing %s", plugin.getClass().getName());
142+
if (plugin instanceof Plugin) {
143+
installPlugin((Plugin) plugin);
144+
}
145+
else {
146+
log.warn("Unknown plugin type: %s", plugin.getClass().getName());
147+
}
148+
}
149+
}
150+
151+
public void installPlugin(Plugin plugin)
152+
{
153+
for (Class<?> functionClass : plugin.getFunctions()) {
154+
log.info("Registering functions from %s", functionClass.getName());
155+
functionAndTypeManager.registerBuiltInFunctions(extractFunctions(functionClass));
156+
}
157+
}
158+
159+
private URLClassLoader buildClassLoader(String plugin)
160+
throws Exception
161+
{
162+
File file = new File(plugin);
163+
if (file.isFile() && (file.getName().equals("pom.xml") || file.getName().endsWith(".pom"))) {
164+
return buildClassLoaderFromPom(file);
165+
}
166+
if (file.isDirectory()) {
167+
return buildClassLoaderFromDirectory(file);
168+
}
169+
return buildClassLoaderFromCoordinates(plugin);
170+
}
171+
172+
private URLClassLoader buildClassLoaderFromPom(File pomFile)
173+
throws Exception
174+
{
175+
List<Artifact> artifacts = resolver.resolvePom(pomFile);
176+
URLClassLoader classLoader = createClassLoader(artifacts, pomFile.getPath());
177+
178+
Artifact artifact = artifacts.get(0);
179+
180+
processPlugins(artifact, classLoader, PLUGIN_SERVICES_FILE, CoordinatorPlugin.class.getName());
181+
182+
return classLoader;
183+
}
184+
185+
private URLClassLoader buildClassLoaderFromDirectory(File dir)
186+
throws Exception
187+
{
188+
log.debug("Classpath for %s:", dir.getName());
189+
List<URL> urls = new ArrayList<>();
190+
for (File file : listFiles(dir)) {
191+
log.debug(" %s", file);
192+
urls.add(file.toURI().toURL());
193+
}
194+
return createClassLoader(urls);
195+
}
196+
197+
private URLClassLoader buildClassLoaderFromCoordinates(String coordinates)
198+
throws Exception
199+
{
200+
Artifact rootArtifact = new DefaultArtifact(coordinates);
201+
List<Artifact> artifacts = resolver.resolveArtifacts(rootArtifact);
202+
return createClassLoader(artifacts, rootArtifact.toString());
203+
}
204+
205+
private URLClassLoader createClassLoader(List<Artifact> artifacts, String name)
206+
throws IOException
207+
{
208+
log.debug("Classpath for %s:", name);
209+
List<URL> urls = new ArrayList<>();
210+
for (Artifact artifact : sortedArtifacts(artifacts)) {
211+
if (artifact.getFile() == null) {
212+
throw new RuntimeException("Could not resolve artifact: " + artifact);
213+
}
214+
File file = artifact.getFile().getCanonicalFile();
215+
log.debug(" %s", file);
216+
urls.add(file.toURI().toURL());
217+
}
218+
return createClassLoader(urls);
219+
}
220+
221+
private URLClassLoader createClassLoader(List<URL> urls)
222+
{
223+
ClassLoader parent = getClass().getClassLoader();
224+
return new PluginClassLoader(urls, parent, SPI_PACKAGES);
225+
}
226+
227+
private static List<File> listFiles(File installedPluginsDir)
228+
{
229+
if (installedPluginsDir != null && installedPluginsDir.isDirectory()) {
230+
File[] files = installedPluginsDir.listFiles();
231+
if (files != null) {
232+
Arrays.sort(files);
233+
return ImmutableList.copyOf(files);
234+
}
235+
}
236+
return ImmutableList.of();
237+
}
238+
239+
private static List<Artifact> sortedArtifacts(List<Artifact> artifacts)
240+
{
241+
List<Artifact> list = new ArrayList<>(artifacts);
242+
Collections.sort(list, Ordering.natural().nullsLast().onResultOf(Artifact::getFile));
243+
return list;
244+
}
245+
246+
private void processPlugins(Artifact artifact, ClassLoader classLoader, String servicesFile, String className)
247+
throws IOException
248+
{
249+
Set<String> plugins = discoverPlugins(artifact, classLoader, servicesFile, className);
250+
if (!plugins.isEmpty()) {
251+
writePluginServices(plugins, artifact.getFile(), servicesFile);
252+
}
253+
}
254+
}

presto-function-server/src/main/java/com/facebook/presto/server/FunctionServer.java

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public void run()
5757
try {
5858
Bootstrap app = new Bootstrap(modules);
5959
Injector injector = app.initialize();
60+
injector.getInstance(FunctionPluginManager.class).loadPlugins();
6061

6162
HttpServerInfo serverInfo = injector.getInstance(HttpServerInfo.class);
6263
log.info("======== REMOTE FUNCTION SERVER STARTED at: " + serverInfo.getHttpUri() + " =========");

presto-function-server/src/main/java/com/facebook/presto/server/FunctionServerModule.java

+2
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ protected void setup(Binder binder)
6060
binder.bind(ObjectMapper.class).toProvider(JsonObjectMapperProvider.class);
6161
binder.bind(new TypeLiteral<JsonCodec<Map<String, List<JsonBasedUdfFunctionMetadata>>>>() {})
6262
.toInstance(new JsonCodecFactory().mapJsonCodec(String.class, listJsonCodec(JsonBasedUdfFunctionMetadata.class)));
63+
binder.bind(FunctionPluginManager.class).in(Scopes.SINGLETON);
64+
configBinder(binder).bindConfig(PluginManagerConfig.class);
6365
configBinder(binder).bindConfig(FunctionsConfig.class);
6466
configBinder(binder).bindConfig(FeaturesConfig.class);
6567
}

presto-function-server/src/test/java/com/facebook/presto/server/FunctionServerQueryRunner.java

+11-38
Original file line numberDiff line numberDiff line change
@@ -13,43 +13,30 @@
1313
*/
1414
package com.facebook.presto.server;
1515

16-
import com.facebook.airlift.bootstrap.Bootstrap;
17-
import com.facebook.airlift.http.server.HttpServerInfo;
18-
import com.facebook.airlift.http.server.HttpServerModule;
19-
import com.facebook.airlift.jaxrs.JaxrsModule;
2016
import com.facebook.airlift.log.Logger;
2117
import com.facebook.presto.functionNamespace.FunctionNamespaceManagerPlugin;
2218
import com.facebook.presto.functionNamespace.rest.RestBasedFunctionNamespaceManagerFactory;
2319
import com.facebook.presto.tests.DistributedQueryRunner;
2420
import com.facebook.presto.tests.tpch.TpchQueryRunner;
2521
import com.facebook.presto.tests.tpch.TpchQueryRunnerBuilder;
26-
import com.google.common.collect.ImmutableList;
2722
import com.google.common.collect.ImmutableMap;
28-
import com.google.inject.Injector;
29-
import com.google.inject.Module;
3023

31-
import java.util.List;
3224
import java.util.Map;
3325

34-
import static com.facebook.presto.server.PrestoSystemRequirements.verifyJvmRequirements;
35-
import static com.facebook.presto.server.PrestoSystemRequirements.verifySystemTimeIsReasonable;
26+
import static java.lang.String.format;
3627

3728
public class FunctionServerQueryRunner
3829
{
3930
private FunctionServerQueryRunner()
4031
{
4132
}
4233

43-
public static DistributedQueryRunner createQueryRunner()
34+
public static DistributedQueryRunner createQueryRunner(int functionServerPort, Map<String, String> queryRunnerExtraProperties)
4435
throws Exception
4536
{
46-
TestingFunctionServer functionServer = new TestingFunctionServer(ImmutableMap.of("http-server.http.port", "8082"));
47-
4837
DistributedQueryRunner runner = TpchQueryRunnerBuilder.builder()
4938
.setExtraProperties(
50-
ImmutableMap.of(
51-
"http-server.http.port", "8080",
52-
"list-built-in-functions-only", "false"))
39+
queryRunnerExtraProperties)
5340
.build();
5441
runner.installPlugin(new FunctionNamespaceManagerPlugin());
5542
runner.loadFunctionNamespaceManager(
@@ -58,7 +45,7 @@ public static DistributedQueryRunner createQueryRunner()
5845
ImmutableMap.of(
5946
"supported-function-languages", "JAVA",
6047
"function-implementation-type", "REST",
61-
"rest-based-function-manager.rest.url", "http://localhost:8082"));
48+
"rest-based-function-manager.rest.url", format("http://localhost:%s", functionServerPort)));
6249

6350
Thread.sleep(5000);
6451
Logger log = Logger.get(TpchQueryRunner.class);
@@ -67,27 +54,13 @@ public static DistributedQueryRunner createQueryRunner()
6754
return runner;
6855
}
6956

70-
private static class TestingFunctionServer
57+
public static void main(String[] args)
58+
throws Exception
7159
{
72-
public TestingFunctionServer(Map<String, String> requiredConfigurationProperties)
73-
{
74-
verifyJvmRequirements();
75-
verifySystemTimeIsReasonable();
76-
77-
Logger log = Logger.get(FunctionServer.class);
78-
79-
List<Module> modules = ImmutableList.of(
80-
new FunctionServerModule(),
81-
new HttpServerModule(),
82-
new JaxrsModule());
83-
84-
Bootstrap app = new Bootstrap(modules);
85-
Injector injector = app
86-
.setRequiredConfigurationProperties(requiredConfigurationProperties)
87-
.initialize();
88-
89-
HttpServerInfo serverInfo = injector.getInstance(HttpServerInfo.class);
90-
log.info("======== REMOTE FUNCTION SERVER STARTED at: " + serverInfo.getHttpUri() + " =========");
91-
}
60+
int functionServerPort = 8082;
61+
TestingFunctionServer functionServer = new TestingFunctionServer(functionServerPort);
62+
createQueryRunner(
63+
functionServerPort,
64+
ImmutableMap.of("http-server.http.port", "8080", "list-built-in-functions-only", "false"));
9265
}
9366
}

0 commit comments

Comments
 (0)