Source code

Revision control

Other Tools

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "mozilla/SnappyUncompressInputStream.h"
#include <algorithm>
#include "nsIAsyncInputStream.h"
#include "nsStreamUtils.h"
#include "snappy/snappy.h"
namespace mozilla {
NS_IMPL_ISUPPORTS(SnappyUncompressInputStream, nsIInputStream);
// Putting kCompressedBufferLength inside a function avoids a static
// constructor.
static size_t CompressedBufferLength() {
static size_t kCompressedBufferLength =
detail::SnappyFrameUtils::MaxCompressedBufferLength(snappy::kBlockSize);
MOZ_ASSERT(kCompressedBufferLength > 0);
return kCompressedBufferLength;
}
SnappyUncompressInputStream::SnappyUncompressInputStream(
nsIInputStream* aBaseStream)
: mBaseStream(aBaseStream),
mUncompressedBytes(0),
mNextByte(0),
mNextChunkType(Unknown),
mNextChunkDataLength(0),
mNeedFirstStreamIdentifier(true) {
// This implementation only supports sync base streams. Verify this in debug
// builds. Note, this is a bit complicated because the streams we support
// advertise different capabilities:
// - nsFileInputStream - blocking and sync
// - nsStringInputStream - non-blocking and sync
// - nsPipeInputStream - can be blocking, but provides async interface
#ifdef DEBUG
bool baseNonBlocking;
nsresult rv = mBaseStream->IsNonBlocking(&baseNonBlocking);
MOZ_ASSERT(NS_SUCCEEDED(rv));
if (baseNonBlocking) {
nsCOMPtr<nsIAsyncInputStream> async = do_QueryInterface(mBaseStream);
MOZ_ASSERT(!async);
}
#endif
}
NS_IMETHODIMP
SnappyUncompressInputStream::Close() {
if (!mBaseStream) {
return NS_OK;
}
mBaseStream->Close();
mBaseStream = nullptr;
mUncompressedBuffer = nullptr;
mCompressedBuffer = nullptr;
return NS_OK;
}
NS_IMETHODIMP
SnappyUncompressInputStream::Available(uint64_t* aLengthOut) {
if (!mBaseStream) {
return NS_BASE_STREAM_CLOSED;
}
// If we have uncompressed bytes, then we are done.
*aLengthOut = UncompressedLength();
if (*aLengthOut > 0) {
return NS_OK;
}
// Otherwise, attempt to uncompress bytes until we get something or the
// underlying stream is drained. We loop here because some chunks can
// be StreamIdentifiers, padding, etc with no data.
uint32_t bytesRead;
do {
nsresult rv = ParseNextChunk(&bytesRead);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
*aLengthOut = UncompressedLength();
} while (*aLengthOut == 0 && bytesRead);
return NS_OK;
}
NS_IMETHODIMP
SnappyUncompressInputStream::Read(char* aBuf, uint32_t aCount,
uint32_t* aBytesReadOut) {
return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aBytesReadOut);
}
NS_IMETHODIMP
SnappyUncompressInputStream::ReadSegments(nsWriteSegmentFun aWriter,
void* aClosure, uint32_t aCount,
uint32_t* aBytesReadOut) {
*aBytesReadOut = 0;
if (!mBaseStream) {
return NS_BASE_STREAM_CLOSED;
}
nsresult rv;
// Do not try to use the base stream's ReadSegements here. Its very
// unlikely we will get a single buffer that contains all of the compressed
// data and therefore would have to copy into our own buffer anyways.
// Instead, focus on making efficient use of the Read() interface.
while (aCount > 0) {
// We have some decompressed data in our buffer. Provide it to the
// callers writer function.
if (mUncompressedBytes > 0) {
MOZ_ASSERT(mUncompressedBuffer);
uint32_t remaining = UncompressedLength();
uint32_t numToWrite = std::min(aCount, remaining);
uint32_t numWritten;
rv = aWriter(this, aClosure, &mUncompressedBuffer[mNextByte],
*aBytesReadOut, numToWrite, &numWritten);
// As defined in nsIInputputStream.idl, do not pass writer func errors.
if (NS_FAILED(rv)) {
return NS_OK;
}
// End-of-file
if (numWritten == 0) {
return NS_OK;
}
*aBytesReadOut += numWritten;
mNextByte += numWritten;
MOZ_ASSERT(mNextByte <= mUncompressedBytes);
if (mNextByte == mUncompressedBytes) {
mNextByte = 0;
mUncompressedBytes = 0;
}
aCount -= numWritten;
continue;
}
// Otherwise uncompress the next chunk and loop. Any resulting data
// will set mUncompressedBytes which we check at the top of the loop.
uint32_t bytesRead;
rv = ParseNextChunk(&bytesRead);
if (NS_FAILED(rv)) {
return rv;
}
// If we couldn't read anything and there is no more data to provide
// to the caller, then this is eof.
if (bytesRead == 0 && mUncompressedBytes == 0) {
return NS_OK;
}
}
return NS_OK;
}
NS_IMETHODIMP
SnappyUncompressInputStream::IsNonBlocking(bool* aNonBlockingOut) {
*aNonBlockingOut = false;
return NS_OK;
}
SnappyUncompressInputStream::~SnappyUncompressInputStream() { Close(); }
nsresult SnappyUncompressInputStream::ParseNextChunk(uint32_t* aBytesReadOut) {
// There must not be any uncompressed data already in mUncompressedBuffer.
MOZ_ASSERT(mUncompressedBytes == 0);
MOZ_ASSERT(mNextByte == 0);
nsresult rv;
*aBytesReadOut = 0;
// Lazily create our two buffers so we can report OOM during stream
// operation. These allocations only happens once. The buffers are reused
// until the stream is closed.
if (!mUncompressedBuffer) {
mUncompressedBuffer.reset(new (fallible) char[snappy::kBlockSize]);
if (NS_WARN_IF(!mUncompressedBuffer)) {
return NS_ERROR_OUT_OF_MEMORY;
}
}
if (!mCompressedBuffer) {
mCompressedBuffer.reset(new (fallible) char[CompressedBufferLength()]);
if (NS_WARN_IF(!mCompressedBuffer)) {
return NS_ERROR_OUT_OF_MEMORY;
}
}
// We have no decompressed data and we also have not seen the start of stream
// yet. Read and validate the StreamIdentifier chunk. Also read the next
// header to determine the size of the first real data chunk.
if (mNeedFirstStreamIdentifier) {
const uint32_t firstReadLength =
kHeaderLength + kStreamIdentifierDataLength + kHeaderLength;
MOZ_ASSERT(firstReadLength <= CompressedBufferLength());
rv = ReadAll(mCompressedBuffer.get(), firstReadLength, firstReadLength,
aBytesReadOut);
if (NS_WARN_IF(NS_FAILED(rv)) || *aBytesReadOut == 0) {
return rv;
}
rv = ParseHeader(mCompressedBuffer.get(), kHeaderLength, &mNextChunkType,
&mNextChunkDataLength);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
if (NS_WARN_IF(mNextChunkType != StreamIdentifier ||
mNextChunkDataLength != kStreamIdentifierDataLength)) {
return NS_ERROR_CORRUPTED_CONTENT;
}
size_t offset = kHeaderLength;
mNeedFirstStreamIdentifier = false;
size_t numRead;
size_t numWritten;
rv = ParseData(mUncompressedBuffer.get(), snappy::kBlockSize,
mNextChunkType, &mCompressedBuffer[offset],
mNextChunkDataLength, &numWritten, &numRead);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
MOZ_ASSERT(numWritten == 0);
MOZ_ASSERT(numRead == mNextChunkDataLength);
offset += numRead;
rv = ParseHeader(&mCompressedBuffer[offset], *aBytesReadOut - offset,
&mNextChunkType, &mNextChunkDataLength);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
return NS_OK;
}
// We have no compressed data and we don't know how big the next chunk is.
// This happens when we get an EOF pause in the middle of a stream and also
// at the end of the stream. Simply read the next header and return. The
// chunk body will be read on the next entry into this method.
if (mNextChunkType == Unknown) {
rv = ReadAll(mCompressedBuffer.get(), kHeaderLength, kHeaderLength,
aBytesReadOut);
if (NS_WARN_IF(NS_FAILED(rv)) || *aBytesReadOut == 0) {
return rv;
}
rv = ParseHeader(mCompressedBuffer.get(), kHeaderLength, &mNextChunkType,
&mNextChunkDataLength);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
return NS_OK;
}
// We have no decompressed data, but we do know the size of the next chunk.
// Read at least that much from the base stream.
uint32_t readLength = mNextChunkDataLength;
MOZ_ASSERT(readLength <= CompressedBufferLength());
// However, if there is enough data in the base stream, also read the next
// chunk header. This helps optimize the stream by avoiding many small reads.
uint64_t avail;
rv = mBaseStream->Available(&avail);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
if (avail >= (readLength + kHeaderLength)) {
readLength += kHeaderLength;
MOZ_ASSERT(readLength <= CompressedBufferLength());
}
rv = ReadAll(mCompressedBuffer.get(), readLength, mNextChunkDataLength,
aBytesReadOut);
if (NS_WARN_IF(NS_FAILED(rv)) || *aBytesReadOut == 0) {
return rv;
}
size_t numRead;
size_t numWritten;
rv = ParseData(mUncompressedBuffer.get(), snappy::kBlockSize, mNextChunkType,
mCompressedBuffer.get(), mNextChunkDataLength, &numWritten,
&numRead);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
MOZ_ASSERT(numRead == mNextChunkDataLength);
mUncompressedBytes = numWritten;
// If we were unable to directly read the next chunk header, then clear
// our internal state. We will have to perform a small read to get the
// header the next time we enter this method.
if (*aBytesReadOut <= mNextChunkDataLength) {
mNextChunkType = Unknown;
mNextChunkDataLength = 0;
return NS_OK;
}
// We got the next chunk header. Parse it so that we are ready to for the
// next call into this method.
rv = ParseHeader(&mCompressedBuffer[numRead], *aBytesReadOut - numRead,
&mNextChunkType, &mNextChunkDataLength);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
return NS_OK;
}
nsresult SnappyUncompressInputStream::ReadAll(char* aBuf, uint32_t aCount,
uint32_t aMinValidCount,
uint32_t* aBytesReadOut) {
MOZ_ASSERT(aCount >= aMinValidCount);
*aBytesReadOut = 0;
if (!mBaseStream) {
return NS_BASE_STREAM_CLOSED;
}
uint32_t offset = 0;
while (aCount > 0) {
uint32_t bytesRead = 0;
nsresult rv = mBaseStream->Read(aBuf + offset, aCount, &bytesRead);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
// EOF, but don't immediately return. We need to validate min read bytes
// below.
if (bytesRead == 0) {
break;
}
*aBytesReadOut += bytesRead;
offset += bytesRead;
aCount -= bytesRead;
}
// Reading zero bytes is not an error. Its the expected EOF condition.
// Only compare to the minimum valid count if we read at least one byte.
if (*aBytesReadOut != 0 && *aBytesReadOut < aMinValidCount) {
return NS_ERROR_CORRUPTED_CONTENT;
}
return NS_OK;
}
size_t SnappyUncompressInputStream::UncompressedLength() const {
MOZ_ASSERT(mNextByte <= mUncompressedBytes);
return mUncompressedBytes - mNextByte;
}
} // namespace mozilla