Source code

Revision control

Other Tools

1
#!/usr/bin/env python
2
#
3
# This Source Code Form is subject to the terms of the Mozilla Public
4
# License, v. 2.0. If a copy of the MPL was not distributed with this
5
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7
"""
8
Reads a certificate specification from stdin or a file and outputs a
9
signed x509 certificate with the desired properties.
10
11
The input format is as follows:
12
13
issuer:<issuer distinguished name specification>
14
subject:<subject distinguished name specification>
15
[version:{1,2,3,4}]
16
[validity:<YYYYMMDD-YYYYMMDD|duration in days>]
17
[issuerKey:<key specification>]
18
[subjectKey:<key specification>]
19
[signature:{sha256WithRSAEncryption,sha1WithRSAEncryption,
20
md5WithRSAEncryption,ecdsaWithSHA256,ecdsaWithSHA384,
21
ecdsaWithSHA512}]
22
[serialNumber:<integer in the interval [1, 127]>]
23
[extension:<extension name:<extension-specific data>>]
24
[...]
25
26
Known extensions are:
27
basicConstraints:[cA],[pathLenConstraint]
28
keyUsage:[digitalSignature,nonRepudiation,keyEncipherment,
29
dataEncipherment,keyAgreement,keyCertSign,cRLSign]
30
extKeyUsage:[serverAuth,clientAuth,codeSigning,emailProtection
31
nsSGC, # Netscape Server Gated Crypto
32
OCSPSigning,timeStamping]
33
subjectAlternativeName:[<dNSName|directoryName|"ip4:"iPV4Address>,...]
34
authorityInformationAccess:<OCSP URI>
35
certificatePolicies:[<policy OID>,...]
36
nameConstraints:{permitted,excluded}:[<dNSName|directoryName>,...]
37
nsCertType:sslServer
38
TLSFeature:[<TLSFeature>,...]
39
embeddedSCTList:[<key specification>:<YYYYMMDD>,...]
40
delegationUsage:
41
42
Where:
43
[] indicates an optional field or component of a field
44
<> indicates a required component of a field
45
{} indicates a choice of exactly one value among a set of values
46
[a,b,c] indicates a list of potential values, of which zero or more
47
may be used
48
49
For instance, the version field is optional. However, if it is
50
specified, it must have exactly one value from the set {1,2,3,4}.
51
52
Most fields have reasonable default values. By default one shared RSA
53
key is used for all signatures and subject public key information
54
fields. Using "issuerKey:<key specification>" or
55
"subjectKey:<key specification>" causes a different key be used for
56
signing or as the subject public key information field, respectively.
57
See pykey.py for the list of available specifications.
58
The signature algorithm is sha256WithRSAEncryption by default.
59
60
The validity period may be specified as either concrete notBefore and
61
notAfter values or as a validity period centered around 'now'. For the
62
latter, this will result in a notBefore of 'now' - duration/2 and a
63
notAfter of 'now' + duration/2.
64
65
Issuer and subject distinguished name specifications are of the form
66
'[stringEncoding]/C=XX/O=Example/CN=example.com'. C (country name), ST
67
(state or province name), L (locality name), O (organization name), OU
68
(organizational unit name), CN (common name) and emailAddress (email
69
address) are currently supported. The optional stringEncoding field may
70
be 'utf8String' or 'printableString'. If the given string does not
71
contain a '/', it is assumed to represent a common name. If an empty
72
string is provided, then an empty distinguished name is returned.
73
DirectoryNames also use this format. When specifying a directoryName in
74
a nameConstraints extension, the implicit form may not be used.
75
76
If an extension name has '[critical]' after it, it will be marked as
77
critical. Otherwise (by default), it will not be marked as critical.
78
79
TLSFeature values can either consist of a named value (currently only
80
'OCSPMustStaple' which corresponds to status_request) or a numeric TLS
81
feature value (see rfc7633 for more information).
82
83
If a serial number is not explicitly specified, it is automatically
84
generated based on the contents of the certificate.
85
"""
86
87
from pyasn1.codec.der import decoder
88
from pyasn1.codec.der import encoder
89
from pyasn1.type import constraint, tag, univ, useful
90
from pyasn1_modules import rfc2459
91
from struct import pack
92
import base64
93
import datetime
94
import hashlib
95
import re
96
import socket
97
import sys
98
99
import pyct
100
import pykey
101
102
class Error(Exception):
103
"""Base class for exceptions in this module."""
104
pass
105
106
107
class UnknownBaseError(Error):
108
"""Base class for handling unexpected input in this module."""
109
110
def __init__(self, value):
111
super(UnknownBaseError, self).__init__()
112
self.value = value
113
self.category = 'input'
114
115
def __str__(self):
116
return 'Unknown %s type "%s"' % (self.category, repr(self.value))
117
118
119
class UnknownAlgorithmTypeError(UnknownBaseError):
120
"""Helper exception type to handle unknown algorithm types."""
121
122
def __init__(self, value):
123
UnknownBaseError.__init__(self, value)
124
self.category = 'algorithm'
125
126
127
class UnknownParameterTypeError(UnknownBaseError):
128
"""Helper exception type to handle unknown input parameters."""
129
130
def __init__(self, value):
131
UnknownBaseError.__init__(self, value)
132
self.category = 'parameter'
133
134
135
class UnknownExtensionTypeError(UnknownBaseError):
136
"""Helper exception type to handle unknown input extensions."""
137
138
def __init__(self, value):
139
UnknownBaseError.__init__(self, value)
140
self.category = 'extension'
141
142
143
class UnknownKeyPurposeTypeError(UnknownBaseError):
144
"""Helper exception type to handle unknown key purposes."""
145
146
def __init__(self, value):
147
UnknownBaseError.__init__(self, value)
148
self.category = 'keyPurpose'
149
150
151
class UnknownKeyTargetError(UnknownBaseError):
152
"""Helper exception type to handle unknown key targets."""
153
154
def __init__(self, value):
155
UnknownBaseError.__init__(self, value)
156
self.category = 'key target'
157
158
159
class UnknownVersionError(UnknownBaseError):
160
"""Helper exception type to handle unknown specified versions."""
161
162
def __init__(self, value):
163
UnknownBaseError.__init__(self, value)
164
self.category = 'version'
165
166
167
class UnknownNameConstraintsSpecificationError(UnknownBaseError):
168
"""Helper exception type to handle unknown specified
169
nameConstraints."""
170
171
def __init__(self, value):
172
UnknownBaseError.__init__(self, value)
173
self.category = 'nameConstraints specification'
174
175
176
class UnknownDNTypeError(UnknownBaseError):
177
"""Helper exception type to handle unknown DN types."""
178
179
def __init__(self, value):
180
UnknownBaseError.__init__(self, value)
181
self.category = 'DN'
182
183
184
class UnknownNSCertTypeError(UnknownBaseError):
185
"""Helper exception type to handle unknown nsCertType types."""
186
187
def __init__(self, value):
188
UnknownBaseError.__init__(self, value)
189
self.category = 'nsCertType'
190
191
192
class UnknownTLSFeature(UnknownBaseError):
193
"""Helper exception type to handle unknown TLS Features."""
194
195
def __init__(self, value):
196
UnknownBaseError.__init__(self, value)
197
self.category = 'TLSFeature'
198
199
200
class UnknownDelegatedCredentialError(UnknownBaseError):
201
"""Helper exception type to handle unknown Delegated Credential args."""
202
203
def __init__(self, value):
204
UnknownBaseError.__init__(self, value)
205
self.category = 'delegatedCredential'
206
207
208
class InvalidSCTSpecification(Error):
209
"""Helper exception type to handle invalid SCT specifications."""
210
211
def __init__(self, value):
212
super(InvalidSCTSpecification, self).__init__()
213
self.value = value
214
215
def __str__(self):
216
return repr('invalid SCT specification "{}"' % self.value)
217
218
219
class InvalidSerialNumber(Error):
220
"""Exception type to handle invalid serial numbers."""
221
222
def __init__(self, value):
223
super(InvalidSerialNumber, self).__init__()
224
self.value = value
225
226
def __str__(self):
227
return repr(self.value)
228
229
230
def getASN1Tag(asn1Type):
231
"""Helper function for returning the base tag value of a given
232
type from the pyasn1 package"""
233
return asn1Type.tagSet.baseTag.tagId
234
235
236
def stringToAccessDescription(string):
237
"""Helper function that takes a string representing a URI
238
presumably identifying an OCSP authority information access
239
location. Returns an AccessDescription usable by pyasn1."""
240
accessMethod = rfc2459.id_ad_ocsp
241
accessLocation = rfc2459.GeneralName()
242
accessLocation['uniformResourceIdentifier'] = string
243
sequence = univ.Sequence()
244
sequence.setComponentByPosition(0, accessMethod)
245
sequence.setComponentByPosition(1, accessLocation)
246
return sequence
247
248
249
def stringToDN(string, tag=None):
250
"""Takes a string representing a distinguished name or directory
251
name and returns a Name for use by pyasn1. See the documentation
252
for the issuer and subject fields for more details. Takes an
253
optional implicit tag in cases where the Name needs to be tagged
254
differently."""
255
if string and '/' not in string:
256
string = '/CN=%s' % string
257
rdns = rfc2459.RDNSequence()
258
pattern = '/(C|ST|L|O|OU|CN|emailAddress)='
259
split = re.split(pattern, string)
260
# split should now be [[encoding], <type>, <value>, <type>, <value>, ...]
261
if split[0]:
262
encoding = split[0]
263
else:
264
encoding = 'utf8String'
265
for pos, (nameType, value) in enumerate(zip(split[1::2], split[2::2])):
266
ava = rfc2459.AttributeTypeAndValue()
267
if nameType == 'C':
268
ava['type'] = rfc2459.id_at_countryName
269
nameComponent = rfc2459.X520countryName(value)
270
elif nameType == 'ST':
271
ava['type'] = rfc2459.id_at_stateOrProvinceName
272
nameComponent = rfc2459.X520StateOrProvinceName()
273
elif nameType == 'L':
274
ava['type'] = rfc2459.id_at_localityName
275
nameComponent = rfc2459.X520LocalityName()
276
elif nameType == 'O':
277
ava['type'] = rfc2459.id_at_organizationName
278
nameComponent = rfc2459.X520OrganizationName()
279
elif nameType == 'OU':
280
ava['type'] = rfc2459.id_at_organizationalUnitName
281
nameComponent = rfc2459.X520OrganizationalUnitName()
282
elif nameType == 'CN':
283
ava['type'] = rfc2459.id_at_commonName
284
nameComponent = rfc2459.X520CommonName()
285
elif nameType == 'emailAddress':
286
ava['type'] = rfc2459.emailAddress
287
nameComponent = rfc2459.Pkcs9email(value)
288
else:
289
raise UnknownDNTypeError(nameType)
290
if not nameType == 'C' and not nameType == 'emailAddress':
291
# The value may have things like '\0' (i.e. a slash followed by
292
# the number zero) that have to be decoded into the resulting
293
# '\x00' (i.e. a byte with value zero).
294
nameComponent[encoding] = value.decode(encoding='string_escape')
295
ava['value'] = nameComponent
296
rdn = rfc2459.RelativeDistinguishedName()
297
rdn.setComponentByPosition(0, ava)
298
rdns.setComponentByPosition(pos, rdn)
299
if tag:
300
name = rfc2459.Name().subtype(implicitTag=tag)
301
else:
302
name = rfc2459.Name()
303
name.setComponentByPosition(0, rdns)
304
return name
305
306
307
def stringToAlgorithmIdentifiers(string):
308
"""Helper function that converts a description of an algorithm
309
to a representation usable by the pyasn1 package and a hash
310
algorithm constant for use by pykey."""
311
algorithmIdentifier = rfc2459.AlgorithmIdentifier()
312
algorithmType = None
313
algorithm = None
314
# We add Null parameters for RSA only
315
addParameters = False
316
if string == 'sha1WithRSAEncryption':
317
algorithmType = pykey.HASH_SHA1
318
algorithm = rfc2459.sha1WithRSAEncryption
319
addParameters = True
320
elif string == 'sha256WithRSAEncryption':
321
algorithmType = pykey.HASH_SHA256
322
algorithm = univ.ObjectIdentifier('1.2.840.113549.1.1.11')
323
addParameters = True
324
elif string == 'md5WithRSAEncryption':
325
algorithmType = pykey.HASH_MD5
326
algorithm = rfc2459.md5WithRSAEncryption
327
addParameters = True
328
elif string == 'ecdsaWithSHA256':
329
algorithmType = pykey.HASH_SHA256
330
algorithm = univ.ObjectIdentifier('1.2.840.10045.4.3.2')
331
elif string == 'ecdsaWithSHA384':
332
algorithmType = pykey.HASH_SHA384
333
algorithm = univ.ObjectIdentifier('1.2.840.10045.4.3.3')
334
elif string == 'ecdsaWithSHA512':
335
algorithmType = pykey.HASH_SHA512
336
algorithm = univ.ObjectIdentifier('1.2.840.10045.4.3.4')
337
else:
338
raise UnknownAlgorithmTypeError(string)
339
algorithmIdentifier['algorithm'] = algorithm
340
if addParameters:
341
# Directly setting parameters to univ.Null doesn't currently work.
342
nullEncapsulated = encoder.encode(univ.Null())
343
algorithmIdentifier['parameters'] = univ.Any(nullEncapsulated)
344
return (algorithmIdentifier, algorithmType)
345
346
347
def datetimeToTime(dt):
348
"""Takes a datetime object and returns an rfc2459.Time object with
349
that time as its value as a GeneralizedTime"""
350
time = rfc2459.Time()
351
time['generalTime'] = useful.GeneralizedTime(dt.strftime('%Y%m%d%H%M%SZ'))
352
return time
353
354
355
def serialBytesToString(serialBytes):
356
"""Takes a list of integers in the interval [0, 255] and returns
357
the corresponding serial number string."""
358
serialBytesLen = len(serialBytes)
359
if serialBytesLen > 127:
360
raise InvalidSerialNumber("{} bytes is too long".format(serialBytesLen))
361
# Prepend the ASN.1 INTEGER tag and length bytes.
362
stringBytes = [getASN1Tag(univ.Integer), serialBytesLen] + serialBytes
363
return ''.join(chr(b) for b in stringBytes)
364
365
366
class Certificate(object):
367
"""Utility class for reading a certificate specification and
368
generating a signed x509 certificate"""
369
370
def __init__(self, paramStream):
371
self.versionValue = 2 # a value of 2 is X509v3
372
self.signature = 'sha256WithRSAEncryption'
373
self.issuer = 'Default Issuer'
374
actualNow = datetime.datetime.utcnow()
375
self.now = datetime.datetime.strptime(str(actualNow.year), '%Y')
376
aYearAndAWhile = datetime.timedelta(days=400)
377
self.notBefore = self.now - aYearAndAWhile
378
self.notAfter = self.now + aYearAndAWhile
379
self.subject = 'Default Subject'
380
self.extensions = None
381
# The serial number can be automatically generated from the
382
# certificate specification. We need this value to depend in
383
# part of what extensions are present. self.extensions are
384
# pyasn1 objects. Depending on the string representation of
385
# these objects can cause the resulting serial number to change
386
# unexpectedly, so instead we depend on the original string
387
# representation of the extensions as specified.
388
self.extensionLines = None
389
self.savedEmbeddedSCTListData = None
390
self.subjectKey = pykey.keyFromSpecification('default')
391
self.issuerKey = pykey.keyFromSpecification('default')
392
self.serialNumber = None
393
self.decodeParams(paramStream)
394
# If a serial number wasn't specified, generate one based on
395
# the certificate contents.
396
if not self.serialNumber:
397
self.serialNumber = self.generateSerialNumber()
398
# This has to be last because the SCT signature depends on the
399
# contents of the certificate.
400
if self.savedEmbeddedSCTListData:
401
self.addEmbeddedSCTListData()
402
403
def generateSerialNumber(self):
404
"""Generates a serial number for this certificate based on its
405
contents. Intended to be reproducible for compatibility with
406
the build system on OS X (see the comment above main, later in
407
this file)."""
408
hasher = hashlib.sha256()
409
hasher.update(str(self.versionValue))
410
hasher.update(self.signature)
411
hasher.update(self.issuer)
412
hasher.update(str(self.notBefore))
413
hasher.update(str(self.notAfter))
414
hasher.update(self.subject)
415
if self.extensionLines:
416
for extensionLine in self.extensionLines:
417
hasher.update(extensionLine)
418
if self.savedEmbeddedSCTListData:
419
# savedEmbeddedSCTListData is
420
# (embeddedSCTListSpecification, critical), where |critical|
421
# may be None
422
hasher.update(self.savedEmbeddedSCTListData[0])
423
if (self.savedEmbeddedSCTListData[1]):
424
hasher.update(self.savedEmbeddedSCTListData[1])
425
serialBytes = [ord(c) for c in hasher.digest()[:20]]
426
# Ensure that the most significant bit isn't set (which would
427
# indicate a negative number, which isn't valid for serial
428
# numbers).
429
serialBytes[0] &= 0x7f
430
# Also ensure that the least significant bit on the most
431
# significant byte is set (to prevent a leading zero byte,
432
# which also wouldn't be valid).
433
serialBytes[0] |= 0x01
434
return serialBytesToString(serialBytes)
435
436
def decodeParams(self, paramStream):
437
for line in paramStream.readlines():
438
self.decodeParam(line.strip())
439
440
def decodeParam(self, line):
441
param = line.split(':')[0]
442
value = ':'.join(line.split(':')[1:])
443
if param == 'version':
444
self.setVersion(value)
445
elif param == 'subject':
446
self.subject = value
447
elif param == 'issuer':
448
self.issuer = value
449
elif param == 'validity':
450
self.decodeValidity(value)
451
elif param == 'extension':
452
self.decodeExtension(value)
453
elif param == 'issuerKey':
454
self.setupKey('issuer', value)
455
elif param == 'subjectKey':
456
self.setupKey('subject', value)
457
elif param == 'signature':
458
self.signature = value
459
elif param == 'serialNumber':
460
serialNumber = int(value)
461
# Ensure only serial numbers that conform to the rules listed in
462
# generateSerialNumber() are permitted.
463
if serialNumber < 1 or serialNumber > 127:
464
raise InvalidSerialNumber(value)
465
self.serialNumber = serialBytesToString([serialNumber])
466
else:
467
raise UnknownParameterTypeError(param)
468
469
def setVersion(self, version):
470
intVersion = int(version)
471
if intVersion >= 1 and intVersion <= 4:
472
self.versionValue = intVersion - 1
473
else:
474
raise UnknownVersionError(version)
475
476
def decodeValidity(self, duration):
477
match = re.search('([0-9]{8})-([0-9]{8})', duration)
478
if match:
479
self.notBefore = datetime.datetime.strptime(match.group(1), '%Y%m%d')
480
self.notAfter = datetime.datetime.strptime(match.group(2), '%Y%m%d')
481
else:
482
delta = datetime.timedelta(days=(int(duration) / 2))
483
self.notBefore = self.now - delta
484
self.notAfter = self.now + delta
485
486
def decodeExtension(self, extension):
487
match = re.search(r'([a-zA-Z]+)(\[critical\])?:(.*)', extension)
488
if not match:
489
raise UnknownExtensionTypeError(extension)
490
extensionType = match.group(1)
491
critical = match.group(2)
492
value = match.group(3)
493
if extensionType == 'basicConstraints':
494
self.addBasicConstraints(value, critical)
495
elif extensionType == 'keyUsage':
496
self.addKeyUsage(value, critical)
497
elif extensionType == 'extKeyUsage':
498
self.addExtKeyUsage(value, critical)
499
elif extensionType == 'subjectAlternativeName':
500
self.addSubjectAlternativeName(value, critical)
501
elif extensionType == 'authorityInformationAccess':
502
self.addAuthorityInformationAccess(value, critical)
503
elif extensionType == 'certificatePolicies':
504
self.addCertificatePolicies(value, critical)
505
elif extensionType == 'nameConstraints':
506
self.addNameConstraints(value, critical)
507
elif extensionType == 'nsCertType':
508
self.addNSCertType(value, critical)
509
elif extensionType == 'TLSFeature':
510
self.addTLSFeature(value, critical)
511
elif extensionType == 'embeddedSCTList':
512
self.savedEmbeddedSCTListData = (value, critical)
513
elif extensionType == 'delegationUsage':
514
self.addDelegationUsage(critical)
515
else:
516
raise UnknownExtensionTypeError(extensionType)
517
518
if extensionType != 'embeddedSCTList':
519
if not self.extensionLines:
520
self.extensionLines = []
521
self.extensionLines.append(extension)
522
523
def setupKey(self, subjectOrIssuer, value):
524
if subjectOrIssuer == 'subject':
525
self.subjectKey = pykey.keyFromSpecification(value)
526
elif subjectOrIssuer == 'issuer':
527
self.issuerKey = pykey.keyFromSpecification(value)
528
else:
529
raise UnknownKeyTargetError(subjectOrIssuer)
530
531
def addExtension(self, extensionType, extensionValue, critical):
532
if not self.extensions:
533
self.extensions = []
534
encapsulated = univ.OctetString(encoder.encode(extensionValue))
535
extension = rfc2459.Extension()
536
extension['extnID'] = extensionType
537
# critical is either the string '[critical]' or None.
538
# We only care whether or not it is truthy.
539
if critical:
540
extension['critical'] = True
541
extension['extnValue'] = encapsulated
542
self.extensions.append(extension)
543
544
def addBasicConstraints(self, basicConstraints, critical):
545
cA = basicConstraints.split(',')[0]
546
pathLenConstraint = basicConstraints.split(',')[1]
547
basicConstraintsExtension = rfc2459.BasicConstraints()
548
basicConstraintsExtension['cA'] = cA == 'cA'
549
if pathLenConstraint:
550
pathLenConstraintValue = \
551
univ.Integer(int(pathLenConstraint)).subtype(
552
subtypeSpec=constraint.ValueRangeConstraint(0, float('inf')))
553
basicConstraintsExtension['pathLenConstraint'] = pathLenConstraintValue
554
self.addExtension(rfc2459.id_ce_basicConstraints, basicConstraintsExtension, critical)
555
556
def addKeyUsage(self, keyUsage, critical):
557
keyUsageExtension = rfc2459.KeyUsage(keyUsage)
558
self.addExtension(rfc2459.id_ce_keyUsage, keyUsageExtension, critical)
559
560
def keyPurposeToOID(self, keyPurpose):
561
if keyPurpose == 'serverAuth':
562
return rfc2459.id_kp_serverAuth
563
if keyPurpose == 'clientAuth':
564
return rfc2459.id_kp_clientAuth
565
if keyPurpose == 'codeSigning':
566
return rfc2459.id_kp_codeSigning
567
if keyPurpose == 'emailProtection':
568
return rfc2459.id_kp_emailProtection
569
if keyPurpose == 'nsSGC':
570
return univ.ObjectIdentifier('2.16.840.1.113730.4.1')
571
if keyPurpose == 'OCSPSigning':
572
return univ.ObjectIdentifier('1.3.6.1.5.5.7.3.9')
573
if keyPurpose == 'timeStamping':
574
return rfc2459.id_kp_timeStamping
575
raise UnknownKeyPurposeTypeError(keyPurpose)
576
577
def addExtKeyUsage(self, extKeyUsage, critical):
578
extKeyUsageExtension = rfc2459.ExtKeyUsageSyntax()
579
for count, keyPurpose in enumerate(extKeyUsage.split(',')):
580
extKeyUsageExtension.setComponentByPosition(count, self.keyPurposeToOID(keyPurpose))
581
self.addExtension(rfc2459.id_ce_extKeyUsage, extKeyUsageExtension, critical)
582
583
def addSubjectAlternativeName(self, names, critical):
584
IPV4_PREFIX = 'ip4:'
585
586
subjectAlternativeName = rfc2459.SubjectAltName()
587
for count, name in enumerate(names.split(',')):
588
generalName = rfc2459.GeneralName()
589
if '/' in name:
590
directoryName = stringToDN(name,
591
tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4))
592
generalName['directoryName'] = directoryName
593
elif '@' in name:
594
generalName['rfc822Name'] = name
595
elif name.startswith(IPV4_PREFIX):
596
generalName['iPAddress'] = socket.inet_pton(socket.AF_INET, name[len(IPV4_PREFIX):])
597
else:
598
# The string may have things like '\0' (i.e. a slash
599
# followed by the number zero) that have to be decoded into
600
# the resulting '\x00' (i.e. a byte with value zero).
601
generalName['dNSName'] = name.decode(encoding='string_escape')
602
subjectAlternativeName.setComponentByPosition(count, generalName)
603
self.addExtension(rfc2459.id_ce_subjectAltName, subjectAlternativeName, critical)
604
605
def addAuthorityInformationAccess(self, ocspURI, critical):
606
sequence = univ.Sequence()
607
accessDescription = stringToAccessDescription(ocspURI)
608
sequence.setComponentByPosition(0, accessDescription)
609
self.addExtension(rfc2459.id_pe_authorityInfoAccess, sequence, critical)
610
611
def addCertificatePolicies(self, policyOIDs, critical):
612
policies = rfc2459.CertificatePolicies()
613
for pos, policyOID in enumerate(policyOIDs.split(',')):
614
if policyOID == 'any':
615
policyOID = '2.5.29.32.0'
616
policy = rfc2459.PolicyInformation()
617
policyIdentifier = rfc2459.CertPolicyId(policyOID)
618
policy['policyIdentifier'] = policyIdentifier
619
policies.setComponentByPosition(pos, policy)
620
self.addExtension(rfc2459.id_ce_certificatePolicies, policies, critical)
621
622
def addNameConstraints(self, constraints, critical):
623
nameConstraints = rfc2459.NameConstraints()
624
if constraints.startswith('permitted:'):
625
(subtreesType, subtreesTag) = ('permittedSubtrees', 0)
626
elif constraints.startswith('excluded:'):
627
(subtreesType, subtreesTag) = ('excludedSubtrees', 1)
628
else:
629
raise UnknownNameConstraintsSpecificationError(constraints)
630
generalSubtrees = rfc2459.GeneralSubtrees().subtype(
631
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, subtreesTag))
632
subtrees = constraints[(constraints.find(':') + 1):]
633
for pos, name in enumerate(subtrees.split(',')):
634
generalName = rfc2459.GeneralName()
635
if '/' in name:
636
directoryName = stringToDN(name,
637
tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4))
638
generalName['directoryName'] = directoryName
639
else:
640
generalName['dNSName'] = name
641
generalSubtree = rfc2459.GeneralSubtree()
642
generalSubtree['base'] = generalName
643
generalSubtrees.setComponentByPosition(pos, generalSubtree)
644
nameConstraints[subtreesType] = generalSubtrees
645
self.addExtension(rfc2459.id_ce_nameConstraints, nameConstraints, critical)
646
647
def addNSCertType(self, certType, critical):
648
if certType != 'sslServer':
649
raise UnknownNSCertTypeError(certType)
650
self.addExtension(univ.ObjectIdentifier('2.16.840.1.113730.1.1'), univ.BitString("'01'B"),
651
critical)
652
653
def addDelegationUsage(self, critical):
654
if critical:
655
raise UnknownDelegatedCredentialError(critical)
656
self.addExtension(univ.ObjectIdentifier('1.3.6.1.4.1.44363.44'), univ.Null(),
657
critical)
658
659
def addTLSFeature(self, features, critical):
660
namedFeatures = {'OCSPMustStaple': 5}
661
featureList = [f.strip() for f in features.split(',')]
662
sequence = univ.Sequence()
663
for feature in featureList:
664
featureValue = 0
665
try:
666
featureValue = int(feature)
667
except ValueError:
668
try:
669
featureValue = namedFeatures[feature]
670
except Exception:
671
raise UnknownTLSFeature(feature)
672
sequence.setComponentByPosition(len(sequence),
673
univ.Integer(featureValue))
674
self.addExtension(univ.ObjectIdentifier('1.3.6.1.5.5.7.1.24'), sequence,
675
critical)
676
677
def addEmbeddedSCTListData(self):
678
(scts, critical) = self.savedEmbeddedSCTListData
679
encodedSCTs = []
680
for sctSpec in scts.split(','):
681
match = re.search('(\w+):(\d{8})', sctSpec)
682
if not match:
683
raise InvalidSCTSpecification(sctSpec)
684
keySpec = match.group(1)
685
key = pykey.keyFromSpecification(keySpec)
686
time = datetime.datetime.strptime(match.group(2), '%Y%m%d')
687
tbsCertificate = self.getTBSCertificate()
688
tbsDER = encoder.encode(tbsCertificate)
689
sct = pyct.SCT(key, time, tbsDER, self.issuerKey)
690
signed = sct.signAndEncode()
691
lengthPrefix = pack('!H', len(signed))
692
encodedSCTs.append(lengthPrefix + signed)
693
encodedSCTBytes = "".join(encodedSCTs)
694
lengthPrefix = pack('!H', len(encodedSCTBytes))
695
extensionBytes = lengthPrefix + encodedSCTBytes
696
self.addExtension(univ.ObjectIdentifier('1.3.6.1.4.1.11129.2.4.2'),
697
univ.OctetString(extensionBytes), critical)
698
699
def getVersion(self):
700
return rfc2459.Version(self.versionValue).subtype(
701
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))
702
703
def getSerialNumber(self):
704
return decoder.decode(self.serialNumber)[0]
705
706
def getIssuer(self):
707
return stringToDN(self.issuer)
708
709
def getValidity(self):
710
validity = rfc2459.Validity()
711
validity['notBefore'] = self.getNotBefore()
712
validity['notAfter'] = self.getNotAfter()
713
return validity
714
715
def getNotBefore(self):
716
return datetimeToTime(self.notBefore)
717
718
def getNotAfter(self):
719
return datetimeToTime(self.notAfter)
720
721
def getSubject(self):
722
return stringToDN(self.subject)
723
724
def getTBSCertificate(self):
725
(signatureOID, _) = stringToAlgorithmIdentifiers(self.signature)
726
tbsCertificate = rfc2459.TBSCertificate()
727
tbsCertificate['version'] = self.getVersion()
728
tbsCertificate['serialNumber'] = self.getSerialNumber()
729
tbsCertificate['signature'] = signatureOID
730
tbsCertificate['issuer'] = self.getIssuer()
731
tbsCertificate['validity'] = self.getValidity()
732
tbsCertificate['subject'] = self.getSubject()
733
tbsCertificate['subjectPublicKeyInfo'] = self.subjectKey.asSubjectPublicKeyInfo()
734
if self.extensions:
735
extensions = rfc2459.Extensions().subtype(
736
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3))
737
for count, extension in enumerate(self.extensions):
738
extensions.setComponentByPosition(count, extension)
739
tbsCertificate['extensions'] = extensions
740
return tbsCertificate
741
742
def toDER(self):
743
(signatureOID, hashAlgorithm) = stringToAlgorithmIdentifiers(self.signature)
744
certificate = rfc2459.Certificate()
745
tbsCertificate = self.getTBSCertificate()
746
certificate['tbsCertificate'] = tbsCertificate
747
certificate['signatureAlgorithm'] = signatureOID
748
tbsDER = encoder.encode(tbsCertificate)
749
certificate['signatureValue'] = self.issuerKey.sign(tbsDER, hashAlgorithm)
750
return encoder.encode(certificate)
751
752
def toPEM(self):
753
output = '-----BEGIN CERTIFICATE-----'
754
der = self.toDER()
755
b64 = base64.b64encode(der)
756
while b64:
757
output += '\n' + b64[:64]
758
b64 = b64[64:]
759
output += '\n-----END CERTIFICATE-----'
760
return output
761
762
763
# The build harness will call this function with an output
764
# file-like object and a path to a file containing a
765
# specification. This will read the specification and output
766
# the certificate as PEM.
767
# This utility tries as hard as possible to ensure that two
768
# runs with the same input will have the same output. This is
769
# particularly important when building on OS X, where we
770
# generate everything twice for unified builds. During the
771
# unification step, if any pair of input files differ, the build
772
# system throws an error.
773
# The one concrete failure mode is if one run happens before
774
# midnight on New Year's Eve and the next run happens after
775
# midnight.
776
def main(output, inputPath):
777
with open(inputPath) as configStream:
778
output.write(Certificate(configStream).toPEM() + '\n')
779
780
781
# When run as a standalone program, this will read a specification from
782
# stdin and output the certificate as PEM to stdout.
783
if __name__ == '__main__':
784
print(Certificate(sys.stdin).toPEM())