From e1088e76bcea807a75bf0c34fb402055af17e336 Mon Sep 17 00:00:00 2001 From: Kavin <20838718+FireMasterK@users.noreply.github.com> Date: Thu, 31 Aug 2023 16:24:42 +0100 Subject: [PATCH] Refactor and make fully async with the usage of channels --- reqwest-jni/src/lib.rs | 157 +++++++++++++++++++++++++++-------------- 1 file changed, 104 insertions(+), 53 deletions(-) diff --git a/reqwest-jni/src/lib.rs b/reqwest-jni/src/lib.rs index 04b3bd5..0ca2703 100644 --- a/reqwest-jni/src/lib.rs +++ b/reqwest-jni/src/lib.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::OnceLock; +use std::sync::{Arc, OnceLock}; use std::time::Duration; use jni::objects::{JByteArray, JClass, JMap, JObject, JString}; @@ -7,6 +7,7 @@ use jni::sys::jobject; use jni::JNIEnv; use reqwest::{Client, Method, Url}; use tokio::runtime::Runtime; +use tokio::sync::mpsc; static RUNTIME: OnceLock = OnceLock::new(); static CLIENT: OnceLock = OnceLock::new(); @@ -119,22 +120,116 @@ pub extern "system" fn Java_rocks_kavin_reqwest4j_ReqwestUtils_fetch( // `JNIEnv` cannot be sent between threads safely let jvm = env.get_java_vm().unwrap(); + let jvm = Arc::new(jvm); // create CompletableFuture let _future = env .new_object("java/util/concurrent/CompletableFuture", "()V", &[]) .unwrap(); let future = env.new_global_ref(&_future).unwrap(); + let future = Arc::new(future); let runtime = RUNTIME.get().unwrap(); - runtime.spawn_blocking(move || { + // Create a channel for communication between tasks + let (tx, mut rx) = mpsc::channel(1); + let (err_tx, mut err_rx) = mpsc::channel(1); + + runtime.spawn(async move { // send request - let response = runtime.block_on(async { request.send().await }); + let response = request.send().await; - let mut env = jvm.attach_current_thread().unwrap(); + match response { + Ok(response) => { + // Send response to the processing task + tx.send(response).await.unwrap(); + } + Err(error) => { + // Send error to the error handling task + err_tx.send(error).await.unwrap(); + } + } + }); + + { + let jvm = Arc::clone(&jvm); + let future = Arc::clone(&future); + runtime.spawn(async move { + // Receive the response from the first task + let response = rx.recv().await; + + if response.is_none() { + return; + } + + let response = response.unwrap(); + + // get response + let status = response.status().as_u16() as i32; + + let final_url = response.url().to_string(); + + let response_headers = response.headers().clone(); + + let body = response.bytes().await.unwrap_or_default().to_vec(); + + let mut env = jvm.attach_current_thread().unwrap(); + + let final_url = env.new_string(final_url).unwrap(); + + let body = env.byte_array_from_slice(&body).unwrap(); + + let headers = env.new_object("java/util/HashMap", "()V", &[]).unwrap(); + let headers: JMap = JMap::from_env(&mut env, &headers).unwrap(); + + response_headers.iter().for_each(|(key, value)| { + let key = env.new_string(key.as_str()).unwrap(); + let value = env.new_string(value.to_str().unwrap()).unwrap(); + headers + .put(&mut env, &JObject::from(key), &JObject::from(value)) + .unwrap(); + }); + + // return response to CompletableFuture + let response = env + .new_object( + "rocks/kavin/reqwest4j/Response", + "(ILjava/util/Map;[BLjava/lang/String;)V", + &[ + status.into(), + (&headers).into(), + (&body).into(), + (&final_url).into(), + ], + ) + .unwrap(); + + let future = future.as_obj(); + env.call_method( + future, + "complete", + "(Ljava/lang/Object;)Z", + &[(&response).into()], + ) + .unwrap(); + }); + } + + { + let jvm = Arc::clone(&jvm); + let future = Arc::clone(&future); + runtime.spawn(async move { + // Receive the error from the first task + let error = err_rx.recv().await; + + if error.is_none() { + return; + } + + let error = error.unwrap(); + + let mut env = jvm.attach_current_thread().unwrap(); - if let Err(error) = response { let error = error.to_string(); let error = env.new_string(error).unwrap(); // create Exception @@ -145,6 +240,9 @@ pub extern "system" fn Java_rocks_kavin_reqwest4j_ReqwestUtils_fetch( &[(&error).into()], ) .unwrap(); + + let future = future.as_obj(); + // pass error to CompletableFuture env.call_method( future, @@ -153,55 +251,8 @@ pub extern "system" fn Java_rocks_kavin_reqwest4j_ReqwestUtils_fetch( &[(&exception).into()], ) .unwrap(); - return; - } - - let response = response.unwrap(); - - // get response - let status = response.status().as_u16() as i32; - - let headers = env.new_object("java/util/HashMap", "()V", &[]).unwrap(); - let headers: JMap = JMap::from_env(&mut env, &headers).unwrap(); - - response.headers().iter().for_each(|(key, value)| { - let key = env.new_string(key.as_str()).unwrap(); - let value = env.new_string(value.to_str().unwrap()).unwrap(); - headers - .put(&mut env, &JObject::from(key), &JObject::from(value)) - .unwrap(); }); - - let final_url = response.url().to_string(); - let final_url = env.new_string(final_url).unwrap(); - - let body = runtime.block_on(async { response.bytes().await.unwrap_or_default().to_vec() }); - - let body = env.byte_array_from_slice(&body).unwrap(); - - // return response to CompletableFuture - let response = env - .new_object( - "rocks/kavin/reqwest4j/Response", - "(ILjava/util/Map;[BLjava/lang/String;)V", - &[ - status.into(), - (&headers).into(), - (&body).into(), - (&final_url).into(), - ], - ) - .unwrap(); - - let future = future.as_obj(); - env.call_method( - future, - "complete", - "(Ljava/lang/Object;)Z", - &[(&response).into()], - ) - .unwrap(); - }); + } _future.into_raw() }