Fixed security issue when extracting items from TAR archive // Resolve #2995

This commit is contained in:
Ivan Kravets
2019-10-24 14:55:45 +03:00
parent 334d50c367
commit 798b12ce7b
2 changed files with 42 additions and 11 deletions

View File

@ -26,6 +26,7 @@ PlatformIO Core 4.0
* Fixed an issue when configuration file options partly ignored when using custom ``--project-conf`` (`issue #3034 <https://github.com/platformio/platformio-core/issues/3034>`_)
* Fixed an issue when installing a package using custom Git tag and submodules were not updated correctly (`issue #3060 <https://github.com/platformio/platformio-core/issues/3060>`_)
* Fixed an issue with linking process when ``$LDSCRIPT`` contains a space in path
* Fixed security issue when extracting items from TAR archive (`issue #2995 <https://github.com/platformio/platformio-core/issues/2995>`_)
4.0.3 (2019-08-30)
~~~~~~~~~~~~~~~~~~

View File

@ -12,8 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from os import chmod
from os.path import exists, join
import os
from tarfile import open as tarfile_open
from time import mktime
from zipfile import ZipFile
@ -33,6 +32,9 @@ class ArchiveBase(object):
def get_item_filename(self, item):
raise NotImplementedError()
def is_link(self, item):
raise NotImplementedError()
def extract_item(self, item, dest_dir):
self._afo.extract(item, dest_dir)
self.after_extract(item, dest_dir)
@ -55,9 +57,36 @@ class TARArchive(ArchiveBase):
return item.name
@staticmethod
def islink(item):
def is_link(item):
return item.islnk() or item.issym()
@staticmethod
def resolve_path(path):
return os.path.realpath(os.path.abspath(path))
def is_bad_path(self, path, base):
return not self.resolve_path(os.path.join(base, path)).startswith(base)
def is_bad_link(self, tarinfo, base):
return self.is_bad_path(
tarinfo.linkname,
base=self.resolve_path(os.path.join(base, os.path.dirname(tarinfo.name))),
)
def extract_item(self, item, dest_dir):
bad_conds = [
self.is_link(item) and self.is_bad_link(item, dest_dir),
not self.is_link(item) and self.is_bad_path(item.name, dest_dir),
]
if not any(bad_conds):
super(TARArchive, self).extract_item(item, dest_dir)
else:
click.secho(
"Blocked insecure item `%s` from archive" % item.name,
fg="red",
err=True,
)
class ZIPArchive(ArchiveBase):
def __init__(self, archpath):
@ -67,24 +96,25 @@ class ZIPArchive(ArchiveBase):
def preserve_permissions(item, dest_dir):
attrs = item.external_attr >> 16
if attrs:
chmod(join(dest_dir, item.filename), attrs)
os.chmod(os.path.join(dest_dir, item.filename), attrs)
@staticmethod
def preserve_mtime(item, dest_dir):
util.change_filemtime(
join(dest_dir, item.filename),
os.path.join(dest_dir, item.filename),
mktime(tuple(item.date_time) + tuple([0, 0, 0])),
)
@staticmethod
def is_link(_):
return False
def get_items(self):
return self._afo.infolist()
def get_item_filename(self, item):
return item.filename
def islink(self, item):
raise NotImplementedError()
def after_extract(self, item, dest_dir):
self.preserve_permissions(item, dest_dir)
self.preserve_mtime(item, dest_dir)
@ -96,7 +126,7 @@ class FileUnpacker(object):
self._unpacker = None
def __enter__(self):
if self.archpath.lower().endswith((".gz", ".bz2")):
if self.archpath.lower().endswith((".gz", ".bz2", ".tar")):
self._unpacker = TARArchive(self.archpath)
elif self.archpath.lower().endswith(".zip"):
self._unpacker = ZIPArchive(self.archpath)
@ -126,9 +156,9 @@ class FileUnpacker(object):
# check on disk
for item in self._unpacker.get_items():
filename = self._unpacker.get_item_filename(item)
item_path = join(dest_dir, filename)
item_path = os.path.join(dest_dir, filename)
try:
if not self._unpacker.islink(item) and not exists(item_path):
if not self._unpacker.is_link(item) and not os.path.exists(item_path):
raise exception.ExtractArchiveItemError(filename, dest_dir)
except NotImplementedError:
pass