Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement blocking unnamed_socket #4072

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ pub enum BlockReason {
Epoll,
/// Blocked on eventfd.
Eventfd,
/// Blocked on unnamed_socket.
UnnamedSocket,
}

/// The state of a thread.
Expand Down
237 changes: 157 additions & 80 deletions src/shims/unix/unnamed_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ struct AnonSocket {
/// This flag is set to `true` if the peer's `readbuf` is non-empty at the time
/// of closure.
peer_lost_data: Cell<bool>,
/// A list of thread ids blocked because the buffer was empty.
/// Once another thread writes some bytes, these threads will be unblocked.
blocked_read_tid: RefCell<Vec<ThreadId>>,
/// A list of thread ids blocked because the buffer was full.
/// Once another thread reads some bytes, these threads will be unblocked.
blocked_write_tid: RefCell<Vec<ThreadId>>,
is_nonblock: bool,
}

Expand Down Expand Up @@ -83,7 +89,7 @@ impl FileDescription for AnonSocket {

fn read<'tcx>(
&self,
_self_ref: &FileDescriptionRef,
self_ref: &FileDescriptionRef,
_communicate_allowed: bool,
ptr: Pointer,
len: usize,
Expand All @@ -100,33 +106,21 @@ impl FileDescription for AnonSocket {
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe");
};
if readbuf.borrow().buf.is_empty() {
if self.peer_fd().upgrade().is_none() {
// Socketpair with no peer and empty buffer.
// 0 bytes successfully read indicates end-of-file.
return ecx.return_read_success(ptr, &[], 0, dest);
} else {
if self.is_nonblock {
// Non-blocking socketpair with writer and empty buffer.
// https://linux.die.net/man/2/read
// EAGAIN or EWOULDBLOCK can be returned for socket,
// POSIX.1-2001 allows either error to be returned for this case.
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
// Blocking socketpair with writer and empty buffer.
// FIXME: blocking is currently not supported
throw_unsup_format!("socketpair/pipe/pipe2 read: blocking isn't supported yet");
}
}

if readbuf.borrow().buf.is_empty() && self.is_nonblock {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The separation of concerns between read and anonsocket_read is a bit unclear. Why is this one case handled here and everything else handled below?

I think the code would be easier to follow if you moved this logic into anonsocket_read as well.

Copy link
Contributor Author

@tiif tiif Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea is keeping the non-blocking && error logic in read and write. In anonsocket_read and anonsocket_write, we either go through the block_thread path or the actual read/write path.

For now, if readbuf.borrow().buf.is_empty() && self.is_nonblock check will never be hit by blocking fd (the one without SOCK_NONBLOCK flag) after unblocked, so I just kept them in read/write.

It is fine to move it in anonsocket_read too if it is clearer.

Copy link
Member

@RalfJung RalfJung Dec 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splitting the logic across two functions leads to a very odd (and not documented!) precondition for anonsocket_read. It also means I have to scroll up and down to look at the entire logic. The logic really isn't that complicated so IMO it'd be better to have it in a single function where one can easily see the entire thing at a single glance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, I can combine them together.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the new logic here is just wrong. When the buffer is empty, we have to first check self_anonsocket.peer_fd().upgrade() and then check is_nonblock.

In the future please just keep the logic the way it is rather than trying to break it apart into two functions. :)

// Non-blocking socketpair with writer and empty buffer.
// https://linux.die.net/man/2/read
// EAGAIN or EWOULDBLOCK can be returned for socket,
// POSIX.1-2001 allows either error to be returned for this case.
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
// TODO: We might need to decide what to do if peer_fd is closed when read is blocked.
oli-obk marked this conversation as resolved.
Show resolved Hide resolved
anonsocket_read(self, self.peer_fd().upgrade(), len, ptr, dest, ecx)
tiif marked this conversation as resolved.
Show resolved Hide resolved
anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx)
}

fn write<'tcx>(
&self,
_self_ref: &FileDescriptionRef,
self_ref: &FileDescriptionRef,
_communicate_allowed: bool,
ptr: Pointer,
len: usize,
Expand All @@ -153,16 +147,11 @@ impl FileDescription for AnonSocket {
};
let available_space =
MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
Comment on lines 148 to 149
Copy link
Member

@RalfJung RalfJung Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not duplicate this logic (that same line now exists twice in this file, always a bad sign).

if available_space == 0 {
if self.is_nonblock {
// Non-blocking socketpair with a full buffer.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
// Blocking socketpair with a full buffer.
throw_unsup_format!("socketpair/pipe/pipe2 write: blocking isn't supported yet");
}
if available_space == 0 && self.is_nonblock {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be moved into anonsocket_write I think, again to keep all the logic in the same place.

// Non-blocking socketpair with a full buffer.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
anonsocket_write(available_space, &peer_fd, ptr, len, dest, ecx)
anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx)
}

fn as_unix(&self) -> &dyn UnixFileDescription {
Expand All @@ -172,81 +161,161 @@ impl FileDescription for AnonSocket {

/// Write to AnonSocket based on the space available and return the written byte size.
fn anonsocket_write<'tcx>(
available_space: usize,
peer_fd: &FileDescriptionRef,
weak_self_ref: WeakFileDescriptionRef,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(self_ref) = weak_self_ref.upgrade() else {
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
throw_unsup_format!("This will be a deadlock error in future")
};
Comment on lines +170 to +173
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is big enough, so I decided to temporarily make this throws unsupported format.

I wanted to write a test for this, and potentially tweak the diagnostic a bit if it is too confusing (because of the reason stated here).

But I certainly won't mind putting it in this PR if anyone prefers.

let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else {
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
// closed.
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, &dest);
Comment on lines +175 to +178
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really necessary to duplicate this check (it is both here and in write above)? I think that should be avoidable, so please reduce the code duplication.

Copy link
Contributor Author

@tiif tiif Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we can always do the check in anonsocket_write, and remove the one in write. This was still in write because I wanted to throw error as early as possible.

};
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe")
};
let mut writebuf = writebuf.borrow_mut();

// Remember this clock so `read` can synchronize with us.
ecx.release_clock(|clock| {
writebuf.clock.join(clock);
});
// Do full write / partial write based on the space available.
let actual_write_size = len.min(available_space);
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
writebuf.buf.extend(&bytes[..actual_write_size]);
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());

if available_space == 0 {
// Blocking socketpair with a full buffer.
let dest = dest.clone();
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
ecx.block_thread(
BlockReason::UnnamedSocket,
None,
callback!(
@capture<'tcx> {
weak_self_ref: WeakFileDescriptionRef,
ptr: Pointer,
len: usize,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
anonsocket_write(weak_self_ref, ptr, len, dest, this)
}
),
);
} else {
let mut writebuf = writebuf.borrow_mut();
// Remember this clock so `read` can synchronize with us.
ecx.release_clock(|clock| {
writebuf.clock.join(clock);
});
// Do full write / partial write based on the space available.
let actual_write_size = len.min(available_space);
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
writebuf.buf.extend(&bytes[..actual_write_size]);

// Need to stop accessing peer_fd so that it can be notified.
drop(writebuf);
// Need to stop accessing peer_fd so that it can be notified.
drop(writebuf);

// Notification should be provided for peer fd as it became readable.
// The kernel does this even if the fd was already readable before, so we follow suit.
ecx.check_and_update_readiness(peer_fd)?;
// Notification should be provided for peer fd as it became readable.
// The kernel does this even if the fd was already readable before, so we follow suit.
ecx.check_and_update_readiness(&peer_fd)?;
let peer_anonsocket = peer_fd.downcast::<AnonSocket>().unwrap();
// Unblock all threads that are currently blocked on peer_fd's read.
Comment on lines +224 to +225
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The let peer_anonsocket is already part of the "unblock all threads", right? So the comment should be moved up by 1 line.

let waiting_threads = std::mem::take(&mut *peer_anonsocket.blocked_read_tid.borrow_mut());
// FIXME: We can randomize the order of unblocking.
for thread_id in waiting_threads {
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
}

ecx.return_write_success(actual_write_size, dest)
return ecx.return_write_success(actual_write_size, &dest);
}
interp_ok(())
}

/// Read from AnonSocket and return the number of bytes read.
fn anonsocket_read<'tcx>(
anonsocket: &AnonSocket,
peer_fd: Option<FileDescriptionRef>,
weak_self_ref: WeakFileDescriptionRef,
len: usize,
ptr: Pointer,
dest: &MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let mut bytes = vec![0; len];
let Some(self_ref) = weak_self_ref.upgrade() else {
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
throw_unsup_format!("This will be a deadlock error in future")
};
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();

let Some(readbuf) = &anonsocket.readbuf else {
let Some(readbuf) = &self_anonsocket.readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe")
};
let mut readbuf = readbuf.borrow_mut();

// Synchronize with all previous writes to this buffer.
// FIXME: this over-synchronizes; a more precise approach would be to
// only sync with the writes whose data we will read.
ecx.acquire_clock(&readbuf.clock);

// Do full read / partial read based on the space available.
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap();

// Need to drop before others can access the readbuf again.
drop(readbuf);

// A notification should be provided for the peer file description even when it can
// only write 1 byte. This implementation is not compliant with the actual Linux kernel
// implementation. For optimization reasons, the kernel will only mark the file description
// as "writable" when it can write more than a certain number of bytes. Since we
// don't know what that *certain number* is, we will provide a notification every time
// a read is successful. This might result in our epoll emulation providing more
// notifications than the real system.
if let Some(peer_fd) = peer_fd {
ecx.check_and_update_readiness(&peer_fd)?;
}

ecx.return_read_success(ptr, &bytes, actual_read_size, dest)
if readbuf.borrow_mut().buf.is_empty() {
if self_anonsocket.peer_fd().upgrade().is_none() {
// Socketpair with no peer and empty buffer.
// 0 bytes successfully read indicates end-of-file.
return ecx.return_read_success(ptr, &[], 0, &dest);
} else {
// Blocking socketpair with writer and empty buffer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know it is blocking?

If that's a precondition, please add an assert! further up this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yea, I should add a check here. The general idea is in #4072 (comment), since readbuf.borrow().buf.is_empty() && self.is_nonblock is already handled in read, this is always blocking. It is also impossible to change an fd from blocking type to non-blocking type for now. But it is likely that I will move the checks around, I will add it if it is still necessary.

let weak_self_ref = weak_self_ref.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you have to clone this reference? Seems like you should move it.

self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread());
ecx.block_thread(
BlockReason::UnnamedSocket,
None,
callback!(
@capture<'tcx> {
weak_self_ref: WeakFileDescriptionRef,
len: usize,
ptr: Pointer,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
anonsocket_read(weak_self_ref, len, ptr, dest, this)
}
),
);
}
} else {
let mut bytes = vec![0; len];
let mut readbuf = readbuf.borrow_mut();
// Synchronize with all previous writes to this buffer.
// FIXME: this over-synchronizes; a more precise approach would be to
// only sync with the writes whose data we will read.
ecx.acquire_clock(&readbuf.clock);

// Do full read / partial read based on the space available.
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap();

// Need to drop before others can access the readbuf again.
drop(readbuf);

// A notification should be provided for the peer file description even when it can
// only write 1 byte. This implementation is not compliant with the actual Linux kernel
// implementation. For optimization reasons, the kernel will only mark the file description
// as "writable" when it can write more than a certain number of bytes. Since we
// don't know what that *certain number* is, we will provide a notification every time
// a read is successful. This might result in our epoll emulation providing more
// notifications than the real system.
if let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() {
ecx.check_and_update_readiness(&peer_fd)?;
let peer_anonsocket = peer_fd.downcast::<AnonSocket>().unwrap();
// Unblock all threads that are currently blocked on peer_fd's write.
Comment on lines +306 to +307
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The let peer_anonsocket is already part of the "unblock all threads", right? So the comment should be moved up by 1 line.

let waiting_threads =
std::mem::take(&mut *peer_anonsocket.blocked_write_tid.borrow_mut());
// FIXME: We can randomize the order of unblocking.
for thread_id in waiting_threads {
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
}
};

return ecx.return_read_success(ptr, &bytes, actual_read_size, &dest);
}
interp_ok(())
}

impl UnixFileDescription for AnonSocket {
Expand Down Expand Up @@ -360,12 +429,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
readbuf: Some(RefCell::new(Buffer::new())),
peer_fd: OnceCell::new(),
peer_lost_data: Cell::new(false),
blocked_read_tid: RefCell::new(Vec::new()),
blocked_write_tid: RefCell::new(Vec::new()),
is_nonblock: is_sock_nonblock,
});
let fd1 = fds.new_ref(AnonSocket {
readbuf: Some(RefCell::new(Buffer::new())),
peer_fd: OnceCell::new(),
peer_lost_data: Cell::new(false),
blocked_read_tid: RefCell::new(Vec::new()),
blocked_write_tid: RefCell::new(Vec::new()),
is_nonblock: is_sock_nonblock,
});

Expand Down Expand Up @@ -424,12 +497,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
readbuf: Some(RefCell::new(Buffer::new())),
peer_fd: OnceCell::new(),
peer_lost_data: Cell::new(false),
blocked_read_tid: RefCell::new(Vec::new()),
blocked_write_tid: RefCell::new(Vec::new()),
is_nonblock,
});
let fd1 = fds.new_ref(AnonSocket {
readbuf: None,
peer_fd: OnceCell::new(),
peer_lost_data: Cell::new(false),
blocked_read_tid: RefCell::new(Vec::new()),
blocked_write_tid: RefCell::new(Vec::new()),
is_nonblock,
});

Expand Down
47 changes: 47 additions & 0 deletions tests/fail-dep/libc/socketpair_block_read_twice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//@ignore-target: windows # No libc socketpair on Windows
//~^ERROR: deadlocked
//~^^ERROR: deadlocked
// test_race depends on a deterministic schedule.
//@compile-flags: -Zmiri-preemption-rate=0
//@error-in-other-file: deadlock

use std::thread;

// Test the behaviour of a thread being blocked on read, get unblocked, then blocked again.

// The expected execution is
// 1. Thread 1 blocks.
// 2. Thread 2 blocks.
// 3. Thread 3 unblocks both thread 1 and thread 2.
// 4. Thread 1 reads.
// 5. Thread 2's `read` can never complete -> deadlocked.

fn main() {
let mut fds = [-1, -1];
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
assert_eq!(res, 0);
let thread1 = thread::spawn(move || {
// Let this thread block on read.
let mut buf: [u8; 3] = [0; 3];
let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
assert_eq!(res, 3);
assert_eq!(&buf, "abc".as_bytes());
});
let thread2 = thread::spawn(move || {
// Let this thread block on read.
let mut buf: [u8; 3] = [0; 3];
let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
//~^ERROR: deadlocked
assert_eq!(res, 3);
assert_eq!(&buf, "abc".as_bytes());
});
let thread3 = thread::spawn(move || {
// Unblock thread1 by writing something.
let data = "abc".as_bytes().as_ptr();
let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) };
assert_eq!(res, 3);
});
thread1.join().unwrap();
thread2.join().unwrap();
thread3.join().unwrap();
}
Loading
Loading