Revision control

1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
* License, v. 2.0. If a copy of the MPL was not distributed with this
3
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5
import Foundation
6
import Shared
7
import Account
8
import XCGLogger
9
import SwiftyJSON
10
11
private let log = Logger.syncLogger
12
13
// Not an error that indicates a server problem, but merely an
14
// error that encloses a StorageResponse.
15
open class StorageResponseError<T>: MaybeErrorType, SyncPingFailureFormattable {
16
public let response: StorageResponse<T>
17
18
open var failureReasonName: SyncPingFailureReasonName {
19
return .httpError
20
}
21
22
public init(_ response: StorageResponse<T>) {
23
self.response = response
24
}
25
26
open var description: String {
27
return "Error."
28
}
29
}
30
31
open class RequestError: MaybeErrorType, SyncPingFailureFormattable {
32
open var failureReasonName: SyncPingFailureReasonName {
33
return .httpError
34
}
35
36
open var description: String {
37
return "Request error."
38
}
39
}
40
41
open class BadRequestError<T>: StorageResponseError<T> {
42
public let request: URLRequest?
43
44
public init(request: URLRequest?, response: StorageResponse<T>) {
45
self.request = request
46
super.init(response)
47
}
48
49
override open var description: String {
50
return "Bad request."
51
}
52
}
53
54
open class ServerError<T>: StorageResponseError<T> {
55
override open var description: String {
56
return "Server error."
57
}
58
59
override public init(_ response: StorageResponse<T>) {
60
super.init(response)
61
}
62
}
63
64
open class NotFound<T>: StorageResponseError<T> {
65
override open var description: String {
66
return "Not found. (\(T.self))"
67
}
68
69
override public init(_ response: StorageResponse<T>) {
70
super.init(response)
71
}
72
}
73
74
open class RecordParseError: MaybeErrorType, SyncPingFailureFormattable {
75
open var description: String {
76
return "Failed to parse record."
77
}
78
79
open var failureReasonName: SyncPingFailureReasonName {
80
return .otherError
81
}
82
}
83
84
open class MalformedMetaGlobalError: MaybeErrorType, SyncPingFailureFormattable {
85
open var description: String {
86
return "Supplied meta/global for upload did not serialize to valid JSON."
87
}
88
89
open var failureReasonName: SyncPingFailureReasonName {
90
return .otherError
91
}
92
}
93
94
open class RecordTooLargeError: MaybeErrorType, SyncPingFailureFormattable {
95
public let guid: GUID
96
public let size: ByteCount
97
98
open var failureReasonName: SyncPingFailureReasonName {
99
return .otherError
100
}
101
102
public init(size: ByteCount, guid: GUID) {
103
self.size = size
104
self.guid = guid
105
}
106
107
open var description: String {
108
return "Record \(self.guid) too large: \(size) bytes."
109
}
110
}
111
112
/**
113
* Raised when the storage client is refusing to make a request due to a known
114
* server backoff.
115
* If you want to bypass this, remove the backoff from the BackoffStorage that
116
* the storage client is using.
117
*/
118
open class ServerInBackoffError: MaybeErrorType, SyncPingFailureFormattable {
119
fileprivate let until: Timestamp
120
121
open var failureReasonName: SyncPingFailureReasonName {
122
return .otherError
123
}
124
125
open var description: String {
126
let formatter = DateFormatter()
127
formatter.dateStyle = .short
128
formatter.timeStyle = .medium
129
let s = formatter.string(from: Date.fromTimestamp(self.until))
130
return "Server in backoff until \(s)."
131
}
132
133
public init(until: Timestamp) {
134
self.until = until
135
}
136
}
137
138
// Returns milliseconds. Handles decimals.
139
private func optionalSecondsHeader(_ input: AnyObject?) -> Timestamp? {
140
if input == nil {
141
return nil
142
}
143
144
if let val = input as? String {
145
if let timestamp = decimalSecondsStringToTimestamp(val) {
146
return timestamp
147
}
148
}
149
150
if let seconds: Double = input as? Double {
151
// Oh for a BigDecimal library.
152
return Timestamp(seconds * 1000)
153
}
154
155
if let seconds: NSNumber = input as? NSNumber {
156
// Who knows.
157
return seconds.uint64Value * 1000
158
}
159
160
return nil
161
}
162
163
private func optionalIntegerHeader(_ input: AnyObject?) -> Int64? {
164
if input == nil {
165
return nil
166
}
167
168
if let val = input as? String {
169
return Scanner(string: val).scanLongLong()
170
}
171
172
if let val: Double = input as? Double {
173
// Oh for a BigDecimal library.
174
return Int64(val)
175
}
176
177
if let val: NSNumber = input as? NSNumber {
178
// Who knows.
179
return val.int64Value
180
}
181
182
return nil
183
}
184
185
private func optionalUIntegerHeader(_ input: AnyObject?) -> Timestamp? {
186
if input == nil {
187
return nil
188
}
189
190
if let val = input as? String {
191
return Scanner(string: val).scanUnsignedLongLong()
192
}
193
194
if let val: Double = input as? Double {
195
// Oh for a BigDecimal library.
196
return Timestamp(val)
197
}
198
199
if let val: NSNumber = input as? NSNumber {
200
// Who knows.
201
return val.uint64Value
202
}
203
204
return nil
205
}
206
207
public enum SortOption: String {
208
case NewestFirst = "newest"
209
case OldestFirst = "oldest"
210
case Index = "index"
211
}
212
213
public struct ResponseMetadata {
214
public let status: Int
215
public let alert: String?
216
public let nextOffset: String?
217
public let records: UInt64?
218
public let quotaRemaining: Int64?
219
public let timestampMilliseconds: Timestamp // Non-optional. Server timestamp when handling request.
220
public let lastModifiedMilliseconds: Timestamp? // Included for all success responses. Collection or record timestamp.
221
public let backoffMilliseconds: UInt64?
222
public let retryAfterMilliseconds: UInt64?
223
224
public init(response: HTTPURLResponse) {
225
self.init(status: response.statusCode, headers: response.allHeaderFields)
226
}
227
228
init(status: Int, headers: [AnyHashable: Any]) {
229
// Work around bug https://bugs.swift.org/browse/SR-2429
230
// response.allHeaderFields is case sensitive in versions newer than swift 2.
231
// This is a 3 year old bug that has not been fixed.
232
// Lowercase all of the header keys so we can index the headers map without
233
// worrying about case.
234
let headers = Dictionary(uniqueKeysWithValues: headers.map {
235
(String(describing: $0.key).lowercased(), String(describing: $0.value))
236
})
237
238
self.status = status
239
alert = headers["x-weave-alert"]
240
nextOffset = headers["x-weave-next-offset"]
241
records = optionalUIntegerHeader(headers["x-weave-records"] as AnyObject?)
242
quotaRemaining = optionalIntegerHeader(headers["x-weave-quota-remaining"] as AnyObject?)
243
timestampMilliseconds = optionalSecondsHeader(headers["x-weave-timestamp"] as AnyObject?) ?? 0
244
lastModifiedMilliseconds = optionalSecondsHeader(headers["x-last-modified"] as AnyObject?)
245
backoffMilliseconds = optionalSecondsHeader(headers["x-weave-backoff"] as AnyObject?) ??
246
optionalSecondsHeader(headers["x-backoff"] as AnyObject?)
247
retryAfterMilliseconds = optionalSecondsHeader(headers["retry-after"] as AnyObject?)
248
}
249
}
250
251
public struct StorageResponse<T> {
252
public let value: T
253
public let metadata: ResponseMetadata
254
255
init(value: T, metadata: ResponseMetadata) {
256
self.value = value
257
self.metadata = metadata
258
}
259
260
init(value: T, response: HTTPURLResponse) {
261
self.value = value
262
self.metadata = ResponseMetadata(response: response)
263
}
264
}
265
266
public typealias BatchToken = String
267
268
public typealias ByteCount = Int
269
270
public struct POSTResult {
271
public let success: [GUID]
272
public let failed: [GUID: String]
273
public let batchToken: BatchToken?
274
275
public init(success: [GUID], failed: [GUID: String], batchToken: BatchToken? = nil) {
276
self.success = success
277
self.failed = failed
278
self.batchToken = batchToken
279
}
280
281
public static func fromJSON(_ json: JSON) -> POSTResult? {
282
if json.isError() {
283
return nil
284
}
285
286
let batchToken = json["batch"].string
287
288
if let s = json["success"].array,
289
let f = json["failed"].dictionary {
290
var failed = false
291
let stringOrFail: (JSON) -> String = { $0.string ?? { failed = true; return "" }() }
292
293
// That's the basic structure. Now let's transform the contents.
294
let successGUIDs = s.map(stringOrFail)
295
if failed {
296
return nil
297
}
298
let failedGUIDs = mapValues(f, f: stringOrFail)
299
if failed {
300
return nil
301
}
302
return POSTResult(success: successGUIDs, failed: failedGUIDs, batchToken: batchToken)
303
}
304
return nil
305
}
306
}
307
308
public typealias Authorizer = (URLRequest) -> URLRequest
309
310
// TODO: don't be so naïve. Use a combination of uptime and wall clock time.
311
public protocol BackoffStorage {
312
var serverBackoffUntilLocalTimestamp: Timestamp? { get set }
313
func clearServerBackoff()
314
func isInBackoff(_ now: Timestamp) -> Timestamp? // Returns 'until' for convenience.
315
}
316
317
// Don't forget to batch downloads.
318
open class Sync15StorageClient {
319
fileprivate let authorizer: Authorizer
320
fileprivate let serverURI: URL
321
322
public static let maxRecordSizeBytes: Int = 262_140 // A shade under 256KB.
323
public static let maxPayloadSizeBytes: Int = 1_000_000 // A shade under 1MB.
324
public static let maxPayloadItemCount: Int = 100 // Bug 1250747 will raise this.
325
326
var backoff: BackoffStorage
327
328
let workQueue: DispatchQueue
329
let resultQueue: DispatchQueue
330
331
public init(token: TokenServerToken, workQueue: DispatchQueue, resultQueue: DispatchQueue, backoff: BackoffStorage) {
332
self.workQueue = workQueue
333
self.resultQueue = resultQueue
334
self.backoff = backoff
335
336
// This is a potentially dangerous assumption, but failable initializers up the stack are a giant pain.
337
// We want the serverURI to *not* have a trailing slash: to efficiently wipe a user's storage, we delete
338
// the user root (like /1.5/1234567) and not an "empty collection" (like /1.5/1234567/); the storage
339
// server treats the first like a DROP table and the latter like a DELETE *, and the former is more
340
// efficient than the latter.
341
342
self.serverURI = URL(string: token.api_endpoint.hasSuffix("/")
343
? String(token.api_endpoint[..<token.api_endpoint.index(before: token.api_endpoint.endIndex)])
344
: token.api_endpoint)!
345
self.authorizer = {
346
(r: URLRequest) -> URLRequest in
347
var req = r
348
let helper = HawkHelper(id: token.id, key: token.key.data(using: .utf8, allowLossyConversion: false)!)
349
req.setValue(helper.getAuthorizationValueFor(r), forHTTPHeaderField: "Authorization")
350
return req
351
}
352
}
353
354
public init(serverURI: URL, authorizer: @escaping Authorizer, workQueue: DispatchQueue, resultQueue: DispatchQueue, backoff: BackoffStorage) {
355
self.serverURI = serverURI
356
self.authorizer = authorizer
357
self.workQueue = workQueue
358
self.resultQueue = resultQueue
359
self.backoff = backoff
360
}
361
362
func updateBackoffFromResponse<T>(_ response: StorageResponse<T>) {
363
// N.B., we would not have made this request if a backoff were set, so
364
// we can safely avoid doing the write if there's no backoff in the
365
// response.
366
// This logic will have to change if we ever invalidate that assumption.
367
if let ms = response.metadata.backoffMilliseconds ?? response.metadata.retryAfterMilliseconds {
368
log.info("Backing off for \(ms)ms.")
369
self.backoff.serverBackoffUntilLocalTimestamp = ms + Date.now()
370
}
371
}
372
373
func getFailureInfo(_ response: URLResponse?, _ error: Error?) -> MaybeErrorType? {
374
func failFromResponse(_ httpResponse: HTTPURLResponse?) -> MaybeErrorType? {
375
guard let httpResponse = httpResponse else {
376
// TODO: better error.
377
log.error("No response")
378
return RecordParseError()
379
}
380
381
log.debug("Status code: \(httpResponse.statusCode).")
382
let storageResponse = StorageResponse(value: httpResponse, metadata: ResponseMetadata(response: httpResponse))
383
self.updateBackoffFromResponse(storageResponse)
384
385
if httpResponse.statusCode >= 500 {
386
log.debug("ServerError.")
387
return ServerError(storageResponse)
388
}
389
390
if httpResponse.statusCode == 404 {
391
log.debug("NotFound")
392
return NotFound(storageResponse)
393
}
394
395
if httpResponse.statusCode >= 400 {
396
log.debug("BadRequestError.")
397
let req = URLRequest(url: httpResponse.url!)
398
return BadRequestError(request: req, response: storageResponse)
399
}
400
401
return nil
402
}
403
404
let httpResponse = response as? HTTPURLResponse
405
if error != nil {
406
log.error("Response: \(httpResponse?.statusCode ?? 0). Got error \(error ??? "nil").")
407
408
// If we got one, we don't want to hit the response nil case above and
409
// return a RecordParseError, because a RequestError is more fittinghttpResponse
410
if let httpResponse = httpResponse, let result = failFromResponse(httpResponse) {
411
log.error("This was a failure response. Filled specific error type.")
412
return result
413
}
414
415
log.error("Filling generic RequestError.")
416
return RequestError()
417
}
418
419
if let result = failFromResponse(httpResponse) {
420
return result
421
}
422
423
return nil
424
}
425
426
lazy fileprivate var urlSession: URLSession = makeURLSession(userAgent: UserAgent.syncUserAgent, configuration: URLSessionConfiguration.ephemeral)
427
428
typealias URLSessionCompletion = (Data?, URLResponse?, Error?) -> Void
429
430
func requestGET(_ url: URL, completion: @escaping URLSessionCompletion) {
431
var req = URLRequest(url: url as URL)
432
req.httpMethod = URLRequest.Method.get.rawValue
433
req.setValue("application/json", forHTTPHeaderField: "Accept")
434
let authorized: URLRequest = self.authorizer(req)
435
urlSession.dataTask(with: authorized) { (data, response, error) in
436
completion(data, response, error)
437
}.resume()
438
}
439
440
func requestDELETE(_ url: URL, completion: @escaping URLSessionCompletion) {
441
var req = URLRequest(url: url as URL)
442
req.httpMethod = URLRequest.Method.delete.rawValue
443
req.setValue("1", forHTTPHeaderField: "X-Confirm-Delete")
444
let authorized: URLRequest = self.authorizer(req)
445
urlSession.dataTask(with: authorized) { (data, response, error) in
446
completion(data, response, error)
447
}.resume()
448
}
449
450
func requestWrite(_ url: URL, method: String, body: String, contentType: String, ifUnmodifiedSince: Timestamp?, completion: @escaping URLSessionCompletion) {
451
var req = URLRequest(url: url as URL)
452
req.httpMethod = method
453
req.setValue(contentType, forHTTPHeaderField: "Content-Type")
454
455
if let ifUnmodifiedSince = ifUnmodifiedSince {
456
req.setValue(millisecondsToDecimalSeconds(ifUnmodifiedSince), forHTTPHeaderField: "X-If-Unmodified-Since")
457
}
458
459
req.httpBody = body.data(using: .utf8)!
460
let authorized: URLRequest = self.authorizer(req)
461
urlSession.dataTask(with: authorized) { (data, response, error) in
462
completion(data, response, error)
463
}.resume()
464
}
465
466
func requestPUT(_ url: URL, body: JSON, ifUnmodifiedSince: Timestamp?, completion: @escaping URLSessionCompletion) {
467
requestWrite(url, method: URLRequest.Method.put.rawValue, body: body.stringify()!, contentType: "application/json;charset=utf-8", ifUnmodifiedSince: ifUnmodifiedSince, completion: completion)
468
}
469
470
func requestPOST(_ url: URL, body: JSON, ifUnmodifiedSince: Timestamp?, completion: @escaping URLSessionCompletion) {
471
requestWrite(url, method: URLRequest.Method.post.rawValue, body: body.stringify()!, contentType: "application/json;charset=utf-8", ifUnmodifiedSince: ifUnmodifiedSince, completion: completion)
472
}
473
474
func requestPOST(_ url: URL, body: [String], ifUnmodifiedSince: Timestamp?, completion: @escaping URLSessionCompletion) {
475
let content = body.joined(separator: "\n")
476
requestWrite(url, method: URLRequest.Method.post.rawValue, body: content, contentType: "application/newlines", ifUnmodifiedSince: ifUnmodifiedSince, completion: completion)
477
}
478
479
func requestPOST(_ url: URL, body: [JSON], ifUnmodifiedSince: Timestamp?, completion: @escaping URLSessionCompletion) {
480
requestPOST(url, body: body.map { $0.stringify()! }, ifUnmodifiedSince: ifUnmodifiedSince, completion: completion)
481
}
482
483
/**
484
* Returns true and fills the provided Deferred if our state shows that we're in backoff.
485
* Returns false otherwise.
486
*/
487
fileprivate func checkBackoff<T>(_ deferred: Deferred<Maybe<T>>) -> Bool {
488
if let until = self.backoff.isInBackoff(Date.now()) {
489
deferred.fill(Maybe<T>(failure: ServerInBackoffError(until: until)))
490
return true
491
}
492
return false
493
}
494
495
fileprivate func doOp<T>(_ op: (URL, @escaping URLSessionCompletion) -> Void, path: String, f: @escaping (JSON) -> T?) -> Deferred<Maybe<StorageResponse<T>>> {
496
497
let deferred = Deferred<Maybe<StorageResponse<T>>>(defaultQueue: self.resultQueue)
498
499
if self.checkBackoff(deferred) {
500
return deferred
501
}
502
503
// Special case "": we want /1.5/1234567 and not /1.5/1234567/. See note about trailing slashes above.
504
let url: URL
505
if path == "" {
506
url = self.serverURI // No trailing slash.
507
} else {
508
url = self.serverURI.appendingPathComponent(path)
509
510
}
511
512
op(url) { (data, response, error) in
513
if let failure = self.getFailureInfo(response, error) {
514
let result = Maybe<StorageResponse<T>>.failure(failure)
515
deferred.fill(result)
516
return
517
}
518
519
if let data = data {
520
let json = JSON(data)
521
if let v = f(json), let response = response as? HTTPURLResponse {
522
let storageResponse = StorageResponse<T>(value: v, response: response)
523
deferred.fill(Maybe(success: storageResponse))
524
} else {
525
deferred.fill(Maybe(failure: RecordParseError()))
526
}
527
return
528
}
529
530
deferred.fill(Maybe(failure: RecordParseError()))
531
}
532
533
return deferred
534
}
535
536
// Sync storage responds with a plain timestamp to a PUT, not with a JSON body.
537
fileprivate func putResource<T>(_ path: String, body: JSON, ifUnmodifiedSince: Timestamp?, parser: @escaping (String) -> T?) -> Deferred<Maybe<StorageResponse<T>>> {
538
let url = self.serverURI.appendingPathComponent(path)
539
return self.putResource(url, body: body, ifUnmodifiedSince: ifUnmodifiedSince, parser: parser)
540
}
541
542
fileprivate func putResource<T>(_ URL: Foundation.URL, body: JSON, ifUnmodifiedSince: Timestamp?, parser: @escaping (String) -> T?) -> Deferred<Maybe<StorageResponse<T>>> {
543
544
let deferred = Deferred<Maybe<StorageResponse<T>>>(defaultQueue: self.resultQueue)
545
if self.checkBackoff(deferred) {
546
return deferred
547
}
548
549
requestPUT(URL, body: body, ifUnmodifiedSince: ifUnmodifiedSince) { (data, response, error) in
550
if let failure = self.getFailureInfo(response, error) {
551
let result = Maybe<StorageResponse<T>>.failure(failure)
552
deferred.fill(result)
553
return
554
}
555
556
if let data = data, let response = response as? HTTPURLResponse, let str = String(data: data, encoding: .utf8) {
557
if let v = parser(str) {
558
let storageResponse = StorageResponse<T>(value: v, response: response)
559
deferred.fill(Maybe(success: storageResponse))
560
} else {
561
deferred.fill(Maybe(failure: RecordParseError()))
562
}
563
return
564
}
565
deferred.fill(Maybe(failure: RecordParseError()))
566
}
567
568
return deferred
569
}
570
571
fileprivate func getResource<T>(_ path: String, f: @escaping (JSON) -> T?) -> Deferred<Maybe<StorageResponse<T>>> {
572
return doOp(self.requestGET, path: path, f: f)
573
}
574
575
fileprivate func deleteResource<T>(_ path: String, f: @escaping (JSON) -> T?) -> Deferred<Maybe<StorageResponse<T>>> {
576
return doOp(self.requestDELETE, path: path, f: f)
577
}
578
579
func wipeStorage() -> Deferred<Maybe<StorageResponse<JSON>>> {
580
// In Sync 1.5 it's preferred that we delete the root, not /storage.
581
return deleteResource("", f: { $0 })
582
}
583
584
func getInfoCollections() -> Deferred<Maybe<StorageResponse<InfoCollections>>> {
585
return getResource("info/collections", f: InfoCollections.fromJSON)
586
}
587
588
func getMetaGlobal() -> Deferred<Maybe<StorageResponse<MetaGlobal>>> {
589
return getResource("storage/meta/global") { json in
590
// We have an envelope. Parse the meta/global record embedded in the 'payload' string.
591
let envelope = EnvelopeJSON(json)
592
if envelope.isValid() {
593
return MetaGlobal.fromJSON(JSON(parseJSON: envelope.payload))
594
}
595
return nil
596
}
597
}
598
599
func getCryptoKeys(_ syncKeyBundle: KeyBundle, ifUnmodifiedSince: Timestamp?) -> Deferred<Maybe<StorageResponse<Record<KeysPayload>>>> {
600
let syncKey = Keys(defaultBundle: syncKeyBundle)
601
let encoder = RecordEncoder<KeysPayload>(decode: { KeysPayload($0) }, encode: { $0.json })
602
let encrypter = syncKey.encrypter("keys", encoder: encoder)
603
let client = self.clientForCollection("crypto", encrypter: encrypter)
604
return client.get("keys")
605
}
606
607
func uploadMetaGlobal(_ metaGlobal: MetaGlobal, ifUnmodifiedSince: Timestamp?) -> Deferred<Maybe<StorageResponse<Timestamp>>> {
608
let payload = metaGlobal.asPayload()
609
if payload.json.isError() {
610
return Deferred(value: Maybe(failure: MalformedMetaGlobalError()))
611
}
612
613
let record = JSON(["payload": payload.json.stringify() ?? JSON.null as Any, "id": "global"])
614
return putResource("storage/meta/global", body: record, ifUnmodifiedSince: ifUnmodifiedSince, parser: decimalSecondsStringToTimestamp)
615
}
616
617
// The crypto/keys record is a special snowflake: it is encrypted with the Sync key bundle. All other records are
618
// encrypted with the bulk key bundle (including possibly a per-collection bulk key) stored in crypto/keys.
619
func uploadCryptoKeys(_ keys: Keys, withSyncKeyBundle syncKeyBundle: KeyBundle, ifUnmodifiedSince: Timestamp?) -> Deferred<Maybe<StorageResponse<Timestamp>>> {
620
let syncKey = Keys(defaultBundle: syncKeyBundle)
621
let encoder = RecordEncoder<KeysPayload>(decode: { KeysPayload($0) }, encode: { $0.json })
622
let encrypter = syncKey.encrypter("keys", encoder: encoder)
623
let client = self.clientForCollection("crypto", encrypter: encrypter)
624
625
let record = Record(id: "keys", payload: keys.asPayload())
626
return client.put(record, ifUnmodifiedSince: ifUnmodifiedSince)
627
}
628
629
// It would be convenient to have the storage client manage Keys, but of course we need to use a different set of
630
// keys to fetch crypto/keys itself. See uploadCryptoKeys.
631
func clientForCollection<T>(_ collection: String, encrypter: RecordEncrypter<T>) -> Sync15CollectionClient<T> {
632
let storage = self.serverURI.appendingPathComponent("storage", isDirectory: true)
633
return Sync15CollectionClient(client: self, serverURI: storage, collection: collection, encrypter: encrypter)
634
}
635
}
636
637
private let DefaultInfoConfiguration = InfoConfiguration(maxRequestBytes: 1_048_576,
638
maxPostRecords: 100,
639
maxPostBytes: 1_048_576,
640
maxTotalRecords: 10_000,
641
maxTotalBytes: 104_857_600)
642
643
/**
644
* We'd love to nest this in the overall storage client, but Swift
645
* forbids the nesting of a generic class inside another class.
646
*/
647
open class Sync15CollectionClient<T: CleartextPayloadJSON> {
648
fileprivate let client: Sync15StorageClient
649
fileprivate let encrypter: RecordEncrypter<T>
650
fileprivate let collectionURI: URL
651
fileprivate let collectionQueue = DispatchQueue(label: "com.mozilla.sync.collectionclient", attributes: [])
652
fileprivate let infoConfig = DefaultInfoConfiguration
653
654
public init(client: Sync15StorageClient, serverURI: URL, collection: String, encrypter: RecordEncrypter<T>) {
655
self.client = client
656
self.encrypter = encrypter
657
self.collectionURI = serverURI.appendingPathComponent(collection, isDirectory: false)
658
}
659
660
var maxBatchPostRecords: Int {
661
get {
662
return infoConfig.maxPostRecords
663
}
664
}
665
666
fileprivate func uriForRecord(_ guid: String) -> URL {
667
return self.collectionURI.appendingPathComponent(guid)
668
}
669
670
open func newBatch(ifUnmodifiedSince: Timestamp? = nil, onCollectionUploaded: @escaping (POSTResult, Timestamp?) -> DeferredTimestamp) -> Sync15BatchClient<T> {
671
return Sync15BatchClient(config: infoConfig,
672
ifUnmodifiedSince: ifUnmodifiedSince,
673
serializeRecord: self.serializeRecord,
674
uploader: self.post,
675
onCollectionUploaded: onCollectionUploaded)
676
}
677
678
// Exposed so we can batch by size.
679
open func serializeRecord(_ record: Record<T>) -> String? {
680
return self.encrypter.serializer(record)?.stringify()
681
}
682
683
open func post(_ lines: [String], ifUnmodifiedSince: Timestamp?, queryParams: [URLQueryItem]? = nil) -> Deferred<Maybe<StorageResponse<POSTResult>>> {
684
let deferred = Deferred<Maybe<StorageResponse<POSTResult>>>(defaultQueue: client.resultQueue)
685
686
if self.client.checkBackoff(deferred) {
687
return deferred
688
}
689
690
let requestURI: URL
691
if let queryParams = queryParams {
692
requestURI = self.collectionURI.withQueryParams(queryParams)
693
} else {
694
requestURI = self.collectionURI
695
}
696
697
client.requestPOST(requestURI, body: lines, ifUnmodifiedSince: ifUnmodifiedSince) { (data, response, error) in
698
do {
699
let json = try jsonResponse(fromData: data)
700
if let result = POSTResult.fromJSON(json), let response = response as? HTTPURLResponse {
701
let storageResponse = StorageResponse(value: result, response: response)
702
deferred.fill(Maybe(success: storageResponse))
703
return
704
} else {
705
log.warning("Couldn't parse JSON response.")
706
}
707
} catch {
708
log.warning("Couldn't parse JSON response. \(error)")
709
}
710
711
deferred.fill(Maybe(failure: RecordParseError()))
712
}
713
714
return deferred
715
}
716
717
open func post(_ records: [Record<T>], ifUnmodifiedSince: Timestamp?, queryParams: [URLQueryItem]? = nil) -> Deferred<Maybe<StorageResponse<POSTResult>>> {
718
// TODO: charset
719
// TODO: if any of these fail, we should do _something_. Right now we just ignore them.
720
let lines = optFilter(records.map(self.serializeRecord))
721
return self.post(lines, ifUnmodifiedSince: ifUnmodifiedSince, queryParams: queryParams)
722
}
723
724
open func put(_ record: Record<T>, ifUnmodifiedSince: Timestamp?) -> Deferred<Maybe<StorageResponse<Timestamp>>> {
725
if let body = self.encrypter.serializer(record) {
726
return self.client.putResource(uriForRecord(record.id), body: body, ifUnmodifiedSince: ifUnmodifiedSince, parser: decimalSecondsStringToTimestamp)
727
}
728
return deferMaybe(RecordParseError())
729
}
730
731
open func get(_ guid: String) -> Deferred<Maybe<StorageResponse<Record<T>>>> {
732
let deferred = Deferred<Maybe<StorageResponse<Record<T>>>>(defaultQueue: client.resultQueue)
733
734
if self.client.checkBackoff(deferred) {
735
return deferred
736
}
737
738
client.requestGET(uriForRecord(guid)) { (data, response, error) in
739
if let failure = self.client.getFailureInfo(response, error) {
740
let result = Maybe<StorageResponse<Record<T>>>.failure(failure)
741
deferred.fill(result)
742
return
743
}
744
745
do {
746
let json = try jsonResponse(fromData: data)
747
let envelope = EnvelopeJSON(json)
748
let record = Record<T>.fromEnvelope(envelope, payloadFactory: self.encrypter.factory)
749
if let record = record, let response = response as? HTTPURLResponse {
750
let storageResponse = StorageResponse(value: record, response: response)
751
deferred.fill(Maybe(success: storageResponse))
752
return
753
} else {
754
log.warning("Couldn't parse JSON response.")
755
}
756
} catch {
757
log.warning("Couldn't parse JSON response. \(error)")
758
759
}
760
761
deferred.fill(Maybe(failure: RecordParseError()))
762
}
763
764
return deferred
765
}
766
767
/**
768
* Unlike every other Sync client, we use the application/json format for fetching
769
* multiple requests. The others use application/newlines. We don't want to write
770
* another Serializer, and we're loading everything into memory anyway.
771
*
772
* It is the caller's responsibility to check whether the returned payloads are invalid.
773
*
774
* Only non-JSON and malformed envelopes will be dropped.
775
*/
776
open func getSince(_ since: Timestamp, sort: SortOption?=nil, limit: Int?=nil, offset: String?=nil) -> Deferred<Maybe<StorageResponse<[Record<T>]>>> {
777
let deferred = Deferred<Maybe<StorageResponse<[Record<T>]>>>(defaultQueue: client.resultQueue)
778
779
// Fills the Deferred for us.
780
if self.client.checkBackoff(deferred) {
781
return deferred
782
}
783
784
var params: [URLQueryItem] = [
785
URLQueryItem(name: "full", value: "1"),
786
URLQueryItem(name: "newer", value: millisecondsToDecimalSeconds(since)),
787
]
788
789
if let offset = offset {
790
params.append(URLQueryItem(name: "offset", value: offset))
791
}
792
793
if let limit = limit {
794
params.append(URLQueryItem(name: "limit", value: "\(limit)"))
795
}
796
797
if let sort = sort {
798
params.append(URLQueryItem(name: "sort", value: sort.rawValue))
799
}
800
801
log.debug("Issuing GET with newer = \(since), offset = \(offset ??? "nil"), sort = \(sort ??? "nil").")
802
client.requestGET(self.collectionURI.withQueryParams(params)) { (data, response, error) in
803
if let failure = self.client.getFailureInfo(response, error) {
804
let result = Maybe<StorageResponse<[Record<T>]>>.failure(failure)
805
deferred.fill(result)
806
return
807
}
808
809
do {
810
log.verbose("Response is \(response?.debugDescription ?? "").")
811
let json = try jsonResponse(fromData: data)
812
guard let arr = json.array, let httpResponse = response as? HTTPURLResponse else {
813
log.warning("Non-array response.")
814
deferred.fill(Maybe(failure: RecordParseError()))
815
return
816
}
817
818
func recordify(_ json: JSON) -> Record<T>? {
819
let envelope = EnvelopeJSON(json)
820
return Record<T>.fromEnvelope(envelope, payloadFactory: self.encrypter.factory)
821
}
822
823
let records = arr.compactMap(recordify)
824
let response = StorageResponse(value: records, response: httpResponse)
825
deferred.fill(Maybe(success: response))
826
return
827
} catch {
828
log.warning("Couldn't parse JSON response. \(error)")
829
}
830
831
deferred.fill(Maybe(failure: RecordParseError()))
832
}
833
834
return deferred
835
}
836
}