Revision control

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
import Foundation
import Shared
import XCGLogger
open class SerializeRecordFailure<T: CleartextPayloadJSON>: MaybeErrorType, SyncPingFailureFormattable {
public let record: Record<T>
open var failureReasonName: SyncPingFailureReasonName {
return .otherError
}
open var description: String {
return "Failed to serialize record: \(record)"
}
public init(record: Record<T>) {
self.record = record
}
}
private let log = Logger.syncLogger
private typealias UploadRecord = (guid: GUID, payload: String, sizeBytes: Int)
public typealias DeferredResponse = Deferred<Maybe<StorageResponse<POSTResult>>>
typealias BatchUploadFunction = (_ lines: [String], _ ifUnmodifiedSince: Timestamp?, _ queryParams: [URLQueryItem]?) -> Deferred<Maybe<StorageResponse<POSTResult>>>
private let commitParam = URLQueryItem(name: "commit", value: "true")
private enum AccumulateRecordError: MaybeErrorType {
var description: String {
switch self {
case .full:
return "Batch or payload is full."
case .unknown:
return "Unknown errored while trying to accumulate records in batch"
}
}
case full(uploadOp: DeferredResponse)
case unknown
}
open class TooManyRecordsError: MaybeErrorType, SyncPingFailureFormattable {
open var description: String {
return "Trying to send too many records in a single batch."
}
open var failureReasonName: SyncPingFailureReasonName {
return .otherError
}
}
open class RecordsFailedToUpload: MaybeErrorType, SyncPingFailureFormattable {
open var description: String {
return "Some records failed to upload"
}
open var failureReasonName: SyncPingFailureReasonName {
return .otherError
}
}
open class Sync15BatchClient<T: CleartextPayloadJSON> {
fileprivate(set) var ifUnmodifiedSince: Timestamp?
fileprivate let config: InfoConfiguration
fileprivate let uploader: BatchUploadFunction
fileprivate let serializeRecord: (Record<T>) -> String?
fileprivate var batchToken: BatchToken?
// Keep track of the limits of a single batch
fileprivate var totalBytes: ByteCount = 0
fileprivate var totalRecords: Int = 0
// Keep track of the limits of a single POST
fileprivate var postBytes: ByteCount = 0
fileprivate var postRecords: Int = 0
fileprivate var records = [UploadRecord]()
fileprivate var onCollectionUploaded: (POSTResult, Timestamp?) -> DeferredTimestamp
fileprivate func batchQueryParamWithValue(_ value: String) -> URLQueryItem {
return URLQueryItem(name: "batch", value: value)
}
init(config: InfoConfiguration, ifUnmodifiedSince: Timestamp? = nil, serializeRecord: @escaping (Record<T>) -> String?,
uploader: @escaping BatchUploadFunction, onCollectionUploaded: @escaping (POSTResult, Timestamp?) -> DeferredTimestamp) {
self.config = config
self.ifUnmodifiedSince = ifUnmodifiedSince
self.uploader = uploader
self.serializeRecord = serializeRecord
self.onCollectionUploaded = onCollectionUploaded
}
open func endBatch() -> Success {
guard !records.isEmpty else {
return succeed()
}
if let token = self.batchToken {
return commitBatch(token) >>> succeed
}
let lines = self.freezePost()
return self.uploader(lines, self.ifUnmodifiedSince, nil)
>>== effect(moveForward)
>>> succeed
}
// If in batch mode, will discard the batch if any record fails
open func endSingleBatch() -> Deferred<Maybe<(succeeded: [GUID], lastModified: Timestamp?)>> {
return self.start() >>== { response in
let succeeded = response.value.success
guard let token = self.batchToken else {
return deferMaybe((succeeded: succeeded, lastModified: response.metadata.lastModifiedMilliseconds))
}
guard succeeded.count == self.totalRecords else {
return deferMaybe(RecordsFailedToUpload())
}
return self.commitBatch(token) >>== { commitResp in
return deferMaybe((succeeded: succeeded, lastModified: commitResp.metadata.lastModifiedMilliseconds))
}
}
}
open func addRecords(_ records: [Record<T>], singleBatch: Bool = false) -> Success {
guard !records.isEmpty else {
return succeed()
}
// Eagerly serializer the record prior to processing them so we can catch any issues
// with record sizes before we start uploading to the server.
let serializeThunks = records.map { record in
return { self.serialize(record) }
}
return accumulate(serializeThunks) >>== {
let iter = $0.makeIterator()
if singleBatch {
return self.addRecordsInSingleBatch(iter)
} else {
return self.addRecords(iter)
}
}
}
fileprivate func addRecords(_ generator: IndexingIterator<[UploadRecord]>) -> Success {
var mutGenerator = generator
while let record = mutGenerator.next() {
return accumulateOrUpload(record) >>> { self.addRecords(mutGenerator) }
}
return succeed()
}
fileprivate func addRecordsInSingleBatch(_ generator: IndexingIterator<[UploadRecord]>) -> Success {
var mutGenerator = generator
while let record = mutGenerator.next() {
guard self.addToPost(record) else {
return deferMaybe(TooManyRecordsError())
}
}
return succeed()
}
fileprivate func accumulateOrUpload(_ record: UploadRecord) -> Success {
return accumulateRecord(record).bind { result in
// Try to add the record to our buffer
guard let e = result.failureValue as? AccumulateRecordError else {
return succeed()
}
switch e {
case .full(let uploadOp):
return uploadOp >>> { self.accumulateOrUpload(record) }
default:
return deferMaybe(e)
}
}
}
fileprivate func accumulateRecord(_ record: UploadRecord) -> Success {
guard let token = self.batchToken else {
guard addToPost(record) else {
return deferMaybe(AccumulateRecordError.full(uploadOp: self.start()))
}
return succeed()
}
guard fitsInBatch(record) else {
return deferMaybe(AccumulateRecordError.full(uploadOp: self.commitBatch(token)))
}
guard addToPost(record) else {
return deferMaybe(AccumulateRecordError.full(uploadOp: self.postInBatch(token)))
}
addToBatch(record)
return succeed()
}
fileprivate func serialize(_ record: Record<T>) -> Deferred<Maybe<UploadRecord>> {
guard let line = self.serializeRecord(record) else {
return deferMaybe(SerializeRecordFailure(record: record))
}
let lineSize = line.utf8.count
guard lineSize < Sync15StorageClient.maxRecordSizeBytes else {
return deferMaybe(RecordTooLargeError(size: lineSize, guid: record.id))
}
return deferMaybe((record.id, line, lineSize))
}
fileprivate func addToPost(_ record: UploadRecord) -> Bool {
guard postRecords + 1 <= config.maxPostRecords && postBytes + record.sizeBytes <= config.maxPostBytes else {
return false
}
postRecords += 1
postBytes += record.sizeBytes
records.append(record)
return true
}
fileprivate func fitsInBatch(_ record: UploadRecord) -> Bool {
return totalRecords + 1 <= config.maxTotalRecords && totalBytes + record.sizeBytes <= config.maxTotalBytes
}
fileprivate func addToBatch(_ record: UploadRecord) {
totalRecords += 1
totalBytes += record.sizeBytes
}
fileprivate func postInBatch(_ token: BatchToken) -> DeferredResponse {
// Push up the current payload to the server and reset
let lines = self.freezePost()
return uploader(lines, self.ifUnmodifiedSince, [batchQueryParamWithValue(token)])
}
fileprivate func commitBatch(_ token: BatchToken) -> DeferredResponse {
resetBatch()
let lines = self.freezePost()
let queryParams = [batchQueryParamWithValue(token), commitParam]
return uploader(lines, self.ifUnmodifiedSince, queryParams)
>>== effect(moveForward)
}
fileprivate func start() -> DeferredResponse {
let postRecordCount = self.postRecords
let postBytesCount = self.postBytes
let lines = freezePost()
return self.uploader(lines, self.ifUnmodifiedSince, [batchQueryParamWithValue("true")])
>>== effect(moveForward)
>>== { response in
if let token = response.value.batchToken {
self.batchToken = token
// Now that we've started a batch, make sure to set the counters for the batch to include
// the records we just sent as part of the start call.
self.totalRecords = postRecordCount
self.totalBytes = postBytesCount
}
return deferMaybe(response)
}
}
fileprivate func moveForward(_ response: StorageResponse<POSTResult>) {
let lastModified = response.metadata.lastModifiedMilliseconds
self.ifUnmodifiedSince = lastModified
_ = self.onCollectionUploaded(response.value, lastModified)
}
fileprivate func resetBatch() {
totalBytes = 0
totalRecords = 0
self.batchToken = nil
}
fileprivate func freezePost() -> [String] {
let lines = records.map { $0.payload }
self.records = []
self.postBytes = 0
self.postRecords = 0
return lines
}
}