| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  | #!/usr/bin/env python | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # ESP32 x509 certificate bundle generation utility | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Converts PEM and DER certificates to a custom bundle format which stores just the | 
					
						
							|  |  |  | # subject name and public key to reduce space | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # The bundle will have the format: number of certificates; crt 1 subject name length; crt 1 public key length; | 
					
						
							|  |  |  | # crt 1 subject name; crt 1 public key; crt 2... | 
					
						
							|  |  |  | # | 
					
						
							| 
									
										
										
										
											2021-10-29 10:09:53 +05:30
										 |  |  | # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD | 
					
						
							|  |  |  | # SPDX-License-Identifier: Apache-2.0 | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | from __future__ import with_statement | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import argparse | 
					
						
							|  |  |  | import csv | 
					
						
							| 
									
										
										
										
											2021-01-26 10:49:01 +08:00
										 |  |  | import os | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  | import re | 
					
						
							| 
									
										
										
										
											2021-01-26 10:49:01 +08:00
										 |  |  | import struct | 
					
						
							|  |  |  | import sys | 
					
						
							| 
									
										
										
										
											2020-06-08 16:37:28 +08:00
										 |  |  | from io import open | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | try: | 
					
						
							|  |  |  |     from cryptography import x509 | 
					
						
							|  |  |  |     from cryptography.hazmat.backends import default_backend | 
					
						
							|  |  |  |     from cryptography.hazmat.primitives import serialization | 
					
						
							|  |  |  | except ImportError: | 
					
						
							|  |  |  |     print('The cryptography package is not installed.' | 
					
						
							|  |  |  |           'Please refer to the Get Started section of the ESP-IDF Programming Guide for ' | 
					
						
							|  |  |  |           'setting up the required packages.') | 
					
						
							|  |  |  |     raise | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ca_bundle_bin_file = 'x509_crt_bundle' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | quiet = False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def status(msg): | 
					
						
							|  |  |  |     """ Print status message to stderr """ | 
					
						
							|  |  |  |     if not quiet: | 
					
						
							|  |  |  |         critical(msg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def critical(msg): | 
					
						
							|  |  |  |     """ Print critical message to stderr """ | 
					
						
							|  |  |  |     sys.stderr.write('gen_crt_bundle.py: ') | 
					
						
							|  |  |  |     sys.stderr.write(msg) | 
					
						
							|  |  |  |     sys.stderr.write('\n') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class CertificateBundle: | 
					
						
							|  |  |  |     def __init__(self): | 
					
						
							|  |  |  |         self.certificates = [] | 
					
						
							|  |  |  |         self.compressed_crts = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if os.path.isfile(ca_bundle_bin_file): | 
					
						
							|  |  |  |             os.remove(ca_bundle_bin_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_from_path(self, crts_path): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         found = False | 
					
						
							|  |  |  |         for file_path in os.listdir(crts_path): | 
					
						
							|  |  |  |             found |= self.add_from_file(os.path.join(crts_path, file_path)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if found is False: | 
					
						
							|  |  |  |             raise InputError('No valid x509 certificates found in %s' % crts_path) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_from_file(self, file_path): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             if file_path.endswith('.pem'): | 
					
						
							| 
									
										
										
										
											2021-01-26 10:49:01 +08:00
										 |  |  |                 status('Parsing certificates from %s' % file_path) | 
					
						
							| 
									
										
										
										
											2020-06-08 16:37:28 +08:00
										 |  |  |                 with open(file_path, 'r', encoding='utf-8') as f: | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  |                     crt_str = f.read() | 
					
						
							|  |  |  |                     self.add_from_pem(crt_str) | 
					
						
							|  |  |  |                     return True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             elif file_path.endswith('.der'): | 
					
						
							| 
									
										
										
										
											2021-01-26 10:49:01 +08:00
										 |  |  |                 status('Parsing certificates from %s' % file_path) | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  |                 with open(file_path, 'rb') as f: | 
					
						
							|  |  |  |                     crt_str = f.read() | 
					
						
							|  |  |  |                     self.add_from_der(crt_str) | 
					
						
							|  |  |  |                     return True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         except ValueError: | 
					
						
							| 
									
										
										
										
											2021-01-26 10:49:01 +08:00
										 |  |  |             critical('Invalid certificate in %s' % file_path) | 
					
						
							|  |  |  |             raise InputError('Invalid certificate') | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |         return False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_from_pem(self, crt_str): | 
					
						
							|  |  |  |         """ A single PEM file may have multiple certificates """ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         crt = '' | 
					
						
							|  |  |  |         count = 0 | 
					
						
							|  |  |  |         start = False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for strg in crt_str.splitlines(True): | 
					
						
							|  |  |  |             if strg == '-----BEGIN CERTIFICATE-----\n' and start is False: | 
					
						
							|  |  |  |                 crt = '' | 
					
						
							|  |  |  |                 start = True | 
					
						
							|  |  |  |             elif strg == '-----END CERTIFICATE-----\n' and start is True: | 
					
						
							|  |  |  |                 crt += strg + '\n' | 
					
						
							|  |  |  |                 start = False | 
					
						
							|  |  |  |                 self.certificates.append(x509.load_pem_x509_certificate(crt.encode(), default_backend())) | 
					
						
							|  |  |  |                 count += 1 | 
					
						
							|  |  |  |             if start is True: | 
					
						
							|  |  |  |                 crt += strg | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-10 09:01:57 +02:00
										 |  |  |         if count == 0: | 
					
						
							| 
									
										
										
										
											2021-01-26 10:49:01 +08:00
										 |  |  |             raise InputError('No certificate found') | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-26 10:49:01 +08:00
										 |  |  |         status('Successfully added %d certificates' % count) | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def add_from_der(self, crt_str): | 
					
						
							|  |  |  |         self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend())) | 
					
						
							| 
									
										
										
										
											2021-01-26 10:49:01 +08:00
										 |  |  |         status('Successfully added 1 certificate') | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def create_bundle(self): | 
					
						
							|  |  |  |         # Sort certificates in order to do binary search when looking up certificates | 
					
						
							|  |  |  |         self.certificates = sorted(self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend())) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         bundle = struct.pack('>H', len(self.certificates)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for crt in self.certificates: | 
					
						
							|  |  |  |             """ Read the public key as DER format """ | 
					
						
							|  |  |  |             pub_key = crt.public_key() | 
					
						
							|  |  |  |             pub_key_der = pub_key.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             """ Read the subject name as DER format """ | 
					
						
							|  |  |  |             sub_name_der = crt.subject.public_bytes(default_backend()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             name_len = len(sub_name_der) | 
					
						
							|  |  |  |             key_len = len(pub_key_der) | 
					
						
							|  |  |  |             len_data = struct.pack('>HH', name_len, key_len) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             bundle += len_data | 
					
						
							|  |  |  |             bundle += sub_name_der | 
					
						
							|  |  |  |             bundle += pub_key_der | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return bundle | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_with_filter(self, crts_path, filter_path): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         filter_set = set() | 
					
						
							| 
									
										
										
										
											2020-06-08 16:37:28 +08:00
										 |  |  |         with open(filter_path, 'r', encoding='utf-8') as f: | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  |             csv_reader = csv.reader(f, delimiter=',') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Skip header | 
					
						
							|  |  |  |             next(csv_reader) | 
					
						
							|  |  |  |             for row in csv_reader: | 
					
						
							|  |  |  |                 filter_set.add(row[1]) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-26 10:49:01 +08:00
										 |  |  |         status('Parsing certificates from %s' % crts_path) | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  |         crt_str = [] | 
					
						
							| 
									
										
										
										
											2020-06-08 16:37:28 +08:00
										 |  |  |         with open(crts_path, 'r', encoding='utf-8') as f: | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  |             crt_str = f.read() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Split all certs into a list of (name, certificate string) tuples | 
					
						
							|  |  |  |             pem_crts = re.findall(r'(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)', crt_str, re.MULTILINE) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             filtered_crts = '' | 
					
						
							|  |  |  |             for name, crt in pem_crts: | 
					
						
							|  |  |  |                 if name in filter_set: | 
					
						
							|  |  |  |                     filtered_crts += crt | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.add_from_pem(filtered_crts) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class InputError(RuntimeError): | 
					
						
							|  |  |  |     def __init__(self, e): | 
					
						
							|  |  |  |         super(InputError, self).__init__(e) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def main(): | 
					
						
							|  |  |  |     global quiet | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     parser = argparse.ArgumentParser(description='ESP-IDF x509 certificate bundle utility') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true') | 
					
						
							|  |  |  |     parser.add_argument('--input', '-i', nargs='+', required=True, | 
					
						
							|  |  |  |                         help='Paths to the custom certificate folders or files to parse, parses all .pem or .der files') | 
					
						
							|  |  |  |     parser.add_argument('--filter', '-f', help='Path to CSV-file where the second columns contains the name of the certificates \
 | 
					
						
							|  |  |  |                         that should be included from cacrt_all.pem') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     args = parser.parse_args() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     quiet = args.quiet | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     bundle = CertificateBundle() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for path in args.input: | 
					
						
							|  |  |  |         if os.path.isfile(path): | 
					
						
							| 
									
										
										
										
											2021-01-26 10:49:01 +08:00
										 |  |  |             if os.path.basename(path) == 'cacrt_all.pem' and args.filter: | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  |                 bundle.add_with_filter(path, args.filter) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 bundle.add_from_file(path) | 
					
						
							|  |  |  |         elif os.path.isdir(path): | 
					
						
							|  |  |  |             bundle.add_from_path(path) | 
					
						
							|  |  |  |         else: | 
					
						
							| 
									
										
										
										
											2021-01-26 10:49:01 +08:00
										 |  |  |             raise InputError('Invalid --input=%s, is neither file nor folder' % args.input) | 
					
						
							| 
									
										
										
										
											2019-09-29 18:04:34 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     status('Successfully added %d certificates in total' % len(bundle.certificates)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     crt_bundle = bundle.create_bundle() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     with open(ca_bundle_bin_file, 'wb') as f: | 
					
						
							|  |  |  |         f.write(crt_bundle) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         main() | 
					
						
							|  |  |  |     except InputError as e: | 
					
						
							|  |  |  |         print(e) | 
					
						
							|  |  |  |         sys.exit(2) |