-
Notifications
You must be signed in to change notification settings - Fork 353
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement blocking unnamed_socket #4072
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,12 @@ struct AnonSocket { | |
/// This flag is set to `true` if the peer's `readbuf` is non-empty at the time | ||
/// of closure. | ||
peer_lost_data: Cell<bool>, | ||
/// A list of thread ids blocked because the buffer was empty. | ||
/// Once another thread writes some bytes, these threads will be unblocked. | ||
blocked_read_tid: RefCell<Vec<ThreadId>>, | ||
/// A list of thread ids blocked because the buffer was full. | ||
/// Once another thread reads some bytes, these threads will be unblocked. | ||
blocked_write_tid: RefCell<Vec<ThreadId>>, | ||
is_nonblock: bool, | ||
} | ||
|
||
|
@@ -83,7 +89,7 @@ impl FileDescription for AnonSocket { | |
|
||
fn read<'tcx>( | ||
&self, | ||
_self_ref: &FileDescriptionRef, | ||
self_ref: &FileDescriptionRef, | ||
_communicate_allowed: bool, | ||
ptr: Pointer, | ||
len: usize, | ||
|
@@ -100,33 +106,21 @@ impl FileDescription for AnonSocket { | |
// corresponding ErrorKind variant. | ||
throw_unsup_format!("reading from the write end of a pipe"); | ||
}; | ||
if readbuf.borrow().buf.is_empty() { | ||
if self.peer_fd().upgrade().is_none() { | ||
// Socketpair with no peer and empty buffer. | ||
// 0 bytes successfully read indicates end-of-file. | ||
return ecx.return_read_success(ptr, &[], 0, dest); | ||
} else { | ||
if self.is_nonblock { | ||
// Non-blocking socketpair with writer and empty buffer. | ||
// https://linux.die.net/man/2/read | ||
// EAGAIN or EWOULDBLOCK can be returned for socket, | ||
// POSIX.1-2001 allows either error to be returned for this case. | ||
// Since there is no ErrorKind for EAGAIN, WouldBlock is used. | ||
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); | ||
} else { | ||
// Blocking socketpair with writer and empty buffer. | ||
// FIXME: blocking is currently not supported | ||
throw_unsup_format!("socketpair/pipe/pipe2 read: blocking isn't supported yet"); | ||
} | ||
} | ||
|
||
if readbuf.borrow().buf.is_empty() && self.is_nonblock { | ||
// Non-blocking socketpair with writer and empty buffer. | ||
// https://linux.die.net/man/2/read | ||
// EAGAIN or EWOULDBLOCK can be returned for socket, | ||
// POSIX.1-2001 allows either error to be returned for this case. | ||
// Since there is no ErrorKind for EAGAIN, WouldBlock is used. | ||
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); | ||
} | ||
// TODO: We might need to decide what to do if peer_fd is closed when read is blocked. | ||
oli-obk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
anonsocket_read(self, self.peer_fd().upgrade(), len, ptr, dest, ecx) | ||
tiif marked this conversation as resolved.
Show resolved
Hide resolved
|
||
anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx) | ||
} | ||
|
||
fn write<'tcx>( | ||
&self, | ||
_self_ref: &FileDescriptionRef, | ||
self_ref: &FileDescriptionRef, | ||
_communicate_allowed: bool, | ||
ptr: Pointer, | ||
len: usize, | ||
|
@@ -153,16 +147,11 @@ impl FileDescription for AnonSocket { | |
}; | ||
let available_space = | ||
MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len()); | ||
Comment on lines
148
to
149
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please do not duplicate this logic (that same line now exists twice in this file, always a bad sign). |
||
if available_space == 0 { | ||
if self.is_nonblock { | ||
// Non-blocking socketpair with a full buffer. | ||
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); | ||
} else { | ||
// Blocking socketpair with a full buffer. | ||
throw_unsup_format!("socketpair/pipe/pipe2 write: blocking isn't supported yet"); | ||
} | ||
if available_space == 0 && self.is_nonblock { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should also be moved into anonsocket_write I think, again to keep all the logic in the same place. |
||
// Non-blocking socketpair with a full buffer. | ||
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); | ||
} | ||
anonsocket_write(available_space, &peer_fd, ptr, len, dest, ecx) | ||
anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx) | ||
} | ||
|
||
fn as_unix(&self) -> &dyn UnixFileDescription { | ||
|
@@ -172,81 +161,161 @@ impl FileDescription for AnonSocket { | |
|
||
/// Write to AnonSocket based on the space available and return the written byte size. | ||
fn anonsocket_write<'tcx>( | ||
available_space: usize, | ||
peer_fd: &FileDescriptionRef, | ||
weak_self_ref: WeakFileDescriptionRef, | ||
ptr: Pointer, | ||
len: usize, | ||
dest: &MPlaceTy<'tcx>, | ||
dest: MPlaceTy<'tcx>, | ||
ecx: &mut MiriInterpCx<'tcx>, | ||
) -> InterpResult<'tcx> { | ||
let Some(self_ref) = weak_self_ref.upgrade() else { | ||
// FIXME: We should raise a deadlock error if the self_ref upgrade failed. | ||
throw_unsup_format!("This will be a deadlock error in future") | ||
}; | ||
Comment on lines
+170
to
+173
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this PR is big enough, so I decided to temporarily make this throws unsupported format. I wanted to write a test for this, and potentially tweak the diagnostic a bit if it is too confusing (because of the reason stated here). But I certainly won't mind putting it in this PR if anyone prefers. |
||
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap(); | ||
let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else { | ||
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been | ||
// closed. | ||
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, &dest); | ||
Comment on lines
+175
to
+178
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it really necessary to duplicate this check (it is both here and in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea we can always do the check in |
||
}; | ||
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else { | ||
// FIXME: This should return EBADF, but there's no nice way to do that as there's no | ||
// corresponding ErrorKind variant. | ||
throw_unsup_format!("writing to the reading end of a pipe") | ||
}; | ||
let mut writebuf = writebuf.borrow_mut(); | ||
|
||
// Remember this clock so `read` can synchronize with us. | ||
ecx.release_clock(|clock| { | ||
writebuf.clock.join(clock); | ||
}); | ||
// Do full write / partial write based on the space available. | ||
let actual_write_size = len.min(available_space); | ||
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?; | ||
writebuf.buf.extend(&bytes[..actual_write_size]); | ||
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len()); | ||
|
||
if available_space == 0 { | ||
// Blocking socketpair with a full buffer. | ||
let dest = dest.clone(); | ||
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread()); | ||
ecx.block_thread( | ||
BlockReason::UnnamedSocket, | ||
None, | ||
callback!( | ||
@capture<'tcx> { | ||
weak_self_ref: WeakFileDescriptionRef, | ||
ptr: Pointer, | ||
len: usize, | ||
dest: MPlaceTy<'tcx>, | ||
} | ||
@unblock = |this| { | ||
anonsocket_write(weak_self_ref, ptr, len, dest, this) | ||
} | ||
), | ||
); | ||
} else { | ||
let mut writebuf = writebuf.borrow_mut(); | ||
// Remember this clock so `read` can synchronize with us. | ||
ecx.release_clock(|clock| { | ||
writebuf.clock.join(clock); | ||
}); | ||
// Do full write / partial write based on the space available. | ||
let actual_write_size = len.min(available_space); | ||
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?; | ||
writebuf.buf.extend(&bytes[..actual_write_size]); | ||
|
||
// Need to stop accessing peer_fd so that it can be notified. | ||
drop(writebuf); | ||
// Need to stop accessing peer_fd so that it can be notified. | ||
drop(writebuf); | ||
|
||
// Notification should be provided for peer fd as it became readable. | ||
// The kernel does this even if the fd was already readable before, so we follow suit. | ||
ecx.check_and_update_readiness(peer_fd)?; | ||
// Notification should be provided for peer fd as it became readable. | ||
// The kernel does this even if the fd was already readable before, so we follow suit. | ||
ecx.check_and_update_readiness(&peer_fd)?; | ||
let peer_anonsocket = peer_fd.downcast::<AnonSocket>().unwrap(); | ||
// Unblock all threads that are currently blocked on peer_fd's read. | ||
Comment on lines
+224
to
+225
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
let waiting_threads = std::mem::take(&mut *peer_anonsocket.blocked_read_tid.borrow_mut()); | ||
// FIXME: We can randomize the order of unblocking. | ||
for thread_id in waiting_threads { | ||
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?; | ||
} | ||
|
||
ecx.return_write_success(actual_write_size, dest) | ||
return ecx.return_write_success(actual_write_size, &dest); | ||
} | ||
interp_ok(()) | ||
} | ||
|
||
/// Read from AnonSocket and return the number of bytes read. | ||
fn anonsocket_read<'tcx>( | ||
anonsocket: &AnonSocket, | ||
peer_fd: Option<FileDescriptionRef>, | ||
weak_self_ref: WeakFileDescriptionRef, | ||
len: usize, | ||
ptr: Pointer, | ||
dest: &MPlaceTy<'tcx>, | ||
dest: MPlaceTy<'tcx>, | ||
ecx: &mut MiriInterpCx<'tcx>, | ||
) -> InterpResult<'tcx> { | ||
let mut bytes = vec![0; len]; | ||
let Some(self_ref) = weak_self_ref.upgrade() else { | ||
// FIXME: We should raise a deadlock error if the self_ref upgrade failed. | ||
throw_unsup_format!("This will be a deadlock error in future") | ||
}; | ||
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap(); | ||
|
||
let Some(readbuf) = &anonsocket.readbuf else { | ||
let Some(readbuf) = &self_anonsocket.readbuf else { | ||
// FIXME: This should return EBADF, but there's no nice way to do that as there's no | ||
// corresponding ErrorKind variant. | ||
throw_unsup_format!("reading from the write end of a pipe") | ||
}; | ||
let mut readbuf = readbuf.borrow_mut(); | ||
|
||
// Synchronize with all previous writes to this buffer. | ||
// FIXME: this over-synchronizes; a more precise approach would be to | ||
// only sync with the writes whose data we will read. | ||
ecx.acquire_clock(&readbuf.clock); | ||
|
||
// Do full read / partial read based on the space available. | ||
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior. | ||
let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap(); | ||
|
||
// Need to drop before others can access the readbuf again. | ||
drop(readbuf); | ||
|
||
// A notification should be provided for the peer file description even when it can | ||
// only write 1 byte. This implementation is not compliant with the actual Linux kernel | ||
// implementation. For optimization reasons, the kernel will only mark the file description | ||
// as "writable" when it can write more than a certain number of bytes. Since we | ||
// don't know what that *certain number* is, we will provide a notification every time | ||
// a read is successful. This might result in our epoll emulation providing more | ||
// notifications than the real system. | ||
if let Some(peer_fd) = peer_fd { | ||
ecx.check_and_update_readiness(&peer_fd)?; | ||
} | ||
|
||
ecx.return_read_success(ptr, &bytes, actual_read_size, dest) | ||
if readbuf.borrow_mut().buf.is_empty() { | ||
if self_anonsocket.peer_fd().upgrade().is_none() { | ||
// Socketpair with no peer and empty buffer. | ||
// 0 bytes successfully read indicates end-of-file. | ||
return ecx.return_read_success(ptr, &[], 0, &dest); | ||
} else { | ||
// Blocking socketpair with writer and empty buffer. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we know it is blocking? If that's a precondition, please add an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yea, I should add a check here. The general idea is in #4072 (comment), since |
||
let weak_self_ref = weak_self_ref.clone(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you have to clone this reference? Seems like you should move it. |
||
self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread()); | ||
ecx.block_thread( | ||
BlockReason::UnnamedSocket, | ||
None, | ||
callback!( | ||
@capture<'tcx> { | ||
weak_self_ref: WeakFileDescriptionRef, | ||
len: usize, | ||
ptr: Pointer, | ||
dest: MPlaceTy<'tcx>, | ||
} | ||
@unblock = |this| { | ||
anonsocket_read(weak_self_ref, len, ptr, dest, this) | ||
} | ||
), | ||
); | ||
} | ||
} else { | ||
let mut bytes = vec![0; len]; | ||
let mut readbuf = readbuf.borrow_mut(); | ||
// Synchronize with all previous writes to this buffer. | ||
// FIXME: this over-synchronizes; a more precise approach would be to | ||
// only sync with the writes whose data we will read. | ||
ecx.acquire_clock(&readbuf.clock); | ||
|
||
// Do full read / partial read based on the space available. | ||
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior. | ||
let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap(); | ||
|
||
// Need to drop before others can access the readbuf again. | ||
drop(readbuf); | ||
|
||
// A notification should be provided for the peer file description even when it can | ||
// only write 1 byte. This implementation is not compliant with the actual Linux kernel | ||
// implementation. For optimization reasons, the kernel will only mark the file description | ||
// as "writable" when it can write more than a certain number of bytes. Since we | ||
// don't know what that *certain number* is, we will provide a notification every time | ||
// a read is successful. This might result in our epoll emulation providing more | ||
// notifications than the real system. | ||
if let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() { | ||
ecx.check_and_update_readiness(&peer_fd)?; | ||
let peer_anonsocket = peer_fd.downcast::<AnonSocket>().unwrap(); | ||
// Unblock all threads that are currently blocked on peer_fd's write. | ||
Comment on lines
+306
to
+307
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
let waiting_threads = | ||
std::mem::take(&mut *peer_anonsocket.blocked_write_tid.borrow_mut()); | ||
// FIXME: We can randomize the order of unblocking. | ||
for thread_id in waiting_threads { | ||
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?; | ||
} | ||
}; | ||
|
||
return ecx.return_read_success(ptr, &bytes, actual_read_size, &dest); | ||
} | ||
interp_ok(()) | ||
} | ||
|
||
impl UnixFileDescription for AnonSocket { | ||
|
@@ -360,12 +429,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { | |
readbuf: Some(RefCell::new(Buffer::new())), | ||
peer_fd: OnceCell::new(), | ||
peer_lost_data: Cell::new(false), | ||
blocked_read_tid: RefCell::new(Vec::new()), | ||
blocked_write_tid: RefCell::new(Vec::new()), | ||
is_nonblock: is_sock_nonblock, | ||
}); | ||
let fd1 = fds.new_ref(AnonSocket { | ||
readbuf: Some(RefCell::new(Buffer::new())), | ||
peer_fd: OnceCell::new(), | ||
peer_lost_data: Cell::new(false), | ||
blocked_read_tid: RefCell::new(Vec::new()), | ||
blocked_write_tid: RefCell::new(Vec::new()), | ||
is_nonblock: is_sock_nonblock, | ||
}); | ||
|
||
|
@@ -424,12 +497,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { | |
readbuf: Some(RefCell::new(Buffer::new())), | ||
peer_fd: OnceCell::new(), | ||
peer_lost_data: Cell::new(false), | ||
blocked_read_tid: RefCell::new(Vec::new()), | ||
blocked_write_tid: RefCell::new(Vec::new()), | ||
is_nonblock, | ||
}); | ||
let fd1 = fds.new_ref(AnonSocket { | ||
readbuf: None, | ||
peer_fd: OnceCell::new(), | ||
peer_lost_data: Cell::new(false), | ||
blocked_read_tid: RefCell::new(Vec::new()), | ||
blocked_write_tid: RefCell::new(Vec::new()), | ||
is_nonblock, | ||
}); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
//@ignore-target: windows # No libc socketpair on Windows | ||
//~^ERROR: deadlocked | ||
//~^^ERROR: deadlocked | ||
// test_race depends on a deterministic schedule. | ||
//@compile-flags: -Zmiri-preemption-rate=0 | ||
//@error-in-other-file: deadlock | ||
|
||
use std::thread; | ||
|
||
// Test the behaviour of a thread being blocked on read, get unblocked, then blocked again. | ||
|
||
// The expected execution is | ||
// 1. Thread 1 blocks. | ||
// 2. Thread 2 blocks. | ||
// 3. Thread 3 unblocks both thread 1 and thread 2. | ||
// 4. Thread 1 reads. | ||
// 5. Thread 2's `read` can never complete -> deadlocked. | ||
|
||
fn main() { | ||
let mut fds = [-1, -1]; | ||
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; | ||
assert_eq!(res, 0); | ||
let thread1 = thread::spawn(move || { | ||
// Let this thread block on read. | ||
let mut buf: [u8; 3] = [0; 3]; | ||
let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; | ||
assert_eq!(res, 3); | ||
assert_eq!(&buf, "abc".as_bytes()); | ||
}); | ||
let thread2 = thread::spawn(move || { | ||
// Let this thread block on read. | ||
let mut buf: [u8; 3] = [0; 3]; | ||
let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; | ||
//~^ERROR: deadlocked | ||
assert_eq!(res, 3); | ||
assert_eq!(&buf, "abc".as_bytes()); | ||
}); | ||
let thread3 = thread::spawn(move || { | ||
// Unblock thread1 by writing something. | ||
let data = "abc".as_bytes().as_ptr(); | ||
let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; | ||
assert_eq!(res, 3); | ||
}); | ||
thread1.join().unwrap(); | ||
thread2.join().unwrap(); | ||
thread3.join().unwrap(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The separation of concerns between read and anonsocket_read is a bit unclear. Why is this one case handled here and everything else handled below?
I think the code would be easier to follow if you moved this logic into anonsocket_read as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My idea is keeping the
non-blocking && error
logic inread
andwrite
. Inanonsocket_read
andanonsocket_write
, we either go through theblock_thread
path or the actualread/write
path.For now,
if readbuf.borrow().buf.is_empty() && self.is_nonblock
check will never be hit by blocking fd (the one withoutSOCK_NONBLOCK flag
) after unblocked, so I just kept them inread/write
.It is fine to move it in
anonsocket_read
too if it is clearer.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Splitting the logic across two functions leads to a very odd (and not documented!) precondition for
anonsocket_read
. It also means I have to scroll up and down to look at the entire logic. The logic really isn't that complicated so IMO it'd be better to have it in a single function where one can easily see the entire thing at a single glance.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense, I can combine them together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the new logic here is just wrong. When the buffer is empty, we have to first check
self_anonsocket.peer_fd().upgrade()
and then checkis_nonblock
.In the future please just keep the logic the way it is rather than trying to break it apart into two functions. :)