diff --git a/sdks/java/io/rrio/build.gradle b/sdks/java/io/rrio/build.gradle index d65df370e0ca..32fbd9d22e38 100644 --- a/sdks/java/io/rrio/build.gradle +++ b/sdks/java/io/rrio/build.gradle @@ -25,9 +25,10 @@ description = "Apache Beam :: SDKS :: Java :: IO :: RequestResponseIO (RRIO)" ext.summary = "Support to read from and write to Web APIs" dependencies { - implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation library.java.joda_time - implementation library.java.vendored_guava_32_1_2_jre + // TODO(damondouglas): revert to implementation after project is more fully developed + permitUnusedDeclared project(path: ":sdks:java:core", configuration: "shadow") + permitUnusedDeclared library.java.joda_time + permitUnusedDeclared library.java.vendored_guava_32_1_2_jre testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation library.java.junit diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/Caller.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/Caller.java new file mode 100644 index 000000000000..32b514c43a15 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/Caller.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponseio; + +import java.io.Serializable; + +/** {@link Caller} interfaces user custom code intended for API calls. */ +public interface Caller extends Serializable { + + /** Calls a Web API with the {@link RequestT} and returns a {@link ResponseT}. */ + ResponseT call(RequestT request) throws UserCodeExecutionException; +} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/SetupTeardown.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/SetupTeardown.java new file mode 100644 index 000000000000..2bdc8113d98e --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/SetupTeardown.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponseio; + +import java.io.Serializable; + +/** + * Provided by user and called within {@link org.apache.beam.sdk.transforms.DoFn.Setup} and @{link + * org.apache.beam.sdk.transforms.DoFn.Teardown} lifecycle methods of {@link Call}'s {@link + * org.apache.beam.sdk.transforms.DoFn}. + */ +public interface SetupTeardown extends Serializable { + + /** Called during the {@link org.apache.beam.sdk.transforms.DoFn}'s setup lifecycle method. */ + void setup() throws UserCodeExecutionException; + + /** Called during the {@link org.apache.beam.sdk.transforms.DoFn}'s teardown lifecycle method. */ + void teardown() throws UserCodeExecutionException; +} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeExecutionException.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeExecutionException.java new file mode 100644 index 000000000000..3a4c002f52e8 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeExecutionException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponseio; + +/** Base {@link Exception} for signaling errors in user custom code. */ +public class UserCodeExecutionException extends Exception { + public UserCodeExecutionException(String message) { + super(message); + } + + public UserCodeExecutionException(String message, Throwable cause) { + super(message, cause); + } + + public UserCodeExecutionException(Throwable cause) { + super(cause); + } + + public UserCodeExecutionException( + String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeQuotaException.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeQuotaException.java new file mode 100644 index 000000000000..f16f078927f8 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeQuotaException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponseio; + +/** + * Extends {@link UserCodeQuotaException} to allow the user custom code to specifically signal a + * Quota or API overuse related error. + */ +public class UserCodeQuotaException extends UserCodeExecutionException { + + public UserCodeQuotaException(String message) { + super(message); + } + + public UserCodeQuotaException(String message, Throwable cause) { + super(message, cause); + } + + public UserCodeQuotaException(Throwable cause) { + super(cause); + } + + public UserCodeQuotaException( + String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeTimeoutException.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeTimeoutException.java new file mode 100644 index 000000000000..22b067449858 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeTimeoutException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponseio; + +/** An extension of {@link UserCodeQuotaException} to specifically signal a user code timeout. */ +public class UserCodeTimeoutException extends UserCodeExecutionException { + + public UserCodeTimeoutException(String message) { + super(message); + } + + public UserCodeTimeoutException(String message, Throwable cause) { + super(message, cause); + } + + public UserCodeTimeoutException(Throwable cause) { + super(cause); + } + + public UserCodeTimeoutException( + String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/package-info.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/package-info.java new file mode 100644 index 000000000000..cd9c11c13f86 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Package provides Beam I/O transform support for safely reading from and writing to Web APIs. */ +package org.apache.beam.io.requestresponseio; diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java new file mode 100644 index 000000000000..5258573f4283 --- /dev/null +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.rrio; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import org.apache.beam.io.requestresponseio.Caller; +import org.apache.beam.io.requestresponseio.UserCodeExecutionException; +import org.apache.beam.io.requestresponseio.UserCodeQuotaException; +import org.apache.beam.io.requestresponseio.UserCodeTimeoutException; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.SerializableUtils; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link Caller}. */ +@RunWith(JUnit4.class) +public class CallerTest { + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void canSerializeImplementingClasses() { + SerializableUtils.serializeToByteArray(new CallerImpl()); + } + + @Test + public void canSerializeWhenUsedInDoFn() { + pipeline + .apply(Create.of(Instant.now())) + .apply(ParDo.of(new CallerUsingDoFn<>(new CallerImpl()))) + .setCoder(StringUtf8Coder.of()); + + pipeline.run(); + } + + @Test + public void canSignalQuotaException() { + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new CallerUsingDoFn<>(new CallerThrowsQuotaException()))) + .setCoder(VarIntCoder.of()); + + PipelineExecutionException executionException = + assertThrows(PipelineExecutionException.class, pipeline::run); + assertEquals(UserCodeQuotaException.class, executionException.getCause().getClass()); + } + + @Test + public void canSignalTimeoutException() { + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new CallerUsingDoFn<>(new CallerThrowsTimeoutException()))) + .setCoder(VarIntCoder.of()); + + PipelineExecutionException executionException = + assertThrows(PipelineExecutionException.class, pipeline::run); + assertEquals(UserCodeTimeoutException.class, executionException.getCause().getClass()); + } + + private static class CallerUsingDoFn extends DoFn { + private final Caller caller; + + private CallerUsingDoFn(Caller caller) { + this.caller = caller; + } + + @ProcessElement + public void process(@Element RequestT request, OutputReceiver receiver) + throws UserCodeExecutionException { + RequestT safeRequest = checkStateNotNull(request); + ResponseT response = caller.call(safeRequest); + receiver.output(response); + } + } + + private static class CallerImpl implements Caller { + + @Override + public String call(Instant request) throws UserCodeExecutionException { + return request.toString(); + } + } + + private static class CallerThrowsQuotaException implements Caller { + + @Override + public Integer call(Integer request) throws UserCodeExecutionException { + throw new UserCodeQuotaException("quota"); + } + } + + private static class CallerThrowsTimeoutException implements Caller { + + @Override + public Integer call(Integer request) throws UserCodeExecutionException { + throw new UserCodeTimeoutException("timeout"); + } + } +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java new file mode 100644 index 000000000000..a8c5c45ede5c --- /dev/null +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.rrio; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import org.apache.beam.io.requestresponseio.SetupTeardown; +import org.apache.beam.io.requestresponseio.UserCodeExecutionException; +import org.apache.beam.io.requestresponseio.UserCodeQuotaException; +import org.apache.beam.io.requestresponseio.UserCodeTimeoutException; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException; +import org.junit.Rule; +import org.junit.Test; + +public class SetupTeardownTest { + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void canSerializeImplementingClasses() { + SerializableUtils.serializeToByteArray(new SetupTeardownImpl()); + } + + @Test + public void canSerializeWhenUsedInDoFn() { + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new SetupTeardownUsingDoFn(new SetupTeardownImpl()))) + .setCoder(VarIntCoder.of()); + + pipeline.run(); + } + + @Test + public void canSignalQuotaException() { + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new SetupTeardownUsingDoFn(new ThrowsQuotaException()))) + .setCoder(VarIntCoder.of()); + + UncheckedExecutionException exception = + assertThrows(UncheckedExecutionException.class, pipeline::run); + UserCodeException userCodeException = (UserCodeException) exception.getCause(); + assertEquals(UserCodeQuotaException.class, userCodeException.getCause().getClass()); + } + + @Test + public void canSignalTimeoutException() { + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new SetupTeardownUsingDoFn(new ThrowsTimeoutException()))) + .setCoder(VarIntCoder.of()); + + UncheckedExecutionException exception = + assertThrows(UncheckedExecutionException.class, pipeline::run); + UserCodeException userCodeException = (UserCodeException) exception.getCause(); + assertEquals(UserCodeTimeoutException.class, userCodeException.getCause().getClass()); + } + + private static class SetupTeardownUsingDoFn extends DoFn { + private final SetupTeardown setupTeardown; + + private SetupTeardownUsingDoFn(SetupTeardown setupTeardown) { + this.setupTeardown = setupTeardown; + } + + @Setup + public void setup() throws UserCodeExecutionException { + setupTeardown.setup(); + } + + @Teardown + public void teardown() throws UserCodeExecutionException { + setupTeardown.teardown(); + } + + @ProcessElement + public void process() {} + } + + private static class SetupTeardownImpl implements SetupTeardown { + @Override + public void setup() throws UserCodeExecutionException {} + + @Override + public void teardown() throws UserCodeExecutionException {} + } + + private static class ThrowsQuotaException implements SetupTeardown { + + @Override + public void setup() throws UserCodeExecutionException { + throw new UserCodeQuotaException("quota"); + } + + @Override + public void teardown() throws UserCodeExecutionException {} + } + + private static class ThrowsTimeoutException implements SetupTeardown { + + @Override + public void setup() throws UserCodeExecutionException { + throw new UserCodeTimeoutException("timeout"); + } + + @Override + public void teardown() throws UserCodeExecutionException {} + } +}