# Copyright (C) Ivan Kravets # See LICENSE for details. from email.utils import parsedate_tz from math import ceil from os.path import getsize, join from subprocess import check_output from time import mktime from click import progressbar from requests import get from platformio.exception import (FDSHASumMismatch, FDSizeMismatch, FDUnrecognizedStatusCode) from platformio.util import change_filemtime class FileDownloader(object): CHUNK_SIZE = 1024 def __init__(self, url, dest_dir=None): self._url = url self._fname = url.split("/")[-1] self._destination = self._fname if dest_dir: self.set_destination(join(dest_dir, self._fname)) self._progressbar = None self._request = get(url, stream=True) if self._request.status_code != 200: raise FDUnrecognizedStatusCode(self._request.status_code, url) def set_destination(self, destination): self._destination = destination def get_filepath(self): return self._destination def get_lmtime(self): return self._request.headers['last-modified'] def get_size(self): return int(self._request.headers['content-length']) def start(self): itercontent = self._request.iter_content(chunk_size=self.CHUNK_SIZE) f = open(self._destination, "wb") chunks = int(ceil(self.get_size() / float(self.CHUNK_SIZE))) with progressbar(length=chunks, label="Downloading") as pb: for _ in pb: f.write(next(itercontent)) f.close() self._request.close() self._preserve_filemtime(self.get_lmtime()) def verify(self, sha1=None): _dlsize = getsize(self._destination) if _dlsize != self.get_size(): raise FDSizeMismatch(_dlsize, self._fname, self.get_size()) if not sha1: return dlsha1 = None try: dlsha1 = check_output(["sha1sum", self._destination]) except OSError: try: dlsha1 = check_output(["shasum", "-a", "1", self._destination]) except OSError: pass if dlsha1: dlsha1 = dlsha1[1:41] if dlsha1.startswith("\\") else dlsha1[:40] if sha1 != dlsha1: raise FDSHASumMismatch(dlsha1, self._fname, sha1) def _preserve_filemtime(self, lmdate): timedata = parsedate_tz(lmdate) lmtime = mktime(timedata[:9]) change_filemtime(self._destination, lmtime) def __del__(self): self._request.close()