#[cfg(feature = "proxy")]
use socks::Socks5Stream;
use std::io::{BufRead, BufReader, Read, Write};
#[cfg(not(jsonrpc_fuzz))]
use std::net::TcpStream;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use std::{error, fmt, io, net, num};
use crate::client::Transport;
use crate::http::DEFAULT_PORT;
#[cfg(feature = "proxy")]
use crate::http::DEFAULT_PROXY_PORT;
use crate::{Request, Response};
const FINAL_RESP_ALLOC: u64 = 1024 * 1024 * 1024;
#[cfg(not(jsonrpc_fuzz))]
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(15);
#[cfg(jsonrpc_fuzz)]
const DEFAULT_TIMEOUT: Duration = Duration::from_millis(1);
#[derive(Clone, Debug)]
pub struct SimpleHttpTransport {
addr: net::SocketAddr,
path: String,
timeout: Duration,
basic_auth: Option<String>,
#[cfg(feature = "proxy")]
proxy_addr: net::SocketAddr,
#[cfg(feature = "proxy")]
proxy_auth: Option<(String, String)>,
sock: Arc<Mutex<Option<BufReader<TcpStream>>>>,
}
impl Default for SimpleHttpTransport {
fn default() -> Self {
SimpleHttpTransport {
addr: net::SocketAddr::new(
net::IpAddr::V4(net::Ipv4Addr::new(127, 0, 0, 1)),
DEFAULT_PORT,
),
path: "/".to_owned(),
timeout: DEFAULT_TIMEOUT,
basic_auth: None,
#[cfg(feature = "proxy")]
proxy_addr: net::SocketAddr::new(
net::IpAddr::V4(net::Ipv4Addr::new(127, 0, 0, 1)),
DEFAULT_PROXY_PORT,
),
#[cfg(feature = "proxy")]
proxy_auth: None,
sock: Arc::new(Mutex::new(None)),
}
}
}
impl SimpleHttpTransport {
pub fn new() -> Self {
SimpleHttpTransport::default()
}
pub fn builder() -> Builder {
Builder::new()
}
pub fn set_url(&mut self, url: &str) -> Result<(), Error> {
let url = check_url(url)?;
self.addr = url.0;
self.path = url.1;
Ok(())
}
pub fn set_url_path(&mut self, path: String) {
self.path = path;
}
fn request<R>(&self, req: impl serde::Serialize) -> Result<R, Error>
where
R: for<'a> serde::de::Deserialize<'a>,
{
match self.try_request(req) {
Ok(response) => Ok(response),
Err(err) => {
*self.sock.lock().expect("poisoned mutex") = None;
Err(err)
}
}
}
#[cfg(feature = "proxy")]
fn fresh_socket(&self) -> Result<TcpStream, Error> {
let stream = if let Some((username, password)) = &self.proxy_auth {
Socks5Stream::connect_with_password(
self.proxy_addr,
self.addr,
username.as_str(),
password.as_str(),
)?
} else {
Socks5Stream::connect(self.proxy_addr, self.addr)?
};
Ok(stream.into_inner())
}
#[cfg(not(feature = "proxy"))]
fn fresh_socket(&self) -> Result<TcpStream, Error> {
let stream = TcpStream::connect_timeout(&self.addr, self.timeout)?;
stream.set_read_timeout(Some(self.timeout))?;
stream.set_write_timeout(Some(self.timeout))?;
Ok(stream)
}
fn try_request<R>(&self, req: impl serde::Serialize) -> Result<R, Error>
where
R: for<'a> serde::de::Deserialize<'a>,
{
let mut sock_lock: MutexGuard<Option<_>> = self.sock.lock().expect("poisoned mutex");
if sock_lock.is_none() {
*sock_lock = Some(BufReader::new(self.fresh_socket()?));
};
let sock: &mut BufReader<_> = sock_lock.as_mut().unwrap();
let body = serde_json::to_vec(&req)?;
let mut request_bytes = Vec::new();
request_bytes.write_all(b"POST ")?;
request_bytes.write_all(self.path.as_bytes())?;
request_bytes.write_all(b" HTTP/1.1\r\n")?;
request_bytes.write_all(b"host: ")?;
request_bytes.write_all(self.addr.to_string().as_bytes())?;
request_bytes.write_all(b"\r\n")?;
request_bytes.write_all(b"Content-Type: application/json\r\n")?;
request_bytes.write_all(b"Content-Length: ")?;
request_bytes.write_all(body.len().to_string().as_bytes())?;
request_bytes.write_all(b"\r\n")?;
if let Some(ref auth) = self.basic_auth {
request_bytes.write_all(b"Authorization: ")?;
request_bytes.write_all(auth.as_ref())?;
request_bytes.write_all(b"\r\n")?;
}
request_bytes.write_all(b"\r\n")?;
request_bytes.write_all(&body)?;
let write_success = sock.get_mut().write_all(request_bytes.as_slice()).is_ok()
&& sock.get_mut().flush().is_ok();
if !write_success {
*sock.get_mut() = self.fresh_socket()?;
sock.get_mut().write_all(request_bytes.as_slice())?;
sock.get_mut().flush()?;
}
let mut header_buf = String::new();
let read_success = sock.read_line(&mut header_buf).is_ok();
if (!read_success || header_buf.is_empty()) && write_success {
*sock.get_mut() = self.fresh_socket()?;
sock.get_mut().write_all(request_bytes.as_slice())?;
sock.get_mut().flush()?;
sock.read_line(&mut header_buf)?;
}
if header_buf.len() < 12 {
return Err(Error::HttpResponseTooShort {
actual: header_buf.len(),
needed: 12,
});
}
if !header_buf.as_bytes()[..12].is_ascii() {
return Err(Error::HttpResponseNonAsciiHello(header_buf.as_bytes()[..12].to_vec()));
}
if !header_buf.starts_with("HTTP/1.1 ") {
return Err(Error::HttpResponseBadHello {
actual: header_buf[0..9].into(),
expected: "HTTP/1.1 ".into(),
});
}
let response_code = match header_buf[9..12].parse::<u16>() {
Ok(n) => n,
Err(e) => return Err(Error::HttpResponseBadStatus(header_buf[9..12].into(), e)),
};
let mut content_length = None;
loop {
header_buf.clear();
sock.read_line(&mut header_buf)?;
if header_buf == "\r\n" {
break;
}
header_buf.make_ascii_lowercase();
const CONTENT_LENGTH: &str = "content-length: ";
if let Some(s) = header_buf.strip_prefix(CONTENT_LENGTH) {
content_length = Some(
s.trim()
.parse::<u64>()
.map_err(|e| Error::HttpResponseBadContentLength(s.into(), e))?,
);
}
const TRANSFER_ENCODING: &str = "transfer-encoding: ";
if let Some(s) = header_buf.strip_prefix(TRANSFER_ENCODING) {
const CHUNKED: &str = "chunked";
if s.trim() == CHUNKED {
return Err(Error::HttpResponseChunked);
}
}
}
if response_code == 401 {
return Err(Error::HttpErrorCode(response_code));
}
let mut reader = match content_length {
None => sock.take(FINAL_RESP_ALLOC),
Some(n) if n > FINAL_RESP_ALLOC => {
return Err(Error::HttpResponseContentLengthTooLarge {
length: n,
max: FINAL_RESP_ALLOC,
});
}
Some(n) => sock.take(n),
};
match serde_json::from_reader(&mut reader) {
Ok(s) => {
if content_length.is_some() {
reader.bytes().count(); }
Ok(s)
}
Err(e) => {
if response_code != 200 {
Err(Error::HttpErrorCode(response_code))
} else {
Err(e.into())
}
}
}
}
}
fn check_url(url: &str) -> Result<(SocketAddr, String), Error> {
let mut fallback_port = DEFAULT_PORT;
let after_scheme = {
let mut split = url.splitn(2, "://");
let s = split.next().unwrap();
match split.next() {
None => s, Some(after) => {
if s == "http" {
fallback_port = 80;
} else if s == "https" {
fallback_port = 443;
} else {
return Err(Error::url(url, "scheme should be http or https"));
}
after
}
}
};
let (before_path, path) = {
if let Some(slash) = after_scheme.find('/') {
(&after_scheme[0..slash], &after_scheme[slash..])
} else {
(after_scheme, "/")
}
};
let after_auth = {
let mut split = before_path.splitn(2, '@');
let s = split.next().unwrap();
split.next().unwrap_or(s)
};
let mut addr = match after_auth.to_socket_addrs() {
Ok(addr) => addr,
Err(_) => {
format!("{}:{}", after_auth, fallback_port).to_socket_addrs()?
}
};
match addr.next() {
Some(a) => Ok((a, path.to_owned())),
None => Err(Error::url(url, "invalid hostname: error extracting socket address")),
}
}
impl Transport for SimpleHttpTransport {
fn send_request(&self, req: Request) -> Result<Response, crate::Error> {
Ok(self.request(req)?)
}
fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, crate::Error> {
Ok(self.request(reqs)?)
}
fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "http://{}:{}{}", self.addr.ip(), self.addr.port(), self.path)
}
}
#[derive(Clone, Debug)]
pub struct Builder {
tp: SimpleHttpTransport,
}
impl Builder {
pub fn new() -> Builder {
Builder {
tp: SimpleHttpTransport::new(),
}
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.tp.timeout = timeout;
self
}
pub fn url(mut self, url: &str) -> Result<Self, Error> {
self.tp.set_url(url)?;
Ok(self)
}
pub fn auth<S: AsRef<str>>(mut self, user: S, pass: Option<S>) -> Self {
let mut auth = user.as_ref().to_owned();
auth.push(':');
if let Some(ref pass) = pass {
auth.push_str(pass.as_ref());
}
self.tp.basic_auth = Some(format!("Basic {}", &base64::encode(auth.as_bytes())));
self
}
pub fn cookie_auth<S: AsRef<str>>(mut self, cookie: S) -> Self {
self.tp.basic_auth = Some(format!("Basic {}", &base64::encode(cookie.as_ref().as_bytes())));
self
}
#[cfg(feature = "proxy")]
pub fn proxy_addr<S: AsRef<str>>(mut self, proxy_addr: S) -> Result<Self, Error> {
self.tp.proxy_addr = check_url(proxy_addr.as_ref())?.0;
Ok(self)
}
#[cfg(feature = "proxy")]
pub fn proxy_auth<S: AsRef<str>>(mut self, user: S, pass: S) -> Self {
self.tp.proxy_auth =
Some((user, pass)).map(|(u, p)| (u.as_ref().to_string(), p.as_ref().to_string()));
self
}
pub fn build(self) -> SimpleHttpTransport {
self.tp
}
}
impl Default for Builder {
fn default() -> Self {
Builder::new()
}
}
impl crate::Client {
pub fn simple_http(
url: &str,
user: Option<String>,
pass: Option<String>,
) -> Result<crate::Client, Error> {
let mut builder = Builder::new().url(url)?;
if let Some(user) = user {
builder = builder.auth(user, pass);
}
Ok(crate::Client::with_transport(builder.build()))
}
#[cfg(feature = "proxy")]
pub fn http_proxy(
url: &str,
user: Option<String>,
pass: Option<String>,
proxy_addr: &str,
proxy_auth: Option<(&str, &str)>,
) -> Result<crate::Client, Error> {
let mut builder = Builder::new().url(url)?;
if let Some(user) = user {
builder = builder.auth(user, pass);
}
builder = builder.proxy_addr(proxy_addr)?;
if let Some((user, pass)) = proxy_auth {
builder = builder.proxy_auth(user, pass);
}
let tp = builder.build();
Ok(crate::Client::with_transport(tp))
}
}
#[derive(Debug)]
pub enum Error {
InvalidUrl {
url: String,
reason: &'static str,
},
SocketError(io::Error),
HttpResponseTooShort {
actual: usize,
needed: usize,
},
HttpResponseNonAsciiHello(Vec<u8>),
HttpResponseBadHello {
actual: String,
expected: String,
},
HttpResponseBadStatus(String, num::ParseIntError),
HttpResponseBadContentLength(String, num::ParseIntError),
HttpResponseContentLengthTooLarge {
length: u64,
max: u64,
},
HttpResponseChunked,
HttpErrorCode(u16),
IncompleteResponse {
content_length: u64,
n_read: u64,
},
Json(serde_json::Error),
}
impl Error {
fn url<U: Into<String>>(url: U, reason: &'static str) -> Error {
Error::InvalidUrl {
url: url.into(),
reason,
}
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use Error::*;
match *self {
InvalidUrl {
ref url,
ref reason,
} => write!(f, "invalid URL '{}': {}", url, reason),
SocketError(ref e) => write!(f, "Couldn't connect to host: {}", e),
HttpResponseTooShort {
ref actual,
ref needed,
} => {
write!(f, "HTTP response too short: length {}, needed {}.", actual, needed)
}
HttpResponseNonAsciiHello(ref bytes) => {
write!(f, "HTTP response started with non-ASCII {:?}", bytes)
}
HttpResponseBadHello {
ref actual,
ref expected,
} => {
write!(f, "HTTP response started with `{}`; expected `{}`.", actual, expected)
}
HttpResponseBadStatus(ref status, ref err) => {
write!(f, "HTTP response had bad status code `{}`: {}.", status, err)
}
HttpResponseBadContentLength(ref len, ref err) => {
write!(f, "HTTP response had bad content length `{}`: {}.", len, err)
}
HttpResponseContentLengthTooLarge {
length,
max,
} => {
write!(f, "HTTP response content length {} exceeds our max {}.", length, max)
}
HttpErrorCode(c) => write!(f, "unexpected HTTP code: {}", c),
IncompleteResponse {
content_length,
n_read,
} => {
write!(
f,
"read {} bytes but HTTP response content-length header was {}.",
n_read, content_length
)
}
Json(ref e) => write!(f, "JSON error: {}", e),
HttpResponseChunked => {
write!(f, "The server replied with a chunked response which is not supported")
}
}
}
}
impl error::Error for Error {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
use self::Error::*;
match *self {
InvalidUrl {
..
}
| HttpResponseTooShort {
..
}
| HttpResponseNonAsciiHello(..)
| HttpResponseBadHello {
..
}
| HttpResponseBadStatus(..)
| HttpResponseBadContentLength(..)
| HttpResponseContentLengthTooLarge {
..
}
| HttpErrorCode(_)
| IncompleteResponse {
..
}
| HttpResponseChunked => None,
SocketError(ref e) => Some(e),
Json(ref e) => Some(e),
}
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::SocketError(e)
}
}
impl From<serde_json::Error> for Error {
fn from(e: serde_json::Error) -> Self {
Error::Json(e)
}
}
impl From<Error> for crate::Error {
fn from(e: Error) -> crate::Error {
match e {
Error::Json(e) => crate::Error::Json(e),
e => crate::Error::Transport(Box::new(e)),
}
}
}
#[cfg(jsonrpc_fuzz)]
pub static FUZZ_TCP_SOCK: Mutex<Option<io::Cursor<Vec<u8>>>> = Mutex::new(None);
#[cfg(jsonrpc_fuzz)]
#[derive(Clone, Debug)]
struct TcpStream;
#[cfg(jsonrpc_fuzz)]
mod impls {
use super::*;
impl Read for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *FUZZ_TCP_SOCK.lock().unwrap() {
Some(ref mut cursor) => io::Read::read(cursor, buf),
None => Ok(0),
}
}
}
impl Write for TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
io::sink().write(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl TcpStream {
pub fn connect_timeout(_: &SocketAddr, _: Duration) -> io::Result<Self> {
Ok(TcpStream)
}
pub fn set_read_timeout(&self, _: Option<Duration>) -> io::Result<()> {
Ok(())
}
pub fn set_write_timeout(&self, _: Option<Duration>) -> io::Result<()> {
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use std::net;
#[cfg(feature = "proxy")]
use std::str::FromStr;
use super::*;
use crate::Client;
#[test]
fn test_urls() {
let addr: net::SocketAddr = ("localhost", 22).to_socket_addrs().unwrap().next().unwrap();
let urls = [
"localhost:22",
"http://localhost:22/",
"https://localhost:22/walletname/stuff?it=working",
"http://me:weak@localhost:22/wallet",
];
for u in &urls {
let tp = Builder::new().url(u).unwrap().build();
assert_eq!(tp.addr, addr);
}
let addr: net::SocketAddr = ("localhost", 80).to_socket_addrs().unwrap().next().unwrap();
let tp = Builder::new().url("http://localhost/").unwrap().build();
assert_eq!(tp.addr, addr);
let addr: net::SocketAddr = ("localhost", 443).to_socket_addrs().unwrap().next().unwrap();
let tp = Builder::new().url("https://localhost/").unwrap().build();
assert_eq!(tp.addr, addr);
let addr: net::SocketAddr =
("localhost", super::DEFAULT_PORT).to_socket_addrs().unwrap().next().unwrap();
let tp = Builder::new().url("localhost").unwrap().build();
assert_eq!(tp.addr, addr);
let valid_urls = [
"localhost",
"127.0.0.1:8080",
"http://127.0.0.1:8080/",
"http://127.0.0.1:8080/rpc/test",
"https://127.0.0.1/rpc/test",
"http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:8300",
"http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]",
];
for u in &valid_urls {
let (addr, path) = check_url(u).unwrap();
let builder = Builder::new().url(u).unwrap_or_else(|_| panic!("error for: {}", u));
assert_eq!(builder.tp.addr, addr);
assert_eq!(builder.tp.path, path);
assert_eq!(builder.tp.timeout, DEFAULT_TIMEOUT);
assert_eq!(builder.tp.basic_auth, None);
#[cfg(feature = "proxy")]
assert_eq!(builder.tp.proxy_addr, SocketAddr::from_str("127.0.0.1:9050").unwrap());
}
let invalid_urls = [
"127.0.0.1.0:8080",
"httpx://127.0.0.1:8080/",
"ftp://127.0.0.1:8080/rpc/test",
"http://127.0.0./rpc/test",
];
for u in &invalid_urls {
if let Ok(b) = Builder::new().url(u) {
let tp = b.build();
panic!("expected error for url {}, got {:?}", u, tp);
}
}
}
#[test]
fn construct() {
let tp = Builder::new()
.timeout(Duration::from_millis(100))
.url("localhost:22")
.unwrap()
.auth("user", None)
.build();
let _ = Client::with_transport(tp);
let _ = Client::simple_http("localhost:22", None, None).unwrap();
}
#[cfg(feature = "proxy")]
#[test]
fn construct_with_proxy() {
let tp = Builder::new()
.timeout(Duration::from_millis(100))
.url("localhost:22")
.unwrap()
.auth("user", None)
.proxy_addr("127.0.0.1:9050")
.unwrap()
.build();
let _ = Client::with_transport(tp);
let _ = Client::http_proxy(
"localhost:22",
None,
None,
"127.0.0.1:9050",
Some(("user", "password")),
)
.unwrap();
}
#[cfg(all(not(feature = "proxy"), not(jsonrpc_fuzz)))]
#[test]
fn request_to_closed_socket() {
use serde_json::{Number, Value};
use std::net::{Shutdown, TcpListener};
use std::sync::mpsc;
use std::thread;
let (tx, rx) = mpsc::sync_channel(1);
thread::spawn(move || {
let server = TcpListener::bind("localhost:0").expect("Binding a Tcp Listener");
tx.send(server.local_addr().unwrap().port()).unwrap();
for (request_id, stream) in server.incoming().enumerate() {
let mut stream = stream.unwrap();
let buf_reader = BufReader::new(&mut stream);
let _http_request: Vec<_> = buf_reader
.lines()
.map(|result| result.unwrap())
.take_while(|line| !line.is_empty())
.collect();
let response = Response {
result: None,
error: None,
id: Value::Number(Number::from(request_id)),
jsonrpc: Some(String::from("2.0")),
};
let response_str = serde_json::to_string(&response).unwrap();
stream.write_all(b"HTTP/1.1 200\r\n").unwrap();
stream.write_all(b"Content-Length: ").unwrap();
stream.write_all(response_str.len().to_string().as_bytes()).unwrap();
stream.write_all(b"\r\n").unwrap();
stream.write_all(b"\r\n").unwrap();
stream.write_all(response_str.as_bytes()).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Both).unwrap();
}
});
thread::sleep(Duration::from_secs(1));
let port = rx.recv().unwrap();
let client =
Client::simple_http(format!("localhost:{}", port).as_str(), None, None).unwrap();
let request = client.build_request("test_request", None);
let result = client.send_request(request).unwrap();
assert_eq!(result.id, Value::Number(Number::from(0)));
thread::sleep(Duration::from_secs(1));
let request = client.build_request("test_request2", None);
let result2 = client.send_request(request)
.expect("This second request should not be an Err like `Err(Transport(HttpResponseTooShort { actual: 0, needed: 12 }))`");
assert_eq!(result2.id, Value::Number(Number::from(1)));
}
}