Skip to content

Commit

Permalink
world/entity.go: Document new synchronisation stuff and cleanups.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sandertv committed Dec 23, 2024
1 parent 09399ad commit c270627
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 36 deletions.
68 changes: 57 additions & 11 deletions server/world/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func (e *EntityHandle) UUID() uuid.UUID {
return e.id
}

// Close closes the EntityHandle. Any subsequent call to ExecWorld will return
// immediately without the transaction function being called.
func (e *EntityHandle) Close(tx *Tx) {
e.setAndUnlockWorld(closeWorld, tx)
}

// ExecWorld obtains the EntityHandle's World in a thread-safe way and opens a
// transaction in it when it does. If the EntityHandle has not been added to a
// world, ExecWorld will block until the EntityHandle is added to a World and
Expand All @@ -147,18 +153,40 @@ func (e *EntityHandle) ExecWorld(f func(tx *Tx, e Entity)) (run bool) {
return e.execWorld(f, false)
}

func (e *EntityHandle) execWorld(f func(tx *Tx, e Entity), weak bool) (run bool) {
// execWorld uses a sync.Cond to synchronise access to the handler's world. We
// are dealing with a rather complicated synchronisation pattern here.
// The summary is as follows: The goal for ExecWorld is to block
func (e *EntityHandle) execWorld(f func(tx *Tx, e Entity), weak bool) bool {
e.cond.L.Lock()
for e.w == nil || (!weak && e.weakTxActive) {
// Wait suspends the current goroutine and unlocks e.cond.L, until
// e.cond.Broadcast() is called. After this, one of the goroutines
// waiting will acquire a lock of e.cond.L again. This means that only
// one goroutine will run the code after this simultaneously.
e.cond.Wait()
}
// If a goroutine manages to exit the for loop, it will have acquired a lock
// on e.cond.L. This also means that e.w can be assumed to not be nil here.
// Because of the lock on e.cond.L, no other transaction will be able to
// change e.w until we finish. e.worldless is set to true in
// e.unsetAndLockWorld(), where the entity's world is removed.
e.worldless.Store(false)
if e.w == closeWorld {
// EntityHandle was closed.
// EntityHandle was closed. No need to continue.
e.cond.L.Unlock()
return false
}

// We now arrive at the more complicated part. When we call e.w.Exec(), our
// transaction must await earlier transactions in the world. If one of those
// earlier transactions tries to change e.w (through e.unsetAndLockWorld()
// or e.setAndUnlockWorld()), it must lock e.cond.L. This would lead to a
// deadlock, because we already have e.cond.L locked here.
// We work around this with so-called "weak transactions". This is a
// transaction that may be invalidated before it is executed. In this case,
// this invalidation happens by setting e.worldless to true. If the
// transaction turns out to be invalidated (ret == false), we simply try
// again, this time with e.execWorld(f, true) to make this goroutine bypass
// any goroutines still awaiting e.cond.
ret := e.weakExec(func(tx *Tx) {
e.lockedTx.Store(tx)
f(tx, e.mustEntity(tx))
Expand All @@ -167,33 +195,51 @@ func (e *EntityHandle) execWorld(f func(tx *Tx, e Entity), weak bool) (run bool)
e.cond.L.Unlock()

if !ret {
// Our weak transaction was suspended. We try again, this time with
// e.execWorld(f, true) to make this goroutine bypass any goroutines
// still awaiting e.cond.
return e.execWorld(f, true)
}
return true
}

// weakExec performs a "weak transaction". It adds a transaction to the world
// that is invalidated when e.worldless is set to true. In this case, weakExec
// returns false. If the weak transaction is successfully executed, it returns
// true, and any calls to ExecWorld waiting on e.cond are awakened. The goal of
// weakExec is to suspend the current goroutine and unlock e.cond.L while
// waiting for previous transactions to finish.
func (e *EntityHandle) weakExec(f ExecFunc) bool {
e.weakTxActive = true

// We create a weak transaction and start a for loop to listen for the
// length of the channel. This might look weird, but the crucial part here
// is the call to e.cond.Wait(), which unlocks e.cond.L. This is required
// to prevent a deadlock if an earlier transaction tries to change e.w.
c := e.w.weakExec(e.worldless, e.cond, f)
for len(c) == 0 {
for len(c) == 0 && e.w != closeWorld {
// Calling e.cond.Wait() here will free the lock on e.cond.L until our
// transaction finishes. e.w.weakExec() ensures that e.cond.Broadcast()
// is called once the transaction finished/is suspended, so we can
// continue after that.
e.cond.Wait()
}
if success := <-c; !success {
// If the EntityHandle was closed (e.w == closeWorld), we treat the
// transaction as successful, because all transactions must be cancelled.
if e.w != closeWorld && !<-c {
// Weak transaction was suspended. Return false and try again.
return false
}
// After setting e.weakTxActive back to false, we must Broadcast to make
// sure any goroutines waiting in e.execWorld as a result of the
// e.weakTxActive condition can continue.
e.weakTxActive = false
e.cond.Broadcast()
return true
}

var closeWorld = &World{}

// Close closes the EntityHandle. Any subsequent call to ExecWorld will return
// immediately without the transaction function being called.
func (e *EntityHandle) Close(tx *Tx) {
e.setAndUnlockWorld(closeWorld, tx)
}

func (e *EntityHandle) unsetAndLockWorld(tx *Tx) {
// If the entity is in a tx created using ExecWorld, e.cond.L will already
// be locked. Don't try to lock again in that case.
Expand Down
48 changes: 48 additions & 0 deletions server/world/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/df-mc/dragonfly/server/block/cube"
"github.com/go-gl/mathgl/mgl64"
"iter"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -226,3 +228,49 @@ func (tx *Tx) World() *World {
func (tx *Tx) close() {
tx.closed = true
}

// normalTransaction is added to the transaction queue for transactions created
// using World.Exec().
type normalTransaction struct {
c chan struct{}
f func(tx *Tx)
}

// Run creates a *Tx, calls ntx.f, closes the transaction and finally closes
// ntx.c.
func (ntx normalTransaction) Run(w *World) {
tx := &Tx{w: w}
ntx.f(tx)
tx.close()
close(ntx.c)
}

// weakTransaction is a transaction that may be cancelled by setting its invalid
// bool to false before the transaction is run.
type weakTransaction struct {
wait chan struct{}

Check failure on line 251 in server/world/tx.go

View workflow job for this annotation

GitHub Actions / Build

field wait is unused (U1000)
c chan bool
f func(tx *Tx)
invalid *atomic.Bool
cond *sync.Cond
}

// Run runs the transaction, first checking if its invalid bool is false and
// creating a *Tx if so. Afterwards, a bool indicating if the transaction was
// run is added to wtx.c. Finally, wtx.cond.Broadcast() is called.
func (wtx weakTransaction) Run(w *World) {
valid := !wtx.invalid.Load()
if valid {
tx := &Tx{w: w}
wtx.f(tx)
tx.close()
}
// We have to acquire a lock on wtx.cond.L here to make sure cond.Wait()
// has been called before we call cond.Broadcast(). If not, we might
// broadcast before cond.Wait() and cause a permanent suspension.
wtx.cond.L.Lock()
defer wtx.cond.L.Unlock()

wtx.c <- valid
wtx.cond.Broadcast()
}
35 changes: 10 additions & 25 deletions server/world/world.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,10 @@ type World struct {
viewers map[*Loader]Viewer
}

// transaction holds a transaction function and the channel to be closed once
// complete.
type transaction struct {
c chan bool
f func(tx *Tx)
invalid *atomic.Bool
cond *sync.Cond
// transaction is a type that may be added to the transaction queue of a World.
// Its Run method is called when the transaction is taken out of the queue.
type transaction interface {
Run(w *World)
}

// New creates a new initialised world. The world may be used right away, but
Expand Down Expand Up @@ -115,15 +112,15 @@ type ExecFunc func(tx *Tx)

// Exec performs a synchronised transaction f on a World. Exec returns a channel
// that is closed once the transaction is complete.
func (w *World) Exec(f ExecFunc) <-chan bool {
c := make(chan bool, 1)
w.queue <- transaction{c: c, f: f}
func (w *World) Exec(f ExecFunc) <-chan struct{} {
c := make(chan struct{})
w.queue <- normalTransaction{c: c, f: f}
return c
}

func (w *World) weakExec(invalid *atomic.Bool, cond *sync.Cond, f ExecFunc) <-chan bool {
c := make(chan bool, 1)
w.queue <- transaction{c: c, f: f, invalid: invalid, cond: cond}
w.queue <- weakTransaction{c: c, f: f, invalid: invalid, cond: cond}
return c
}

Expand All @@ -132,20 +129,8 @@ func (w *World) weakExec(invalid *atomic.Bool, cond *sync.Cond, f ExecFunc) <-ch
func (w *World) handleTransactions() {
for {
select {
case queuedTx := <-w.queue:
if queuedTx.invalid != nil && queuedTx.invalid.Load() {
queuedTx.c <- false
queuedTx.cond.Broadcast()
continue
}
tx := &Tx{w: w}
queuedTx.f(tx)
tx.close()

queuedTx.c <- true
if queuedTx.cond != nil {
queuedTx.cond.Broadcast()
}
case tx := <-w.queue:
tx.Run(w)
case <-w.closing:
w.running.Done()
return
Expand Down

0 comments on commit c270627

Please sign in to comment.