Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction only commited after stream is drained #2132

Closed
wb14123 opened this issue Nov 10, 2024 · 19 comments · Fixed by #2137
Closed

Transaction only commited after stream is drained #2132

wb14123 opened this issue Nov 10, 2024 · 19 comments · Fixed by #2137
Labels
Bug Bugs
Milestone

Comments

@wb14123
Copy link
Contributor

wb14123 commented Nov 10, 2024

When using doobie with fs2 stream and transaction, it only commits the transaction after the stream is drained, including any downstream operations.

For example:

val q = quote {
   ...
}
stream(q).transact(xa).map { _ =>
  doThings() // some time consuming operation
}

The transaction is only committed (and return connection to the pool) after doThings are finished.

Is it expected behavior?

@jatcwang
Copy link
Collaborator

What version of doobie are you using? I believe that issue should've been fixed for a while now looking at the history.
c6daae8

Can you try to replicate the issue for me with 1.0.0-RC5?

@wb14123
Copy link
Contributor Author

wb14123 commented Nov 11, 2024

I'm using 1.0.0-RC4, which should already included the commit. Tried 1.0.0-RC5 but still the same. I'm using fs2 stream 3.9.3 btw.

@jatcwang
Copy link
Collaborator

Oh that's no good. Can you provide a minimal reproduction for this issue?

wb14123 added a commit to wb14123/doobie that referenced this issue Nov 12, 2024
@wb14123
Copy link
Contributor Author

wb14123 commented Nov 12, 2024

Sure, I created a unit test here. Just run sbt "project hikari" "test:testOnly *PGConcurrentSuite" on my branch: https://github.com/wb14123/doobie/tree/stream-leak .

Output:

doobie.postgres.PGConcurrentSuite:                                                            
==> X doobie.postgres.PGConcurrentSuite.Connection returned before stream is drained  3.803s java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request ti
med out after 2001ms (total=1, active=1, idle=0, waiting=0)            
    at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:686)     
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:179)                                                                                                                  
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:144)                                                                                                                  
    at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:127)            
    at doobie.util.transactor$Transactor$FromDataSourceUnapplied.$anonfun$apply$14(transactor.scala:333)        
    at delay @ doobie.util.transactor$Transactor$FromDataSourceUnapplied.$anonfun$apply$13(transactor.scala:333)                                                                             
    at fromAutoCloseable @ doobie.util.transactor$Transactor$FromDataSourceUnapplied.$anonfun$apply$13(transactor.scala:333)                                                                 
    at fromAutoCloseable @ doobie.util.transactor$Transactor$FromDataSourceUnapplied.$anonfun$apply$13(transactor.scala:333)                                                                 
    at use @ fs2.Compiler$Target.compile(Compiler.scala:158)                   
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)       
    at handleErrorWith @ fs2.Compiler$Target.handleErrorWith(Compiler.scala:161)         
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at get @ fs2.internal.Scope.openScope(Scope.scala:275)                            
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)                              
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)                     
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)                                                                                                                             
    at flatMap @ fs2.Pull$.goEval$1(Pull.scala:1089)                                                                                                                                         
    at get @ fs2.internal.Scope.openScope(Scope.scala:275)                                                                                                                                   
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)                              
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)                              
[error] Failed: Total 1, Failed 1, Errors 0, Passed 0                                         
[error] Failed tests:                                                                         
[error]         doobie.postgres.PGConcurrentSuite                                                                                                                                            
[error] (Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 18 s, completed Nov 11, 2024, 7:37:38 PM  

@wb14123
Copy link
Contributor Author

wb14123 commented Nov 13, 2024

I'm not familiar about how scope works in fs2 stream. Is it supposed to close the resource early? Since in my test:

val res1 = Resource.make(IO(println("res1 acquire")))(_ => IO(println("res1 close")))
s2.Stream.resource(res1)
  .scope
  .evalMap{_ => IO(println("start sleep")) >> IO.sleep(10.seconds) >> IO(println("sleep finished"))}
  .compile.drain.unsafeRunSync()

The output is:

res1 acquire
start sleep
sleep finished
res1 close

So the resource is waiting for downstream operations to finish before release?

@jatcwang
Copy link
Collaborator

jatcwang commented Nov 13, 2024

Thanks for the minimized example. That is a bit surprising indeed.

Stream close their resource when it's exhausted so one way I know to get the behaviour you want is.

    val res1 = Resource.make(IO(println("res1 acquire")))(_ => IO(println("res1 close")))
    (
      Stream.resource(res1) ++
      Stream.eval(IO(println("start sleep")) >> IO.sleep(1.seconds) >> IO(println("sleep finished")))
    )
     .compile.drain
res1 acquire
res1 close
start sleep
sleep finished

https://scastie.scala-lang.org/gnqt4MwzRTGssAAkQJ3KCg

But that's the extent of what I remember digging deeper (fs2 docs doesn't seem to offer any guidance on this). This definitely needs fixing on doobie's end if possible.

@wb14123
Copy link
Contributor Author

wb14123 commented Nov 13, 2024

Yeah my current work around is to just convert the stream into a list. But it essentially makes stream query useless. My use cases are mostly to send the result stream to client in a web app. Something like this:

val resultStream = ... //
resultStream.evalMap(elem => sendToClient(elem))....

I thought this should help performance since it's like a data pipe: the send to client is started before all the results from db is fetched. But if it's blocking on sendToClient, that means a slow network connection on client side will block the transaction from commit and return to the pool, which is very bad. (The sendToClient is not always explicit. For example, I'm using fs2-grpc and http4s which can give a stream as response. I guess it's doing something similar internally as the example above).

I think lots of people uses doobie like this especially with Typelevel stack like fs2-grpc and http4s. Hopefully it can be fixed so I don't need to replace all the streams in my code to an IO list. Do you think it's something need to be reported to fs2 library?

@jatcwang
Copy link
Collaborator

jatcwang commented Nov 13, 2024

I agree. I don't think there's a bug in fs2 but it'd be good to have some documentation on resource scopes.

Trawling through the fs2 chat history (e.g. https://discord.com/channels/632277896739946517/632310980449402880/966754369930420294) may help understanding what's going on and what needs to be done to fix it.

@wb14123
Copy link
Contributor Author

wb14123 commented Nov 13, 2024

Yeah I think on fs2's side, it's reasonable to close the resource only after the stream operations are done, because it doesn't know if the resource is still needed in downstream or not.

I think what can be done is introduce a buffer in between. Something like this:

resultStream -> bufferStream -> downstreamOps

If the downstreamOps is slower than resultStream, then the buffer is filled to drain resultStream. Once the resultStream is done, the transaction is committed and connection is returned, while downstream can still slowly get results from buffer.

Not really sure how to implement it with fs2 yet but should be something possible.

@wb14123
Copy link
Contributor Author

wb14123 commented Nov 13, 2024

@jatcwang I created a prototype for this approach in #2137. I created a new transBuffer method to accept a buffer size param. Do you think it's good to make this approach as default (change the existing behavior instead of create a new method) ? The buffer size can be moved to a global config somewhere.

@jatcwang
Copy link
Collaborator

Thanks for the PR! Yes thinking about it I agree that's the only logical way this can work, due to fs2 Stream being pull-based. I do wonder whether fs2 has an operator for this kind of "buffering". I see various buffer* methods but I'm not sure if they run the finalizer of the stream eager when it is exhausted.

@wb14123
Copy link
Contributor Author

wb14123 commented Nov 15, 2024

From the doc and example here, seems the buffer method is waiting N elements before output the elements to downstream. This behavior is not what we want here.

@jatcwang
Copy link
Collaborator

There is groupWithin(..).unchunks but we'll need to test that it has the desired behaviour.

@wb14123
Copy link
Contributor Author

wb14123 commented Nov 15, 2024

I think groupWithin is basically the same as buffer. They are blocking the emitting before N elements are put into the buffer. Here we don't want the buffer to block the emitting, but to store elements when downstream operations are slow for query result to drain.

@wb14123
Copy link
Contributor Author

wb14123 commented Nov 16, 2024

@jatcwang let me know when you think this is the right approach and where to put the buffer size configuration. I can refine the PR to make it the default stream method.

@jatcwang
Copy link
Collaborator

@wb14123 I think the idea is sound thanks! I'm checking with the fs2 folks too in case we're reinventing the wheel or there's a better way :) https://discord.com/channels/632277896739946517/632310980449402880/1307638686468407319

@jatcwang
Copy link
Collaborator

@wb14123 Seems like prefetchN implements the same logic we have here. In doobie's current stream implementation we emit a chunk per fetch from JDBC (determined by fetchSize).

Assuming prefetchN does exactly what we want here, I think we can..

  • Modify transact to use prefetchN(1)
  • Add transactNoPrefetch with the old transact logic
  • Add transactPrefetchN?

@wb14123
Copy link
Contributor Author

wb14123 commented Nov 17, 2024

Looking at the code, yeah it seems implement basically the same logic. That's cool!

Modify transact to use prefetchN(1)

Yeah this default behavior makes sense to me.

Add transactNoPrefetch with the old transact logic
Add transactPrefetchN?

I think these two are a little bit redundant. It's kind of unclear what n means in prefetchN, so maybe just provide transactNoPrefetch so the user can do their own prefetchN if wanted?

wb14123 added a commit to wb14123/doobie that referenced this issue Nov 17, 2024
Resolves typelevel#2132.

Use fs2 Stream's `prefetchN` to buffer query results so that slow
downstream operations don't slow down transaction commit.

The buffered result size is `chunkSize` (equals to `fetchSize`) by
default. If want a different buffer size, can use `transactNoPrefetc` to
get a stream without `prefetchN` and append any prefetch operations
wanted.

prototype stream with buffer to avoid block on transaction commit

use prefetchN
wb14123 added a commit to wb14123/doobie that referenced this issue Nov 17, 2024
Resolves typelevel#2132.

Use fs2 Stream's `prefetchN` to buffer query results so that slow
downstream operations don't slow down transaction commit.

The buffered result size is `chunkSize` (equals to `fetchSize`) by
default. If want a different buffer size, can use `transactNoPrefetc` to
get a stream without `prefetchN` and append any prefetch operations
wanted.
wb14123 added a commit to wb14123/doobie that referenced this issue Nov 17, 2024
Resolves typelevel#2132.

Use fs2 Stream's `prefetchN` to buffer query results so that slow
downstream operations don't slow down transaction commit.

The buffered result size is `chunkSize` (equals to `fetchSize`) by
default. If want a different buffer size, can use `transactNoPrefetc` to
get a stream without `prefetchN` and append any prefetch operations
wanted.
@wb14123
Copy link
Contributor Author

wb14123 commented Nov 17, 2024

I updated the PR. Maybe we can continue the discussion there.

wb14123 added a commit to wb14123/doobie that referenced this issue Nov 17, 2024
Resolves typelevel#2132.

Use fs2 Stream's `prefetchN` to buffer query results so that slow
downstream operations don't slow down transaction commit.

The buffered result size is `chunkSize` (equals to `fetchSize`) by
default. If want a different buffer size, can use `transactNoPrefetc` to
get a stream without `prefetchN` and append any prefetch operations
wanted.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Bug Bugs
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants