-
Notifications
You must be signed in to change notification settings - Fork 359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transaction only commited after stream is drained #2132
Comments
What version of doobie are you using? I believe that issue should've been fixed for a while now looking at the history. Can you try to replicate the issue for me with 1.0.0-RC5? |
I'm using 1.0.0-RC4, which should already included the commit. Tried 1.0.0-RC5 but still the same. I'm using fs2 stream 3.9.3 btw. |
Oh that's no good. Can you provide a minimal reproduction for this issue? |
Sure, I created a unit test here. Just run Output:
|
I'm not familiar about how val res1 = Resource.make(IO(println("res1 acquire")))(_ => IO(println("res1 close")))
s2.Stream.resource(res1)
.scope
.evalMap{_ => IO(println("start sleep")) >> IO.sleep(10.seconds) >> IO(println("sleep finished"))}
.compile.drain.unsafeRunSync() The output is:
So the resource is waiting for downstream operations to finish before release? |
Thanks for the minimized example. That is a bit surprising indeed. Stream close their resource when it's exhausted so one way I know to get the behaviour you want is.
https://scastie.scala-lang.org/gnqt4MwzRTGssAAkQJ3KCg But that's the extent of what I remember digging deeper (fs2 docs doesn't seem to offer any guidance on this). This definitely needs fixing on doobie's end if possible. |
Yeah my current work around is to just convert the stream into a list. But it essentially makes stream query useless. My use cases are mostly to send the result stream to client in a web app. Something like this:
I thought this should help performance since it's like a data pipe: the send to client is started before all the results from db is fetched. But if it's blocking on I think lots of people uses doobie like this especially with Typelevel stack like fs2-grpc and http4s. Hopefully it can be fixed so I don't need to replace all the streams in my code to an IO list. Do you think it's something need to be reported to fs2 library? |
I agree. I don't think there's a bug in fs2 but it'd be good to have some documentation on resource scopes. Trawling through the fs2 chat history (e.g. https://discord.com/channels/632277896739946517/632310980449402880/966754369930420294) may help understanding what's going on and what needs to be done to fix it. |
Yeah I think on fs2's side, it's reasonable to close the resource only after the stream operations are done, because it doesn't know if the resource is still needed in downstream or not. I think what can be done is introduce a buffer in between. Something like this:
If the Not really sure how to implement it with fs2 yet but should be something possible. |
@jatcwang I created a prototype for this approach in #2137. I created a new |
Thanks for the PR! Yes thinking about it I agree that's the only logical way this can work, due to fs2 Stream being pull-based. I do wonder whether fs2 has an operator for this kind of "buffering". I see various |
From the doc and example here, seems the buffer method is waiting N elements before output the elements to downstream. This behavior is not what we want here. |
There is |
I think |
@jatcwang let me know when you think this is the right approach and where to put the buffer size configuration. I can refine the PR to make it the default stream method. |
@wb14123 I think the idea is sound thanks! I'm checking with the fs2 folks too in case we're reinventing the wheel or there's a better way :) https://discord.com/channels/632277896739946517/632310980449402880/1307638686468407319 |
@wb14123 Seems like Assuming
|
Looking at the code, yeah it seems implement basically the same logic. That's cool!
Yeah this default behavior makes sense to me.
I think these two are a little bit redundant. It's kind of unclear what |
Resolves typelevel#2132. Use fs2 Stream's `prefetchN` to buffer query results so that slow downstream operations don't slow down transaction commit. The buffered result size is `chunkSize` (equals to `fetchSize`) by default. If want a different buffer size, can use `transactNoPrefetc` to get a stream without `prefetchN` and append any prefetch operations wanted. prototype stream with buffer to avoid block on transaction commit use prefetchN
Resolves typelevel#2132. Use fs2 Stream's `prefetchN` to buffer query results so that slow downstream operations don't slow down transaction commit. The buffered result size is `chunkSize` (equals to `fetchSize`) by default. If want a different buffer size, can use `transactNoPrefetc` to get a stream without `prefetchN` and append any prefetch operations wanted.
Resolves typelevel#2132. Use fs2 Stream's `prefetchN` to buffer query results so that slow downstream operations don't slow down transaction commit. The buffered result size is `chunkSize` (equals to `fetchSize`) by default. If want a different buffer size, can use `transactNoPrefetc` to get a stream without `prefetchN` and append any prefetch operations wanted.
I updated the PR. Maybe we can continue the discussion there. |
Resolves typelevel#2132. Use fs2 Stream's `prefetchN` to buffer query results so that slow downstream operations don't slow down transaction commit. The buffered result size is `chunkSize` (equals to `fetchSize`) by default. If want a different buffer size, can use `transactNoPrefetc` to get a stream without `prefetchN` and append any prefetch operations wanted.
When using doobie with fs2 stream and transaction, it only commits the transaction after the stream is drained, including any downstream operations.
For example:
The transaction is only committed (and return connection to the pool) after
doThings
are finished.Is it expected behavior?
The text was updated successfully, but these errors were encountered: