-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add lwt aware stream module for IO #218
Changes from 2 commits
7fe3586
472f5ed
61abf03
afa25e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
open Lwt.Syntax | ||
|
||
module Input = struct | ||
type 'a t = unit -> 'a option Lwt.t | ||
|
||
let create f = f | ||
let read f = f () | ||
|
||
let rec iter f t = | ||
let* res = read t in | ||
match res with | ||
| None -> Lwt.return_unit | ||
| Some item -> | ||
let* () = f item in | ||
iter f t | ||
;; | ||
|
||
let singleton item = | ||
let finished = ref false in | ||
fun () -> | ||
if !finished | ||
then Lwt.return_none | ||
else ( | ||
finished := true; | ||
Lwt.return (Some item)) | ||
;; | ||
end | ||
|
||
module Output = struct | ||
type 'a t = 'a option -> unit Lwt.t | ||
|
||
let create f = | ||
let closed = ref false in | ||
fun v -> | ||
if !closed | ||
then Lwt.return_unit | ||
else ( | ||
match v with | ||
| None -> | ||
closed := true; | ||
f None | ||
| v -> f v) | ||
;; | ||
|
||
let write i f = f i | ||
end | ||
|
||
let transfer input output = | ||
let rec loop () = | ||
let* item = Input.read input in | ||
let* () = Output.write item output in | ||
match item with | ||
| None -> Lwt.return_unit | ||
| Some _ -> loop () | ||
in | ||
loop () | ||
;; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
module Input : sig | ||
type 'a t | ||
|
||
val create : (unit -> 'a option Lwt.t) -> 'a t | ||
val singleton : 'a -> 'a t | ||
val read : 'a t -> 'a option Lwt.t | ||
val iter : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When working with Httpaf bodies, we'll have places where we use Input.iter (fun item -> Httpaf.Body.write_string item; Lwt.return_unit) stream;; I think it might make sense to add a second flavor of iter that works with val iter : ('a -> unit) -> 'a t -> unit Lwt.t
val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think this is bound to allocate a huge amount of memory though. We need to return a promise that's determined when the buffer hits max size and needs to be flushed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
For now let's see if we can make do with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Hmm that's true. On a related note we don't read this config today, but we should respect the body buffer size limits set by the user in the server config handed to Rock. Working with promises when writing to httpaf's body should straightforward. (Reading is a different story though). |
||
end | ||
|
||
module Output : sig | ||
type 'a t | ||
|
||
val create : ('a option -> unit Lwt.t) -> 'a t | ||
val write : 'a option -> 'a t -> unit Lwt.t | ||
end | ||
|
||
val transfer : 'a Input.t -> 'a Output.t -> unit Lwt.t | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd try to match the API here: https://github.com/ocaml/ocaml-lsp/blob/master/fiber-unix/src/fiber_stream.mli#L33 It's already the one I'm familiar with (which I copied from the original Haskell source) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea. I'll add in the Lwt version of fiber_stream module. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more useful is a
of_list
primitive. it covers this use case as well.