-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlib.rs
132 lines (114 loc) · 3.96 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#![cfg_attr(feature = "nightly", deny(missing_docs))]
#![cfg_attr(test, deny(warnings))]
#![feature(pin, futures_api, async_await, await_macro, arbitrary_self_types)]
use bytes::Bytes;
use futures::{
future,
prelude::*,
stream::{self, StreamObj},
task::Waker,
Poll,
};
use std::marker::Unpin;
use std::pin::Pin;
/// The raw body of an http request or response
///
/// A body is a stream of `Bytes` values, which are shared
/// handles to byte buffers
/// Both `Body` and `Bytes` values can be easily created from
/// standard owned byte buffer types
/// like `Vec<u8>` or `String`, using the `From` trait
pub struct Body {
stream: StreamObj<'static, Result<Bytes, std::io::Error>>,
}
impl Body {
/// create an empty body
pub fn empty() -> Self {
Body::from_stream(stream::empty())
}
/// Create a body from a stream of `Bytes`
pub fn from_stream<S>(s: S) -> Self
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
{
Self {
stream: StreamObj::new(Box::new(s)),
}
}
/// Reads the stream into a new `Vec`.
pub async fn into_vec(mut self) -> std::io::Result<Vec<u8>> {
let mut bytes = Vec::new();
while let Some(chunk) = await!(self.next()) {
bytes.extend(chunk?);
}
Ok(bytes)
}
}
impl<T: Into<Bytes> + Send> From<T> for Body {
fn from(x: T) -> Self {
Self::from_stream(stream::once(future::ok(x.into())))
}
}
impl Unpin for Body {}
impl Stream for Body {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(waker)
}
}
/// An HTTP request with a streaming body.
pub type Request = http::Request<Body>;
/// An HTTP response with a streaming body
pub type Response = http::Response<Body>;
/// An async HTTP service
///
/// An instance represents a service as a while. The associated
/// `Conn` type represents a particular connection, and may carry
/// connection-specific state
pub trait HttpService: Send + Sync + 'static {
/// An individual connection
///
/// This associated type is used to establish and hold any per-connection
/// state needed by the service
type Connection: Send + 'static;
/// A future for setting up an individual connection
///
/// This method is called each time the server receives a new connection request,
/// but before actually exchanging any data with the client
///
/// Returning an error will result in the server immediately dropping
/// the connection
type ConnectionFuture: Send + 'static + TryFuture<Ok = Self::Connection>;
/// Initiate a new connection.
///
/// This method is given access to the global service (`&self`),
/// which may provide handles to connection pools, thread pools, or other global data
fn connect(&self) -> Self::ConnectionFuture;
/// The async computation for producing the response
///
/// Returning an error will result in the server immediately dropping
/// the connection. It is usually preferable to instead return an HTTP respnse
/// with an error status code.
type Fut: Send + 'static + TryFuture<Ok = Response>;
/// Begin handling a single request
///
/// The handler is given shared access to the service itself, and mutable access
/// to the state for the connection where the request is taking place.
fn respond(&self, conn: &mut Self::Connection, req: Request) -> Self::Fut;
}
impl<F, Fut> HttpService for F
where
F: Send + Sync + 'static + Fn(Request) -> Fut,
Fut: Send + 'static + TryFuture<Ok = Response>,
Fut::Error: Send,
{
type Connection = ();
type ConnectionFuture = future::Ready<Result<(), Fut::Error>>;
fn connect(&self) -> Self::ConnectionFuture {
future::ok(())
}
type Fut = Fut;
fn respond(&self, _: &mut (), req: Request) -> Self::Fut {
(self)(req)
}
}