Refactor and make fully async with the usage of channels

This commit is contained in:
Kavin 2023-08-31 16:24:42 +01:00
parent f6cdafe9f5
commit e1088e76bc
No known key found for this signature in database
GPG key ID: 6E4598CA5C92C41F

View file

@ -1,5 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::OnceLock; use std::sync::{Arc, OnceLock};
use std::time::Duration; use std::time::Duration;
use jni::objects::{JByteArray, JClass, JMap, JObject, JString}; use jni::objects::{JByteArray, JClass, JMap, JObject, JString};
@ -7,6 +7,7 @@ use jni::sys::jobject;
use jni::JNIEnv; use jni::JNIEnv;
use reqwest::{Client, Method, Url}; use reqwest::{Client, Method, Url};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tokio::sync::mpsc;
static RUNTIME: OnceLock<Runtime> = OnceLock::new(); static RUNTIME: OnceLock<Runtime> = OnceLock::new();
static CLIENT: OnceLock<Client> = OnceLock::new(); static CLIENT: OnceLock<Client> = OnceLock::new();
@ -119,22 +120,116 @@ pub extern "system" fn Java_rocks_kavin_reqwest4j_ReqwestUtils_fetch(
// `JNIEnv` cannot be sent between threads safely // `JNIEnv` cannot be sent between threads safely
let jvm = env.get_java_vm().unwrap(); let jvm = env.get_java_vm().unwrap();
let jvm = Arc::new(jvm);
// create CompletableFuture // create CompletableFuture
let _future = env let _future = env
.new_object("java/util/concurrent/CompletableFuture", "()V", &[]) .new_object("java/util/concurrent/CompletableFuture", "()V", &[])
.unwrap(); .unwrap();
let future = env.new_global_ref(&_future).unwrap(); let future = env.new_global_ref(&_future).unwrap();
let future = Arc::new(future);
let runtime = RUNTIME.get().unwrap(); let runtime = RUNTIME.get().unwrap();
runtime.spawn_blocking(move || { // Create a channel for communication between tasks
let (tx, mut rx) = mpsc::channel(1);
let (err_tx, mut err_rx) = mpsc::channel(1);
runtime.spawn(async move {
// send request // send request
let response = runtime.block_on(async { request.send().await }); let response = request.send().await;
let mut env = jvm.attach_current_thread().unwrap(); match response {
Ok(response) => {
// Send response to the processing task
tx.send(response).await.unwrap();
}
Err(error) => {
// Send error to the error handling task
err_tx.send(error).await.unwrap();
}
}
});
{
let jvm = Arc::clone(&jvm);
let future = Arc::clone(&future);
runtime.spawn(async move {
// Receive the response from the first task
let response = rx.recv().await;
if response.is_none() {
return;
}
let response = response.unwrap();
// get response
let status = response.status().as_u16() as i32;
let final_url = response.url().to_string();
let response_headers = response.headers().clone();
let body = response.bytes().await.unwrap_or_default().to_vec();
let mut env = jvm.attach_current_thread().unwrap();
let final_url = env.new_string(final_url).unwrap();
let body = env.byte_array_from_slice(&body).unwrap();
let headers = env.new_object("java/util/HashMap", "()V", &[]).unwrap();
let headers: JMap = JMap::from_env(&mut env, &headers).unwrap();
response_headers.iter().for_each(|(key, value)| {
let key = env.new_string(key.as_str()).unwrap();
let value = env.new_string(value.to_str().unwrap()).unwrap();
headers
.put(&mut env, &JObject::from(key), &JObject::from(value))
.unwrap();
});
// return response to CompletableFuture
let response = env
.new_object(
"rocks/kavin/reqwest4j/Response",
"(ILjava/util/Map;[BLjava/lang/String;)V",
&[
status.into(),
(&headers).into(),
(&body).into(),
(&final_url).into(),
],
)
.unwrap();
let future = future.as_obj();
env.call_method(
future,
"complete",
"(Ljava/lang/Object;)Z",
&[(&response).into()],
)
.unwrap();
});
}
{
let jvm = Arc::clone(&jvm);
let future = Arc::clone(&future);
runtime.spawn(async move {
// Receive the error from the first task
let error = err_rx.recv().await;
if error.is_none() {
return;
}
let error = error.unwrap();
let mut env = jvm.attach_current_thread().unwrap();
if let Err(error) = response {
let error = error.to_string(); let error = error.to_string();
let error = env.new_string(error).unwrap(); let error = env.new_string(error).unwrap();
// create Exception // create Exception
@ -145,6 +240,9 @@ pub extern "system" fn Java_rocks_kavin_reqwest4j_ReqwestUtils_fetch(
&[(&error).into()], &[(&error).into()],
) )
.unwrap(); .unwrap();
let future = future.as_obj();
// pass error to CompletableFuture // pass error to CompletableFuture
env.call_method( env.call_method(
future, future,
@ -153,55 +251,8 @@ pub extern "system" fn Java_rocks_kavin_reqwest4j_ReqwestUtils_fetch(
&[(&exception).into()], &[(&exception).into()],
) )
.unwrap(); .unwrap();
return;
}
let response = response.unwrap();
// get response
let status = response.status().as_u16() as i32;
let headers = env.new_object("java/util/HashMap", "()V", &[]).unwrap();
let headers: JMap = JMap::from_env(&mut env, &headers).unwrap();
response.headers().iter().for_each(|(key, value)| {
let key = env.new_string(key.as_str()).unwrap();
let value = env.new_string(value.to_str().unwrap()).unwrap();
headers
.put(&mut env, &JObject::from(key), &JObject::from(value))
.unwrap();
}); });
}
let final_url = response.url().to_string();
let final_url = env.new_string(final_url).unwrap();
let body = runtime.block_on(async { response.bytes().await.unwrap_or_default().to_vec() });
let body = env.byte_array_from_slice(&body).unwrap();
// return response to CompletableFuture
let response = env
.new_object(
"rocks/kavin/reqwest4j/Response",
"(ILjava/util/Map;[BLjava/lang/String;)V",
&[
status.into(),
(&headers).into(),
(&body).into(),
(&final_url).into(),
],
)
.unwrap();
let future = future.as_obj();
env.call_method(
future,
"complete",
"(Ljava/lang/Object;)Z",
&[(&response).into()],
)
.unwrap();
});
_future.into_raw() _future.into_raw()
} }