Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lwt aware stream module for IO #218

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions rock/src/io_stream.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
open Lwt.Syntax

module Input = struct
type 'a t = unit -> 'a option Lwt.t

let create f = f
let read f = f ()

let rec iter f t =
let* res = read t in
match res with
| None -> Lwt.return_unit
| Some item ->
let* () = f item in
iter f t
;;

let singleton item =
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more useful is a of_list primitive. it covers this use case as well.

let finished = ref false in
fun () ->
if !finished
then Lwt.return_none
else (
finished := true;
Lwt.return (Some item))
;;
end

module Output = struct
type 'a t = 'a option -> unit Lwt.t

let create f =
let closed = ref false in
fun v ->
if !closed
then Lwt.return_unit
else (
match v with
| None ->
closed := true;
f None
| v -> f v)
;;

let write i f = f i
end

let transfer input output =
let rec loop () =
let* item = Input.read input in
let* () = Output.write item output in
match item with
| None -> Lwt.return_unit
| Some _ -> loop ()
in
loop ()
;;
17 changes: 17 additions & 0 deletions rock/src/io_stream.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module Input : sig
type 'a t

val create : (unit -> 'a option Lwt.t) -> 'a t
val singleton : 'a -> 'a t
val read : 'a t -> 'a option Lwt.t
val iter : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When working with Httpaf bodies, we'll have places where we use iter to consume the input stream, but the handler there will be of the form,

Input.iter (fun item -> Httpaf.Body.write_string item; Lwt.return_unit) stream;;

I think it might make sense to add a second flavor of iter that works with ('a -> unit). Maybe we can match the naming scheme from lwt by having

val iter : ('a -> unit) -> 'a t -> unit Lwt.t
val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Input.iter (fun item -> Httpaf.Body.write_string item; Lwt.return_unit) stream;;

I think this is bound to allocate a huge amount of memory though. We need to return a promise that's determined when the buffer hits max size and needs to be flushed.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might make sense to add a second flavor of iter that works with ('a -> unit). Maybe we can match the naming scheme from lwt by having

For now let's see if we can make do with iter that returns a unit Lwt.t. I suspect that the other iter will not be very useful in the end.

Copy link
Collaborator Author

@anuragsoni anuragsoni Nov 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is bound to allocate a huge amount of memory though. We need to return a promise that's determined when the buffer hits max size and needs to be flushed.

Hmm that's true. On a related note we don't read this config today, but we should respect the body buffer size limits set by the user in the server config handed to Rock. Working with promises when writing to httpaf's body should straightforward. (Reading is a different story though).

end

module Output : sig
type 'a t

val create : ('a option -> unit Lwt.t) -> 'a t
val write : 'a option -> 'a t -> unit Lwt.t
end

val transfer : 'a Input.t -> 'a Output.t -> unit Lwt.t
Copy link
Owner

@rgrinberg rgrinberg Nov 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd try to match the API here: https://github.com/ocaml/ocaml-lsp/blob/master/fiber-unix/src/fiber_stream.mli#L33

It's already the one I'm familiar with (which I copied from the original Haskell source)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I'll add in the Lwt version of fiber_stream module.

1 change: 1 addition & 0 deletions rock/src/rock.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module App = App
module Context = Context
module Io_stream = Io_stream
module Body = Body
module Request = Request
module Response = Response
Expand Down
1 change: 1 addition & 0 deletions rock/src/rock.mli
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module App = App
module Context = Context
module Request = Request
module Response = Response
module Io_stream = Io_stream
module Body = Body
module Service = Service
module Filter = Filter
Expand Down