通过 HTTP 从浏览器向服务器流式传输数据的方法

IT技术 javascript http browser xmlhttprequest
2021-02-01 21:17:15

是否有任何类似 XHR 的浏览器 API 可用于通过 HTTP 将二进制文件流式传输到服务器?

我想随着时间的推移发出 HTTP PUT 请求并以编程方式创建数据。我不想一次创建所有这些数据,因为它可能存在于内存中。一些伪代码来说明我的意思:

var dataGenerator = new DataGenerator(); // Generates 8KB UInt8Array every second
var streamToWriteTo;
http.put('/example', function (requestStream) {
  streamToWriteTo = requestStream;
});

dataGenerator.on('data', function (chunk) {
  if (!streamToWriteTo) {
    return;
  }
  streamToWriteTo.write(chunk);
});

我目前有一个网络套接字解决方案,但更喜欢常规 HTTP 以便与一些现有的服务器端代码更好地互操作。

编辑:我可以使用前沿浏览器 API。我正在查看 Fetch API,因为它支持 ArrayBuffers、DataViews、Files 等请求主体。如果我能以某种方式伪造这些对象之一,以便我可以将 Fetch API 与动态数据一起使用,那对我来说就行了。我尝试创建一个 Proxy 对象,看看是否有任何方法被调用,我可以修补。不幸的是,浏览器(至少在 Chrome 中)似乎正在读取本机代码而不是 JS 领域。但是,如果我错了,请纠正我。

6个回答

我不知道如何使用纯 HTML5 API 做到这一点,但一种可能的解决方法是使用 Chrome 应用程序作为后台服务为网页提供附加功能。如果您已经愿意使用开发浏览器并启用实验性功能,那么这似乎只是比这更进一步的一步。

Chrome 应用程序可以调用chrome.sockets.tcpAPI,您可以在该 API 上实现您想要的任何协议,包括 HTTP 和 HTTPS。这将提供实现流式传输的灵活性。

常规网页可以使用chrome.runtimeAPI与应用程序交换消息,只要应用程序声明此用法即可这将允许您的网页对您的应用程序进行异步调用。

我写了这个简单的应用程序作为概念证明:

清单文件.json

{
  "manifest_version" : 2,

  "name" : "Streaming Upload Test",
  "version" : "0.1",

  "app": {
    "background": {
      "scripts": ["background.js"]
    }
  },

  "externally_connectable": {
    "matches": ["*://localhost/*"]
  },

  "sockets": {
    "tcp": {
      "connect": "*:*"
    }
  },

  "permissions": [
  ]
}

背景.js

var mapSocketToPort = {};

chrome.sockets.tcp.onReceive.addListener(function(info) {
  var port = mapSocketToPort[info.socketId];
  port.postMessage(new TextDecoder('utf-8').decode(info.data));
});

chrome.sockets.tcp.onReceiveError.addListener(function(info) {
  chrome.sockets.tcp.close(info.socketId);
  var port = mapSocketToPort[info.socketId];
  port.postMessage();
  port.disconnect();
  delete mapSocketToPort[info.socketId];
});

// Promisify socket API for easier operation sequencing.
// TODO: Check for error and reject.
function socketCreate() {
  return new Promise(function(resolve, reject) {
    chrome.sockets.tcp.create({ persistent: true }, resolve);
  });
}

function socketConnect(s, host, port) {
  return new Promise(function(resolve, reject) {
    chrome.sockets.tcp.connect(s, host, port, resolve);
  });
}

function socketSend(s, data) {
  return new Promise(function(resolve, reject) {
    chrome.sockets.tcp.send(s, data, resolve);
  });
}

chrome.runtime.onConnectExternal.addListener(function(port) {
  port.onMessage.addListener(function(msg) {
    if (!port.state) {
      port.state = msg;

      port.chain = socketCreate().then(function(info) {
        port.socket = info.socketId;
        mapSocketToPort[port.socket] = port;
        return socketConnect(port.socket, 'httpbin.org', 80);
      }).then(function() {
        // TODO: Layer TLS if needed.
      }).then(function() {
        // TODO: Build headers from the request.
        // TODO: Use Transfer-Encoding: chunked.
        var headers =
            'PUT /put HTTP/1.0\r\n' +
            'Host: httpbin.org\r\n' +
            'Content-Length: 17\r\n' +
            '\r\n';
        return socketSend(port.socket, new TextEncoder('utf-8').encode(headers).buffer);
      });
    }
    else {
      if (msg) {
        port.chain = port.chain.then(function() {
          // TODO: Use chunked encoding.
          return socketSend(port.socket, new TextEncoder('utf-8').encode(msg).buffer);
        });
      }
    }
  });
});

此应用程序没有用户界面。它侦听连接并向http://httpbin.org/puthttpbin是一个有用的测试站点,但请注意它不支持分块编码发出硬编码的 PUT 请求PUT 数据(当前硬编码为 17 个八位字节)从客户端(根据需要使用尽可能少的消息)并发送到服务器。来自服务器的响应流回客户端。

这只是一个概念证明。一个真正的应用程序可能应该:

  • 连接到任何主机和端口。
  • 使用传输编码:分块。
  • 表示流数据结束。
  • 处理套接字错误。
  • 支持 TLS(例如使用Forge

这是一个示例网页,它使用应用程序即服务执行流式上传(17 个八位字节)(请注意,您必须配置自己的应用程序 ID):

<pre id="result"></pre>
<script>
 var MY_CHROME_APP_ID = 'omlafihmmjpklmnlcfkghehxcomggohk';

 function streamingUpload(url, options) {
   // Open a connection to the Chrome App. The argument must be the 
   var port = chrome.runtime.connect(MY_CHROME_APP_ID);

   port.onMessage.addListener(function(msg) {
     if (msg)
       document.getElementById("result").textContent += msg;
     else
       port.disconnect();
   });

   // Send arguments (must be JSON-serializable).
   port.postMessage({
     url: url,
     options: options
   });

   // Return a function to call with body data.
   return function(data) {
     port.postMessage(data);
   };
 }

 // Start an upload.
 var f = streamingUpload('https://httpbin.org/put', { method: 'PUT' });

 // Stream data a character at a time.
 'how now brown cow'.split('').forEach(f);
</script>

当我在安装了应用程序的 Chrome 浏览器中加载此网页时,httpbin 返回:

HTTP/1.1 200 OK
Server: nginx
Date: Sun, 19 Jun 2016 16:54:23 GMT
Content-Type: application/json
Content-Length: 240
Connection: close
Access-Control-Allow-Origin: *
Access-Control-Allow-Credentials: true

{
  "args": {}, 
  "data": "how now brown cow", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Content-Length": "17", 
    "Host": "httpbin.org"
  }, 
  "json": null, 
  "origin": "[redacted]", 
  "url": "http://httpbin.org/put"
}
这就是我的一天!
2021-03-25 21:17:15



我目前正在寻找完全相同的东西(通过 Ajax 进行上游)。我目前发现的,看起来好像我们正在搜索浏览器功能设计的前沿;-)

XMLHttpRequest 定义在第 4 步bodyinit告诉我们,它的内容提取是(或可以是)readablestream

我仍在搜索(作为非网络开发人员)有关如何创建这样的东西并将数据馈送到“可读流”的“另一端”(即应该是“可写流”,但我还没有没找到)。

也许你更擅长搜索,如果你找到了实现这些设计计划的方法,可以在这里发布。

^5
斯文

该功能的测试可以在这里运行w3c-test.org/fetch/api/basic/request-upload.any.html(截至今天仍然失败)
2021-03-23 21:17:15
这实际上行不通。Chrome 无法发送 ReadableStream。它将其转换为字符串并发送[object Object].
2021-04-02 21:17:15
更新... Chrome 不再将 ReadableStream 转换为字符串,但它也不发送任何数据。我为这个特定的方法创建了一个附带问题... stackoverflow.com/questions/40939857/fetch-with-readablestream
2021-04-07 21:17:15
对于对当前讨论感兴趣的任何人,请查看此处: groups.google.com/a/chromium.org/forum/#!topic/...
2021-04-07 21:17:15

一种利用ReadableStream流式传输任意数据的方法;RTCDataChannel以以下形式发送和/或接收任意数据Uint8ArrayTextEncoder要创建8000存储在 a 中的随机数据字节Uint8ArrayTextDecoder解码Uint8ArrayRTCDataChannelto返回的字符串以进行演示,note 也可以在这里使用FileReader .readAsArrayBuffer.readAsText

标记和脚本代码是从 的示例中修改而来的MDN - WebRTC: Simple RTCDataChannel sample,包括adapter.js包含RTCPeerConnection帮助程序的示例创建您自己的可读流

另请注意,当传输的总字节数达到 时,示例流将被取消8000 * 864000

(function init() {
  var interval, reader, stream, curr, len = 0,
    totalBytes = 8000 * 8,
    data = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789",
    randomData = function randomData() {
      var encoder = new TextEncoder();
      var currentStream = "";
      for (var i = 0; i < 8000; i++) {
        currentStream += data[Math.floor(Math.random() * data.length)]
      }
      return encoder.encode(currentStream)
    },
    // optionally reconnect to stream if cancelled
    reconnect = function reconnect() {
      connectButton.disabled = false;
      startup()
    };

  // Define "global" variables

  var connectButton = null;
  var disconnectButton = null;
  var messageInputBox = null;
  var receiveBox = null;

  var localConnection = null; // RTCPeerConnection for our "local" connection
  // adjust this to remote address; or use `ServiceWorker` `onfetch`; other
  var remoteConnection = null; // RTCPeerConnection for the "remote"

  var sendChannel = null; // RTCDataChannel for the local (sender)
  var receiveChannel = null; // RTCDataChannel for the remote (receiver)

  // Functions

  // Set things up, connect event listeners, etc.

  function startup() {
    connectButton = document.getElementById("connectButton");
    disconnectButton = document.getElementById("disconnectButton");
    messageInputBox = document.getElementById("message");
    receiveBox = document.getElementById("receivebox");

    // Set event listeners for user interface widgets

    connectButton.addEventListener("click", connectPeers, false);
    disconnectButton.addEventListener("click", disconnectPeers, false);
  }

  // Connect the two peers. Normally you look for and connect to a remote
  // machine here, but we"re just connecting two local objects, so we can
  // bypass that step.

  function connectPeers() {
    // Create the local connection and its event listeners
    if (len < totalBytes) {
      localConnection = new RTCPeerConnection();

      // Create the data channel and establish its event listeners
      sendChannel = localConnection.createDataChannel("sendChannel");
      sendChannel.onopen = handleSendChannelStatusChange;
      sendChannel.onclose = handleSendChannelStatusChange;

      // Create the remote connection and its event listeners

      remoteConnection = new RTCPeerConnection();
      remoteConnection.ondatachannel = receiveChannelCallback;

      // Set up the ICE candidates for the two peers

      localConnection.onicecandidate = e => 
        !e.candidate || remoteConnection.addIceCandidate(e.candidate)
      .catch(handleAddCandidateError);

      remoteConnection.onicecandidate = e => 
        !e.candidate || localConnection.addIceCandidate(e.candidate)
      .catch(handleAddCandidateError);

      // Now create an offer to connect; this starts the process

      localConnection.createOffer()
      .then(offer => localConnection.setLocalDescription(offer))
      .then(() => remoteConnection
                 .setRemoteDescription(localConnection.localDescription)
       )
      .then(() => remoteConnection.createAnswer())
      .then(answer => remoteConnection
                      .setLocalDescription(answer)
       )
      .then(() => localConnection
                 .setRemoteDescription(remoteConnection.localDescription)
      )
      // start streaming connection
      .then(sendMessage)
      .catch(handleCreateDescriptionError);
    } else {

      alert("total bytes streamed:" + len)
    }

  }

  // Handle errors attempting to create a description;
  // this can happen both when creating an offer and when
  // creating an answer. In this simple example, we handle
  // both the same way.

  function handleCreateDescriptionError(error) {
    console.log("Unable to create an offer: " + error.toString());
  }

  // Handle successful addition of the ICE candidate
  // on the "local" end of the connection.

  function handleLocalAddCandidateSuccess() {
    connectButton.disabled = true;
  }

  // Handle successful addition of the ICE candidate
  // on the "remote" end of the connection.

  function handleRemoteAddCandidateSuccess() {
    disconnectButton.disabled = false;
  }

  // Handle an error that occurs during addition of ICE candidate.

  function handleAddCandidateError() {
    console.log("Oh noes! addICECandidate failed!");
  }

  // Handles clicks on the "Send" button by transmitting
  // a message to the remote peer.

  function sendMessage() {

    stream = new ReadableStream({
      start(controller) {
          interval = setInterval(() => {
            if (sendChannel) {
              curr = randomData();
              len += curr.byteLength;
              // queue current stream
              controller.enqueue([curr, len, sendChannel.send(curr)]);

              if (len >= totalBytes) {
                controller.close();
                clearInterval(interval);
              }
            }
          }, 1000);
        },
        pull(controller) {
          // do stuff during stream
          // call `releaseLock()` if `diconnect` button clicked
          if (!sendChannel) reader.releaseLock();
        },
        cancel(reason) {
          clearInterval(interval);
          console.log(reason);
        }
    });

    reader = stream.getReader({
      mode: "byob"
    });

    reader.read().then(function process(result) {
        if (result.done && len >= totalBytes) {
          console.log("Stream done!");
          connectButton.disabled = false;
          if (len < totalBytes) reconnect();
          return;
        }

        if (!result.done && result.value) {
          var [currentStream, totalStreamLength] = [...result.value];
        }

        if (result.done && len < totalBytes) {
          throw new Error("stream cancelled")
        }

        console.log("currentStream:", currentStream
                   , "totalStremalength:", totalStreamLength
                   , "result:", result);
        return reader.read().then(process);
      })
      .catch(function(err) {
        console.log("catch stream cancellation:", err);
        if (len < totalBytes) reconnect()
      });

    reader.closed.then(function() {
      console.log("stream closed")
    })

  }

  // Handle status changes on the local end of the data
  // channel; this is the end doing the sending of data
  // in this example.

  function handleSendChannelStatusChange(event) {
    if (sendChannel) {
      var state = sendChannel.readyState;

      if (state === "open") {
        disconnectButton.disabled = false;
        connectButton.disabled = true;
      } else {
        connectButton.disabled = false;
        disconnectButton.disabled = true;
      }
    }
  }

  // Called when the connection opens and the data
  // channel is ready to be connected to the remote.

  function receiveChannelCallback(event) {
    receiveChannel = event.channel;
    receiveChannel.onmessage = handleReceiveMessage;
    receiveChannel.onopen = handleReceiveChannelStatusChange;
    receiveChannel.onclose = handleReceiveChannelStatusChange;
  }

  // Handle onmessage events for the receiving channel.
  // These are the data messages sent by the sending channel.

  function handleReceiveMessage(event) {
    var decoder = new TextDecoder();
    var data = decoder.decode(event.data);
    var el = document.createElement("p");
    var txtNode = document.createTextNode(data);

    el.appendChild(txtNode);
    receiveBox.appendChild(el);
  }

  // Handle status changes on the receiver"s channel.

  function handleReceiveChannelStatusChange(event) {
    if (receiveChannel) {
      console.log("Receive channel's status has changed to " +
        receiveChannel.readyState);
    }

    // Here you would do stuff that needs to be done
    // when the channel"s status changes.
  }

  // Close the connection, including data channels if they"re open.
  // Also update the UI to reflect the disconnected status.

  function disconnectPeers() {

    // Close the RTCDataChannels if they"re open.

    sendChannel.close();
    receiveChannel.close();

    // Close the RTCPeerConnections

    localConnection.close();
    remoteConnection.close();

    sendChannel = null;
    receiveChannel = null;
    localConnection = null;
    remoteConnection = null;

    // Update user interface elements


    disconnectButton.disabled = true;
    // cancel stream on `click` of `disconnect` button, 
    // pass `reason` for cancellation as parameter
    reader.cancel("stream cancelled");
  }

  // Set up an event listener which will run the startup
  // function once the page is done loading.

  window.addEventListener("load", startup, false);
})();

plnkr http://plnkr.co/edit/cln6uxgMZwE2EQCfNXFO?p=preview

有趣的。请注意,您的答案缺少一个关键步骤,即必须在 Chrome 中启用实验标志才能使用ReadableStream. 此外,断开连接按钮会引发错误:Cannot read property 'cancel' of undefined但是,OP 的问题需要 HTTP(PUT/POST),而您的解决方案使用 WebRTC。是你无法获得fetchReadableStream工作,你打算
2021-03-16 21:17:15
@ tony19另一个选择可能是使用browserify转换nodejs等同PUTPOST可写流功能和执行文件可用的浏览器。不确定文件有多长。
2021-03-30 21:17:15
@tony19 “请注意,您的答案缺少一个关键步骤,因为必须在 Chrome 中启用实验性标志”公平点。OP 似乎熟悉浏览器技术 “我可以使用最前沿的浏览器 API”,虽然是的,但尝试将--enable-experimental-web-platform-features标志设置为铬。“OP 的问题需要 HTTP (PUT/POST),”没有从问题中收集到,评论只有PUTPOST必须使用“但我有一个边缘情况我需要解决”“无法读取未定义的属性‘取消’”错误不会在这里发生。不确定是否可以fetch单独使用
2021-04-05 21:17:15
2021-04-09 21:17:15
真的吗?标题: “通过 HTTP 到服务器”,正文:通过 HTTP到服务器”,“我想发出 HTTP PUT 请求”,“更喜欢常规 HTTP 以获得更好的互操作性”。对我来说,这一切都意味着PUTPOST:-) 但我猜他会考虑 WebRTC,因为他提到了“出血边缘 API”。
2021-04-10 21:17:15

我认为简短的回答是否定的。在撰写此回复时(2021 年 11 月),这在任何主要浏览器中都不可用。

长答案是:

我认为您正在使用 Fetch API 寻找正确的位置。ReadableStream 目前是 Request 构造函数的 body 属性的有效类型:https :
//developer.mozilla.org/en-US/docs/Web/API/Request/Request#parameters

但是,遗憾的是,如果您查看浏览器支持矩阵:
https : //developer.mozilla.org/en-US/docs/Web/API/Request/Request#browser_compatibility,
您可以看到“在请求正文中发送 ReadableStream”仍然是对于所有主要浏览器都不是。尽管它目前在某些浏览器(包括 Chrome)中以实验模式可用。

这里有一个关于如何在实验模式下进行的很好的教程:https :
//web.dev/fetch-upload-streaming/

看看帖子的日期和在这个功能上所做的工作,我认为很明显这项技术正在停滞不前,我们可能不会很快看到它。因此,遗憾的是,WebSockets 可能仍然是我们为数不多的几个不错的选择之一(用于无界流传输):https :
//developer.mozilla.org/en-US/docs/Web/API/WebSockets_API

谢谢@brad,知道这很有趣。我想 HTTP/1.1 中使用的分块数据编码存在一些阻力。
2021-03-15 21:17:15
嘿,流请求正文现在实际上是在 Chrome 原始试用版中!!不幸的是,作者决定禁止我们在 HTTP/1.1 上使用它。仅限 HTTP/2。
2021-04-06 21:17:15

您可以使用Promise, setTimeout, 递归。另请参阅REST 中的 PUT 与 POST

var count = 0, total = 0, timer = null, d = 500, stop = false, p = void 0
, request = function request () {
              return new XMLHttpRequest()
            };
function sendData() {
  p = Promise.resolve(generateSomeBinaryData()).then(function(data) { 
    var currentRequest = request();
    currentRequest.open("POST", "http://example.com");
    currentRequest.onload = function () {
      ++count; // increment `count`
      total += data.byteLength; // increment total bytes posted to server
    }

    currentRequest.onloadend = function () {
      if (stop) { // stop recursion
        throw new Error("aborted") // `throw` error to `.catch()`
      } else {
        timer = setTimeout(sendData, d); // recursively call `sendData`
      }
    }
    currentRequest.send(data); // `data`: `Uint8Array`; `TypedArray`
    return currentRequest; // return `currentRequest` 
  });
  return p // return `Promise` : `p`
}

var curr = sendData();

curr.then(function(current) {
  console.log(current) // current post request
})
.catch(function(err) {
  console.log(e) // handle aborted `request`; errors
});
@guest271314,我不相信WritableStream在任何浏览器中都可用。ReadableStream直到最近才在 Chrome 和 Opera 中可用。但是 Node 确实实现了一个WritableStream.
2021-03-19 21:17:15
感谢您对此进行破解。但是,您的代码会为每个块发出一个新的 HTTP 请求。我需要将所有内容保存在单个 HTTP 请求中。我在问题中做了一个新的例子,所以我们不会被数据的生成方式所困扰。这里的问题是如何在请求进行中时将数据保持在单个请求的请求正文中。我想知道是否有一种方法可以实现像 ArrayBufferView 这样的接口,以便 XHR 可以以这种方式使用它。或者,甚至以某种方式修改 Blob。
2021-04-03 21:17:15
@guest271314 嗯。您提供的所有链接似乎都表明响应是 aReadableStream并且没有提及有关请求正文的任何​​内容。是否有我需要查看的特定部分?无论如何,我期待您的实施。
2021-04-06 21:17:15
@guest271314 我现在很困惑。我以为@Brad想数据流一台服务器,那么又怎么会fetchReadableXXX帮助?
2021-04-09 21:17:15
@Brad 您是否正在尝试创建Writable Streams另请参阅创建您自己的可读流
2021-04-11 21:17:15