diff --git a/index.bs b/index.bs index 1080f8d5..cffdeee9 100644 --- a/index.bs +++ b/index.bs @@ -69,6 +69,7 @@ spec:fetch; type:dfn; for:/; text:fetch spec:url; type:dfn; text:scheme spec:url; type:dfn; text:fragment spec:infra; type:dfn; for:/; text:ASCII case-insensitive +spec:infra; type:dfn; text:list
 url: https://html.spec.whatwg.org/multipage/origin.html#concept-origin; type: dfn; text: origin; for:/
@@ -1474,6 +1475,7 @@ interface WebTransportSendStream : WritableStream {
   attribute WebTransportSendGroup? sendGroup;
   attribute long long sendOrder;
   Promise<WebTransportSendStreamStats> getStats();
+  WebTransportWriter getWriter();
 };
 
@@ -1519,6 +1521,12 @@ The {{WebTransportSendStream}}'s [=transfer steps=] and 1. [=Resolve=] |p| with |stats|. 1. Return |p|. +: getWriter() +:: This method must be implemented in the same manner as {{WritableStream/getWriter}} + inherited from {{WritableStream}}, except in place of creating a + {{WritableStreamDefaultWriter}}, it must instead + [=WebTransportWriter/create=] a {{WebTransportWriter}} with [=this=]. + ## Internal Slots ## {#send-stream-internal-slots} A {{WebTransportSendStream}} has the following internal slots. @@ -1552,6 +1560,11 @@ A {{WebTransportSendStream}} has the following internal slots. `[[SendOrder]]` An optional send order number, defaulting to 0. + + `[[AtomicWriteRequests]]` + An [=ordered set=] of promises, keeping track of the subset of + write requests that are atomic among those queued to be processed by the underlying sink. + @@ -1575,6 +1588,8 @@ To create a :: |sendGroup| : {{WebTransportSendStream/[[SendOrder]]}} :: |sendOrder| + : {{WebTransportSendStream/[[AtomicWriteRequests]]}} + :: An empty [=ordered set=] of promises. 1. Let |writeAlgorithm| be an action that [=writes=] |chunk| to |stream|, given |chunk|. 1. Let |closeAlgorithm| be an action that [=closes=] |stream|. 1. Let |abortAlgorithm| be an action that [=aborts=] |stream| with |reason|, given |reason|. @@ -1602,9 +1617,18 @@ To write |chunk| to a {{WebTransportSend 1. Let |promise| be a new promise. 1. Let |bytes| be a copy of the [=byte sequence=] which |chunk| represents. 1. Set |stream|.{{[[PendingOperation]]}} to |promise|. +1. Let |inFlightWriteRequest| be + |stream|.inFlightWriteRequest. +1. Let |atomic| be true if [=stream=].{{WebTransportSendStream/[[AtomicWriteRequests]]}} + [=list/contains=] |inFlightWriteRequest|, otherwise false. 1. Run the following steps [=in parallel=]: - 1. [=stream/Send=] |bytes| on |stream|.{{WebTransportSendStream/[[InternalStream]]}} and wait for the - operation to complete. + 1. If |atomic| is true and the current [=flow control=] window is too small for |bytes| to be sent + in its entirety, then abort the remaining steps and [=queue a network task=] with |transport| + to run these sub-steps: + 1. Set |stream|.{{[[PendingOperation]]}} to null. + 1. [=Abort all atomic write requests=] on |stream|. + 1. Otherwise, [=stream/send=] |bytes| on |stream|.{{WebTransportSendStream/[[InternalStream]]}} + and wait for the operation to complete. This sending MAY be interleaved with sending of previously queued streams and datagrams, as well as streams and datagrams yet to be queued to be sent over this transport. @@ -1629,20 +1653,22 @@ To write |chunk| to a {{WebTransportSend Note: The definition of fairness here is [=implementation-defined=]. - 1. If the previous step failed, abort the remaining steps. + 1. If the previous step failed due to a network error, abort the remaining steps. Note: We don't reject |promise| here because we handle network errors elsewhere, and those steps reject |stream|.{{[[PendingOperation]]}}. - 1. [=Queue a network task=] with |transport| to run these steps: + 1. Otherwise, [=queue a network task=] with |transport| + to run these steps: 1. Set |stream|.{{[[PendingOperation]]}} to null. + 1. If |stream|.{{WebTransportSendStream/[[AtomicWriteRequests]]}} [=list/contains=] |inFlightWriteRequest|, [=list/remove=] |inFlightWriteRequest|. 1. [=Resolve=] |promise| with undefined. 1. Return |promise|. Note: The [=fulfilled|fulfillment=] of the promise returned from this algorithm (or, -{{WritableStreamDefaultWriter/write|WritableStreamDefaultWriter.write}}) does **NOT** necessarily mean that the chunk is acked by +{{WritableStreamDefaultWriter/write(chunk)}}) does **NOT** necessarily mean that the chunk is acked by the server [[!QUIC]]. It may just mean that the chunk is appended to the buffer. To make sure that -the chunk arrives at the server, use an application-level protocol. +the chunk arrives at the server, the server needs to send an application-level acknowledgment message. @@ -1687,6 +1713,20 @@ To abort a {{WebTransportSendStream}} |s +
+To abort all atomic write requests on a {{WebTransportSendStream}} |stream|, run these steps: + 1. Let |writeRequests| be + |stream|.writeRequests. + 1. Let |requestsToAbort| be [=stream=].{{WebTransportSendStream/[[AtomicWriteRequests]]}}. + 1. If |writeRequests| [=list/contains=] a promise not in |requestsToAbort|, then + [=WritableStream/error=] |stream| with {{AbortError}}, and abort these steps. + 1. [=list/Empty=] [=stream=].{{WebTransportSendStream/[[AtomicWriteRequests]]}}. + 1. [=For each=] |promise| in |requestsToAbort|, [=reject=] |promise| with {{AbortError}}. + 1. [=In parallel=], [=for each=] |promise| in |requestsToAbort|, abort the + [=stream/send|sending=] of bytes associated with |promise|. + +
+ ## STOP_SENDING signal coming from the server ## {#send-stream-STOP_SENDING}
@@ -2116,6 +2156,66 @@ object |transport|, and a |sendOrder|, run these steps.
+# `WebTransportWriter` Interface # {#web-transport-writer-interface} + +{{WebTransportWriter}} is a subclass of {{WritableStreamDefaultWriter}} that +adds one method. + +A {{WebTransportWriter}} is always created by the +[=WebTransportWriter/create=] procedure. + +
+[Exposed=*, SecureContext]
+interface WebTransportWriter : WritableStreamDefaultWriter {
+  Promise<undefined> atomicWrite(optional any chunk);
+};
+
+ +## Methods ## {#web-transport-writer-methods} + +: atomicWrite(chunk) +:: The {{atomicWrite}} method will reject if the |chunk| given to it + could not be sent in its entirety within the [=flow control=] window that + is current at the time of sending. This behavior is designed to satisfy niche + transactional applications sensitive to [=flow control=] deadlocks ([[RFC9308]] + [Section 4.4](https://datatracker.ietf.org/doc/html/rfc9308#section-4.4)). + + Note: {{atomicWrite}} can still reject after sending some data. Though it + provides atomicity with respect to flow control, other errors may occur. + {{atomicWrite}} does not prevent data from being split between packets + or being interleaved with other data. Only the sender learns if + {{atomicWrite}} fails due to lack of available flow control credit. + + Note: Atomic writes can still block if queued behind non-atomic writes. If + the atomic write is rejected, everything queued behind it at that moment + will be rejected as well. Any non-atomic writes rejected in this way will + [=WritableStream/error=] the stream. Applications are therefore encouraged to + always await atomic writes. + + When {{atomicWrite}} is called, the user agent MUST run the following steps: + 1. Let |p| be the result of {{WritableStreamDefaultWriter/write(chunk)}} + on {{WritableStreamDefaultWriter}} with |chunk|. + 1. [=set/Append=] |p| to |stream|.{{WebTransportSendStream/[[AtomicWriteRequests]]}}. + 1. Return the result of [=reacting=] to |p| with the following steps: + 1. If |stream|.{{WebTransportSendStream/[[AtomicWriteRequests]]}} [=list/contains=] |p|, + [=list/remove=] |p|. + 1. If |p| was rejected with reason |r|, then return [=a promise rejected with=] |r|. + 1. Return undefined. + +## Procedures ## {#web-transport-writer-procedures} + +
+ +To create a +{{WebTransportWriter}}, with a {{WebTransportSendStream}} |stream|, run these +steps: +1. Let |writer| be a [=new=] {{WebTransportWriter}}. +1. Run the [new WritableStreamDefaultWriter(stream)](https://streams.spec.whatwg.org/#default-writer-constructor) + constructor steps passing |writer| as this, and |stream| as the constructor argument. +1. Return |writer|. + +
+ # `WebTransportError` Interface # {#web-transport-error-interface} WebTransportError is a subclass of {{DOMException}} that represents @@ -2260,7 +2360,7 @@ converted to an httpErrorCode, and vice versa, as specified in [[!WEB-TRANSPORT- [=stream/Send|sends=] STREAM with FIN bit set - {{WebTransportBidirectionalStream/writable}}.getWriter().{{WritableStreamDefaultWriter/write}}() + {{WebTransportBidirectionalStream/writable}}.getWriter().{{WritableStreamDefaultWriter/write(chunk)}}() [=stream/Send|sends=] STREAM @@ -2784,6 +2884,31 @@ async function receiveText(url, createWritableStreamForTextData) { } +## Sending a transactional chunk on a stream ## {#example-transactional-stream} + +*This section is non-normative.* + +Sending a transactional piece of data on a unidirectional stream, only if it can be done +entirely without blocking on [=flow control=], can be achieved by using the +{{WebTransportSendStream/getWriter}} function and the resulting writer. + +
+async function sendTransactionalData(wt, bytes) {
+  const writable = await wt.createUnidirectionalStream();
+  const writer = writable.getWriter();
+  await writer.ready;
+  try {
+    await writer.atomicWrite(bytes);
+  } catch (e) {
+    if (e.name != "AbortError") throw e;
+    // rejected to avoid blocking on flow control
+    // The writable remains un-errored provided no non-atomic writes are pending
+  } finally {
+    writer.releaseLock();
+  }
+}
+
+ ## Complete example ## {#example-complete} *This section is non-normative.*