diff --git a/Cargo.lock b/Cargo.lock index 301afe54b66e..e3b784fb2a34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5011,6 +5011,7 @@ dependencies = [ "test-programs-artifacts", "tokio", "tokio-rustls", + "tokio-util", "tracing", "tracing-subscriber", "wasm-compose", diff --git a/Cargo.toml b/Cargo.toml index 2b663f089362..f90402f82809 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -413,6 +413,7 @@ itertools = "0.14.0" base64 = "0.22.1" termcolor = "1.4.1" flate2 = "1.0.30" +tokio-util = "0.7.4" # ============================================================================= # diff --git a/crates/test-programs/src/bin/p3_http_outbound_request_content_length.rs b/crates/test-programs/src/bin/p3_http_outbound_request_content_length.rs index 8734d76f5403..cf01e1cde496 100644 --- a/crates/test-programs/src/bin/p3_http_outbound_request_content_length.rs +++ b/crates/test-programs/src/bin/p3_http_outbound_request_content_length.rs @@ -44,51 +44,51 @@ fn make_request() -> ( impl test_programs::p3::exports::wasi::cli::run::Guest for Component { async fn run() -> Result<(), ()> { + println!("writing enough"); { let (request, mut contents_tx, trailers_tx, transmit) = make_request(); - let (transmit, handle) = join!(async { transmit.await }, async { - let res = handler::handle(request) - .await - .context("failed to send request")?; - println!("writing enough"); - let remaining = contents_tx.write_all(b"long enough".to_vec()).await; - assert!( - remaining.is_empty(), - "{}", - String::from_utf8_lossy(&remaining) - ); - drop(contents_tx); - trailers_tx - .write(Ok(None)) - .await - .context("failed to finish body")?; - anyhow::Ok(res) - }); + let (handle, (), ()) = join!( + async { + let res = handler::handle(request) + .await + .context("failed to send request")?; + anyhow::Ok(res) + }, + async { + let remaining = contents_tx.write_all(b"long enough".to_vec()).await; + assert!( + remaining.is_empty(), + "{}", + String::from_utf8_lossy(&remaining) + ); + trailers_tx.write(Ok(None)).await.unwrap(); + drop(contents_tx); + }, + async { + transmit.await.unwrap(); + }, + ); let res = handle.unwrap(); drop(res); - transmit.expect("failed to transmit request"); } + println!("writing too little"); { let (request, mut contents_tx, trailers_tx, transmit) = make_request(); - let (transmit, handle) = join!(async { transmit.await }, async { - let res = handler::handle(request) - .await - .context("failed to send request")?; - println!("writing too little"); - let remaining = contents_tx.write_all(b"msg".to_vec()).await; - assert!( - remaining.is_empty(), - "{}", - String::from_utf8_lossy(&remaining) - ); - drop(contents_tx); - trailers_tx - .write(Ok(None)) - .await - .context("failed to finish body")?; - anyhow::Ok(res) - }); + let (handle, transmit, ()) = join!( + async { handler::handle(request).await }, + async { transmit.await }, + async { + let remaining = contents_tx.write_all(b"msg".to_vec()).await; + assert!( + remaining.is_empty(), + "{}", + String::from_utf8_lossy(&remaining) + ); + drop(contents_tx); + trailers_tx.write(Ok(None)).await.unwrap(); + }, + ); let res = handle.unwrap(); drop(res); let err = transmit.expect_err("request transmission should have failed"); @@ -98,23 +98,24 @@ impl test_programs::p3::exports::wasi::cli::run::Guest for Component { ); } + println!("writing too much"); { let (request, mut contents_tx, trailers_tx, transmit) = make_request(); - let (transmit, handle) = join!(async { transmit.await }, async { - let res = handler::handle(request) - .await - .context("failed to send request")?; - println!("writing too much"); - let remaining = contents_tx.write_all(b"more than 11 bytes".to_vec()).await; - assert!( - remaining.is_empty(), - "{}", - String::from_utf8_lossy(&remaining) - ); - drop(contents_tx); - _ = trailers_tx.write(Ok(None)).await; - anyhow::Ok(res) - }); + let (handle, transmit, ()) = join!( + async { handler::handle(request).await }, + async { transmit.await }, + async { + let remaining = contents_tx.write_all(b"more than 11 bytes".to_vec()).await; + assert!( + remaining.is_empty(), + "{}", + String::from_utf8_lossy(&remaining) + ); + drop(contents_tx); + _ = trailers_tx.write(Ok(None)).await; + }, + ); + let res = handle.unwrap(); drop(res); let err = transmit.expect_err("request transmission should have failed"); diff --git a/crates/wasi-http/Cargo.toml b/crates/wasi-http/Cargo.toml index 1ef15de184ed..8b40de30ba49 100644 --- a/crates/wasi-http/Cargo.toml +++ b/crates/wasi-http/Cargo.toml @@ -14,7 +14,7 @@ workspace = true [features] default = ["default-send-request"] default-send-request = ["dep:tokio-rustls", "dep:rustls", "dep:webpki-roots"] -p3 = ["wasmtime-wasi/p3"] +p3 = ["wasmtime-wasi/p3", "dep:tokio-util"] [dependencies] anyhow = { workspace = true } @@ -27,6 +27,7 @@ tokio = { workspace = true, features = [ "rt-multi-thread", "time", ] } +tokio-util = { workspace = true, optional = true } http = { workspace = true } http-body = { workspace = true } http-body-util = { workspace = true } diff --git a/crates/wasi-http/src/p3/bindings.rs b/crates/wasi-http/src/p3/bindings.rs index 8e36aeafb35b..01eaa0aedacc 100644 --- a/crates/wasi-http/src/p3/bindings.rs +++ b/crates/wasi-http/src/p3/bindings.rs @@ -7,12 +7,31 @@ mod generated { world: "wasi:http/proxy", imports: { "wasi:http/handler/[async]handle": async | store | trappable | tracing, + "wasi:http/types/[method]request.consume-body": async | store | trappable | tracing, + "wasi:http/types/[method]response.consume-body": async | store | trappable | tracing, "wasi:http/types/[static]request.new": async | store | trappable | tracing, "wasi:http/types/[static]response.new": async | store | trappable | tracing, default: trappable | tracing, }, exports: { default: async | store }, + with: { + "wasi:http/types/fields": with::Fields, + "wasi:http/types/request": crate::p3::Request, + "wasi:http/types/request-options": with::RequestOptions, + "wasi:http/types/response": crate::p3::Response, + }, + trappable_error_type: { + "wasi:http/types/error-code" => crate::p3::HttpError, + }, }); + + mod with { + /// The concrete type behind a `wasi:http/types/fields` resource. + pub type Fields = crate::p3::MaybeMutable; + + /// The concrete type behind a `wasi:http/types/request-options` resource. + pub type RequestOptions = crate::p3::MaybeMutable; + } } pub use self::generated::wasi::*; diff --git a/crates/wasi-http/src/p3/body.rs b/crates/wasi-http/src/p3/body.rs new file mode 100644 index 000000000000..7a568e5172b2 --- /dev/null +++ b/crates/wasi-http/src/p3/body.rs @@ -0,0 +1,257 @@ +use crate::p3::bindings::http::types::{ErrorCode, Trailers}; +use crate::p3::{WasiHttp, WasiHttpCtxView}; +use anyhow::Context as _; +use bytes::Bytes; +use core::pin::Pin; +use core::task::{Context, Poll, ready}; +use http::HeaderMap; +use http_body_util::combinators::BoxBody; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::sync::PollSender; +use wasmtime::component::{ + Accessor, FutureConsumer, FutureReader, Resource, Source, StreamConsumer, StreamReader, + StreamResult, +}; +use wasmtime::{AsContextMut, StoreContextMut}; + +/// The concrete type behind a `wasi:http/types/body` resource. +pub(crate) enum Body { + /// Body constructed by the guest + Guest { + /// The body stream + contents_rx: Option>, + /// Future, on which guest will write result and optional trailers + trailers_rx: FutureReader>, ErrorCode>>, + /// Channel, on which transmission result will be written + result_tx: oneshot::Sender> + Send>>, + }, + /// Body constructed by the host. + Host { + body: BoxBody, + /// Channel, on which transmission result will be written + result_tx: oneshot::Sender> + Send>>, + }, + /// Body is consumed. + Consumed, +} + +pub(crate) struct GuestBodyConsumer { + pub(crate) tx: PollSender, +} + +impl StreamConsumer for GuestBodyConsumer { + type Item = u8; + + fn poll_consume( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + store: StoreContextMut, + src: Source, + finish: bool, + ) -> Poll> { + match self.tx.poll_reserve(cx) { + Poll::Ready(Ok(())) => { + let mut src = src.as_direct(store); + let buf = Bytes::copy_from_slice(src.remaining()); + let n = buf.len(); + match self.tx.send_item(buf) { + Ok(()) => { + src.mark_read(n); + Poll::Ready(Ok(StreamResult::Completed)) + } + Err(..) => Poll::Ready(Ok(StreamResult::Dropped)), + } + } + Poll::Ready(Err(..)) => Poll::Ready(Ok(StreamResult::Dropped)), + Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)), + Poll::Pending => Poll::Pending, + } + } +} + +pub(crate) struct GuestBody { + pub(crate) contents_rx: Option>, + pub(crate) trailers_rx: + Option>, ErrorCode>>>, +} + +impl GuestBody { + pub fn new( + mut store: impl AsContextMut, + contents_rx: Option>, + trailers_rx: FutureReader>, ErrorCode>>, + getter: for<'a> fn(&'a mut T) -> WasiHttpCtxView<'a>, + ) -> Self { + let (trailers_http_tx, trailers_http_rx) = oneshot::channel(); + trailers_rx.pipe( + &mut store, + GuestTrailerConsumer { + tx: trailers_http_tx, + getter, + }, + ); + let contents_rx = contents_rx.map(|rx| { + let (http_tx, http_rx) = mpsc::channel(1); + rx.pipe( + store, + GuestBodyConsumer { + tx: PollSender::new(http_tx), + }, + ); + http_rx + }); + Self { + trailers_rx: Some(trailers_http_rx), + contents_rx, + } + } +} + +impl http_body::Body for GuestBody { + type Data = Bytes; + type Error = ErrorCode; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + if let Some(contents_rx) = self.contents_rx.as_mut() { + while let Some(buf) = ready!(contents_rx.poll_recv(cx)) { + return Poll::Ready(Some(Ok(http_body::Frame::data(buf)))); + } + self.contents_rx = None; + } + + let Some(trailers_rx) = self.trailers_rx.as_mut() else { + return Poll::Ready(None); + }; + + let res = ready!(Pin::new(trailers_rx).poll(cx)); + self.trailers_rx = None; + match res { + Ok(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers( + Arc::unwrap_or_clone(trailers), + )))), + Ok(Ok(None)) => Poll::Ready(None), + Ok(Err(err)) => Poll::Ready(Some(Err(err))), + Err(..) => Poll::Ready(None), + } + } + + fn is_end_stream(&self) -> bool { + if let Some(contents_rx) = self.contents_rx.as_ref() { + if !contents_rx.is_empty() || !contents_rx.is_closed() { + return false; + } + } + if let Some(trailers_rx) = self.trailers_rx.as_ref() { + if !trailers_rx.is_terminated() { + return false; + } + } + return true; + } + + fn size_hint(&self) -> http_body::SizeHint { + // TODO: use content-length + http_body::SizeHint::default() + } +} + +pub(crate) struct ConsumedBody; + +impl http_body::Body for ConsumedBody { + type Data = Bytes; + type Error = ErrorCode; + + fn poll_frame( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Poll::Ready(Some(Err(ErrorCode::InternalError(Some( + "body consumed".into(), + ))))) + } + + fn is_end_stream(&self) -> bool { + true + } + + fn size_hint(&self) -> http_body::SizeHint { + http_body::SizeHint::with_exact(0) + } +} + +pub(crate) struct GuestTrailerConsumer { + pub(crate) tx: oneshot::Sender>, ErrorCode>>, + pub(crate) getter: for<'a> fn(&'a mut T) -> WasiHttpCtxView<'a>, +} + +impl FutureConsumer for GuestTrailerConsumer +where + D: 'static, +{ + type Item = Result>, ErrorCode>; + + async fn consume(self, store: &Accessor, res: Self::Item) -> wasmtime::Result<()> { + match res { + Ok(Some(trailers)) => store + .with_getter::(self.getter) + .with(|mut store| { + let WasiHttpCtxView { table, .. } = store.get(); + let trailers = table + .delete(trailers) + .context("failed to delete trailers")?; + _ = self.tx.send(Ok(Some(Arc::from(trailers)))); + Ok(()) + }), + Ok(None) => { + _ = self.tx.send(Ok(None)); + Ok(()) + } + Err(err) => { + _ = self.tx.send(Err(err)); + Ok(()) + } + } + } +} + +pub(crate) struct IncomingResponseBody { + pub incoming: hyper::body::Incoming, + pub timeout: tokio::time::Interval, +} + +impl http_body::Body for IncomingResponseBody { + type Data = ::Data; + type Error = ErrorCode; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match Pin::new(&mut self.as_mut().incoming).poll_frame(cx) { + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(Err(err))) => { + Poll::Ready(Some(Err(ErrorCode::from_hyper_response_error(err)))) + } + Poll::Ready(Some(Ok(frame))) => { + self.timeout.reset(); + Poll::Ready(Some(Ok(frame))) + } + Poll::Pending => { + ready!(self.timeout.poll_tick(cx)); + Poll::Ready(Some(Err(ErrorCode::ConnectionReadTimeout))) + } + } + } + + fn is_end_stream(&self) -> bool { + self.incoming.is_end_stream() + } + + fn size_hint(&self) -> http_body::SizeHint { + self.incoming.size_hint() + } +} diff --git a/crates/wasi-http/src/p3/conv.rs b/crates/wasi-http/src/p3/conv.rs index e5a053beac31..64c5a385370d 100644 --- a/crates/wasi-http/src/p3/conv.rs +++ b/crates/wasi-http/src/p3/conv.rs @@ -1,5 +1,7 @@ use crate::p3::bindings::http::types::{ErrorCode, Method, Scheme}; use core::convert::Infallible; +use core::error::Error as _; +use tracing::warn; impl From for ErrorCode { fn from(x: Infallible) -> Self { @@ -7,6 +9,40 @@ impl From for ErrorCode { } } +impl ErrorCode { + /// Translate a [`hyper::Error`] to a wasi-http [ErrorCode] in the context of a request. + pub fn from_hyper_request_error(err: hyper::Error) -> Self { + // If there's a source, we might be able to extract a wasi-http error from it. + if let Some(cause) = err.source() { + if let Some(err) = cause.downcast_ref::() { + return err.clone(); + } + } + + warn!("hyper request error: {err:?}"); + + Self::HttpProtocolError + } + + /// Translate a [`hyper::Error`] to a wasi-http [ErrorCode] in the context of a response. + pub fn from_hyper_response_error(err: hyper::Error) -> Self { + if err.is_timeout() { + return ErrorCode::HttpResponseTimeout; + } + + // If there's a source, we might be able to extract a wasi-http error from it. + if let Some(cause) = err.source() { + if let Some(err) = cause.downcast_ref::() { + return err.clone(); + } + } + + warn!("hyper response error: {err:?}"); + + ErrorCode::HttpProtocolError + } +} + impl From for Method { fn from(method: http::Method) -> Self { Self::from(&method) diff --git a/crates/wasi-http/src/p3/host/handler.rs b/crates/wasi-http/src/p3/host/handler.rs index d0d4814883d6..0c73bf52ce2e 100644 --- a/crates/wasi-http/src/p3/host/handler.rs +++ b/crates/wasi-http/src/p3/host/handler.rs @@ -1,15 +1,118 @@ use crate::p3::bindings::http::handler::{Host, HostWithStore}; use crate::p3::bindings::http::types::{ErrorCode, Request, Response}; -use crate::p3::{WasiHttp, WasiHttpCtxView}; -use anyhow::bail; +use crate::p3::body::{Body, ConsumedBody, GuestBody}; +use crate::p3::host::{delete_request, push_response}; +use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView}; +use http::header::HOST; +use http::{HeaderValue, Uri}; +use http_body_util::BodyExt as _; +use std::sync::Arc; +use tokio::sync::oneshot; +use tracing::debug; use wasmtime::component::{Accessor, Resource}; impl HostWithStore for WasiHttp { - async fn handle( - store: &Accessor, - request: Resource, - ) -> wasmtime::Result, ErrorCode>> { - bail!("TODO") + async fn handle( + store: &Accessor, + req: Resource, + ) -> HttpResult> { + let getter = store.getter(); + let (res_result_tx, res_result_rx) = oneshot::channel(); + let (fut, req_result_tx) = store.with(|mut store| { + let WasiHttpCtxView { ctx, table } = store.get(); + let Request { + method, + scheme, + authority, + path_with_query, + headers, + options, + body, + } = delete_request(table, req).map_err(HttpError::trap)?; + let mut headers = Arc::unwrap_or_clone(headers); + if ctx.set_host_header() { + let host = if let Some(authority) = authority.as_ref() { + HeaderValue::try_from(authority.as_str()) + .map_err(|err| ErrorCode::InternalError(Some(err.to_string())))? + } else { + HeaderValue::from_static("") + }; + headers.insert(HOST, host); + } + + let scheme = match scheme { + None => ctx.default_scheme().ok_or(ErrorCode::HttpProtocolError)?, + Some(scheme) if ctx.is_supported_scheme(&scheme) => scheme, + Some(..) => return Err(ErrorCode::HttpProtocolError.into()), + }; + let mut uri = Uri::builder().scheme(scheme); + if let Some(authority) = authority { + uri = uri.authority(authority) + }; + if let Some(path_with_query) = path_with_query { + uri = uri.path_and_query(path_with_query) + }; + let uri = uri.build().map_err(|err| { + debug!(?err, "failed to build request URI"); + ErrorCode::HttpRequestUriInvalid + })?; + + let mut req = http::Request::builder(); + *req.headers_mut().unwrap() = headers; + let (body, result_tx) = match body { + Body::Guest { + contents_rx, + trailers_rx, + result_tx, + } => ( + GuestBody::new(&mut store, contents_rx, trailers_rx, getter).boxed(), + Some(result_tx), + ), + Body::Host { body, result_tx } => (body, Some(result_tx)), + Body::Consumed => (ConsumedBody.boxed(), None), + }; + let req = req + .method(method) + .uri(uri) + .body(body) + .map_err(|err| ErrorCode::InternalError(Some(err.to_string())))?; + HttpResult::Ok(( + store.get().ctx.send_request( + req, + options.as_deref().copied(), + Box::new(async { + let Ok(fut) = res_result_rx.await else { + return Ok(()); + }; + Box::into_pin(fut).await + }), + ), + result_tx, + )) + })?; + let (res, io) = Box::into_pin(fut).await?; + if let Some(req_result_tx) = req_result_tx { + if let Err(io) = req_result_tx.send(io) { + Box::into_pin(io).await?; + } + } else { + Box::into_pin(io).await?; + } + let ( + http::response::Parts { + status, headers, .. + }, + body, + ) = res.into_parts(); + let res = Response { + status, + headers: Arc::new(headers), + body: Body::Host { + body, + result_tx: res_result_tx, + }, + }; + store.with(|mut store| push_response(store.get().table, res).map_err(HttpError::trap)) } } diff --git a/crates/wasi-http/src/p3/host/mod.rs b/crates/wasi-http/src/p3/host/mod.rs index 9d6627020f26..702650a72865 100644 --- a/crates/wasi-http/src/p3/host/mod.rs +++ b/crates/wasi-http/src/p3/host/mod.rs @@ -1,2 +1,89 @@ +use crate::p3::bindings::http::types::{Fields, Request, Response}; +use anyhow::Context as _; +use wasmtime::component::{Resource, ResourceTable}; + mod handler; mod types; + +fn get_fields<'a>( + table: &'a ResourceTable, + fields: &Resource, +) -> wasmtime::Result<&'a Fields> { + table + .get(&fields) + .context("failed to get fields from table") +} + +fn get_fields_mut<'a>( + table: &'a mut ResourceTable, + fields: &Resource, +) -> wasmtime::Result<&'a mut Fields> { + table + .get_mut(&fields) + .context("failed to get fields from table") +} + +fn push_fields(table: &mut ResourceTable, fields: Fields) -> wasmtime::Result> { + table.push(fields).context("failed to push fields to table") +} + +fn delete_fields(table: &mut ResourceTable, fields: Resource) -> wasmtime::Result { + table + .delete(fields) + .context("failed to delete fields from table") +} + +fn get_request<'a>( + table: &'a ResourceTable, + req: &Resource, +) -> wasmtime::Result<&'a Request> { + table.get(req).context("failed to get request from table") +} + +fn get_request_mut<'a>( + table: &'a mut ResourceTable, + req: &Resource, +) -> wasmtime::Result<&'a mut Request> { + table + .get_mut(req) + .context("failed to get request from table") +} + +fn push_request(table: &mut ResourceTable, req: Request) -> wasmtime::Result> { + table.push(req).context("failed to push request to table") +} + +fn delete_request(table: &mut ResourceTable, req: Resource) -> wasmtime::Result { + table + .delete(req) + .context("failed to delete request from table") +} + +fn get_response<'a>( + table: &'a ResourceTable, + res: &Resource, +) -> wasmtime::Result<&'a Response> { + table.get(res).context("failed to get response from table") +} + +fn get_response_mut<'a>( + table: &'a mut ResourceTable, + res: &Resource, +) -> wasmtime::Result<&'a mut Response> { + table + .get_mut(res) + .context("failed to get response from table") +} + +fn push_response(table: &mut ResourceTable, res: Response) -> wasmtime::Result> { + table.push(res).context("failed to push response to table") +} + +fn delete_response( + table: &mut ResourceTable, + res: Resource, +) -> wasmtime::Result { + table + .delete(res) + .context("failed to delete response from table") +} diff --git a/crates/wasi-http/src/p3/host/types.rs b/crates/wasi-http/src/p3/host/types.rs index 7eef3cade3a3..533bec028719 100644 --- a/crates/wasi-http/src/p3/host/types.rs +++ b/crates/wasi-http/src/p3/host/types.rs @@ -1,290 +1,815 @@ +use super::{ + delete_fields, delete_request, delete_response, get_fields, get_fields_mut, get_request, + get_request_mut, get_response, get_response_mut, push_fields, push_request, push_response, +}; use crate::p3::bindings::clocks::monotonic_clock::Duration; use crate::p3::bindings::http::types::{ ErrorCode, FieldName, FieldValue, Fields, HeaderError, Headers, Host, HostFields, HostRequest, HostRequestOptions, HostRequestWithStore, HostResponse, HostResponseWithStore, Method, Request, RequestOptions, RequestOptionsError, Response, Scheme, StatusCode, Trailers, }; -use crate::p3::{WasiHttp, WasiHttpCtxView}; -use anyhow::bail; -use wasmtime::component::{Accessor, FutureReader, Resource, StreamReader}; +use crate::p3::body::Body; +use crate::p3::{HttpError, WasiHttp, WasiHttpCtxView}; +use anyhow::Context as _; +use bytes::Bytes; +use core::mem; +use core::num::NonZeroUsize; +use core::pin::Pin; +use core::task::Context; +use core::task::Poll; +use http::header::CONTENT_LENGTH; +use http_body::Body as _; +use http_body_util::combinators::BoxBody; +use std::io::Cursor; +use std::sync::Arc; +use tokio::sync::oneshot; +use wasmtime::StoreContextMut; +use wasmtime::component::{ + Accessor, Destination, FutureProducer, FutureReader, Resource, StreamProducer, StreamReader, + StreamResult, +}; +use wasmtime_wasi::ResourceTable; +use wasmtime_wasi::p3::{FutureOneshotProducer, StreamEmptyProducer}; + +fn get_request_options<'a>( + table: &'a ResourceTable, + opts: &Resource, +) -> wasmtime::Result<&'a RequestOptions> { + table + .get(opts) + .context("failed to get request options from table") +} + +fn get_request_options_mut<'a>( + table: &'a mut ResourceTable, + opts: &Resource, +) -> wasmtime::Result<&'a mut RequestOptions> { + table + .get_mut(opts) + .context("failed to get request options from table") +} + +fn push_request_options( + table: &mut ResourceTable, + opts: RequestOptions, +) -> wasmtime::Result> { + table + .push(opts) + .context("failed to push request options to table") +} + +fn delete_request_options( + table: &mut ResourceTable, + opts: Resource, +) -> wasmtime::Result { + table + .delete(opts) + .context("failed to delete request options from table") +} + +fn parse_header_value( + name: &http::HeaderName, + value: impl AsRef<[u8]>, +) -> Result { + if name == CONTENT_LENGTH { + let s = str::from_utf8(value.as_ref()).or(Err(HeaderError::InvalidSyntax))?; + let v: u64 = s.parse().or(Err(HeaderError::InvalidSyntax))?; + Ok(v.into()) + } else { + http::HeaderValue::from_bytes(value.as_ref()).or(Err(HeaderError::InvalidSyntax)) + } +} + +struct GuestBodyResultProducer( + oneshot::Receiver> + Send>>, +); + +impl FutureProducer for GuestBodyResultProducer { + type Item = Result<(), ErrorCode>; + + async fn produce(self, _: &Accessor) -> wasmtime::Result { + let Ok(fut) = self.0.await else { + return Ok(Ok(())); + }; + Ok(Box::into_pin(fut).await) + } +} + +struct HostBodyStreamProducer { + body: BoxBody, + trailers: Option>, ErrorCode>>>, + getter: for<'a> fn(&'a mut T) -> WasiHttpCtxView<'a>, +} + +impl Drop for HostBodyStreamProducer { + fn drop(&mut self) { + self.close(Ok(None)) + } +} + +impl HostBodyStreamProducer { + fn close(&mut self, res: Result>, ErrorCode>) { + if let Some(tx) = self.trailers.take() { + _ = tx.send(res); + } + } +} + +impl StreamProducer for HostBodyStreamProducer +where + D: 'static, +{ + type Item = u8; + type Buffer = Cursor; + + fn poll_produce<'a>( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut store: StoreContextMut<'a, D>, + mut dst: Destination<'a, Self::Item, Self::Buffer>, + finish: bool, + ) -> Poll> { + let res = 'result: { + let cap = match dst.remaining(&mut store).map(NonZeroUsize::new) { + Some(Some(cap)) => Some(cap), + Some(None) => { + if self.body.is_end_stream() { + break 'result Ok(None); + } else { + return Poll::Ready(Ok(StreamResult::Completed)); + } + } + None => None, + }; + match Pin::new(&mut self.body).poll_frame(cx) { + Poll::Ready(Some(Ok(frame))) => { + match frame.into_data().map_err(http_body::Frame::into_trailers) { + Ok(mut frame) => { + if let Some(cap) = cap { + let n = frame.len(); + let cap = cap.into(); + if n > cap { + dst.set_buffer(Cursor::new(frame.split_off(cap))); + let mut dst = dst.as_direct(store, cap); + dst.remaining().copy_from_slice(&frame); + dst.mark_written(cap); + } else { + let mut dst = dst.as_direct(store, n); + dst.remaining()[..n].copy_from_slice(&frame); + dst.mark_written(n); + } + } else { + dst.set_buffer(Cursor::new(frame)); + } + return Poll::Ready(Ok(StreamResult::Completed)); + } + Err(Ok(trailers)) => { + let trailers = push_fields( + (self.getter)(store.data_mut()).table, + Fields::new_mutable(trailers), + )?; + break 'result Ok(Some(trailers)); + } + Err(Err(..)) => break 'result Err(ErrorCode::HttpProtocolError), + } + } + Poll::Ready(Some(Err(err))) => break 'result Err(err), + Poll::Ready(None) => break 'result Ok(None), + Poll::Pending if finish => return Poll::Ready(Ok(StreamResult::Cancelled)), + Poll::Pending => return Poll::Pending, + } + }; + self.close(res); + Poll::Ready(Ok(StreamResult::Dropped)) + } +} impl HostFields for WasiHttpCtxView<'_> { fn new(&mut self) -> wasmtime::Result> { - bail!("TODO") + push_fields(self.table, Fields::new_mutable_default()) } fn from_list( &mut self, entries: Vec<(FieldName, FieldValue)>, ) -> wasmtime::Result, HeaderError>> { - bail!("TODO") + let mut fields = http::HeaderMap::default(); + for (name, value) in entries { + let Ok(name) = name.parse() else { + return Ok(Err(HeaderError::InvalidSyntax)); + }; + if self.ctx.is_forbidden_header(&name) { + return Ok(Err(HeaderError::Forbidden)); + } + match parse_header_value(&name, value) { + Ok(value) => { + fields.append(name, value); + } + Err(err) => return Ok(Err(err)), + } + } + let fields = push_fields(self.table, Fields::new_mutable(fields))?; + Ok(Ok(fields)) } fn get( &mut self, - self_: Resource, + fields: Resource, name: FieldName, ) -> wasmtime::Result> { - bail!("TODO") + let fields = get_fields(self.table, &fields)?; + Ok(fields + .get_all(name) + .into_iter() + .map(|val| val.as_bytes().into()) + .collect()) } - fn has(&mut self, self_: Resource, name: FieldName) -> wasmtime::Result { - bail!("TODO") + fn has(&mut self, fields: Resource, name: FieldName) -> wasmtime::Result { + let fields = get_fields(self.table, &fields)?; + Ok(fields.contains_key(name)) } fn set( &mut self, - self_: Resource, + fields: Resource, name: FieldName, value: Vec, ) -> wasmtime::Result> { - bail!("TODO") + let Ok(name) = name.parse() else { + return Ok(Err(HeaderError::InvalidSyntax)); + }; + if self.ctx.is_forbidden_header(&name) { + return Ok(Err(HeaderError::Forbidden)); + } + let mut values = Vec::with_capacity(value.len()); + for value in value { + match parse_header_value(&name, value) { + Ok(value) => { + values.push(value); + } + Err(err) => return Ok(Err(err)), + } + } + let fields = get_fields_mut(self.table, &fields)?; + let Some(fields) = fields.get_mut() else { + return Ok(Err(HeaderError::Immutable)); + }; + fields.remove(&name); + for value in values { + fields.append(&name, value); + } + Ok(Ok(())) } fn delete( &mut self, - self_: Resource, + fields: Resource, name: FieldName, ) -> wasmtime::Result> { - bail!("TODO") + let header = match http::HeaderName::from_bytes(name.as_bytes()) { + Ok(header) => header, + Err(_) => return Ok(Err(HeaderError::InvalidSyntax)), + }; + if self.ctx.is_forbidden_header(&header) { + return Ok(Err(HeaderError::Forbidden)); + } + let fields = get_fields_mut(self.table, &fields)?; + let Some(fields) = fields.get_mut() else { + return Ok(Err(HeaderError::Immutable)); + }; + fields.remove(&name); + Ok(Ok(())) } fn get_and_delete( &mut self, - self_: Resource, + fields: Resource, name: FieldName, ) -> wasmtime::Result, HeaderError>> { - bail!("TODO") + let Ok(header) = http::header::HeaderName::from_bytes(name.as_bytes()) else { + return Ok(Err(HeaderError::InvalidSyntax)); + }; + if self.ctx.is_forbidden_header(&header) { + return Ok(Err(HeaderError::Forbidden)); + } + let fields = get_fields_mut(self.table, &fields)?; + let Some(fields) = fields.get_mut() else { + return Ok(Err(HeaderError::Immutable)); + }; + let http::header::Entry::Occupied(entry) = fields.entry(header) else { + return Ok(Ok(vec![])); + }; + let (.., values) = entry.remove_entry_mult(); + Ok(Ok(values.map(|header| header.as_bytes().into()).collect())) } fn append( &mut self, - self_: Resource, + fields: Resource, name: FieldName, value: FieldValue, ) -> wasmtime::Result> { - bail!("TODO") + let Ok(name) = name.parse() else { + return Ok(Err(HeaderError::InvalidSyntax)); + }; + if self.ctx.is_forbidden_header(&name) { + return Ok(Err(HeaderError::Forbidden)); + } + let value = match parse_header_value(&name, value) { + Ok(value) => value, + Err(err) => return Ok(Err(err)), + }; + let fields = get_fields_mut(self.table, &fields)?; + let Some(fields) = fields.get_mut() else { + return Ok(Err(HeaderError::Immutable)); + }; + fields.append(name, value); + Ok(Ok(())) } fn copy_all( &mut self, - self_: Resource, + fields: Resource, ) -> wasmtime::Result> { - bail!("TODO") + let fields = get_fields(self.table, &fields)?; + let fields = fields + .iter() + .map(|(name, value)| (name.as_str().into(), value.as_bytes().into())) + .collect(); + Ok(fields) } - fn clone(&mut self, self_: Resource) -> wasmtime::Result> { - bail!("TODO") + fn clone(&mut self, fields: Resource) -> wasmtime::Result> { + let fields = get_fields(self.table, &fields)?; + push_fields(self.table, Fields::new_mutable(Arc::clone(fields))) } - fn drop(&mut self, rep: Resource) -> wasmtime::Result<()> { - bail!("TODO") + fn drop(&mut self, fields: Resource) -> wasmtime::Result<()> { + delete_fields(self.table, fields)?; + Ok(()) } } impl HostRequestWithStore for WasiHttp { - async fn new( - store: &Accessor, + async fn new( + store: &Accessor, headers: Resource, contents: Option>, trailers: FutureReader>, ErrorCode>>, options: Option>, ) -> wasmtime::Result<(Resource, FutureReader>)> { - bail!("TODO") + let instance = store.instance(); + store.with(|mut store| { + let (result_tx, result_rx) = oneshot::channel(); + let WasiHttpCtxView { table, .. } = store.get(); + let headers = delete_fields(table, headers)?; + let options = options + .map(|options| delete_request_options(table, options)) + .transpose()?; + let body = Body::Guest { + contents_rx: contents, + trailers_rx: trailers, + result_tx, + }; + let req = Request { + method: http::Method::GET, + scheme: None, + authority: None, + path_with_query: None, + headers: headers.into(), + options: options.map(Into::into), + body, + }; + let req = push_request(table, req)?; + Ok(( + req, + FutureReader::new(instance, &mut store, GuestBodyResultProducer(result_rx)), + )) + }) + } + + async fn consume_body( + store: &Accessor, + req: Resource, + ) -> wasmtime::Result< + Result< + ( + StreamReader, + FutureReader>, ErrorCode>>, + ), + (), + >, + > { + let getter = store.getter(); + store.with(|mut store| { + let req = get_request_mut(store.get().table, &req)?; + match mem::replace(&mut req.body, Body::Consumed) { + Body::Guest { + contents_rx: Some(contents_rx), + trailers_rx, + result_tx, + } => { + // TODO: Use a result specified by the caller + // https://github.com/WebAssembly/wasi-http/issues/176 + _ = result_tx.send(Box::new(async { Ok(()) })); + Ok(Ok((contents_rx, trailers_rx))) + } + Body::Guest { + contents_rx: None, + trailers_rx, + result_tx, + } => { + let instance = store.instance(); + // TODO: Use a result specified by the caller + // https://github.com/WebAssembly/wasi-http/issues/176 + _ = result_tx.send(Box::new(async { Ok(()) })); + Ok(Ok(( + StreamReader::new(instance, &mut store, StreamEmptyProducer::default()), + trailers_rx, + ))) + } + Body::Host { body, result_tx } => { + let instance = store.instance(); + // TODO: Use a result specified by the caller + // https://github.com/WebAssembly/wasi-http/issues/176 + _ = result_tx.send(Box::new(async { Ok(()) })); + let (trailers_tx, trailers_rx) = oneshot::channel(); + Ok(Ok(( + StreamReader::new( + instance, + &mut store, + HostBodyStreamProducer { + body, + trailers: Some(trailers_tx), + getter, + }, + ), + FutureReader::new( + instance, + &mut store, + FutureOneshotProducer::from(trailers_rx), + ), + ))) + } + Body::Consumed => Ok(Err(())), + } + }) } } impl HostRequest for WasiHttpCtxView<'_> { - fn get_method(&mut self, self_: Resource) -> wasmtime::Result { - bail!("TODO") + fn get_method(&mut self, req: Resource) -> wasmtime::Result { + let Request { method, .. } = get_request(self.table, &req)?; + Ok(method.into()) } fn set_method( &mut self, - self_: Resource, + req: Resource, method: Method, ) -> wasmtime::Result> { - bail!("TODO") + let req = get_request_mut(self.table, &req)?; + let Ok(method) = method.try_into() else { + return Ok(Err(())); + }; + req.method = method; + Ok(Ok(())) } - fn get_path_with_query( - &mut self, - self_: Resource, - ) -> wasmtime::Result> { - bail!("TODO") + fn get_path_with_query(&mut self, req: Resource) -> wasmtime::Result> { + let Request { + path_with_query, .. + } = get_request(self.table, &req)?; + Ok(path_with_query.as_ref().map(|pq| pq.as_str().into())) } fn set_path_with_query( &mut self, - self_: Resource, + req: Resource, path_with_query: Option, ) -> wasmtime::Result> { - bail!("TODO") + let req = get_request_mut(self.table, &req)?; + let Some(path_with_query) = path_with_query else { + req.path_with_query = None; + return Ok(Ok(())); + }; + let Ok(path_with_query) = path_with_query.try_into() else { + return Ok(Err(())); + }; + req.path_with_query = Some(path_with_query); + Ok(Ok(())) } - fn get_scheme(&mut self, self_: Resource) -> wasmtime::Result> { - bail!("TODO") + fn get_scheme(&mut self, req: Resource) -> wasmtime::Result> { + let Request { scheme, .. } = get_request(self.table, &req)?; + Ok(scheme.as_ref().map(Into::into)) } fn set_scheme( &mut self, - self_: Resource, + req: Resource, scheme: Option, ) -> wasmtime::Result> { - bail!("TODO") + let req = get_request_mut(self.table, &req)?; + let Some(scheme) = scheme else { + req.scheme = None; + return Ok(Ok(())); + }; + let Ok(scheme) = scheme.try_into() else { + return Ok(Err(())); + }; + req.scheme = Some(scheme); + Ok(Ok(())) } - fn get_authority(&mut self, self_: Resource) -> wasmtime::Result> { - bail!("TODO") + fn get_authority(&mut self, req: Resource) -> wasmtime::Result> { + let Request { authority, .. } = get_request(self.table, &req)?; + Ok(authority.as_ref().map(|auth| auth.as_str().into())) } fn set_authority( &mut self, - self_: Resource, + req: Resource, authority: Option, ) -> wasmtime::Result> { - bail!("TODO") + let req = get_request_mut(self.table, &req)?; + let Some(authority) = authority else { + req.authority = None; + return Ok(Ok(())); + }; + let has_port = authority.contains(':'); + let Ok(authority) = http::uri::Authority::try_from(authority) else { + return Ok(Err(())); + }; + if has_port && authority.port_u16().is_none() { + return Ok(Err(())); + } + req.authority = Some(authority); + Ok(Ok(())) } fn get_options( &mut self, - self_: Resource, + req: Resource, ) -> wasmtime::Result>> { - bail!("TODO") + let Request { options, .. } = get_request(self.table, &req)?; + if let Some(options) = options { + let options = push_request_options( + self.table, + RequestOptions::new_immutable(Arc::clone(options)), + )?; + Ok(Some(options)) + } else { + Ok(None) + } } - fn get_headers(&mut self, self_: Resource) -> wasmtime::Result> { - bail!("TODO") - } - - fn consume_body( - &mut self, - self_: Resource, - ) -> wasmtime::Result< - Result< - ( - StreamReader, - FutureReader>, ErrorCode>>, - ), - (), - >, - > { - bail!("TODO") + fn get_headers(&mut self, req: Resource) -> wasmtime::Result> { + let Request { headers, .. } = get_request(self.table, &req)?; + push_fields(self.table, Fields::new_immutable(Arc::clone(headers))) } - fn drop(&mut self, rep: Resource) -> wasmtime::Result<()> { - bail!("TODO") + fn drop(&mut self, req: Resource) -> wasmtime::Result<()> { + delete_request(self.table, req)?; + Ok(()) } } impl HostRequestOptions for WasiHttpCtxView<'_> { fn new(&mut self) -> wasmtime::Result> { - bail!("TODO") + push_request_options(self.table, RequestOptions::new_mutable_default()) } fn get_connect_timeout( &mut self, - self_: Resource, + opts: Resource, ) -> wasmtime::Result> { - bail!("TODO") + let opts = get_request_options(self.table, &opts)?; + let Some(connect_timeout) = opts.connect_timeout else { + return Ok(None); + }; + let ns = connect_timeout.as_nanos(); + let ns = ns + .try_into() + .context("connect timeout duration nanoseconds do not fit in u64")?; + Ok(Some(ns)) } fn set_connect_timeout( &mut self, - self_: Resource, + opts: Resource, duration: Option, ) -> wasmtime::Result> { - bail!("TODO") + let opts = get_request_options_mut(self.table, &opts)?; + let Some(opts) = opts.get_mut() else { + return Ok(Err(RequestOptionsError::Immutable)); + }; + opts.connect_timeout = duration.map(core::time::Duration::from_nanos); + Ok(Ok(())) } fn get_first_byte_timeout( &mut self, - self_: Resource, + opts: Resource, ) -> wasmtime::Result> { - bail!("TODO") + let opts = get_request_options(self.table, &opts)?; + let Some(first_byte_timeout) = opts.first_byte_timeout else { + return Ok(None); + }; + let ns = first_byte_timeout.as_nanos(); + let ns = ns + .try_into() + .context("first byte timeout duration nanoseconds do not fit in u64")?; + Ok(Some(ns)) } fn set_first_byte_timeout( &mut self, - self_: Resource, + opts: Resource, duration: Option, ) -> wasmtime::Result> { - bail!("TODO") + let opts = get_request_options_mut(self.table, &opts)?; + let Some(opts) = opts.get_mut() else { + return Ok(Err(RequestOptionsError::Immutable)); + }; + opts.first_byte_timeout = duration.map(core::time::Duration::from_nanos); + Ok(Ok(())) } fn get_between_bytes_timeout( &mut self, - self_: Resource, + opts: Resource, ) -> wasmtime::Result> { - bail!("TODO") + let opts = get_request_options(self.table, &opts)?; + let Some(between_bytes_timeout) = opts.between_bytes_timeout else { + return Ok(None); + }; + let ns = between_bytes_timeout.as_nanos(); + let ns = ns + .try_into() + .context("between bytes timeout duration nanoseconds do not fit in u64")?; + Ok(Some(ns)) } fn set_between_bytes_timeout( &mut self, - self_: Resource, + opts: Resource, duration: Option, ) -> wasmtime::Result> { - bail!("TODO") + let opts = get_request_options_mut(self.table, &opts)?; + let Some(opts) = opts.get_mut() else { + return Ok(Err(RequestOptionsError::Immutable)); + }; + opts.between_bytes_timeout = duration.map(core::time::Duration::from_nanos); + Ok(Ok(())) } fn clone( &mut self, - self_: Resource, + opts: Resource, ) -> wasmtime::Result> { - bail!("TODO") + let opts = get_request_options(self.table, &opts)?; + push_request_options(self.table, RequestOptions::new_mutable(Arc::clone(opts))) } - fn drop(&mut self, rep: Resource) -> wasmtime::Result<()> { - bail!("TODO") + fn drop(&mut self, opts: Resource) -> wasmtime::Result<()> { + delete_request_options(self.table, opts)?; + Ok(()) } } impl HostResponseWithStore for WasiHttp { - async fn new( - store: &Accessor, + async fn new( + store: &Accessor, headers: Resource, contents: Option>, trailers: FutureReader>, ErrorCode>>, ) -> wasmtime::Result<(Resource, FutureReader>)> { - bail!("TODO") + let instance = store.instance(); + store.with(|mut store| { + let (result_tx, result_rx) = oneshot::channel(); + let WasiHttpCtxView { table, .. } = store.get(); + let headers = delete_fields(table, headers)?; + let body = Body::Guest { + contents_rx: contents, + trailers_rx: trailers, + result_tx, + }; + let res = Response { + status: http::StatusCode::OK, + headers: headers.into(), + body, + }; + let res = push_response(table, res)?; + Ok(( + res, + FutureReader::new(instance, &mut store, GuestBodyResultProducer(result_rx)), + )) + }) + } + + async fn consume_body( + store: &Accessor, + res: Resource, + ) -> wasmtime::Result< + Result< + ( + StreamReader, + FutureReader>, ErrorCode>>, + ), + (), + >, + > { + let getter = store.getter(); + store.with(|mut store| { + let res = get_response_mut(store.get().table, &res)?; + match mem::replace(&mut res.body, Body::Consumed) { + Body::Guest { + contents_rx: Some(contents_rx), + trailers_rx, + result_tx, + } => { + // TODO: Use a result specified by the caller + // https://github.com/WebAssembly/wasi-http/issues/176 + _ = result_tx.send(Box::new(async { Ok(()) })); + Ok(Ok((contents_rx, trailers_rx))) + } + Body::Guest { + contents_rx: None, + trailers_rx, + result_tx, + } => { + let instance = store.instance(); + // TODO: Use a result specified by the caller + // https://github.com/WebAssembly/wasi-http/issues/176 + _ = result_tx.send(Box::new(async { Ok(()) })); + Ok(Ok(( + StreamReader::new(instance, &mut store, StreamEmptyProducer::default()), + trailers_rx, + ))) + } + Body::Host { body, result_tx } => { + let instance = store.instance(); + // TODO: Use a result specified by the caller + // https://github.com/WebAssembly/wasi-http/issues/176 + _ = result_tx.send(Box::new(async { Ok(()) })); + let (trailers_tx, trailers_rx) = oneshot::channel(); + Ok(Ok(( + StreamReader::new( + instance, + &mut store, + HostBodyStreamProducer { + body, + trailers: Some(trailers_tx), + getter, + }, + ), + FutureReader::new( + instance, + &mut store, + FutureOneshotProducer::from(trailers_rx), + ), + ))) + } + Body::Consumed => Ok(Err(())), + } + }) } } impl HostResponse for WasiHttpCtxView<'_> { - fn get_status_code(&mut self, self_: Resource) -> wasmtime::Result { - bail!("TODO") + fn get_status_code(&mut self, res: Resource) -> wasmtime::Result { + let res = get_response(self.table, &res)?; + Ok(res.status.into()) } fn set_status_code( &mut self, - self_: Resource, + res: Resource, status_code: StatusCode, ) -> wasmtime::Result> { - bail!("TODO") + let res = get_response_mut(self.table, &res)?; + let Ok(status) = http::StatusCode::from_u16(status_code) else { + return Ok(Err(())); + }; + res.status = status; + Ok(Ok(())) } - fn get_headers(&mut self, self_: Resource) -> wasmtime::Result> { - bail!("TODO") + fn get_headers(&mut self, res: Resource) -> wasmtime::Result> { + let Response { headers, .. } = get_response(self.table, &res)?; + push_fields(self.table, Fields::new_immutable(Arc::clone(headers))) } - fn consume_body( - &mut self, - self_: Resource, - ) -> wasmtime::Result< - Result< - ( - StreamReader, - FutureReader>, ErrorCode>>, - ), - (), - >, - > { - bail!("TODO") + fn drop(&mut self, res: Resource) -> wasmtime::Result<()> { + delete_response(self.table, res)?; + Ok(()) } +} - fn drop(&mut self, rep: Resource) -> wasmtime::Result<()> { - bail!("TODO") +impl Host for WasiHttpCtxView<'_> { + fn convert_error_code(&mut self, error: HttpError) -> wasmtime::Result { + error.downcast() } } - -impl Host for WasiHttpCtxView<'_> {} diff --git a/crates/wasi-http/src/p3/mod.rs b/crates/wasi-http/src/p3/mod.rs index 4339aa6afb71..a0d4ce3f0471 100644 --- a/crates/wasi-http/src/p3/mod.rs +++ b/crates/wasi-http/src/p3/mod.rs @@ -9,12 +9,32 @@ //! Documentation of this module may be incorrect or out-of-sync with the implementation. pub mod bindings; +pub mod body; mod conv; -#[expect(unused, reason = "work in progress")] // TODO: implement mod host; +mod proxy; +mod request; +mod response; +#[cfg(feature = "default-send-request")] +pub use request::default_send_request; +pub use request::{Request, RequestOptions}; +pub use response::Response; + +use crate::p3::bindings::http::types::ErrorCode; +use crate::types::DEFAULT_FORBIDDEN_HEADERS; use bindings::http::{handler, types}; +use bytes::Bytes; +use core::ops::Deref; +use http::HeaderName; +use http::uri::Scheme; +use http_body_util::combinators::BoxBody; +use std::sync::Arc; use wasmtime::component::{HasData, Linker, ResourceTable}; +use wasmtime_wasi::TrappableError; + +pub type HttpResult = Result; +pub type HttpError = TrappableError; pub(crate) struct WasiHttp; @@ -22,11 +42,85 @@ impl HasData for WasiHttp { type Data<'a> = WasiHttpCtxView<'a>; } +pub trait WasiHttpCtx: Send { + /// Whether a given header should be considered forbidden and not allowed. + fn is_forbidden_header(&mut self, name: &HeaderName) -> bool { + DEFAULT_FORBIDDEN_HEADERS.contains(name) + } + + /// Whether a given scheme should be considered supported. + /// + /// `handle` will return [ErrorCode::HttpProtocolError] for unsupported schemes. + fn is_supported_scheme(&mut self, scheme: &Scheme) -> bool { + *scheme == Scheme::HTTP || *scheme == Scheme::HTTPS + } + + /// Whether to set `host` header in the request passed to `send_request`. + fn set_host_header(&mut self) -> bool { + true + } + + /// Scheme to default to, when not set by the guest. + /// + /// If [None], `handle` will return [ErrorCode::HttpProtocolError] + /// for requests missing a scheme. + fn default_scheme(&mut self) -> Option { + Some(Scheme::HTTPS) + } + + /// Send an outgoing request. + #[cfg(feature = "default-send-request")] + fn send_request( + &mut self, + request: http::Request>, + options: Option, + fut: Box> + Send>, + ) -> Box< + dyn Future< + Output = HttpResult<( + http::Response>, + Box> + Send>, + )>, + > + Send, + > { + _ = fut; + Box::new(async move { + use http_body_util::BodyExt; + + let (res, io) = default_send_request(request, options).await?; + Ok(( + res.map(BodyExt::boxed), + Box::new(io) as Box + Send>, + )) + }) + } + + /// Send an outgoing request. + #[cfg(not(feature = "default-send-request"))] + fn send_request( + &mut self, + request: http::Request>, + options: Option, + fut: Box> + Send>, + ) -> Box< + dyn Future< + Output = HttpResult<( + http::Response>, + Box> + Send>, + )>, + > + Send, + >; +} + +#[cfg(feature = "default-send-request")] #[derive(Clone, Default)] -pub struct WasiHttpCtx {} +pub struct DefaultWasiHttpCtx; + +#[cfg(feature = "default-send-request")] +impl WasiHttpCtx for DefaultWasiHttpCtx {} pub struct WasiHttpCtxView<'a> { - pub ctx: &'a mut WasiHttpCtx, + pub ctx: &'a mut dyn WasiHttpCtx, pub table: &'a mut ResourceTable, } @@ -45,7 +139,7 @@ pub trait WasiHttpView: Send { /// ``` /// use wasmtime::{Engine, Result, Store, Config}; /// use wasmtime::component::{Linker, ResourceTable}; -/// use wasmtime_wasi_http::p3::{WasiHttpCtx, WasiHttpCtxView, WasiHttpView}; +/// use wasmtime_wasi_http::p3::{DefaultWasiHttpCtx, WasiHttpCtxView, WasiHttpView}; /// /// fn main() -> Result<()> { /// let mut config = Config::new(); @@ -69,7 +163,7 @@ pub trait WasiHttpView: Send { /// /// #[derive(Default)] /// struct MyState { -/// http: WasiHttpCtx, +/// http: DefaultWasiHttpCtx, /// table: ResourceTable, /// } /// @@ -90,3 +184,66 @@ where types::add_to_linker::<_, WasiHttp>(linker, T::http)?; Ok(()) } + +/// An [Arc], which may be immutable. +pub enum MaybeMutable { + Mutable(Arc), + Immutable(Arc), +} + +impl From> for Arc { + fn from(v: MaybeMutable) -> Self { + v.into_arc() + } +} + +impl Deref for MaybeMutable { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + self.as_arc() + } +} + +impl MaybeMutable { + pub fn new_mutable(v: impl Into>) -> Self { + Self::Mutable(v.into()) + } + + pub fn new_mutable_default() -> Self + where + T: Default, + { + Self::new_mutable(T::default()) + } + + pub fn new_immutable(v: impl Into>) -> Self { + Self::Immutable(v.into()) + } + + fn as_arc(&self) -> &Arc { + match self { + Self::Mutable(v) | Self::Immutable(v) => v, + } + } + + fn into_arc(self) -> Arc { + match self { + Self::Mutable(v) | Self::Immutable(v) => v, + } + } + + pub fn get(&self) -> &T { + self + } + + pub fn get_mut(&mut self) -> Option<&mut T> + where + T: Clone, + { + match self { + Self::Mutable(v) => Some(Arc::make_mut(v)), + Self::Immutable(..) => None, + } + } +} diff --git a/crates/wasi-http/src/p3/proxy.rs b/crates/wasi-http/src/p3/proxy.rs new file mode 100644 index 000000000000..75029e2ae769 --- /dev/null +++ b/crates/wasi-http/src/p3/proxy.rs @@ -0,0 +1,37 @@ +use crate::p3::WasiHttpView; +use crate::p3::bindings::Proxy; +use crate::p3::bindings::http::types::{ErrorCode, Request, Response}; +use anyhow::Context as _; +use wasmtime::component::Accessor; + +impl Proxy { + /// Call `handle` on [Proxy] getting a [Future] back. + pub async fn handle( + &self, + store: &Accessor, + req: impl Into, + ) -> wasmtime::Result> { + let req = store.with(|mut store| { + store + .data_mut() + .http() + .table + .push(req.into()) + .context("failed to push request to table") + })?; + match self.wasi_http_handler().call_handle(store, req).await? { + Ok(res) => { + let res = store.with(|mut store| { + store + .data_mut() + .http() + .table + .delete(res) + .context("failed to delete response from table") + })?; + Ok(Ok(res)) + } + Err(err) => Ok(Err(err)), + } + } +} diff --git a/crates/wasi-http/src/p3/request.rs b/crates/wasi-http/src/p3/request.rs new file mode 100644 index 000000000000..413d5fae836c --- /dev/null +++ b/crates/wasi-http/src/p3/request.rs @@ -0,0 +1,289 @@ +use crate::p3::bindings::http::types::ErrorCode; +use crate::p3::body::Body; +use bytes::Bytes; +use core::time::Duration; +use http::uri::{Authority, PathAndQuery, Scheme}; +use http::{HeaderMap, Method}; +use http_body_util::BodyExt as _; +use http_body_util::combinators::BoxBody; +use std::sync::Arc; +use tokio::sync::oneshot; + +#[derive(Copy, Clone, Debug, Default)] +pub struct RequestOptions { + /// How long to wait for a connection to be established. + pub connect_timeout: Option, + /// How long to wait for the first byte of the response body. + pub first_byte_timeout: Option, + /// How long to wait between frames of the response body. + pub between_bytes_timeout: Option, +} + +/// The concrete type behind a `wasi:http/types/request` resource. +pub struct Request { + /// The method of the request. + pub method: Method, + /// The scheme of the request. + pub scheme: Option, + /// The authority of the request. + pub authority: Option, + /// The path and query of the request. + pub path_with_query: Option, + /// The request headers. + pub headers: Arc, + /// Request options. + pub options: Option>, + /// Request body. + pub(crate) body: Body, +} + +impl Request { + /// Construct a new [Request] + pub fn new( + method: Method, + scheme: Option, + authority: Option, + path_with_query: Option, + headers: impl Into>, + options: Option>, + body: impl Into>, + ) -> ( + Self, + impl Future> + Send + 'static, + ) { + let (tx, rx) = oneshot::channel(); + ( + Self { + method, + scheme, + authority, + path_with_query, + headers: headers.into(), + options, + body: Body::Host { + body: body.into(), + result_tx: tx, + }, + }, + async { + let Ok(fut) = rx.await else { return Ok(()) }; + Box::into_pin(fut).await + }, + ) + } + + /// Construct a new [Request] from [http::Request]. + pub fn from_http( + req: http::Request, + ) -> ( + Self, + impl Future> + Send + 'static, + ) + where + T: http_body::Body + Send + Sync + 'static, + T::Error: Into, + { + let ( + http::request::Parts { + method, + uri, + headers, + .. + }, + body, + ) = req.into_parts(); + let http::uri::Parts { + scheme, + authority, + path_and_query, + .. + } = uri.into_parts(); + Self::new( + method, + scheme, + authority, + path_and_query, + headers, + None, + body.map_err(Into::into).boxed(), + ) + } +} + +/// The default implementation of how an outgoing request is sent. +/// +/// This implementation is used by the `wasi:http/outgoing-handler` interface +/// default implementation. +#[cfg(feature = "default-send-request")] +pub async fn default_send_request( + mut req: http::Request + Send + 'static>, + options: Option, +) -> Result< + ( + http::Response>, + impl Future> + Send, + ), + ErrorCode, +> { + use core::future::poll_fn; + use core::pin::{Pin, pin}; + use core::task::{Poll, ready}; + use tokio::io::{AsyncRead, AsyncWrite}; + use tokio::net::TcpStream; + + trait TokioStream: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static { + fn boxed(self) -> Box + where + Self: Sized, + { + Box::new(self) + } + } + impl TokioStream for T where T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static {} + + fn dns_error(rcode: String, info_code: u16) -> ErrorCode { + ErrorCode::DnsError(crate::p3::bindings::http::types::DnsErrorPayload { + rcode: Some(rcode), + info_code: Some(info_code), + }) + } + + let uri = req.uri(); + let authority = uri.authority().ok_or(ErrorCode::HttpRequestUriInvalid)?; + let use_tls = uri.scheme() == Some(&Scheme::HTTPS); + let authority = if authority.port().is_some() { + authority.to_string() + } else { + let port = if use_tls { 443 } else { 80 }; + format!("{authority}:{port}") + }; + + let connect_timeout = options + .and_then( + |RequestOptions { + connect_timeout, .. + }| connect_timeout, + ) + .unwrap_or(Duration::from_secs(600)); + + let first_byte_timeout = options + .and_then( + |RequestOptions { + first_byte_timeout, .. + }| first_byte_timeout, + ) + .unwrap_or(Duration::from_secs(600)); + + let between_bytes_timeout = options + .and_then( + |RequestOptions { + between_bytes_timeout, + .. + }| between_bytes_timeout, + ) + .unwrap_or(Duration::from_secs(600)); + + let stream = match tokio::time::timeout(connect_timeout, TcpStream::connect(&authority)).await { + Ok(Ok(stream)) => stream, + Ok(Err(err)) if err.kind() == std::io::ErrorKind::AddrNotAvailable => { + return Err(dns_error("address not available".to_string(), 0)); + } + Ok(Err(err)) + if err + .to_string() + .starts_with("failed to lookup address information") => + { + return Err(dns_error("address not available".to_string(), 0)); + } + Ok(Err(..)) => return Err(ErrorCode::ConnectionRefused), + Err(..) => return Err(ErrorCode::ConnectionTimeout), + }; + let stream = if use_tls { + use rustls::pki_types::ServerName; + + // derived from https://github.com/rustls/rustls/blob/main/examples/src/bin/simpleclient.rs + let root_cert_store = rustls::RootCertStore { + roots: webpki_roots::TLS_SERVER_ROOTS.into(), + }; + let config = rustls::ClientConfig::builder() + .with_root_certificates(root_cert_store) + .with_no_client_auth(); + let connector = tokio_rustls::TlsConnector::from(std::sync::Arc::new(config)); + let mut parts = authority.split(":"); + let host = parts.next().unwrap_or(&authority); + let domain = ServerName::try_from(host) + .map_err(|e| { + tracing::warn!("dns lookup error: {e:?}"); + dns_error("invalid dns name".to_string(), 0) + })? + .to_owned(); + let stream = connector.connect(domain, stream).await.map_err(|e| { + tracing::warn!("tls protocol error: {e:?}"); + ErrorCode::TlsProtocolError + })?; + stream.boxed() + } else { + stream.boxed() + }; + let (mut sender, conn) = tokio::time::timeout( + connect_timeout, + // TODO: we should plumb the builder through the http context, and use it here + hyper::client::conn::http1::Builder::new().handshake(crate::io::TokioIo::new(stream)), + ) + .await + .map_err(|_| ErrorCode::ConnectionTimeout)? + .map_err(ErrorCode::from_hyper_request_error)?; + + // at this point, the request contains the scheme and the authority, but + // the http packet should only include those if addressing a proxy, so + // remove them here, since SendRequest::send_request does not do it for us + *req.uri_mut() = http::Uri::builder() + .path_and_query( + req.uri() + .path_and_query() + .map(|p| p.as_str()) + .unwrap_or("/"), + ) + .build() + .expect("comes from valid request"); + + let send = async move { + use crate::p3::body::IncomingResponseBody; + + let res = tokio::time::timeout(first_byte_timeout, sender.send_request(req)) + .await + .map_err(|_| ErrorCode::ConnectionReadTimeout)? + .map_err(ErrorCode::from_hyper_request_error)?; + let mut timeout = tokio::time::interval(between_bytes_timeout); + timeout.reset(); + Ok(res.map(|incoming| IncomingResponseBody { incoming, timeout })) + }; + let mut send = pin!(send); + let mut conn = Some(conn); + let res = poll_fn(|cx| match send.as_mut().poll(cx) { + Poll::Ready(Ok(res)) => Poll::Ready(Ok(res)), + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => { + if let Some(fut) = conn.as_mut() { + let res = ready!(Pin::new(fut).poll(cx)); + conn = None; + match res { + Ok(()) => match ready!(send.as_mut().poll(cx)) { + Ok(res) => Poll::Ready(Ok(res)), + Err(err) => Poll::Ready(Err(err)), + }, + Err(err) => Poll::Ready(Err(ErrorCode::from_hyper_request_error(err))), + } + } else { + Poll::Pending + } + } + }) + .await?; + Ok((res, async move { + let Some(conn) = conn.take() else { + return Ok(()); + }; + conn.await.map_err(ErrorCode::from_hyper_response_error) + })) +} diff --git a/crates/wasi-http/src/p3/response.rs b/crates/wasi-http/src/p3/response.rs new file mode 100644 index 000000000000..d2549542a568 --- /dev/null +++ b/crates/wasi-http/src/p3/response.rs @@ -0,0 +1,90 @@ +use crate::p3::WasiHttpView; +use crate::p3::bindings::http::types::ErrorCode; +use crate::p3::body::{Body, ConsumedBody, GuestBody}; +use bytes::Bytes; +use http::{HeaderMap, StatusCode}; +use http_body_util::BodyExt as _; +use http_body_util::combinators::BoxBody; +use std::sync::Arc; +use tokio::sync::oneshot; +use wasmtime::AsContextMut; + +/// The concrete type behind a `wasi:http/types/response` resource. +pub struct Response { + /// The status of the response. + pub status: StatusCode, + /// The headers of the response. + pub headers: Arc, + /// Response body. + pub(crate) body: Body, +} + +impl TryFrom for http::Response { + type Error = http::Error; + + fn try_from( + Response { + status, + headers, + body, + }: Response, + ) -> Result { + let mut res = http::Response::builder().status(status); + *res.headers_mut().unwrap() = Arc::unwrap_or_clone(headers); + res.body(body) + } +} + +impl Response { + /// Construct a new [Response] + pub fn new( + status: StatusCode, + headers: impl Into>, + body: impl Into>, + ) -> ( + Self, + impl Future> + Send + 'static, + ) { + let (tx, rx) = oneshot::channel(); + ( + Self { + status, + headers: headers.into(), + body: Body::Host { + body: body.into(), + result_tx: tx, + }, + }, + async { + let Ok(fut) = rx.await else { return Ok(()) }; + Box::into_pin(fut).await + }, + ) + } + + /// Convert [Response] into [http::Response]. + pub fn into_http( + self, + store: impl AsContextMut, + fut: impl Future> + Send + 'static, + ) -> http::Result>> { + let response = http::Response::try_from(self)?; + let (response, body) = response.into_parts(); + let body = match body { + Body::Guest { + contents_rx, + trailers_rx, + result_tx, + } => { + _ = result_tx.send(Box::new(fut)); + GuestBody::new(store, contents_rx, trailers_rx, T::http).boxed() + } + Body::Host { body, result_tx } => { + _ = result_tx.send(Box::new(fut)); + body + } + Body::Consumed => ConsumedBody.boxed(), + }; + Ok(http::Response::from_parts(response, body)) + } +} diff --git a/crates/wasi-http/tests/all/main.rs b/crates/wasi-http/tests/all/main.rs index b7e39588bd0d..6db5cd0742a2 100644 --- a/crates/wasi-http/tests/all/main.rs +++ b/crates/wasi-http/tests/all/main.rs @@ -18,10 +18,10 @@ mod body { use hyper::body::Bytes; pub fn full(bytes: Bytes) -> BoxBody { - BoxBody::new(Full::new(bytes).map_err(|_| unreachable!())) + BoxBody::new(Full::new(bytes).map_err(|x| match x {})) } pub fn empty() -> BoxBody { - BoxBody::new(Empty::new().map_err(|_| unreachable!())) + BoxBody::new(Empty::new().map_err(|x| match x {})) } } diff --git a/crates/wasi-http/tests/all/p3/incoming.rs b/crates/wasi-http/tests/all/p3/incoming.rs deleted file mode 100644 index e6ffb347122f..000000000000 --- a/crates/wasi-http/tests/all/p3/incoming.rs +++ /dev/null @@ -1,52 +0,0 @@ -use bytes::Bytes; -use http_body::Body; -use http_body_util::{Collected, Empty}; -use wasmtime::Store; -use wasmtime::component::{Component, Linker}; -use wasmtime_wasi_http::p3::bindings::Proxy; -use wasmtime_wasi_http::p3::bindings::http::types::ErrorCode; - -use super::Ctx; - -#[expect(unused, reason = "unimplemented")] // TODO: implement -pub async fn run_wasi_http + 'static>( - component_filename: &str, - req: http::Request + Send + Sync + 'static>, -) -> anyhow::Result>, Option>> { - let engine = test_programs_artifacts::engine(|config| { - config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); - config.async_support(true); - config.wasm_component_model_async(true); - }); - let component = Component::from_file(&engine, component_filename)?; - - let mut store = Store::new(&engine, Ctx::default()); - - let mut linker = Linker::new(&engine); - wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; - wasmtime_wasi_http::p3::add_to_linker(&mut linker)?; - let instance = linker.instantiate_async(&mut store, &component).await?; - let proxy = Proxy::new(&mut store, &instance)?; - todo!("not implemented yet") -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test)] -async fn wasi_http_proxy_tests() -> anyhow::Result<()> { - let req = http::Request::builder() - .uri("http://example.com:8080/test-path") - .method(http::Method::GET); - - let resp = run_wasi_http( - test_programs_artifacts::P3_API_PROXY_COMPONENT, - req.body(Empty::new())?, - ) - .await?; - - match resp { - Ok(resp) => println!("response: {resp:?}"), - Err(e) => panic!("Error given in response: {e:?}"), - }; - - Ok(()) -} diff --git a/crates/wasi-http/tests/all/p3/mod.rs b/crates/wasi-http/tests/all/p3/mod.rs index fe4f38ccbefc..09cbe200bb3a 100644 --- a/crates/wasi-http/tests/all/p3/mod.rs +++ b/crates/wasi-http/tests/all/p3/mod.rs @@ -1,16 +1,42 @@ +use crate::http_server::Server; +use anyhow::Result; +use anyhow::{Context as _, anyhow}; +use bytes::Bytes; +use flate2::Compression; +use flate2::write::{DeflateDecoder, DeflateEncoder}; +use futures::SinkExt; +use http::HeaderValue; +use http_body::Body; +use http_body_util::{BodyExt as _, Collected, Empty}; +use std::io::Write; +use std::path::Path; +use test_programs_artifacts::*; +use tokio::{fs, try_join}; +use wasm_compose::composer::ComponentComposer; +use wasm_compose::config::{Config, Dependency, Instantiation, InstantiationArg}; use wasmtime::Store; use wasmtime::component::{Component, Linker, ResourceTable}; +use wasmtime_wasi::p3::bindings::Command; use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView}; -use wasmtime_wasi_http::p3::{WasiHttpCtx, WasiHttpCtxView, WasiHttpView}; +use wasmtime_wasi_http::p3::bindings::Proxy; +use wasmtime_wasi_http::p3::bindings::http::types::ErrorCode; +use wasmtime_wasi_http::p3::{Request, WasiHttpCtx, WasiHttpCtxView, WasiHttpView}; +use wasmtime_wasi_http::types::DEFAULT_FORBIDDEN_HEADERS; -mod incoming; -mod outgoing; -mod proxy; +foreach_p3_http!(assert_test_exists); + +struct TestHttpCtx; + +impl WasiHttpCtx for TestHttpCtx { + fn is_forbidden_header(&mut self, name: &http::header::HeaderName) -> bool { + name.as_str() == "custom-forbidden-header" || DEFAULT_FORBIDDEN_HEADERS.contains(name) + } +} struct Ctx { table: ResourceTable, wasi: WasiCtx, - http: WasiHttpCtx, + http: TestHttpCtx, } impl Default for Ctx { @@ -18,7 +44,7 @@ impl Default for Ctx { Self { table: ResourceTable::default(), wasi: WasiCtxBuilder::new().inherit_stdio().build(), - http: WasiHttpCtx::default(), + http: TestHttpCtx, } } } @@ -40,3 +66,373 @@ impl WasiHttpView for Ctx { } } } + +async fn run_cli(path: &str, server: &Server) -> anyhow::Result<()> { + let engine = test_programs_artifacts::engine(|config| { + config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); + config.async_support(true); + config.wasm_component_model_async(true); + }); + let component = Component::from_file(&engine, path)?; + let mut store = Store::new( + &engine, + Ctx { + wasi: wasmtime_wasi::WasiCtx::builder() + .env("HTTP_SERVER", server.addr()) + .build(), + ..Ctx::default() + }, + ); + let mut linker = Linker::new(&engine); + wasmtime_wasi::p2::add_to_linker_async(&mut linker) + .context("failed to link `wasi:cli@0.2.x`")?; + wasmtime_wasi::p3::add_to_linker(&mut linker).context("failed to link `wasi:cli@0.3.x`")?; + wasmtime_wasi_http::p3::add_to_linker(&mut linker) + .context("failed to link `wasi:http@0.3.x`")?; + let instance = linker.instantiate_async(&mut store, &component).await?; + let command = Command::new(&mut store, &instance)?; + instance + .run_concurrent(store, async |store| { + command.wasi_cli_run().call_run(store).await + }) + .await + .context("failed to call `wasi:cli/run#run`")? + .context("guest trapped")? + .map_err(|()| anyhow!("`wasi:cli/run#run` failed")) +} + +async fn run_http + 'static>( + component_filename: &str, + req: http::Request + Send + Sync + 'static>, +) -> anyhow::Result>, Option>> { + let engine = test_programs_artifacts::engine(|config| { + config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); + config.async_support(true); + config.wasm_component_model_async(true); + }); + let component = Component::from_file(&engine, component_filename)?; + + let mut store = Store::new(&engine, Ctx::default()); + + let mut linker = Linker::new(&engine); + wasmtime_wasi::p2::add_to_linker_async(&mut linker) + .context("failed to link `wasi:cli@0.2.x`")?; + wasmtime_wasi::p3::add_to_linker(&mut linker).context("failed to link `wasi:cli@0.3.x`")?; + wasmtime_wasi_http::p3::add_to_linker(&mut linker) + .context("failed to link `wasi:http@0.3.x`")?; + let instance = linker.instantiate_async(&mut store, &component).await?; + let proxy = Proxy::new(&mut store, &instance)?; + let (req, io) = Request::from_http(req); + let (res, ()) = instance + .run_concurrent(&mut store, async |store| { + try_join!( + async { + let res = match proxy.handle(store, req).await? { + Ok(res) => res, + Err(err) => return Ok(Err(Some(err))), + }; + let res = store.with(|store| res.into_http(store, async { Ok(()) }))?; + let (parts, body) = res.into_parts(); + let body = body.collect().await.context("failed to collect body")?; + Ok(Ok(http::Response::from_parts(parts, body))) + }, + async { io.await.context("failed to consume request body") } + ) + }) + .await??; + Ok(res) +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_get() -> anyhow::Result<()> { + let server = Server::http1(1)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_GET_COMPONENT, &server).await +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_timeout() -> anyhow::Result<()> { + let server = Server::http1(1)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_TIMEOUT_COMPONENT, &server).await +} + +#[ignore = "unimplemented"] // TODO: implement +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_post() -> anyhow::Result<()> { + let server = Server::http1(1)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_POST_COMPONENT, &server).await +} + +#[ignore = "unimplemented"] // TODO: implement +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_large_post() -> anyhow::Result<()> { + let server = Server::http1(1)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_LARGE_POST_COMPONENT, &server).await +} + +#[ignore = "unimplemented"] // TODO: implement +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_put() -> anyhow::Result<()> { + let server = Server::http1(1)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_PUT_COMPONENT, &server).await +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_invalid_version() -> anyhow::Result<()> { + let server = Server::http2(1)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_VERSION_COMPONENT, &server).await +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_invalid_header() -> anyhow::Result<()> { + let server = Server::http2(1)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_HEADER_COMPONENT, &server).await +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_unknown_method() -> anyhow::Result<()> { + let server = Server::http1(1)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_UNKNOWN_METHOD_COMPONENT, &server).await +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_unsupported_scheme() -> anyhow::Result<()> { + let server = Server::http1(1)?; + run_cli( + P3_HTTP_OUTBOUND_REQUEST_UNSUPPORTED_SCHEME_COMPONENT, + &server, + ) + .await +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_invalid_port() -> anyhow::Result<()> { + let server = Server::http1(1)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_PORT_COMPONENT, &server).await +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_invalid_dnsname() -> anyhow::Result<()> { + let server = Server::http1(1)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_DNSNAME_COMPONENT, &server).await +} + +#[ignore = "unimplemented"] // TODO: implement +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_response_build() -> anyhow::Result<()> { + let server = Server::http1(1)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_RESPONSE_BUILD_COMPONENT, &server).await +} + +#[ignore = "unimplemented"] // FIXME(#11631) +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_content_length() -> anyhow::Result<()> { + let server = Server::http1(3)?; + run_cli(P3_HTTP_OUTBOUND_REQUEST_CONTENT_LENGTH_COMPONENT, &server).await +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_outbound_request_missing_path_and_query() -> anyhow::Result<()> { + let server = Server::http1(1)?; + run_cli( + P3_HTTP_OUTBOUND_REQUEST_MISSING_PATH_AND_QUERY_COMPONENT, + &server, + ) + .await +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn wasi_http_proxy_tests() -> anyhow::Result<()> { + let req = http::Request::builder() + .uri("http://example.com:8080/test-path") + .method(http::Method::GET); + + let res = run_http(P3_API_PROXY_COMPONENT, req.body(Empty::new())?).await?; + + match res { + Ok(res) => println!("response: {res:?}"), + Err(err) => panic!("Error given in response: {err:?}"), + }; + + Ok(()) +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_echo() -> Result<()> { + test_http_echo(P3_HTTP_ECHO_COMPONENT, false).await +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_middleware() -> Result<()> { + let tempdir = tempfile::tempdir()?; + let echo = &fs::read(P3_HTTP_ECHO_COMPONENT).await?; + let middleware = &fs::read(P3_HTTP_MIDDLEWARE_COMPONENT).await?; + + let path = tempdir.path().join("temp.wasm"); + fs::write(&path, compose(middleware, echo).await?).await?; + test_http_echo(&path.to_str().unwrap(), true).await +} + +async fn compose(a: &[u8], b: &[u8]) -> Result> { + let dir = tempfile::tempdir()?; + + let a_file = dir.path().join("a.wasm"); + fs::write(&a_file, a).await?; + + let b_file = dir.path().join("b.wasm"); + fs::write(&b_file, b).await?; + + ComponentComposer::new( + &a_file, + &wasm_compose::config::Config { + dir: dir.path().to_owned(), + definitions: vec![b_file.to_owned()], + ..Default::default() + }, + ) + .compose() +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_middleware_with_chain() -> Result<()> { + let dir = tempfile::tempdir()?; + let path = dir.path().join("temp.wasm"); + + fs::copy(P3_HTTP_ECHO_COMPONENT, &dir.path().join("chain-http.wasm")).await?; + + let bytes = ComponentComposer::new( + Path::new(P3_HTTP_MIDDLEWARE_WITH_CHAIN_COMPONENT), + &Config { + dir: dir.path().to_owned(), + definitions: Vec::new(), + search_paths: Vec::new(), + skip_validation: false, + import_components: false, + disallow_imports: false, + dependencies: [( + "local:local/chain-http".to_owned(), + Dependency { + path: P3_HTTP_ECHO_COMPONENT.into(), + }, + )] + .into_iter() + .collect(), + instantiations: [( + "root".to_owned(), + Instantiation { + dependency: Some("local:local/chain-http".to_owned()), + arguments: [( + "local:local/chain-http".to_owned(), + InstantiationArg { + instance: "local:local/chain-http".into(), + export: Some("wasi:http/handler@0.3.0-rc-2025-08-15".into()), + }, + )] + .into_iter() + .collect(), + }, + )] + .into_iter() + .collect(), + }, + ) + .compose()?; + fs::write(&path, &bytes).await?; + + test_http_echo(&path.to_str().unwrap(), true).await +} + +async fn test_http_echo(component: &str, use_compression: bool) -> Result<()> { + let body = b"And the mome raths outgrabe"; + + // Prepare the raw body, optionally compressed if that's what we're + // testing. + let raw_body = if use_compression { + let mut encoder = DeflateEncoder::new(Vec::new(), Compression::fast()); + encoder.write_all(body).unwrap(); + Bytes::from(encoder.finish().unwrap()) + } else { + Bytes::copy_from_slice(body) + }; + + // Prepare the http_body body, modeled here as a channel with the body + // chunk above buffered up followed by some trailers. Note that trailers + // are always here to test that code paths throughout the components. + let (mut body_tx, body_rx) = futures::channel::mpsc::channel::>(2); + body_tx + .send(Ok(http_body::Frame::data(raw_body))) + .await + .unwrap(); + body_tx + .send(Ok(http_body::Frame::trailers({ + let mut trailers = http::HeaderMap::new(); + assert!( + trailers + .insert("fizz", http::HeaderValue::from_static("buzz")) + .is_none() + ); + trailers + }))) + .await + .unwrap(); + + // Build the `http::Request`, optionally specifying compression-related + // headers. + let mut request = http::Request::builder() + .uri("http://localhost/") + .method(http::Method::GET) + .header("foo", "bar"); + if use_compression { + request = request + .header("content-encoding", "deflate") + .header("accept-encoding", "nonexistent-encoding, deflate"); + } + + // Send this request to wasm and assert that success comes back. + // + // Note that this will read the entire body internally and wait for + // everything to get collected before proceeding to below. + let response = run_http( + component, + request.body(http_body_util::StreamBody::new(body_rx))?, + ) + .await? + .unwrap(); + assert!(response.status().as_u16() == 200); + + // Our input header should be echo'd back. + assert_eq!( + response.headers().get("foo"), + Some(&HeaderValue::from_static("bar")) + ); + + // The compression headers should be set if `use_compression` was turned + // on. + if use_compression { + assert_eq!( + response.headers().get("content-encoding"), + Some(&HeaderValue::from_static("deflate")) + ); + assert!(response.headers().get("content-length").is_none()); + } + + // Trailers should be echo'd back as well. + let trailers = response.body().trailers().expect("trailers missing"); + assert_eq!( + trailers.get("fizz"), + Some(&HeaderValue::from_static("buzz")) + ); + + // And our body should match our original input body as well. + let (_, collected_body) = response.into_parts(); + let collected_body = collected_body.to_bytes(); + + let response_body = if use_compression { + let mut decoder = DeflateDecoder::new(Vec::new()); + decoder.write_all(&collected_body)?; + decoder.finish()? + } else { + collected_body.to_vec() + }; + assert_eq!(response_body, body.as_slice()); + Ok(()) +} diff --git a/crates/wasi-http/tests/all/p3/outgoing.rs b/crates/wasi-http/tests/all/p3/outgoing.rs deleted file mode 100644 index e0762fd48ba3..000000000000 --- a/crates/wasi-http/tests/all/p3/outgoing.rs +++ /dev/null @@ -1,148 +0,0 @@ -use super::*; -use crate::http_server::Server; -use anyhow::{Context as _, anyhow}; -use test_programs_artifacts::*; -use wasmtime_wasi::p3::bindings::Command; - -foreach_p3_http!(assert_test_exists); - -use super::proxy::{p3_http_echo, p3_http_middleware, p3_http_middleware_with_chain}; - -async fn run(path: &str, server: &Server) -> anyhow::Result<()> { - let engine = test_programs_artifacts::engine(|config| { - config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); - config.async_support(true); - config.wasm_component_model_async(true); - }); - let component = Component::from_file(&engine, path)?; - let mut store = Store::new( - &engine, - Ctx { - wasi: wasmtime_wasi::WasiCtx::builder() - .env("HTTP_SERVER", server.addr()) - .build(), - ..Ctx::default() - }, - ); - let mut linker = Linker::new(&engine); - wasmtime_wasi::p2::add_to_linker_async(&mut linker) - .context("failed to link `wasi:cli@0.2.x`")?; - wasmtime_wasi::p3::add_to_linker(&mut linker).context("failed to link `wasi:cli@0.3.x`")?; - wasmtime_wasi_http::p3::add_to_linker(&mut linker)?; - let instance = linker.instantiate_async(&mut store, &component).await?; - let command = Command::new(&mut store, &instance)?; - instance - .run_concurrent(store, async |store| { - command.wasi_cli_run().call_run(store).await - }) - .await - .context("failed to call `wasi:cli/run#run`")? - .context("guest trapped")? - .map_err(|()| anyhow!("`wasi:cli/run#run` failed")) -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_get() -> anyhow::Result<()> { - let server = Server::http1(1)?; - run(P3_HTTP_OUTBOUND_REQUEST_GET_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_timeout() -> anyhow::Result<()> { - let server = Server::http1(1)?; - run(P3_HTTP_OUTBOUND_REQUEST_TIMEOUT_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_post() -> anyhow::Result<()> { - let server = Server::http1(1)?; - run(P3_HTTP_OUTBOUND_REQUEST_POST_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_large_post() -> anyhow::Result<()> { - let server = Server::http1(1)?; - run(P3_HTTP_OUTBOUND_REQUEST_LARGE_POST_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_put() -> anyhow::Result<()> { - let server = Server::http1(1)?; - run(P3_HTTP_OUTBOUND_REQUEST_PUT_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_invalid_version() -> anyhow::Result<()> { - let server = Server::http2(1)?; - run(P3_HTTP_OUTBOUND_REQUEST_INVALID_VERSION_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_invalid_header() -> anyhow::Result<()> { - let server = Server::http2(1)?; - run(P3_HTTP_OUTBOUND_REQUEST_INVALID_HEADER_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_unknown_method() -> anyhow::Result<()> { - let server = Server::http1(1)?; - run(P3_HTTP_OUTBOUND_REQUEST_UNKNOWN_METHOD_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_unsupported_scheme() -> anyhow::Result<()> { - let server = Server::http1(1)?; - run( - P3_HTTP_OUTBOUND_REQUEST_UNSUPPORTED_SCHEME_COMPONENT, - &server, - ) - .await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_invalid_port() -> anyhow::Result<()> { - let server = Server::http1(1)?; - run(P3_HTTP_OUTBOUND_REQUEST_INVALID_PORT_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_invalid_dnsname() -> anyhow::Result<()> { - let server = Server::http1(1)?; - run(P3_HTTP_OUTBOUND_REQUEST_INVALID_DNSNAME_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_response_build() -> anyhow::Result<()> { - let server = Server::http1(1)?; - run(P3_HTTP_OUTBOUND_REQUEST_RESPONSE_BUILD_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_content_length() -> anyhow::Result<()> { - let server = Server::http1(3)?; - run(P3_HTTP_OUTBOUND_REQUEST_CONTENT_LENGTH_COMPONENT, &server).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn p3_http_outbound_request_missing_path_and_query() -> anyhow::Result<()> { - let server = Server::http1(1)?; - run( - P3_HTTP_OUTBOUND_REQUEST_MISSING_PATH_AND_QUERY_COMPONENT, - &server, - ) - .await -} diff --git a/crates/wasi-http/tests/all/p3/proxy.rs b/crates/wasi-http/tests/all/p3/proxy.rs deleted file mode 100644 index 66daad9015d8..000000000000 --- a/crates/wasi-http/tests/all/p3/proxy.rs +++ /dev/null @@ -1,204 +0,0 @@ -use crate::p3::incoming::run_wasi_http; -use anyhow::Result; -use bytes::Bytes; -use flate2::{ - Compression, - write::{DeflateDecoder, DeflateEncoder}, -}; -use futures::SinkExt; -use http::HeaderValue; -use std::io::Write; -use std::path::Path; -use tokio::fs; -use wasm_compose::{ - composer::ComponentComposer, - config::{Config, Dependency, Instantiation, InstantiationArg}, -}; -use wasmtime_wasi_http::p3::bindings::http::types::ErrorCode; - -#[ignore = "unimplemented"] // TODO: implement -#[tokio::test] -pub async fn p3_http_echo() -> Result<()> { - test_http_echo(test_programs_artifacts::P3_HTTP_ECHO_COMPONENT, false).await -} - -#[ignore = "unimplemented"] // TODO: implement -#[tokio::test] -pub async fn p3_http_middleware() -> Result<()> { - let tempdir = tempfile::tempdir()?; - let echo = &fs::read(test_programs_artifacts::P3_HTTP_ECHO_COMPONENT).await?; - let middleware = &fs::read(test_programs_artifacts::P3_HTTP_MIDDLEWARE_COMPONENT).await?; - - let path = tempdir.path().join("temp.wasm"); - fs::write(&path, compose(middleware, echo).await?).await?; - test_http_echo(&path.to_str().unwrap(), true).await -} - -async fn compose(a: &[u8], b: &[u8]) -> Result> { - let dir = tempfile::tempdir()?; - - let a_file = dir.path().join("a.wasm"); - fs::write(&a_file, a).await?; - - let b_file = dir.path().join("b.wasm"); - fs::write(&b_file, b).await?; - - ComponentComposer::new( - &a_file, - &wasm_compose::config::Config { - dir: dir.path().to_owned(), - definitions: vec![b_file.to_owned()], - ..Default::default() - }, - ) - .compose() -} - -#[ignore = "unimplemented"] // TODO: implement -#[tokio::test] -pub async fn p3_http_middleware_with_chain() -> Result<()> { - let dir = tempfile::tempdir()?; - let path = dir.path().join("temp.wasm"); - - fs::copy( - test_programs_artifacts::P3_HTTP_ECHO_COMPONENT, - &dir.path().join("chain-http.wasm"), - ) - .await?; - - let bytes = ComponentComposer::new( - Path::new(test_programs_artifacts::P3_HTTP_MIDDLEWARE_WITH_CHAIN_COMPONENT), - &Config { - dir: dir.path().to_owned(), - definitions: Vec::new(), - search_paths: Vec::new(), - skip_validation: false, - import_components: false, - disallow_imports: false, - dependencies: [( - "local:local/chain-http".to_owned(), - Dependency { - path: test_programs_artifacts::P3_HTTP_ECHO_COMPONENT.into(), - }, - )] - .into_iter() - .collect(), - instantiations: [( - "root".to_owned(), - Instantiation { - dependency: Some("local:local/chain-http".to_owned()), - arguments: [( - "local:local/chain-http".to_owned(), - InstantiationArg { - instance: "local:local/chain-http".into(), - export: Some("wasi:http/handler@0.3.0-draft".into()), - }, - )] - .into_iter() - .collect(), - }, - )] - .into_iter() - .collect(), - }, - ) - .compose()?; - fs::write(&path, &bytes).await?; - - test_http_echo(&path.to_str().unwrap(), true).await -} - -async fn test_http_echo(component: &str, use_compression: bool) -> Result<()> { - let body = b"And the mome raths outgrabe"; - - // Prepare the raw body, optionally compressed if that's what we're - // testing. - let raw_body = if use_compression { - let mut encoder = DeflateEncoder::new(Vec::new(), Compression::fast()); - encoder.write_all(body).unwrap(); - Bytes::from(encoder.finish().unwrap()) - } else { - Bytes::copy_from_slice(body) - }; - - // Prepare the http_body body, modeled here as a channel with the body - // chunk above buffered up followed by some trailers. Note that trailers - // are always here to test that code paths throughout the components. - let (mut body_tx, body_rx) = futures::channel::mpsc::channel::>(2); - body_tx - .send(Ok(http_body::Frame::data(raw_body))) - .await - .unwrap(); - body_tx - .send(Ok(http_body::Frame::trailers({ - let mut trailers = http::HeaderMap::new(); - assert!( - trailers - .insert("fizz", http::HeaderValue::from_static("buzz")) - .is_none() - ); - trailers - }))) - .await - .unwrap(); - - // Build the `http::Request`, optionally specifying compression-related - // headers. - let mut request = http::Request::builder() - .uri("http://localhost/") - .method(http::Method::GET) - .header("foo", "bar"); - if use_compression { - request = request - .header("content-encoding", "deflate") - .header("accept-encoding", "nonexistent-encoding, deflate"); - } - - // Send this request to wasm and assert that success comes back. - // - // Note that this will read the entire body internally and wait for - // everything to get collected before proceeding to below. - let response = run_wasi_http( - component, - request.body(http_body_util::StreamBody::new(body_rx))?, - ) - .await? - .unwrap(); - assert!(response.status().as_u16() == 200); - - // Our input header should be echo'd back. - assert_eq!( - response.headers().get("foo"), - Some(&HeaderValue::from_static("bar")) - ); - - // The compression headers should be set if `use_compression` was turned - // on. - if use_compression { - assert_eq!( - response.headers().get("content-encoding"), - Some(&HeaderValue::from_static("deflate")) - ); - assert!(response.headers().get("content-length").is_none()); - } - - // Trailers should be echo'd back as well. - assert_eq!( - response.body().trailers().unwrap().get("fizz"), - Some(&HeaderValue::from_static("buzz")) - ); - - // And our body should match our original input body as well. - let (_, collected_body) = response.into_parts(); - let collected_body = collected_body.to_bytes(); - - let response_body = if use_compression { - let mut decoder = DeflateDecoder::new(Vec::new()); - decoder.write_all(&collected_body)?; - decoder.finish()? - } else { - collected_body.to_vec() - }; - assert_eq!(response_body, body.as_slice()); - Ok(()) -} diff --git a/crates/wasi/src/p3/mod.rs b/crates/wasi/src/p3/mod.rs index f3bab1d63217..2283e887da48 100644 --- a/crates/wasi/src/p3/mod.rs +++ b/crates/wasi/src/p3/mod.rs @@ -30,7 +30,7 @@ use wasmtime::component::{ // Default buffer capacity to use for reads of byte-sized values. const DEFAULT_BUFFER_CAPACITY: usize = 8192; -struct StreamEmptyProducer(PhantomData T>); +pub struct StreamEmptyProducer(PhantomData T>); impl Default for StreamEmptyProducer { fn default() -> Self { @@ -66,7 +66,13 @@ where } } -struct FutureOneshotProducer(oneshot::Receiver); +pub struct FutureOneshotProducer(oneshot::Receiver); + +impl From> for FutureOneshotProducer { + fn from(rx: oneshot::Receiver) -> Self { + Self(rx) + } +} impl FutureProducer for FutureOneshotProducer where