Source code

Revision control

Copy as Markdown

Other Tools

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "AsyncPlatformPipes.h"
#include "base/eintr_wrapper.h"
#include "base/message_loop.h"
#include "mozilla/EventTargetCapability.h"
#include "mozilla/UniquePtr.h"
#include "nsStreamUtils.h"
#include "nsThreadUtils.h"
#include "nsXULAppAPI.h"
#ifndef XP_WIN
# include <errno.h>
# include <fcntl.h>
# include <sys/stat.h>
# include <unistd.h>
#endif
namespace mozilla {
namespace platform_pipe_detail {
class PlatformPipeLink
#ifdef XP_WIN
: public MessageLoopForIO::IOHandler
#else
: public MessageLoopForIO::Watcher
#endif
{
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(PlatformPipeLink)
public:
PlatformPipeLink(UniqueFileHandle aHandle, uint32_t aBufferSize);
void CloseLocked(nsresult aStatus) MOZ_REQUIRES(mMutex);
// Dispatch notification of mCallback to another thread.
//
// NOTE: We intentionally do not notify directly on the current thread, even
// if no event target is provided, as we don't want user code running on the
// IPC I/O thread as it is very latency sensitive.
void DispatchNotify() MOZ_REQUIRES(mMutex);
void AdvanceIO() MOZ_EXCLUDES(mMutex) MOZ_REQUIRES(mIOThread);
#ifdef XP_WIN
void OnIOCompleted(MessageLoopForIO::IOContext* aContext,
DWORD aBytesTransferred, DWORD aError) override;
#else
void OnFileCanReadWithoutBlocking(int fd) override;
void OnFileCanWriteWithoutBlocking(int fd) override;
#endif
const EventTargetCapability<nsISerialEventTarget> mIOThread;
Mutex mMutex{"PlatformPipeReader"};
UniqueFileHandle mHandle MOZ_GUARDED_BY(mIOThread);
const UniquePtr<char[]> mBuffer;
const uint32_t mBufferSize;
bool mProcessingSegment MOZ_GUARDED_BY(mMutex) = false;
nsresult mStatus MOZ_GUARDED_BY(mMutex) = NS_OK;
// `mOffset` is the start of the readable region in the buffer, and
// `mAvailable` is its size.
uint32_t mOffset MOZ_GUARDED_BY(mMutex) = 0;
uint32_t mAvailable MOZ_GUARDED_BY(mMutex) = 0;
bool mCallbackClosureOnly MOZ_GUARDED_BY(mMutex) = false;
nsCOMPtr<nsIRunnable> mCallback MOZ_GUARDED_BY(mMutex);
nsCOMPtr<nsIEventTarget> mCallbackTarget MOZ_GUARDED_BY(mMutex);
// A reference keeping `this` alive while I/O is in-flight.
// This is particularly important on Windows where mBuffer needs to be kept
// alive while Overlapped IO is ongoing.
RefPtr<PlatformPipeLink> mPending MOZ_GUARDED_BY(mIOThread);
#ifdef XP_WIN
MessageLoopForIO::IOContext mIOContext MOZ_GUARDED_BY(mIOThread) = {};
#else
MessageLoopForIO::FileDescriptorWatcher mWatcher MOZ_GUARDED_BY(mIOThread);
#endif
private:
~PlatformPipeLink() = default;
};
PlatformPipeLink::PlatformPipeLink(UniqueFileHandle aHandle,
uint32_t aBufferSize)
: mIOThread(XRE_GetAsyncIOEventTarget()),
mHandle(std::move(aHandle)),
mBuffer(MakeUnique<char[]>(aBufferSize)),
mBufferSize(aBufferSize) {
MOZ_ASSERT(aBufferSize > 1, "invalid buffer size");
MOZ_ASSERT(mHandle, "invalid handle");
#if defined(DEBUG) && !defined(XP_WIN)
struct stat st{};
MOZ_ASSERT(fstat(mHandle.get(), &st) == 0 && !S_ISREG(st.st_mode),
"PlatformPipeLink does not support regular files");
MOZ_ASSERT(fcntl(mHandle.get(), F_GETFL) & O_NONBLOCK,
"PlatformPipeLink requires non-blocking file descriptors");
#endif
}
void PlatformPipeLink::CloseLocked(nsresult aStatus) {
if (NS_FAILED(mStatus)) {
return;
}
mStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus;
DispatchNotify();
mIOThread.Dispatch(NewRunnableMethod("PlatformPipeLink::AdvanceIO", this,
&PlatformPipeLink::AdvanceIO));
}
void PlatformPipeLink::DispatchNotify() {
nsCOMPtr<nsIRunnable> callback = mCallback.forget();
nsCOMPtr<nsIEventTarget> target = mCallbackTarget.forget();
if (!callback) {
return;
}
if (target) {
target->Dispatch(callback.forget());
} else {
NS_DispatchBackgroundTask(callback.forget());
}
}
void PlatformPipeLink::AdvanceIO() {
if (!mHandle) {
return;
}
MutexAutoLock lock(mMutex);
if (NS_FAILED(mStatus)) {
if (mPending) {
#ifdef XP_WIN
// If we still have pending I/O, cancel it. We don't clear mPending,
// as we need to keep our buffers alive until the cancelled I/O
// completes.
CancelIo(mHandle.get());
#else
// On posix, immediately cancel our pending I/O by clearing the
// watcher.
mWatcher.StopWatchingFileDescriptor();
mPending = nullptr;
#endif
}
// The `close` operation can be slow and perform blocking I/O, so we want to
// avoid performing it on the IPC I/O thread. Instead, we dispatch it to a
// blocking background task.
NS_DispatchBackgroundTask(
NS_NewRunnableFunction(
"PlatformPipeLink::CloseHandle",
[handle = std::move(mHandle)]() mutable { handle = nullptr; }),
NS_DISPATCH_EVENT_MAY_BLOCK);
return;
}
// We still have outstanding I/O, or our buffer already has data waiting to
// be consumed. Either way, don't start new I/O.
if (mPending || mAvailable) {
return;
}
#ifdef XP_WIN
// On Windows, we need to register the IO handler the first time we're on the
// I/O thread.
if (!mIOContext.handler) {
MessageLoopForIO::current()->RegisterIOHandler(mHandle.get(), this);
mIOContext.handler = this;
}
BOOL ok = ReadFile(mHandle.get(), mBuffer.get(), mBufferSize, nullptr,
&mIOContext.overlapped);
if (!ok) {
DWORD error = GetLastError();
if (error == ERROR_IO_PENDING) {
mPending = this;
return;
}
if (error == ERROR_BROKEN_PIPE || error == ERROR_HANDLE_EOF) {
CloseLocked(NS_BASE_STREAM_CLOSED);
} else {
CloseLocked(NS_ERROR_FAILURE);
}
return;
}
mPending = this;
#else
ssize_t rv = HANDLE_EINTR(read(mHandle.get(), mBuffer.get(), mBufferSize));
if (rv > 0) {
mOffset = 0;
mAvailable = static_cast<uint32_t>(rv);
if (!mCallbackClosureOnly) {
DispatchNotify();
}
return;
}
if (rv == 0) {
CloseLocked(NS_BASE_STREAM_CLOSED);
return;
}
if (errno == EAGAIN
# if EWOULDBLOCK != EAGAIN
|| errno == EWOULDBLOCK
# endif
) {
if (MessageLoopForIO::current()->WatchFileDescriptor(
mHandle.get(), false, MessageLoopForIO::WATCH_READ, &mWatcher,
this)) {
mPending = this;
return;
}
}
CloseLocked(NS_ERROR_FAILURE);
#endif
}
#ifdef XP_WIN
void PlatformPipeLink::OnIOCompleted(MessageLoopForIO::IOContext* aContext,
DWORD aBytesTransferred, DWORD aError) {
mIOThread.AssertOnCurrentThread();
if (aContext != &mIOContext) {
return;
}
RefPtr<PlatformPipeLink> pending = mPending.forget();
if (!pending) {
return;
}
MutexAutoLock lock(mMutex);
if (NS_FAILED(mStatus)) {
return;
}
if (aError != ERROR_SUCCESS) {
if (aError == ERROR_BROKEN_PIPE || aError == ERROR_HANDLE_EOF ||
aError == ERROR_OPERATION_ABORTED) {
CloseLocked(NS_BASE_STREAM_CLOSED);
} else {
CloseLocked(NS_ERROR_FAILURE);
}
return;
}
if (aBytesTransferred == 0) {
CloseLocked(NS_BASE_STREAM_CLOSED);
return;
}
mOffset = 0;
mAvailable = aBytesTransferred;
if (!mCallbackClosureOnly) {
DispatchNotify();
}
}
#else
void PlatformPipeLink::OnFileCanReadWithoutBlocking(int fd) {
mIOThread.AssertOnCurrentThread();
RefPtr<PlatformPipeLink> pending = mPending.forget();
AdvanceIO();
}
void PlatformPipeLink::OnFileCanWriteWithoutBlocking(int fd) {
MOZ_ASSERT_UNREACHABLE();
}
#endif
} // namespace platform_pipe_detail
//-----------------------------------------------------------------------------
// PlatformPipeReader
//-----------------------------------------------------------------------------
NS_IMPL_ISUPPORTS(PlatformPipeReader, nsIInputStream, nsIAsyncInputStream)
PlatformPipeReader::PlatformPipeReader(UniqueFileHandle aHandle,
uint32_t aBufferSize)
: mLink(new platform_pipe_detail::PlatformPipeLink(std::move(aHandle),
aBufferSize)) {}
PlatformPipeReader::~PlatformPipeReader() { Close(); }
NS_IMETHODIMP PlatformPipeReader::Close() {
return CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
NS_IMETHODIMP PlatformPipeReader::Available(uint64_t* aAvailable) {
MutexAutoLock lock(mLink->mMutex);
if (NS_FAILED(mLink->mStatus)) {
return mLink->mStatus;
}
*aAvailable = mLink->mAvailable;
return NS_OK;
}
NS_IMETHODIMP PlatformPipeReader::StreamStatus() {
MutexAutoLock lock(mLink->mMutex);
return mLink->mStatus;
}
NS_IMETHODIMP PlatformPipeReader::Read(char* aBuf, uint32_t aCount,
uint32_t* aReadCount) {
return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aReadCount);
}
NS_IMETHODIMP PlatformPipeReader::ReadSegments(nsWriteSegmentFun aWriter,
void* aClosure, uint32_t aCount,
uint32_t* aReadCount) {
*aReadCount = 0;
MutexAutoLock lock(mLink->mMutex);
if (NS_FAILED(mLink->mStatus)) {
return mLink->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mLink->mStatus;
}
if (!mLink->mAvailable) {
return NS_BASE_STREAM_WOULD_BLOCK;
}
MOZ_RELEASE_ASSERT(!mLink->mProcessingSegment,
"Only one thread may be processing a segment at a time");
char* start = mLink->mBuffer.get() + mLink->mOffset;
uint32_t length = std::min(aCount, mLink->mAvailable);
mLink->mProcessingSegment = true;
{
MutexAutoUnlock unlock(mLink->mMutex);
nsresult rv = aWriter(this, aClosure, start, 0, length, aReadCount);
if (NS_FAILED(rv)) {
*aReadCount = 0;
}
MOZ_RELEASE_ASSERT(*aReadCount <= length);
}
mLink->mProcessingSegment = false;
mLink->mOffset += *aReadCount;
mLink->mAvailable -= *aReadCount;
// If a closure-only callback is armed, the caller isn't listening for new
// data, so only the IO thread is able to notice the peer closing. Re-kick
// AdvanceIO once the buffer has drained so a subsequent close is observed.
if (!mLink->mAvailable && mLink->mCallback && mLink->mCallbackClosureOnly) {
mLink->mIOThread.Dispatch(
NewRunnableMethod("PlatformPipeLink::AdvanceIO", mLink,
&platform_pipe_detail::PlatformPipeLink::AdvanceIO));
}
return NS_OK;
}
NS_IMETHODIMP PlatformPipeReader::IsNonBlocking(bool* aNonBlocking) {
*aNonBlocking = true;
return NS_OK;
}
NS_IMETHODIMP PlatformPipeReader::CloseWithStatus(nsresult aStatus) {
MutexAutoLock lock(mLink->mMutex);
MOZ_RELEASE_ASSERT(!mLink->mProcessingSegment,
"Cannot close pipe during ReadSegments callback");
mLink->CloseLocked(aStatus);
return NS_OK;
}
NS_IMETHODIMP PlatformPipeReader::AsyncWait(nsIInputStreamCallback* aCallback,
uint32_t aFlags,
uint32_t aRequestedCount,
nsIEventTarget* aTarget) {
MutexAutoLock lock(mLink->mMutex);
if (!aCallback) {
mLink->mCallback = nullptr;
mLink->mCallbackTarget = nullptr;
return NS_OK;
}
mLink->mCallback = NS_NewRunnableFunction(
"PlatformPipeReader::AsyncWait",
[self = RefPtr{this}, callback = RefPtr{aCallback}] {
callback->OnInputStreamReady(self);
});
mLink->mCallbackTarget = aTarget;
mLink->mCallbackClosureOnly = aFlags & WAIT_CLOSURE_ONLY;
if (NS_FAILED(mLink->mStatus) ||
(!mLink->mCallbackClosureOnly && mLink->mAvailable)) {
mLink->DispatchNotify();
} else {
mLink->mIOThread.Dispatch(
NewRunnableMethod("PlatformPipeLink::AdvanceIO", mLink,
&platform_pipe_detail::PlatformPipeLink::AdvanceIO));
}
return NS_OK;
}
} // namespace mozilla