mirror of
				https://github.com/espressif/esp-idf.git
				synced 2025-11-04 00:51:42 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			228 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			228 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python
 | 
						|
#
 | 
						|
# ESP32 x509 certificate bundle generation utility
 | 
						|
#
 | 
						|
# Converts PEM and DER certificates to a custom bundle format which stores just the
 | 
						|
# subject name and public key to reduce space
 | 
						|
#
 | 
						|
# The bundle will have the format: number of certificates; crt 1 subject name length; crt 1 public key length;
 | 
						|
# crt 1 subject name; crt 1 public key; crt 2...
 | 
						|
#
 | 
						|
# Copyright 2018-2019 Espressif Systems (Shanghai) PTE LTD
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http:#www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
from __future__ import with_statement
 | 
						|
 | 
						|
import argparse
 | 
						|
import csv
 | 
						|
import os
 | 
						|
import re
 | 
						|
import struct
 | 
						|
import sys
 | 
						|
from io import open
 | 
						|
 | 
						|
try:
 | 
						|
    from cryptography import x509
 | 
						|
    from cryptography.hazmat.backends import default_backend
 | 
						|
    from cryptography.hazmat.primitives import serialization
 | 
						|
except ImportError:
 | 
						|
    print('The cryptography package is not installed.'
 | 
						|
          'Please refer to the Get Started section of the ESP-IDF Programming Guide for '
 | 
						|
          'setting up the required packages.')
 | 
						|
    raise
 | 
						|
 | 
						|
ca_bundle_bin_file = 'x509_crt_bundle'
 | 
						|
 | 
						|
quiet = False
 | 
						|
 | 
						|
 | 
						|
def status(msg):
 | 
						|
    """ Print status message to stderr """
 | 
						|
    if not quiet:
 | 
						|
        critical(msg)
 | 
						|
 | 
						|
 | 
						|
def critical(msg):
 | 
						|
    """ Print critical message to stderr """
 | 
						|
    sys.stderr.write('gen_crt_bundle.py: ')
 | 
						|
    sys.stderr.write(msg)
 | 
						|
    sys.stderr.write('\n')
 | 
						|
 | 
						|
 | 
						|
class CertificateBundle:
 | 
						|
    def __init__(self):
 | 
						|
        self.certificates = []
 | 
						|
        self.compressed_crts = []
 | 
						|
 | 
						|
        if os.path.isfile(ca_bundle_bin_file):
 | 
						|
            os.remove(ca_bundle_bin_file)
 | 
						|
 | 
						|
    def add_from_path(self, crts_path):
 | 
						|
 | 
						|
        found = False
 | 
						|
        for file_path in os.listdir(crts_path):
 | 
						|
            found |= self.add_from_file(os.path.join(crts_path, file_path))
 | 
						|
 | 
						|
        if found is False:
 | 
						|
            raise InputError('No valid x509 certificates found in %s' % crts_path)
 | 
						|
 | 
						|
    def add_from_file(self, file_path):
 | 
						|
        try:
 | 
						|
            if file_path.endswith('.pem'):
 | 
						|
                status('Parsing certificates from %s' % file_path)
 | 
						|
                with open(file_path, 'r', encoding='utf-8') as f:
 | 
						|
                    crt_str = f.read()
 | 
						|
                    self.add_from_pem(crt_str)
 | 
						|
                    return True
 | 
						|
 | 
						|
            elif file_path.endswith('.der'):
 | 
						|
                status('Parsing certificates from %s' % file_path)
 | 
						|
                with open(file_path, 'rb') as f:
 | 
						|
                    crt_str = f.read()
 | 
						|
                    self.add_from_der(crt_str)
 | 
						|
                    return True
 | 
						|
 | 
						|
        except ValueError:
 | 
						|
            critical('Invalid certificate in %s' % file_path)
 | 
						|
            raise InputError('Invalid certificate')
 | 
						|
 | 
						|
        return False
 | 
						|
 | 
						|
    def add_from_pem(self, crt_str):
 | 
						|
        """ A single PEM file may have multiple certificates """
 | 
						|
 | 
						|
        crt = ''
 | 
						|
        count = 0
 | 
						|
        start = False
 | 
						|
 | 
						|
        for strg in crt_str.splitlines(True):
 | 
						|
            if strg == '-----BEGIN CERTIFICATE-----\n' and start is False:
 | 
						|
                crt = ''
 | 
						|
                start = True
 | 
						|
            elif strg == '-----END CERTIFICATE-----\n' and start is True:
 | 
						|
                crt += strg + '\n'
 | 
						|
                start = False
 | 
						|
                self.certificates.append(x509.load_pem_x509_certificate(crt.encode(), default_backend()))
 | 
						|
                count += 1
 | 
						|
            if start is True:
 | 
						|
                crt += strg
 | 
						|
 | 
						|
        if(count == 0):
 | 
						|
            raise InputError('No certificate found')
 | 
						|
 | 
						|
        status('Successfully added %d certificates' % count)
 | 
						|
 | 
						|
    def add_from_der(self, crt_str):
 | 
						|
        self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend()))
 | 
						|
        status('Successfully added 1 certificate')
 | 
						|
 | 
						|
    def create_bundle(self):
 | 
						|
        # Sort certificates in order to do binary search when looking up certificates
 | 
						|
        self.certificates = sorted(self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend()))
 | 
						|
 | 
						|
        bundle = struct.pack('>H', len(self.certificates))
 | 
						|
 | 
						|
        for crt in self.certificates:
 | 
						|
            """ Read the public key as DER format """
 | 
						|
            pub_key = crt.public_key()
 | 
						|
            pub_key_der = pub_key.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo)
 | 
						|
 | 
						|
            """ Read the subject name as DER format """
 | 
						|
            sub_name_der = crt.subject.public_bytes(default_backend())
 | 
						|
 | 
						|
            name_len = len(sub_name_der)
 | 
						|
            key_len = len(pub_key_der)
 | 
						|
            len_data = struct.pack('>HH', name_len, key_len)
 | 
						|
 | 
						|
            bundle += len_data
 | 
						|
            bundle += sub_name_der
 | 
						|
            bundle += pub_key_der
 | 
						|
 | 
						|
        return bundle
 | 
						|
 | 
						|
    def add_with_filter(self, crts_path, filter_path):
 | 
						|
 | 
						|
        filter_set = set()
 | 
						|
        with open(filter_path, 'r', encoding='utf-8') as f:
 | 
						|
            csv_reader = csv.reader(f, delimiter=',')
 | 
						|
 | 
						|
            # Skip header
 | 
						|
            next(csv_reader)
 | 
						|
            for row in csv_reader:
 | 
						|
                filter_set.add(row[1])
 | 
						|
 | 
						|
        status('Parsing certificates from %s' % crts_path)
 | 
						|
        crt_str = []
 | 
						|
        with open(crts_path, 'r', encoding='utf-8') as f:
 | 
						|
            crt_str = f.read()
 | 
						|
 | 
						|
            # Split all certs into a list of (name, certificate string) tuples
 | 
						|
            pem_crts = re.findall(r'(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)', crt_str, re.MULTILINE)
 | 
						|
 | 
						|
            filtered_crts = ''
 | 
						|
            for name, crt in pem_crts:
 | 
						|
                if name in filter_set:
 | 
						|
                    filtered_crts += crt
 | 
						|
 | 
						|
        self.add_from_pem(filtered_crts)
 | 
						|
 | 
						|
 | 
						|
class InputError(RuntimeError):
 | 
						|
    def __init__(self, e):
 | 
						|
        super(InputError, self).__init__(e)
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    global quiet
 | 
						|
 | 
						|
    parser = argparse.ArgumentParser(description='ESP-IDF x509 certificate bundle utility')
 | 
						|
 | 
						|
    parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true')
 | 
						|
    parser.add_argument('--input', '-i', nargs='+', required=True,
 | 
						|
                        help='Paths to the custom certificate folders or files to parse, parses all .pem or .der files')
 | 
						|
    parser.add_argument('--filter', '-f', help='Path to CSV-file where the second columns contains the name of the certificates \
 | 
						|
                        that should be included from cacrt_all.pem')
 | 
						|
 | 
						|
    args = parser.parse_args()
 | 
						|
 | 
						|
    quiet = args.quiet
 | 
						|
 | 
						|
    bundle = CertificateBundle()
 | 
						|
 | 
						|
    for path in args.input:
 | 
						|
        if os.path.isfile(path):
 | 
						|
            if os.path.basename(path) == 'cacrt_all.pem' and args.filter:
 | 
						|
                bundle.add_with_filter(path, args.filter)
 | 
						|
            else:
 | 
						|
                bundle.add_from_file(path)
 | 
						|
        elif os.path.isdir(path):
 | 
						|
            bundle.add_from_path(path)
 | 
						|
        else:
 | 
						|
            raise InputError('Invalid --input=%s, is neither file nor folder' % args.input)
 | 
						|
 | 
						|
    status('Successfully added %d certificates in total' % len(bundle.certificates))
 | 
						|
 | 
						|
    crt_bundle = bundle.create_bundle()
 | 
						|
 | 
						|
    with open(ca_bundle_bin_file, 'wb') as f:
 | 
						|
        f.write(crt_bundle)
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    try:
 | 
						|
        main()
 | 
						|
    except InputError as e:
 | 
						|
        print(e)
 | 
						|
        sys.exit(2)
 |