Revision control

1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
* License, v. 2.0. If a copy of the MPL was not distributed with this
3
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5
import Foundation
6
import Shared
7
import XCGLogger
8
9
open class SerializeRecordFailure<T: CleartextPayloadJSON>: MaybeErrorType, SyncPingFailureFormattable {
10
public let record: Record<T>
11
12
open var failureReasonName: SyncPingFailureReasonName {
13
return .otherError
14
}
15
16
open var description: String {
17
return "Failed to serialize record: \(record)"
18
}
19
20
public init(record: Record<T>) {
21
self.record = record
22
}
23
}
24
25
private let log = Logger.syncLogger
26
27
private typealias UploadRecord = (guid: GUID, payload: String, sizeBytes: Int)
28
public typealias DeferredResponse = Deferred<Maybe<StorageResponse<POSTResult>>>
29
30
typealias BatchUploadFunction = (_ lines: [String], _ ifUnmodifiedSince: Timestamp?, _ queryParams: [URLQueryItem]?) -> Deferred<Maybe<StorageResponse<POSTResult>>>
31
32
private let commitParam = URLQueryItem(name: "commit", value: "true")
33
34
private enum AccumulateRecordError: MaybeErrorType {
35
var description: String {
36
switch self {
37
case .full:
38
return "Batch or payload is full."
39
case .unknown:
40
return "Unknown errored while trying to accumulate records in batch"
41
}
42
}
43
44
case full(uploadOp: DeferredResponse)
45
case unknown
46
}
47
48
open class TooManyRecordsError: MaybeErrorType, SyncPingFailureFormattable {
49
open var description: String {
50
return "Trying to send too many records in a single batch."
51
}
52
open var failureReasonName: SyncPingFailureReasonName {
53
return .otherError
54
}
55
}
56
57
open class RecordsFailedToUpload: MaybeErrorType, SyncPingFailureFormattable {
58
open var description: String {
59
return "Some records failed to upload"
60
}
61
open var failureReasonName: SyncPingFailureReasonName {
62
return .otherError
63
}
64
}
65
66
open class Sync15BatchClient<T: CleartextPayloadJSON> {
67
fileprivate(set) var ifUnmodifiedSince: Timestamp?
68
69
fileprivate let config: InfoConfiguration
70
fileprivate let uploader: BatchUploadFunction
71
fileprivate let serializeRecord: (Record<T>) -> String?
72
73
fileprivate var batchToken: BatchToken?
74
75
// Keep track of the limits of a single batch
76
fileprivate var totalBytes: ByteCount = 0
77
fileprivate var totalRecords: Int = 0
78
79
// Keep track of the limits of a single POST
80
fileprivate var postBytes: ByteCount = 0
81
fileprivate var postRecords: Int = 0
82
83
fileprivate var records = [UploadRecord]()
84
85
fileprivate var onCollectionUploaded: (POSTResult, Timestamp?) -> DeferredTimestamp
86
87
fileprivate func batchQueryParamWithValue(_ value: String) -> URLQueryItem {
88
return URLQueryItem(name: "batch", value: value)
89
}
90
91
init(config: InfoConfiguration, ifUnmodifiedSince: Timestamp? = nil, serializeRecord: @escaping (Record<T>) -> String?,
92
uploader: @escaping BatchUploadFunction, onCollectionUploaded: @escaping (POSTResult, Timestamp?) -> DeferredTimestamp) {
93
self.config = config
94
self.ifUnmodifiedSince = ifUnmodifiedSince
95
self.uploader = uploader
96
self.serializeRecord = serializeRecord
97
98
self.onCollectionUploaded = onCollectionUploaded
99
}
100
101
open func endBatch() -> Success {
102
guard !records.isEmpty else {
103
return succeed()
104
}
105
106
if let token = self.batchToken {
107
return commitBatch(token) >>> succeed
108
}
109
110
let lines = self.freezePost()
111
return self.uploader(lines, self.ifUnmodifiedSince, nil)
112
>>== effect(moveForward)
113
>>> succeed
114
}
115
116
// If in batch mode, will discard the batch if any record fails
117
open func endSingleBatch() -> Deferred<Maybe<(succeeded: [GUID], lastModified: Timestamp?)>> {
118
return self.start() >>== { response in
119
let succeeded = response.value.success
120
guard let token = self.batchToken else {
121
return deferMaybe((succeeded: succeeded, lastModified: response.metadata.lastModifiedMilliseconds))
122
}
123
guard succeeded.count == self.totalRecords else {
124
return deferMaybe(RecordsFailedToUpload())
125
}
126
return self.commitBatch(token) >>== { commitResp in
127
return deferMaybe((succeeded: succeeded, lastModified: commitResp.metadata.lastModifiedMilliseconds))
128
}
129
}
130
}
131
132
open func addRecords(_ records: [Record<T>], singleBatch: Bool = false) -> Success {
133
guard !records.isEmpty else {
134
return succeed()
135
}
136
137
// Eagerly serializer the record prior to processing them so we can catch any issues
138
// with record sizes before we start uploading to the server.
139
let serializeThunks = records.map { record in
140
return { self.serialize(record) }
141
}
142
143
return accumulate(serializeThunks) >>== {
144
let iter = $0.makeIterator()
145
if singleBatch {
146
return self.addRecordsInSingleBatch(iter)
147
} else {
148
return self.addRecords(iter)
149
}
150
}
151
}
152
153
fileprivate func addRecords(_ generator: IndexingIterator<[UploadRecord]>) -> Success {
154
var mutGenerator = generator
155
while let record = mutGenerator.next() {
156
return accumulateOrUpload(record) >>> { self.addRecords(mutGenerator) }
157
}
158
return succeed()
159
}
160
161
fileprivate func addRecordsInSingleBatch(_ generator: IndexingIterator<[UploadRecord]>) -> Success {
162
var mutGenerator = generator
163
while let record = mutGenerator.next() {
164
guard self.addToPost(record) else {
165
return deferMaybe(TooManyRecordsError())
166
}
167
}
168
return succeed()
169
}
170
171
fileprivate func accumulateOrUpload(_ record: UploadRecord) -> Success {
172
return accumulateRecord(record).bind { result in
173
// Try to add the record to our buffer
174
guard let e = result.failureValue as? AccumulateRecordError else {
175
return succeed()
176
}
177
178
switch e {
179
case .full(let uploadOp):
180
return uploadOp >>> { self.accumulateOrUpload(record) }
181
default:
182
return deferMaybe(e)
183
}
184
}
185
}
186
187
fileprivate func accumulateRecord(_ record: UploadRecord) -> Success {
188
guard let token = self.batchToken else {
189
guard addToPost(record) else {
190
return deferMaybe(AccumulateRecordError.full(uploadOp: self.start()))
191
}
192
return succeed()
193
}
194
195
guard fitsInBatch(record) else {
196
return deferMaybe(AccumulateRecordError.full(uploadOp: self.commitBatch(token)))
197
}
198
199
guard addToPost(record) else {
200
return deferMaybe(AccumulateRecordError.full(uploadOp: self.postInBatch(token)))
201
}
202
203
addToBatch(record)
204
return succeed()
205
}
206
207
fileprivate func serialize(_ record: Record<T>) -> Deferred<Maybe<UploadRecord>> {
208
guard let line = self.serializeRecord(record) else {
209
return deferMaybe(SerializeRecordFailure(record: record))
210
}
211
212
let lineSize = line.utf8.count
213
guard lineSize < Sync15StorageClient.maxRecordSizeBytes else {
214
return deferMaybe(RecordTooLargeError(size: lineSize, guid: record.id))
215
}
216
217
return deferMaybe((record.id, line, lineSize))
218
}
219
220
fileprivate func addToPost(_ record: UploadRecord) -> Bool {
221
guard postRecords + 1 <= config.maxPostRecords && postBytes + record.sizeBytes <= config.maxPostBytes else {
222
return false
223
}
224
postRecords += 1
225
postBytes += record.sizeBytes
226
records.append(record)
227
return true
228
}
229
230
fileprivate func fitsInBatch(_ record: UploadRecord) -> Bool {
231
return totalRecords + 1 <= config.maxTotalRecords && totalBytes + record.sizeBytes <= config.maxTotalBytes
232
}
233
234
fileprivate func addToBatch(_ record: UploadRecord) {
235
totalRecords += 1
236
totalBytes += record.sizeBytes
237
}
238
239
fileprivate func postInBatch(_ token: BatchToken) -> DeferredResponse {
240
// Push up the current payload to the server and reset
241
let lines = self.freezePost()
242
return uploader(lines, self.ifUnmodifiedSince, [batchQueryParamWithValue(token)])
243
}
244
245
fileprivate func commitBatch(_ token: BatchToken) -> DeferredResponse {
246
resetBatch()
247
let lines = self.freezePost()
248
let queryParams = [batchQueryParamWithValue(token), commitParam]
249
return uploader(lines, self.ifUnmodifiedSince, queryParams)
250
>>== effect(moveForward)
251
}
252
253
fileprivate func start() -> DeferredResponse {
254
let postRecordCount = self.postRecords
255
let postBytesCount = self.postBytes
256
let lines = freezePost()
257
return self.uploader(lines, self.ifUnmodifiedSince, [batchQueryParamWithValue("true")])
258
>>== effect(moveForward)
259
>>== { response in
260
if let token = response.value.batchToken {
261
self.batchToken = token
262
263
// Now that we've started a batch, make sure to set the counters for the batch to include
264
// the records we just sent as part of the start call.
265
self.totalRecords = postRecordCount
266
self.totalBytes = postBytesCount
267
}
268
269
return deferMaybe(response)
270
}
271
}
272
273
fileprivate func moveForward(_ response: StorageResponse<POSTResult>) {
274
let lastModified = response.metadata.lastModifiedMilliseconds
275
self.ifUnmodifiedSince = lastModified
276
_ = self.onCollectionUploaded(response.value, lastModified)
277
}
278
279
fileprivate func resetBatch() {
280
totalBytes = 0
281
totalRecords = 0
282
self.batchToken = nil
283
}
284
285
fileprivate func freezePost() -> [String] {
286
let lines = records.map { $0.payload }
287
self.records = []
288
self.postBytes = 0
289
self.postRecords = 0
290
return lines
291
}
292
}