Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky file watcher test #258

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 72 additions & 56 deletions internal/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ func TestFileWatcher_WatchFile(t *testing.T) {
name: "all updates notified",
fileReader: newMockReader("test", "original", nil),
genUpdates: func(reader *mockReader) {
reader.setData([]byte("update 1"))
reader.waitForRead()
reader.setData([]byte("update 2"))
reader.waitForRead()
reader.setDataAndWaitForRead([]byte("update 1"))
reader.setDataAndWaitForRead([]byte("update 2"))
},
interval: watcherInterval,
wantCallbacks: 2,
Expand All @@ -63,14 +61,10 @@ func TestFileWatcher_WatchFile(t *testing.T) {
name: "no content changes don't notify",
fileReader: newMockReader("test", "original", nil),
genUpdates: func(reader *mockReader) {
reader.setData([]byte("update 1"))
reader.waitForRead()
reader.setData([]byte("update 2"))
reader.waitForRead()
reader.setData([]byte("update 2"))
reader.waitForRead()
reader.setData([]byte("update 2"))
reader.waitForRead()
reader.setDataAndWaitForRead([]byte("update 1"))
reader.setDataAndWaitForRead([]byte("update 2"))
reader.setDataAndWaitForRead([]byte("update 2"))
reader.setDataAndWaitForRead([]byte("update 2"))
},
interval: watcherInterval,
wantCallbacks: 2,
Expand All @@ -81,11 +75,8 @@ func TestFileWatcher_WatchFile(t *testing.T) {
name: "missed update due to slow interval",
fileReader: newMockReader("test", "original", nil),
genUpdates: func(reader *mockReader) {
reader.setData([]byte("update 1"))
// no waiting for the read to happen and performing next update
// reader.waitForRead()
reader.setData([]byte("update 2"))
reader.waitForRead()
reader.setData([]byte("update 1")) // no waiting for the read to happen and performing next update
reader.setDataAndWaitForRead([]byte("update 2"))
},
interval: watcherInterval,
wantCallbacks: 1,
Expand All @@ -102,15 +93,12 @@ func TestFileWatcher_WatchFile(t *testing.T) {
name: "error reading file after start",
fileReader: newMockReader("test", "original", nil),
genUpdates: func(reader *mockReader) {
reader.setData([]byte("update 1"))
reader.waitForRead()
reader.setErr(errors.New("error reading file"))
reader.waitForRead()
reader.setDataAndWaitForRead([]byte("update 1"))
reader.setErrAdnWaitForRead(errors.New("error reading file"))
// stop error
reader.setErr(nil)
reader.unsetErr()
// even if an error happens, next updates should be notified
reader.setData([]byte("update 2"))
reader.waitForRead()
reader.setDataAndWaitForRead([]byte("update 2"))
},
interval: watcherInterval,
wantCallbacks: 2,
Expand Down Expand Up @@ -158,7 +146,7 @@ func TestFileWatcher_WatchFile(t *testing.T) {
require.False(t, ok)
}

tt.fileReader.waitForRead() // Wait for the first read to happen, the one synchronous
tt.fileReader.waitForRead(got) // Wait for the first read to happen, the one synchronous
require.Equal(t, tt.want, string(got))
require.NoError(t, err)

Expand Down Expand Up @@ -199,7 +187,7 @@ func TestFileWatcher_WatchFile(t *testing.T) {
mu1.Unlock()
})
require.NoError(t, err)
file1.waitForRead() // Wait for the first read to happen
file1.waitForRead([]byte("original1")) // Wait for the first read to happen

file2 := newMockReader("test2", "original2", nil)
got2, err := fw.WatchFile(file2, watcherInterval, func(data []byte) {
Expand All @@ -209,14 +197,11 @@ func TestFileWatcher_WatchFile(t *testing.T) {
mu2.Unlock()
})
require.NoError(t, err)
file2.waitForRead() // Wait for the first read to happen
file2.waitForRead([]byte("original2")) // Wait for the first read to happen

file1.setData([]byte("update 1-1"))
file1.waitForRead()
file2.setData([]byte("update 2-1"))
file2.waitForRead()
file1.setData([]byte("update 1-2"))
file1.waitForRead()
file1.setDataAndWaitForRead([]byte("update 1-1"))
file2.setDataAndWaitForRead([]byte("update 2-1"))
file1.setDataAndWaitForRead([]byte("update 1-2"))

// ensure no more updates are notified before verifying the results
cancel()
Expand Down Expand Up @@ -250,7 +235,7 @@ func TestFileWatcher_WatchFile(t *testing.T) {
muU.Unlock()
})
require.NoError(t, err)
file1.waitForRead() // Wait for the first read to happen
file1.waitForRead([]byte("original")) // Wait for the first read to happen

wg := sync.WaitGroup{}
wg.Add(2) // 2 callbacks to be notified
Expand All @@ -263,12 +248,10 @@ func TestFileWatcher_WatchFile(t *testing.T) {
muO.Unlock()
})
require.NoError(t, err)
file1.waitForRead() // Wait for the first read to happen again
file1.waitForRead([]byte("override")) // Wait for the first read to happen again

file1.setData([]byte("update 1"))
file1.waitForRead()
file1.setData([]byte("update 2"))
file1.waitForRead()
file1.setDataAndWaitForRead([]byte("update 1"))
file1.setDataAndWaitForRead([]byte("update 2"))

// ensure no more updates are notified before verifying the results
cancel()
Expand All @@ -284,57 +267,90 @@ func TestFileWatcher_WatchFile(t *testing.T) {

var _ Reader = (*mockReader)(nil)

type mockReader struct {
id string
err error
type (
mockReader struct {
id string
err error

m sync.Mutex
fileData []byte
m sync.Mutex
fileData []byte

// reads is used to signal that a read happened, it should be buffered to avoid deadlocks.
// It is used to know if a read happened but there's no need to block reads happening if no one is waiting for them.
reads chan struct{}
}
// reads is used to signal that a read happened, it should be buffered to avoid deadlocks.
// It is used to know if a read happened but there's no need to block reads happening if no one is waiting for them.
reads chan msg
}

msg struct {
data []byte
err error
}
)

func newMockReader(id, data string, err error) *mockReader {
return &mockReader{
id: id,
fileData: []byte(data),
err: err,
reads: make(chan struct{}, 50),
reads: make(chan msg, 50),
}
}

// Implement Reader interface
func (m *mockReader) ID() string {
return m.id
}

// Implement Reader interface
func (m *mockReader) Read() ([]byte, error) {
// Notify that a read happened
defer func() { m.reads <- struct{}{} }()

m.m.Lock()
defer m.m.Unlock()

// Notify that a read happened
defer func() { m.reads <- msg{data: m.fileData, err: m.err} }()

if m.err != nil {
return nil, m.err
}

return m.fileData, nil
}

// setData sets the data to be read.
func (m *mockReader) setData(data []byte) {
m.m.Lock()
defer m.m.Unlock()
m.fileData = data
}

func (m *mockReader) setErr(err error) {
// setDataAndWaitForRead sets the data and waits for the read to happen with the given data.
func (m *mockReader) setDataAndWaitForRead(data []byte) {
m.setData(data)
m.waitForRead(data)
}

// waitForRead waits for the given data to be read.
func (m *mockReader) waitForRead(data []byte) {
readData := <-m.reads // wait for at least first read
for string(readData.data) != string(data) {
readData = <-m.reads
}
}

// setErrAndWaitForRead sets the error and waits for the read to happen with the given error.
func (m *mockReader) setErrAdnWaitForRead(err error) {
m.m.Lock()
defer m.m.Unlock()
m.err = err
m.m.Unlock()

readData := <-m.reads // wait for at least first read
for !errors.Is(err, readData.err) {
readData = <-m.reads
}
}

func (m *mockReader) waitForRead() {
<-m.reads
// unsetErr unsets the error to stop the mockReader to fail on Read().
func (m *mockReader) unsetErr() {
m.m.Lock()
defer m.m.Unlock()
m.err = nil
}