diff --git a/reqwest-jni/src/lib.rs b/reqwest-jni/src/lib.rs index 0ca2703..0ccaede 100644 --- a/reqwest-jni/src/lib.rs +++ b/reqwest-jni/src/lib.rs @@ -7,7 +7,6 @@ use jni::sys::jobject; use jni::JNIEnv; use reqwest::{Client, Method, Url}; use tokio::runtime::Runtime; -use tokio::sync::mpsc; static RUNTIME: OnceLock = OnceLock::new(); static CLIENT: OnceLock = OnceLock::new(); @@ -131,126 +130,98 @@ pub extern "system" fn Java_rocks_kavin_reqwest4j_ReqwestUtils_fetch( let runtime = RUNTIME.get().unwrap(); - // Create a channel for communication between tasks - let (tx, mut rx) = mpsc::channel(1); - let (err_tx, mut err_rx) = mpsc::channel(1); - - runtime.spawn(async move { - // send request - let response = request.send().await; - - match response { - Ok(response) => { - // Send response to the processing task - tx.send(response).await.unwrap(); - } - Err(error) => { - // Send error to the error handling task - err_tx.send(error).await.unwrap(); - } - } - }); - + // send request in a async task { let jvm = Arc::clone(&jvm); let future = Arc::clone(&future); + runtime.spawn(async move { - // Receive the response from the first task - let response = rx.recv().await; + // send request + let response = request.send().await; - if response.is_none() { - return; + match response { + Ok(response) => { + // get response + let status = response.status().as_u16() as i32; + + let final_url = response.url().to_string(); + + let response_headers = response.headers().clone(); + + let body = response.bytes().await.unwrap_or_default().to_vec(); + + // send response in a blocking task + runtime.spawn_blocking(move || { + let mut env = jvm.attach_current_thread().unwrap(); + + let final_url = env.new_string(final_url).unwrap(); + + let body = env.byte_array_from_slice(&body).unwrap(); + + let headers = env.new_object("java/util/HashMap", "()V", &[]).unwrap(); + let headers: JMap = JMap::from_env(&mut env, &headers).unwrap(); + + response_headers.iter().for_each(|(key, value)| { + let key = env.new_string(key.as_str()).unwrap(); + let value = env.new_string(value.to_str().unwrap()).unwrap(); + headers + .put(&mut env, &JObject::from(key), &JObject::from(value)) + .unwrap(); + }); + + // return response to CompletableFuture + let response = env + .new_object( + "rocks/kavin/reqwest4j/Response", + "(ILjava/util/Map;[BLjava/lang/String;)V", + &[ + status.into(), + (&headers).into(), + (&body).into(), + (&final_url).into(), + ], + ) + .unwrap(); + + let future = future.as_obj(); + env.call_method( + future, + "complete", + "(Ljava/lang/Object;)Z", + &[(&response).into()], + ) + .unwrap(); + }); + } + Err(error) => { + // send error in a blocking task + runtime.spawn_blocking(move || { + let mut env = jvm.attach_current_thread().unwrap(); + + let error = error.to_string(); + let error = env.new_string(error).unwrap(); + // create Exception + let exception = env + .new_object( + "java/lang/Exception", + "(Ljava/lang/String;)V", + &[(&error).into()], + ) + .unwrap(); + + let future = future.as_obj(); + + // pass error to CompletableFuture + env.call_method( + future, + "completeExceptionally", + "(Ljava/lang/Throwable;)Z", + &[(&exception).into()], + ) + .unwrap(); + }); + } } - - let response = response.unwrap(); - - // get response - let status = response.status().as_u16() as i32; - - let final_url = response.url().to_string(); - - let response_headers = response.headers().clone(); - - let body = response.bytes().await.unwrap_or_default().to_vec(); - - let mut env = jvm.attach_current_thread().unwrap(); - - let final_url = env.new_string(final_url).unwrap(); - - let body = env.byte_array_from_slice(&body).unwrap(); - - let headers = env.new_object("java/util/HashMap", "()V", &[]).unwrap(); - let headers: JMap = JMap::from_env(&mut env, &headers).unwrap(); - - response_headers.iter().for_each(|(key, value)| { - let key = env.new_string(key.as_str()).unwrap(); - let value = env.new_string(value.to_str().unwrap()).unwrap(); - headers - .put(&mut env, &JObject::from(key), &JObject::from(value)) - .unwrap(); - }); - - // return response to CompletableFuture - let response = env - .new_object( - "rocks/kavin/reqwest4j/Response", - "(ILjava/util/Map;[BLjava/lang/String;)V", - &[ - status.into(), - (&headers).into(), - (&body).into(), - (&final_url).into(), - ], - ) - .unwrap(); - - let future = future.as_obj(); - env.call_method( - future, - "complete", - "(Ljava/lang/Object;)Z", - &[(&response).into()], - ) - .unwrap(); - }); - } - - { - let jvm = Arc::clone(&jvm); - let future = Arc::clone(&future); - runtime.spawn(async move { - // Receive the error from the first task - let error = err_rx.recv().await; - - if error.is_none() { - return; - } - - let error = error.unwrap(); - - let mut env = jvm.attach_current_thread().unwrap(); - - let error = error.to_string(); - let error = env.new_string(error).unwrap(); - // create Exception - let exception = env - .new_object( - "java/lang/Exception", - "(Ljava/lang/String;)V", - &[(&error).into()], - ) - .unwrap(); - - let future = future.as_obj(); - - // pass error to CompletableFuture - env.call_method( - future, - "completeExceptionally", - "(Ljava/lang/Throwable;)Z", - &[(&exception).into()], - ) - .unwrap(); }); }