Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Dec 11, 2024
1 parent 8d8bdbf commit 39081e2
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class StreamingOperationBase extends CrtResource {

StreamingOperationBase(MqttRequestResponseClient rrClient, StreamingOperationOptions options) {
acquireNativeHandle(streamingOperationNew(
this,
rrClient.getNativeHandle(),
options
));
Expand Down Expand Up @@ -82,7 +81,7 @@ public void close() {
* native methods
******************************************************************************/

private static native long streamingOperationNew(StreamingOperationBase streamingOperation, long rrClientHandle, StreamingOperationOptions options);
private static native long streamingOperationNew(long rrClientHandle, StreamingOperationOptions options);

private static native void streamingOperationOpen(long streamingOperationHandle);

Expand Down
157 changes: 148 additions & 9 deletions src/native/mqtt_request_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,28 +558,162 @@ JNIEXPORT void JNICALL
s_aws_request_response_operation_jni_owned_parameters_clean_up(&request_params, env);
}

JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBase_streamingOperationNew(
struct aws_streaming_operation_binding {
struct aws_allocator *allocator;

JavaVM *jvm;

struct aws_mqtt_rr_client_operation *stream;

jobject java_incoming_publish_event_callback;
jobject java_subscription_status_event_callback;
};

static void s_aws_streaming_operation_binding_destroy(struct aws_streaming_operation_binding *binding) {
if (!binding) {
return;
}

// tearing down a stream is asynchronous and should have been done earlier if needed
AWS_FATAL_ASSERT(binding->stream == NULL);

/********** JNI ENV ACQUIRE **********/
JavaVM *jvm = binding->jvm;
JNIEnv *env = aws_jni_acquire_thread_env(jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "JNI env no longer resolvable; JVM likely shutting down");
goto done;
}

if (binding->java_incoming_publish_event_callback) {
(*env)->DeleteGlobalRef(env, binding->java_incoming_publish_event_callback);
}

if (binding->java_subscription_status_event_callback) {
(*env)->DeleteGlobalRef(env, binding->java_subscription_status_event_callback);
}

done:

aws_mem_release(binding->allocator, binding);
}

static void s_aws_mqtt_streaming_operation_subscription_status_callback(
enum aws_rr_streaming_subscription_event_type status,
int error_code,
void *user_data) {

struct aws_streaming_operation_binding *binding = user_data;
if (!binding->java_incoming_publish_event_callback) {
return;
}

??;
}

static void s_aws_mqtt_streaming_operation_incoming_publish_callback(struct aws_byte_cursor payload, void *user_data) {

struct aws_streaming_operation_binding *binding = user_data;
if (!binding->java_incoming_publish_event_callback) {
return;
}

??;
}

static void s_aws_mqtt_streaming_operation_terminated_callback(void *user_data) {
struct aws_streaming_operation_binding *binding = user_data;

s_aws_streaming_operation_binding_destroy(binding);
}

JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBase_streamingOperationNew(
JNIEnv *env,
jclass jni_class,
jobject java_streaming_operation,
jlong jni_mqtt_request_response_client_handle,
jobject java_options) {

(void)env;
(void)jni_class;
(void)java_streaming_operation;
(void)jni_mqtt_request_response_client_handle;
(void)java_options;

aws_cache_jni_ids(env);

struct aws_crt_mqtt_request_response_client_binding *rr_client_binding = (struct aws_crt_mqtt_request_response_client_binding *)jni_mqtt_request_response_client_handle;
if (rr_client_binding == NULL) {
aws_jni_throw_runtime_exception(env, "streamingOperationNew: null request-response client binding");
return (jlong)NULL;
}

struct aws_allocator *allocator = aws_jni_get_allocator();
struct aws_streaming_operation_binding *binding = aws_mem_calloc(allocator, 1, sizeof(struct aws_streaming_operation_binding));
binding->allocator = allocator;

jint jvmresult = (*env)->GetJavaVM(env, &binding->jvm);
if (jvmresult != 0) {
aws_jni_throw_runtime_exception(env, "streamingOperationNew: failed to get JVM");
goto error;
}

struct aws_mqtt_streaming_operation_options stream_options;
AWS_ZERO_STRUCT(stream_options);
stream_options.subscription_status_callback = ??;
stream_options.incoming_publish_callback = ??;
stream_options.terminated_callback = ??;
stream_options.user_data = binding;

jstring java_topic = (jstring)(*env)->GetObjectField(
env, java_options, streaming_operation_options_properties.topic_field_id);
if (!java_topic) {
aws_jni_throw_runtime_exception(env, "streamingOperationNew - topic is null");
goto error;
}
stream_options.topic_filter = aws_jni_byte_cursor_from_jstring_acquire(env, java_topic);

jobject java_incoming_publish_event_callback = (*env)->GetObjectField(
env, java_options, streaming_operation_options_properties.incoming_publish_event_callback_field_id);
if (java_incoming_publish_event_callback) {
binding->java_incoming_publish_event_callback = (*env)->NewGlobalRef(env, java_incoming_publish_event_callback);
}

jobject java_subscription_status_event_callback = (*env)->GetObjectField(
env, java_options, streaming_operation_options_properties.subscription_status_event_callback_field_id);
if (java_subscription_status_event_callback) {
binding->java_subscription_status_event_callback = (*env)->NewGlobalRef(env, java_subscription_status_event_callback);
}

binding->stream = aws_mqtt_request_response_client_create_streaming_operation(rr_client_binding->client, &stream_options);

aws_jni_byte_cursor_from_jstring_release(env, java_topic, stream_options.topic_filter);

if (!binding->stream) {
aws_jni_throw_runtime_exception(env, "streamingOperationNew - failed to create native stream");
goto error;
}

return (jlong)binding;

error:

AWS_FATAL_ASSERT(binding && !binding->stream);

s_aws_streaming_operation_binding_destroy(binding);

return (jlong)NULL;
}

JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBase_streamingOperationOpen(
JNIEnv *env,
jclass jni_class,
jlong jni_streaming_operation_handle) {

(void)env;
(void)jni_class;
(void)jni_streaming_operation_handle;

struct aws_streaming_operation_binding *binding = (struct aws_streaming_operation_binding *)jni_streaming_operation_handle;
struct aws_mqtt_rr_client_operation *stream = binding->stream;

if (aws_mqtt_rr_client_operation_activate(stream)) {
aws_jni_throw_runtime_exception(env, "streamingOperationOpen - failed to open stream");
}
}

JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBase_streamingOperationDestroy(
Expand All @@ -589,7 +723,12 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBas

(void)env;
(void)jni_class;
(void)jni_streaming_operation_handle;

struct aws_streaming_operation_binding *binding = (struct aws_streaming_operation_binding *)jni_streaming_operation_handle;
struct aws_mqtt_rr_client_operation *stream = binding->stream;

binding->stream = NULL;
aws_mqtt_rr_client_operation_release(stream);
}

#if UINTPTR_MAX == 0xffffffff
Expand Down

0 comments on commit 39081e2

Please sign in to comment.