fix(updater): request stream usage (#3746)

This commit is contained in:
Lucas Fernandes Nogueira
2022-03-22 08:31:41 -07:00
committed by GitHub
parent b8ea013fd8
commit 7b24448137
4 changed files with 45 additions and 51 deletions

View File

@@ -1,5 +0,0 @@
---
"tauri": patch
---
Added `bytes_stream` method to `tauri::api::http::Response`.

View File

@@ -137,10 +137,10 @@ updater = [
"fs-extract-api"
]
__updater-docs = [ "minisign-verify", "base64", "http-api", "dialog-ask" ]
http-api = [ "attohttpc", "bytes" ]
http-api = [ "attohttpc" ]
shell-open-api = [ "open", "regex", "tauri-macros/shell-scope" ]
fs-extract-api = [ "zip" ]
reqwest-client = [ "reqwest" ]
reqwest-client = [ "reqwest", "bytes" ]
process-command-api = [ "shared_child", "os_pipe", "memchr" ]
dialog = [ "rfd" ]
notification = [ "notify-rust" ]

View File

@@ -353,32 +353,6 @@ pub struct Response(ResponseType, reqwest::Response);
#[derive(Debug)]
pub struct Response(ResponseType, attohttpc::Response, Url);
#[cfg(not(feature = "reqwest-client"))]
struct AttohttpcByteReader(attohttpc::ResponseReader);
#[cfg(not(feature = "reqwest-client"))]
impl futures::Stream for AttohttpcByteReader {
type Item = crate::api::Result<bytes::Bytes>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut futures::task::Context<'_>,
) -> futures::task::Poll<Option<Self::Item>> {
use std::io::Read;
let mut buf = [0; 256];
match self.0.read(&mut buf) {
Ok(b) => {
if b == 0 {
futures::task::Poll::Ready(None)
} else {
futures::task::Poll::Ready(Some(Ok(buf[0..b].to_vec().into())))
}
}
Err(_) => futures::task::Poll::Ready(None),
}
}
}
impl Response {
/// Get the [`StatusCode`] of this Response.
pub fn status(&self) -> StatusCode {
@@ -400,6 +374,13 @@ impl Response {
Ok(RawResponse { status, data })
}
#[cfg(not(feature = "reqwest-client"))]
#[allow(dead_code)]
pub(crate) fn reader(self) -> attohttpc::ResponseReader {
let (_, _, reader) = self.1.split();
reader
}
/// Convert the response into a Stream of [`bytes::Bytes`] from the body.
///
/// # Examples
@@ -419,17 +400,13 @@ impl Response {
/// # Ok(())
/// # }
/// ```
pub fn bytes_stream(self) -> impl futures::Stream<Item = crate::api::Result<bytes::Bytes>> {
#[cfg(not(feature = "reqwest-client"))]
{
let (_, _, reader) = self.1.split();
AttohttpcByteReader(reader)
}
#[cfg(feature = "reqwest-client")]
{
use futures::StreamExt;
self.1.bytes_stream().map(|res| res.map_err(Into::into))
}
#[cfg(feature = "reqwest-client")]
#[allow(dead_code)]
pub(crate) fn bytes_stream(
self,
) -> impl futures::Stream<Item = crate::api::Result<bytes::Bytes>> {
use futures::StreamExt;
self.1.bytes_stream().map(|res| res.map_err(Into::into))
}
/// Reads the response.

View File

@@ -13,7 +13,6 @@ use crate::{
AppHandle, Manager, Runtime,
};
use base64::decode;
use futures::StreamExt;
use http::StatusCode;
use minisign_verify::{PublicKey, Signature};
use tauri_utils::{platform::current_exe, Env};
@@ -483,12 +482,35 @@ impl<R: Runtime> Update<R> {
.and_then(|value| value.parse().ok());
let mut buffer = Vec::new();
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
let bytes = chunk.as_ref().to_vec();
on_chunk(bytes.len(), content_length);
buffer.extend(bytes);
#[cfg(feature = "reqwest-client")]
{
use futures::StreamExt;
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
let bytes = chunk.as_ref().to_vec();
on_chunk(bytes.len(), content_length);
buffer.extend(bytes);
}
}
#[cfg(not(feature = "reqwest-client"))]
{
let mut reader = response.reader();
let mut buf = [0; 16384];
loop {
match reader.read(&mut buf) {
Ok(b) => {
if b == 0 {
break;
} else {
let bytes = buf[0..b].to_vec();
on_chunk(bytes.len(), content_length);
buffer.extend(bytes);
}
}
Err(e) => return Err(e.into()),
}
}
}
// create memory buffer from our archive (Seek + Read)