# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
+
import copy
import errno
import getopt
import zipfile
import blockimgdiff
-from rangelib import *
+
+from hashlib import sha1 as sha1
try:
- from hashlib import sha1 as sha1
-except ImportError:
- from sha import sha as sha1
+ raw_input
+except NameError:
+ raw_input = input
-# missing in Python 2.4 and before
-if not hasattr(os, "SEEK_SET"):
- os.SEEK_SET = 0
-class Options(object): pass
-OPTIONS = Options()
+def iteritems(obj):
+ if hasattr(obj, 'iteritems'):
+ return obj.iteritems()
+ return obj.items()
-DEFAULT_SEARCH_PATH_BY_PLATFORM = {
- "linux2": "out/host/linux-x86",
- "darwin": "out/host/darwin-x86",
+
+class Options(object):
+ def __init__(self):
+ platform_search_path = {
+ "linux2": "out/host/linux-x86",
+ "darwin": "out/host/darwin-x86",
}
-OPTIONS.search_path = DEFAULT_SEARCH_PATH_BY_PLATFORM.get(sys.platform, None)
-OPTIONS.signapk_path = "framework/signapk.jar" # Relative to search_path
-OPTIONS.extra_signapk_args = []
-OPTIONS.java_path = "java" # Use the one on the path by default.
-OPTIONS.java_args = "-Xmx2048m" # JVM Args
-OPTIONS.public_key_suffix = ".x509.pem"
-OPTIONS.private_key_suffix = ".pk8"
-OPTIONS.verbose = False
-OPTIONS.tempfiles = []
-OPTIONS.device_specific = None
-OPTIONS.extras = {}
-OPTIONS.info_dict = None
+ self.search_path = platform_search_path.get(sys.platform, None)
+ self.signapk_path = "framework/signapk.jar" # Relative to search_path
+ self.signapk_shared_library_path = "lib64" # Relative to search_path
+ self.extra_signapk_args = []
+ self.java_path = "java" # Use the one on the path by default.
+ self.java_args = "-Xmx2048m" # JVM Args
+ self.public_key_suffix = ".x509.pem"
+ self.private_key_suffix = ".pk8"
+ # use otatools built boot_signer by default
+ self.boot_signer_path = "boot_signer"
+ self.boot_signer_args = []
+ self.verity_signer_path = None
+ self.verity_signer_args = []
+ self.verbose = False
+ self.tempfiles = []
+ self.device_specific = None
+ self.extras = {}
+ self.info_dict = None
+ self.source_info_dict = None
+ self.target_info_dict = None
+ self.worker_threads = None
+ # Stash size cannot exceed cache_size * threshold.
+ self.cache_size = None
+ self.stash_threshold = 0.8
+
+
+OPTIONS = Options()
# Values for "certificate" in apkcerts that mean special things.
SPECIAL_CERT_STRINGS = ("PRESIGNED", "EXTERNAL")
-
-class ExternalError(RuntimeError): pass
+class ErrorCode(object):
+ """Define error_codes for failures that happen during the actual
+ update package installation.
+
+ Error codes 0-999 are reserved for failures before the package
+ installation (i.e. low battery, package verification failure).
+ Detailed code in 'bootable/recovery/error_code.h' """
+
+ SYSTEM_VERIFICATION_FAILURE = 1000
+ SYSTEM_UPDATE_FAILURE = 1001
+ SYSTEM_UNEXPECTED_CONTENTS = 1002
+ SYSTEM_NONZERO_CONTENTS = 1003
+ SYSTEM_RECOVER_FAILURE = 1004
+ VENDOR_VERIFICATION_FAILURE = 2000
+ VENDOR_UPDATE_FAILURE = 2001
+ VENDOR_UNEXPECTED_CONTENTS = 2002
+ VENDOR_NONZERO_CONTENTS = 2003
+ VENDOR_RECOVER_FAILURE = 2004
+ OEM_PROP_MISMATCH = 3000
+ FINGERPRINT_MISMATCH = 3001
+ THUMBPRINT_MISMATCH = 3002
+ OLDER_BUILD = 3003
+ DEVICE_MISMATCH = 3004
+ BAD_PATCH_FILE = 3005
+ INSUFFICIENT_CACHE_SPACE = 3006
+ TUNE_PARTITION_FAILURE = 3007
+ APPLY_PATCH_FAILURE = 3008
+
+class ExternalError(RuntimeError):
+ pass
def Run(args, **kwargs):
"""Create and return a subprocess.Popen object, printing the command
line on the terminal if -v was specified."""
if OPTIONS.verbose:
- print " running: ", " ".join(args)
+ print(" running: ", " ".join(args))
return subprocess.Popen(args, **kwargs)
pass
-def LoadInfoDict(input):
+def LoadInfoDict(input_file, input_dir=None):
"""Read and parse the META/misc_info.txt key/value pairs from the
input target files and return a dict."""
def read_helper(fn):
- if isinstance(input, zipfile.ZipFile):
- return input.read(fn)
+ if isinstance(input_file, zipfile.ZipFile):
+ return input_file.read(fn)
else:
- path = os.path.join(input, *fn.split("/"))
+ path = os.path.join(input_file, *fn.split("/"))
try:
with open(path) as f:
return f.read()
- except IOError, e:
+ except IOError as e:
if e.errno == errno.ENOENT:
raise KeyError(fn)
d = {}
if "mkyaffs2_extra_flags" not in d:
try:
- d["mkyaffs2_extra_flags"] = read_helper("META/mkyaffs2-extra-flags.txt").strip()
+ d["mkyaffs2_extra_flags"] = read_helper(
+ "META/mkyaffs2-extra-flags.txt").strip()
except KeyError:
# ok if flags don't exist
pass
if "recovery_api_version" not in d:
try:
- d["recovery_api_version"] = read_helper("META/recovery-api-version.txt").strip()
+ d["recovery_api_version"] = read_helper(
+ "META/recovery-api-version.txt").strip()
except KeyError:
raise ValueError("can't find recovery API version in input target-files")
if "fstab_version" not in d:
d["fstab_version"] = "1"
+ # A few properties are stored as links to the files in the out/ directory.
+ # It works fine with the build system. However, they are no longer available
+ # when (re)generating from target_files zip. If input_dir is not None, we
+ # are doing repacking. Redirect those properties to the actual files in the
+ # unzipped directory.
+ if input_dir is not None:
+ # We carry a copy of file_contexts.bin under META/. If not available,
+ # search BOOT/RAMDISK/. Note that sometimes we may need a different file
+ # to build images than the one running on device, such as when enabling
+ # system_root_image. In that case, we must have the one for image
+ # generation copied to META/.
+ fc_basename = os.path.basename(d.get("selinux_fc", "file_contexts"))
+ fc_config = os.path.join(input_dir, "META", fc_basename)
+ if d.get("system_root_image") == "true":
+ assert os.path.exists(fc_config)
+ if not os.path.exists(fc_config):
+ fc_config = os.path.join(input_dir, "BOOT", "RAMDISK", fc_basename)
+ if not os.path.exists(fc_config):
+ fc_config = None
+
+ if fc_config:
+ d["selinux_fc"] = fc_config
+
+ # Similarly we need to redirect "ramdisk_dir" and "ramdisk_fs_config".
+ if d.get("system_root_image") == "true":
+ d["ramdisk_dir"] = os.path.join(input_dir, "ROOT")
+ d["ramdisk_fs_config"] = os.path.join(
+ input_dir, "META", "root_filesystem_config.txt")
+
+ # Redirect {system,vendor}_base_fs_file.
+ if "system_base_fs_file" in d:
+ basename = os.path.basename(d["system_base_fs_file"])
+ system_base_fs_file = os.path.join(input_dir, "META", basename)
+ if os.path.exists(system_base_fs_file):
+ d["system_base_fs_file"] = system_base_fs_file
+ else:
+ print("Warning: failed to find system base fs file: %s" % (
+ system_base_fs_file,))
+ del d["system_base_fs_file"]
+
+ if "vendor_base_fs_file" in d:
+ basename = os.path.basename(d["vendor_base_fs_file"])
+ vendor_base_fs_file = os.path.join(input_dir, "META", basename)
+ if os.path.exists(vendor_base_fs_file):
+ d["vendor_base_fs_file"] = vendor_base_fs_file
+ else:
+ print("Warning: failed to find vendor base fs file: %s" % (
+ vendor_base_fs_file,))
+ del d["vendor_base_fs_file"]
+
+
+ if "device_type" not in d:
+ d["device_type"] = "MMC"
try:
data = read_helper("META/imagesizes.txt")
for line in data.split("\n"):
- if not line: continue
+ if not line:
+ continue
name, value = line.split(" ", 1)
- if not value: continue
+ if not value:
+ continue
if name == "blocksize":
d[name] = value
else:
makeint("boot_size")
makeint("fstab_version")
- d["fstab"] = LoadRecoveryFSTab(read_helper, d["fstab_version"])
+ if d.get("no_recovery", False) == "true":
+ d["fstab"] = None
+ else:
+ d["fstab"] = LoadRecoveryFSTab(read_helper, d["fstab_version"],
+ d["device_type"], d.get("system_root_image", False))
d["build.prop"] = LoadBuildProp(read_helper)
return d
try:
data = read_helper("SYSTEM/build.prop")
except KeyError:
- print "Warning: could not find SYSTEM/build.prop in %s" % zip
+ print("Warning: could not find SYSTEM/build.prop in %s" % zip)
data = ""
return LoadDictionaryFromLines(data.split("\n"))
d = {}
for line in lines:
line = line.strip()
- if not line or line.startswith("#"): continue
+ if not line or line.startswith("#"):
+ continue
if "=" in line:
name, value = line.split("=", 1)
d[name] = value
return d
-def LoadRecoveryFSTab(read_helper, fstab_version):
+def LoadRecoveryFSTab(read_helper, fstab_version, type, system_root_image=False):
class Partition(object):
- pass
+ def __init__(self, mount_point, fs_type, device, length, device2, context):
+ self.mount_point = mount_point
+ self.fs_type = fs_type
+ self.device = device
+ self.length = length
+ self.device2 = device2
+ self.context = context
try:
data = read_helper("RECOVERY/RAMDISK/etc/recovery.fstab")
except KeyError:
- print "Warning: could not find RECOVERY/RAMDISK/etc/recovery.fstab"
+ print("Warning: could not find RECOVERY/RAMDISK/etc/recovery.fstab")
data = ""
if fstab_version == 1:
d = {}
for line in data.split("\n"):
line = line.strip()
- if not line or line.startswith("#"): continue
+ if not line or line.startswith("#"):
+ continue
pieces = line.split()
- if not (3 <= len(pieces) <= 4):
+ if not 3 <= len(pieces) <= 4:
raise ValueError("malformed recovery.fstab line: \"%s\"" % (line,))
-
- p = Partition()
- p.mount_point = pieces[0]
- p.fs_type = pieces[1]
- p.device = pieces[2]
- p.length = 0
options = None
if len(pieces) >= 4:
if pieces[3].startswith("/"):
- p.device2 = pieces[3]
+ device2 = pieces[3]
if len(pieces) >= 5:
options = pieces[4]
else:
- p.device2 = None
+ device2 = None
options = pieces[3]
else:
- p.device2 = None
+ device2 = None
+ mount_point = pieces[0]
+ length = 0
if options:
options = options.split(",")
for i in options:
if i.startswith("length="):
- p.length = int(i[7:])
+ length = int(i[7:])
else:
- print "%s: unknown option \"%s\"" % (p.mount_point, i)
+ print("%s: unknown option \"%s\"" % (mount_point, i))
- d[p.mount_point] = p
+ if not d.get(mount_point):
+ d[mount_point] = Partition(mount_point=mount_point, fs_type=pieces[1],
+ device=pieces[2], length=length,
+ device2=device2)
elif fstab_version == 2:
d = {}
for line in data.split("\n"):
line = line.strip()
- if not line or line.startswith("#"): continue
+ if not line or line.startswith("#"):
+ continue
+ # <src> <mnt_point> <type> <mnt_flags and options> <fs_mgr_flags>
pieces = line.split()
if len(pieces) != 5:
raise ValueError("malformed recovery.fstab line: \"%s\"" % (line,))
# Ignore entries that are managed by vold
options = pieces[4]
- if "voldmanaged=" in options: continue
+ if "voldmanaged=" in options:
+ continue
# It's a good line, parse it
- p = Partition()
- p.device = pieces[0]
- p.mount_point = pieces[1]
- p.fs_type = pieces[2]
- p.device2 = None
- p.length = 0
-
+ length = 0
options = options.split(",")
for i in options:
if i.startswith("length="):
- p.length = int(i[7:])
+ length = int(i[7:])
else:
# Ignore all unknown options in the unified fstab
continue
- d[p.mount_point] = p
+ mount_flags = pieces[3]
+ # Honor the SELinux context if present.
+ context = None
+ for i in mount_flags.split(","):
+ if i.startswith("context="):
+ context = i
+
+ mount_point = pieces[1]
+ if not d.get(mount_point):
+ d[mount_point] = Partition(mount_point=mount_point, fs_type=pieces[2],
+ device=pieces[0], length=length,
+ device2=None, context=context)
else:
raise ValueError("Unknown fstab_version: \"%d\"" % (fstab_version,))
+ # / is used for the system mount point when the root directory is included in
+ # system. Other areas assume system is always at "/system" so point /system
+ # at /.
+ if system_root_image:
+ assert not d.has_key("/system") and d.has_key("/")
+ d["/system"] = d["/"]
return d
def DumpInfoDict(d):
for k, v in sorted(d.items()):
- print "%-25s = (%s) %s" % (k, type(v).__name__, v)
-
-def BuildBootableImage(sourcedir, fs_config_file, info_dict=None):
- """Take a kernel, cmdline, and ramdisk directory from the input (in
- 'sourcedir'), and turn them into a boot image. Return the image
- data, or None if sourcedir does not appear to contains files for
- building the requested image."""
+ print("%-25s = (%s) %s" % (k, type(v).__name__, v))
- if (not os.access(os.path.join(sourcedir, "RAMDISK"), os.F_OK) or
- not os.access(os.path.join(sourcedir, "kernel"), os.F_OK)):
- return None
- if info_dict is None:
- info_dict = OPTIONS.info_dict
+def _BuildBootableImage(sourcedir, fs_config_file, info_dict=None,
+ has_ramdisk=False):
+ """Build a bootable image from the specified sourcedir.
- ramdisk_img = tempfile.NamedTemporaryFile()
- img = tempfile.NamedTemporaryFile()
+ Take a kernel, cmdline, and optionally a ramdisk directory from the input (in
+ 'sourcedir'), and turn them into a boot image. Return the image data, or
+ None if sourcedir does not appear to contains files for building the
+ requested image."""
- if os.access(fs_config_file, os.F_OK):
- cmd = ["mkbootfs", "-f", fs_config_file, os.path.join(sourcedir, "RAMDISK")]
- else:
- cmd = ["mkbootfs", os.path.join(sourcedir, "RAMDISK")]
- p1 = Run(cmd, stdout=subprocess.PIPE)
- p2 = Run(["minigzip"],
- stdin=p1.stdout, stdout=ramdisk_img.file.fileno())
+ def make_ramdisk():
+ ramdisk_img = tempfile.NamedTemporaryFile()
- p2.wait()
- p1.wait()
- assert p1.returncode == 0, "mkbootfs of %s ramdisk failed" % (targetname,)
- assert p2.returncode == 0, "minigzip of %s ramdisk failed" % (targetname,)
+ if os.access(fs_config_file, os.F_OK):
+ cmd = ["mkbootfs", "-f", fs_config_file,
+ os.path.join(sourcedir, "RAMDISK")]
+ else:
+ cmd = ["mkbootfs", os.path.join(sourcedir, "RAMDISK")]
+ p1 = Run(cmd, stdout=subprocess.PIPE)
+ p2 = Run(["minigzip"], stdin=p1.stdout, stdout=ramdisk_img.file.fileno())
- # use MKBOOTIMG from environ, or "mkbootimg" if empty or not set
- mkbootimg = os.getenv('MKBOOTIMG') or "mkbootimg"
+ p2.wait()
+ p1.wait()
+ assert p1.returncode == 0, "mkbootfs of %s ramdisk failed" % (sourcedir,)
+ assert p2.returncode == 0, "minigzip of %s ramdisk failed" % (sourcedir,)
- cmd = [mkbootimg, "--kernel", os.path.join(sourcedir, "kernel")]
+ return ramdisk_img
- fn = os.path.join(sourcedir, "second")
- if os.access(fn, os.F_OK):
- cmd.append("--second")
- cmd.append(fn)
+ if not os.access(os.path.join(sourcedir, "kernel"), os.F_OK):
+ return None
- fn = os.path.join(sourcedir, "cmdline")
- if os.access(fn, os.F_OK):
- cmd.append("--cmdline")
- cmd.append(open(fn).read().rstrip("\n"))
+ if has_ramdisk and not os.access(os.path.join(sourcedir, "RAMDISK"), os.F_OK):
+ return None
- fn = os.path.join(sourcedir, "base")
- if os.access(fn, os.F_OK):
- cmd.append("--base")
- cmd.append(open(fn).read().rstrip("\n"))
+ if info_dict is None:
+ info_dict = OPTIONS.info_dict
- fn = os.path.join(sourcedir, "pagesize")
- if os.access(fn, os.F_OK):
- cmd.append("--pagesize")
- cmd.append(open(fn).read().rstrip("\n"))
+ img = tempfile.NamedTemporaryFile()
+ bootimg_key = os.getenv("PRODUCT_PRIVATE_KEY", None)
+ verity_key = os.getenv("PRODUCT_VERITY_KEY", None)
+ custom_boot_signer = os.getenv("PRODUCT_BOOT_SIGNER", None)
- args = info_dict.get("mkbootimg_args", None)
- if args and args.strip():
- cmd.extend(shlex.split(args))
+ if has_ramdisk:
+ ramdisk_img = make_ramdisk()
- cmd.extend(["--ramdisk", ramdisk_img.name,
- "--output", img.name])
+ """check if uboot is requested"""
+ fn = os.path.join(sourcedir, "ubootargs")
+ if os.access(fn, os.F_OK):
+ cmd = ["mkimage"]
+ for argument in open(fn).read().rstrip("\n").split(" "):
+ cmd.append(argument)
+ cmd.append("-d")
+ cmd.append(os.path.join(sourcedir, "kernel") + ":" + ramdisk_img.name)
+ cmd.append(img.name)
+ else:
+ # use MKBOOTIMG from environ, or "mkbootimg" if empty or not set
+ mkbootimg = os.getenv('MKBOOTIMG') or "mkbootimg"
+
+ cmd = [mkbootimg, "--kernel", os.path.join(sourcedir, "kernel")]
+
+ fn = os.path.join(sourcedir, "second")
+ if os.access(fn, os.F_OK):
+ cmd.append("--second")
+ cmd.append(fn)
+
+ fn = os.path.join(sourcedir, "cmdline")
+ if os.access(fn, os.F_OK):
+ cmd.append("--cmdline")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "base")
+ if os.access(fn, os.F_OK):
+ cmd.append("--base")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "tagsaddr")
+ if os.access(fn, os.F_OK):
+ cmd.append("--tags-addr")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "tags_offset")
+ if os.access(fn, os.F_OK):
+ cmd.append("--tags_offset")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "ramdisk_offset")
+ if os.access(fn, os.F_OK):
+ cmd.append("--ramdisk_offset")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "dt")
+ if os.access(fn, os.F_OK):
+ cmd.append("--dt")
+ cmd.append(fn)
+
+ fn = os.path.join(sourcedir, "pagesize")
+ if os.access(fn, os.F_OK):
+ kernel_pagesize = open(fn).read().rstrip("\n")
+ cmd.append("--pagesize")
+ cmd.append(kernel_pagesize)
+
+ args = info_dict.get("mkbootimg_args", None)
+ if args and args.strip():
+ cmd.extend(shlex.split(args))
+
+ args = info_dict.get("mkbootimg_version_args", None)
+ if args and args.strip():
+ cmd.extend(shlex.split(args))
+
+ if has_ramdisk:
+ cmd.extend(["--ramdisk", ramdisk_img.name])
+
+ img_unsigned = None
+ if info_dict.get("vboot", None):
+ img_unsigned = tempfile.NamedTemporaryFile()
+ cmd.extend(["--output", img_unsigned.name])
+ else:
+ cmd.extend(["--output", img.name])
p = Run(cmd, stdout=subprocess.PIPE)
p.communicate()
assert p.returncode == 0, "mkbootimg of %s image failed" % (
os.path.basename(sourcedir),)
- if info_dict.get("verity_key", None):
+ if custom_boot_signer and bootimg_key and os.path.exists(bootimg_key):
+ print("Signing bootable image with custom boot signer...")
+ img_secure = tempfile.NamedTemporaryFile()
+ p = Run([custom_boot_signer, img.name, img_secure.name], stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ shutil.copyfile(img_secure.name, img.name)
+ img_secure.close()
+ elif bootimg_key and os.path.exists(bootimg_key) and kernel_pagesize > 0:
+ print("Signing bootable image...")
+ bootimg_key_passwords = {}
+ bootimg_key_passwords.update(PasswordManager().GetPasswords(bootimg_key.split()))
+ bootimg_key_password = bootimg_key_passwords[bootimg_key]
+ if bootimg_key_password is not None:
+ bootimg_key_password += "\n"
+ img_sha256 = tempfile.NamedTemporaryFile()
+ img_sig = tempfile.NamedTemporaryFile()
+ img_sig_padded = tempfile.NamedTemporaryFile()
+ img_secure = tempfile.NamedTemporaryFile()
+ p = Run(["openssl", "dgst", "-sha256", "-binary", "-out", img_sha256.name, img.name],
+ stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["openssl", "rsautl", "-sign", "-in", img_sha256.name, "-inkey", bootimg_key, "-out",
+ img_sig.name, "-passin", "stdin"], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ p.communicate(bootimg_key_password)
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["dd", "if=/dev/zero", "of=%s" % img_sig_padded.name, "bs=%s" % kernel_pagesize,
+ "count=1"], stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["dd", "if=%s" % img_sig.name, "of=%s" % img_sig_padded.name, "conv=notrunc"],
+ stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["cat", img.name, img_sig_padded.name], stdout=img_secure.file.fileno())
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ shutil.copyfile(img_secure.name, img.name)
+ img_sha256.close()
+ img_sig.close()
+ img_sig_padded.close()
+ img_secure.close()
+
+ if (info_dict.get("boot_signer", None) == "true" and
+ info_dict.get("verity_key", None)):
+ path = "/" + os.path.basename(sourcedir).lower()
+ cmd = [OPTIONS.boot_signer_path]
+ cmd.extend(OPTIONS.boot_signer_args)
+ cmd.extend([path, img.name,
+ info_dict["verity_key"] + ".pk8",
+ info_dict["verity_key"] + ".x509.pem", img.name])
+ verity_key_password = None
+
+ if verity_key and os.path.exists(verity_key+".pk8") and kernel_pagesize > 0:
+ verity_key_passwords = {}
+ verity_key_passwords.update(PasswordManager().GetPasswords(verity_key.split()))
+ verity_key_password = verity_key_passwords[verity_key]
+
+ if verity_key_password is not None:
+ verity_key_password += "\n"
+ p = Run(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ p.communicate(verity_key_password)
+ else:
+ p = Run(cmd)
+ p.communicate()
+
+ assert p.returncode == 0, "boot_signer of %s image failed" % path
+
+ # Sign the image if vboot is non-empty.
+ elif info_dict.get("vboot", None):
path = "/" + os.path.basename(sourcedir).lower()
- cmd = ["boot_signer", path, img.name, info_dict["verity_key"], img.name]
+ img_keyblock = tempfile.NamedTemporaryFile()
+ cmd = [info_dict["vboot_signer_cmd"], info_dict["futility"],
+ img_unsigned.name, info_dict["vboot_key"] + ".vbpubk",
+ info_dict["vboot_key"] + ".vbprivk",
+ info_dict["vboot_subkey"] + ".vbprivk",
+ img_keyblock.name,
+ img.name]
p = Run(cmd, stdout=subprocess.PIPE)
p.communicate()
- assert p.returncode == 0, "boot_signer of %s image failed" % path
+ assert p.returncode == 0, "vboot_signer of %s image failed" % path
+
+ # Clean up the temp files.
+ img_unsigned.close()
+ img_keyblock.close()
img.seek(os.SEEK_SET, 0)
data = img.read()
- ramdisk_img.close()
+ if has_ramdisk:
+ ramdisk_img.close()
img.close()
return data
def GetBootableImage(name, prebuilt_name, unpack_dir, tree_subdir,
info_dict=None):
- """Return a File object (with name 'name') with the desired bootable
- image. Look for it in 'unpack_dir'/BOOTABLE_IMAGES under the name
- 'prebuilt_name', otherwise look for it under 'unpack_dir'/IMAGES,
- otherwise construct it from the source files in
- 'unpack_dir'/'tree_subdir'."""
+ """Return a File object with the desired bootable image.
+
+ Look for it in 'unpack_dir'/BOOTABLE_IMAGES under the name 'prebuilt_name',
+ otherwise look for it under 'unpack_dir'/IMAGES, otherwise construct it from
+ the source files in 'unpack_dir'/'tree_subdir'."""
prebuilt_path = os.path.join(unpack_dir, "BOOTABLE_IMAGES", prebuilt_name)
if os.path.exists(prebuilt_path):
- print "using prebuilt %s from BOOTABLE_IMAGES..." % (prebuilt_name,)
+ print("using prebuilt %s from BOOTABLE_IMAGES..." % prebuilt_name)
return File.FromLocalFile(name, prebuilt_path)
prebuilt_path = os.path.join(unpack_dir, "IMAGES", prebuilt_name)
if os.path.exists(prebuilt_path):
- print "using prebuilt %s from IMAGES..." % (prebuilt_name,)
+ print("using prebuilt %s from IMAGES..." % prebuilt_name)
return File.FromLocalFile(name, prebuilt_path)
- print "building image from target_files %s..." % (tree_subdir,)
+ print("building image from target_files %s..." % tree_subdir)
+
+ if info_dict is None:
+ info_dict = OPTIONS.info_dict
+
+ # With system_root_image == "true", we don't pack ramdisk into the boot image.
+ # Unless "recovery_as_boot" is specified, in which case we carry the ramdisk
+ # for recovery.
+ has_ramdisk = (info_dict.get("system_root_image") != "true" or
+ prebuilt_name != "boot.img" or
+ info_dict.get("recovery_as_boot") == "true")
+
fs_config = "META/" + tree_subdir.lower() + "_filesystem_config.txt"
- data = BuildBootableImage(os.path.join(unpack_dir, tree_subdir),
- os.path.join(unpack_dir, fs_config),
- info_dict)
+ data = _BuildBootableImage(os.path.join(unpack_dir, tree_subdir),
+ os.path.join(unpack_dir, fs_config),
+ info_dict, has_ramdisk)
if data:
return File(name, data)
return None
OPTIONS.tempfiles.append(tmp)
def unzip_to_dir(filename, dirname):
+ subprocess.call(["rm", "-rf", dirname + filename, "targetfiles-*"])
cmd = ["unzip", "-o", "-q", filename, "-d", dirname]
if pattern is not None:
cmd.append(pattern)
stdin=devnull.fileno(),
stdout=devnull.fileno(),
stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
+ _, stderr = p.communicate()
if p.returncode == 0:
# Encrypted key with empty string as password.
key_passwords[k] = ''
- elif stderr.startswith('Error decrypting key'):
+ elif stderr.startswith(b'Error decrypting key'):
# Definitely encrypted key.
# It would have said "Error reading key" if it didn't parse correctly.
need_passwords.append(k)
return key_passwords
-def SignFile(input_name, output_name, key, password, align=None,
- whole_file=False):
+def GetMinSdkVersion(apk_name):
+ """Get the minSdkVersion delared in the APK. This can be both a decimal number
+ (API Level) or a codename.
+ """
+
+ p = Run(["aapt", "dump", "badging", apk_name], stdout=subprocess.PIPE)
+ output, err = p.communicate()
+ if err:
+ raise ExternalError("Failed to obtain minSdkVersion: aapt return code %s"
+ % (p.returncode,))
+
+ for line in output.split("\n"):
+ # Looking for lines such as sdkVersion:'23' or sdkVersion:'M'
+ m = re.match(r'sdkVersion:\'([^\']*)\'', line)
+ if m:
+ return m.group(1)
+ raise ExternalError("No minSdkVersion returned by aapt")
+
+
+def GetMinSdkVersionInt(apk_name, codename_to_api_level_map):
+ """Get the minSdkVersion declared in the APK as a number (API Level). If
+ minSdkVersion is set to a codename, it is translated to a number using the
+ provided map.
+ """
+
+ version = GetMinSdkVersion(apk_name)
+ try:
+ return int(version)
+ except ValueError:
+ # Not a decimal number. Codename?
+ if version in codename_to_api_level_map:
+ return codename_to_api_level_map[version]
+ else:
+ raise ExternalError("Unknown minSdkVersion: '%s'. Known codenames: %s"
+ % (version, codename_to_api_level_map))
+
+
+def SignFile(input_name, output_name, key, password, min_api_level=None,
+ codename_to_api_level_map=dict(),
+ whole_file=False):
"""Sign the input_name zip/jar/apk, producing output_name. Use the
given key and password (the latter may be None if the key does not
have a password.
- If align is an integer > 1, zipalign is run to align stored files in
- the output zip on 'align'-byte boundaries.
-
If whole_file is true, use the "-w" option to SignApk to embed a
signature that covers the whole file in the archive comment of the
zip file.
- """
- if align == 0 or align == 1:
- align = None
+ min_api_level is the API Level (int) of the oldest platform this file may end
+ up on. If not specified for an APK, the API Level is obtained by interpreting
+ the minSdkVersion attribute of the APK's AndroidManifest.xml.
- if align:
- temp = tempfile.NamedTemporaryFile()
- sign_name = temp.name
- else:
- sign_name = output_name
+ codename_to_api_level_map is needed to translate the codename which may be
+ encountered as the APK's minSdkVersion.
+ """
+
+ java_library_path = os.path.join(
+ OPTIONS.search_path, OPTIONS.signapk_shared_library_path)
- cmd = [OPTIONS.java_path, OPTIONS.java_args, "-jar",
+ cmd = [OPTIONS.java_path, OPTIONS.java_args,
+ "-Djava.library.path=" + java_library_path,
+ "-jar",
os.path.join(OPTIONS.search_path, OPTIONS.signapk_path)]
cmd.extend(OPTIONS.extra_signapk_args)
if whole_file:
cmd.append("-w")
+
+ min_sdk_version = min_api_level
+ if min_sdk_version is None:
+ if not whole_file:
+ min_sdk_version = GetMinSdkVersionInt(
+ input_name, codename_to_api_level_map)
+ if min_sdk_version is not None:
+ cmd.extend(["--min-sdk-version", str(min_sdk_version)])
+
cmd.extend([key + OPTIONS.public_key_suffix,
key + OPTIONS.private_key_suffix,
- input_name, sign_name])
+ input_name, output_name])
p = Run(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
if password is not None:
if p.returncode != 0:
raise ExternalError("signapk.jar failed: return code %s" % (p.returncode,))
- if align:
- p = Run(["zipalign", "-f", str(align), sign_name, output_name])
- p.communicate()
- if p.returncode != 0:
- raise ExternalError("zipalign failed: return code %s" % (p.returncode,))
- temp.close()
-
def CheckSize(data, target, info_dict):
"""Check the data string passed against the max size limit, if
any, for the given target. Raise exception if the data is too big.
Print a warning if the data is nearing the maximum size."""
- if target.endswith(".img"): target = target[:-4]
+ if target.endswith(".img"):
+ target = target[:-4]
mount_point = "/" + target
fs_type = None
limit = None
if info_dict["fstab"]:
- if mount_point == "/userdata": mount_point = "/data"
+ if mount_point == "/userdata_extra":
+ mount_point = "/data"
+ if mount_point == "/userdata":
+ mount_point = "/data"
p = info_dict["fstab"][mount_point]
fs_type = p.fs_type
device = p.device
if "/" in device:
device = device[device.rfind("/")+1:]
limit = info_dict.get(device + "_size", None)
- if not fs_type or not limit: return
+ if not fs_type or not limit:
+ return
if fs_type == "yaffs2":
# image size should be increased by 1/64th to account for the
if pct >= 99.0:
raise ExternalError(msg)
elif pct >= 95.0:
- print
- print " WARNING: ", msg
- print
+ print()
+ print(" WARNING: ", msg)
+ print()
elif OPTIONS.verbose:
- print " ", msg
+ print(" ", msg)
def ReadApkCerts(tf_zip):
certmap = {}
for line in tf_zip.read("META/apkcerts.txt").split("\n"):
line = line.strip()
- if not line: continue
+ if not line:
+ continue
m = re.match(r'^name="(.*)"\s+certificate="(.*)"\s+'
r'private_key="(.*)"$', line)
if m:
"""
def Usage(docstring):
- print docstring.rstrip("\n")
- print COMMON_DOCSTRING
+ print(docstring.rstrip("\n"))
+ print(COMMON_DOCSTRING)
def ParseOptions(argv,
try:
opts, args = getopt.getopt(
argv, "hvp:s:x:" + extra_opts,
- ["help", "verbose", "path=", "signapk_path=", "extra_signapk_args=",
+ ["help", "verbose", "path=", "signapk_path=",
+ "signapk_shared_library_path=", "extra_signapk_args=",
"java_path=", "java_args=", "public_key_suffix=",
- "private_key_suffix=", "device_specific=", "extra="] +
+ "private_key_suffix=", "boot_signer_path=", "boot_signer_args=",
+ "verity_signer_path=", "verity_signer_args=", "device_specific=",
+ "extra="] +
list(extra_long_opts))
- except getopt.GetoptError, err:
+ except getopt.GetoptError as err:
Usage(docstring)
- print "**", str(err), "**"
+ print("**", str(err), "**")
sys.exit(2)
- path_specified = False
-
for o, a in opts:
if o in ("-h", "--help"):
Usage(docstring)
OPTIONS.search_path = a
elif o in ("--signapk_path",):
OPTIONS.signapk_path = a
+ elif o in ("--signapk_shared_library_path",):
+ OPTIONS.signapk_shared_library_path = a
elif o in ("--extra_signapk_args",):
OPTIONS.extra_signapk_args = shlex.split(a)
elif o in ("--java_path",):
OPTIONS.public_key_suffix = a
elif o in ("--private_key_suffix",):
OPTIONS.private_key_suffix = a
+ elif o in ("--boot_signer_path",):
+ OPTIONS.boot_signer_path = a
+ elif o in ("--boot_signer_args",):
+ OPTIONS.boot_signer_args = shlex.split(a)
+ elif o in ("--verity_signer_path",):
+ OPTIONS.verity_signer_path = a
+ elif o in ("--verity_signer_args",):
+ OPTIONS.verity_signer_args = shlex.split(a)
elif o in ("-s", "--device_specific"):
OPTIONS.device_specific = a
elif o in ("-x", "--extra"):
if i not in current or not current[i]:
missing.append(i)
# Are all the passwords already in the file?
- if not missing: return current
+ if not missing:
+ return current
for i in missing:
current[i] = ""
if not first:
- print "key file %s still missing some passwords." % (self.pwfile,)
+ print("key file %s still missing some passwords." % self.pwfile)
answer = raw_input("try to edit again? [y]> ").strip()
if answer and answer[0] not in 'yY':
raise RuntimeError("key passwords unavailable")
current = self.UpdateAndReadFile(current)
- def PromptResult(self, current):
+ def PromptResult(self, current): # pylint: disable=no-self-use
"""Prompt the user to enter a value (password) for each key in
'current' whose value is fales. Returns a new dict with all the
values.
"""
result = {}
- for k, v in sorted(current.iteritems()):
+ for k, v in sorted(iteritems(current)):
if v:
result[k] = v
else:
while True:
- result[k] = getpass.getpass("Enter password for %s key> "
- % (k,)).strip()
- if result[k]: break
+ result[k] = getpass.getpass(
+ "Enter password for %s key> " % k).strip()
+ if result[k]:
+ break
return result
def UpdateAndReadFile(self, current):
return self.PromptResult(current)
f = open(self.pwfile, "w")
- os.chmod(self.pwfile, 0600)
+ os.chmod(self.pwfile, 0o600)
f.write("# Enter key passwords between the [[[ ]]] brackets.\n")
f.write("# (Additional spaces are harmless.)\n\n")
first_line = None
- sorted = [(not v, k, v) for (k, v) in current.iteritems()]
- sorted.sort()
- for i, (_, k, v) in enumerate(sorted):
+ sorted_list = sorted((not v, k, v) for (k, v) in current.items())
+ for i, (_, k, v) in enumerate(sorted_list):
f.write("[[[ %s ]]] %s\n" % (v, k))
if not v and first_line is None:
# position cursor on first line with no password.
def ReadFile(self):
result = {}
- if self.pwfile is None: return result
+ if self.pwfile is None:
+ return result
try:
f = open(self.pwfile, "r")
for line in f:
line = line.strip()
- if not line or line[0] == '#': continue
+ if not line or line[0] == '#':
+ continue
m = re.match(r"^\[\[\[\s*(.*?)\s*\]\]\]\s*(\S+)$", line)
if not m:
- print "failed to parse password file: ", line
+ print("failed to parse password file: ", line)
else:
result[m.group(2)] = m.group(1)
f.close()
- except IOError, e:
+ except IOError as e:
if e.errno != errno.ENOENT:
- print "error reading password file: ", str(e)
+ print("error reading password file: ", str(e))
return result
-def ZipWriteStr(zip, filename, data, perms=0644, compression=None):
- # use a fixed timestamp so the output is repeatable.
- zinfo = zipfile.ZipInfo(filename=filename,
- date_time=(2009, 1, 1, 0, 0, 0))
- if compression is None:
- zinfo.compress_type = zip.compression
+def ZipWrite(zip_file, filename, arcname=None, perms=0o644,
+ compress_type=None):
+ import datetime
+
+ # http://b/18015246
+ # Python 2.7's zipfile implementation wrongly thinks that zip64 is required
+ # for files larger than 2GiB. We can work around this by adjusting their
+ # limit. Note that `zipfile.writestr()` will not work for strings larger than
+ # 2GiB. The Python interpreter sometimes rejects strings that large (though
+ # it isn't clear to me exactly what circumstances cause this).
+ # `zipfile.write()` must be used directly to work around this.
+ #
+ # This mess can be avoided if we port to python3.
+ saved_zip64_limit = zipfile.ZIP64_LIMIT
+ zipfile.ZIP64_LIMIT = (1 << 32) - 1
+
+ if compress_type is None:
+ compress_type = zip_file.compression
+ if arcname is None:
+ arcname = filename
+
+ saved_stat = os.stat(filename)
+
+ try:
+ # `zipfile.write()` doesn't allow us to pass ZipInfo, so just modify the
+ # file to be zipped and reset it when we're done.
+ os.chmod(filename, perms)
+
+ # Use a fixed timestamp so the output is repeatable.
+ epoch = datetime.datetime.fromtimestamp(0)
+ timestamp = (datetime.datetime(2009, 1, 1) - epoch).total_seconds()
+ os.utime(filename, (timestamp, timestamp))
+
+ zip_file.write(filename, arcname=arcname, compress_type=compress_type)
+ finally:
+ os.chmod(filename, saved_stat.st_mode)
+ os.utime(filename, (saved_stat.st_atime, saved_stat.st_mtime))
+ zipfile.ZIP64_LIMIT = saved_zip64_limit
+
+
+def ZipWriteStr(zip_file, zinfo_or_arcname, data, perms=None,
+ compress_type=None):
+ """Wrap zipfile.writestr() function to work around the zip64 limit.
+
+ Even with the ZIP64_LIMIT workaround, it won't allow writing a string
+ longer than 2GiB. It gives 'OverflowError: size does not fit in an int'
+ when calling crc32(bytes).
+
+ But it still works fine to write a shorter string into a large zip file.
+ We should use ZipWrite() whenever possible, and only use ZipWriteStr()
+ when we know the string won't be too long.
+ """
+
+ saved_zip64_limit = zipfile.ZIP64_LIMIT
+ zipfile.ZIP64_LIMIT = (1 << 32) - 1
+
+ if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
+ zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname)
+ zinfo.compress_type = zip_file.compression
+ if perms is None:
+ perms = 0o100644
else:
- zinfo.compress_type = compression
- zinfo.external_attr = perms << 16
- zip.writestr(zinfo, data)
+ zinfo = zinfo_or_arcname
+
+ # If compress_type is given, it overrides the value in zinfo.
+ if compress_type is not None:
+ zinfo.compress_type = compress_type
+
+ # If perms is given, it has a priority.
+ if perms is not None:
+ # If perms doesn't set the file type, mark it as a regular file.
+ if perms & 0o770000 == 0:
+ perms |= 0o100000
+ zinfo.external_attr = perms << 16
+
+ # Use a fixed timestamp so the output is repeatable.
+ zinfo.date_time = (2009, 1, 1, 0, 0, 0)
+
+ zip_file.writestr(zinfo, data)
+ zipfile.ZIP64_LIMIT = saved_zip64_limit
+
+
+def ZipClose(zip_file):
+ # http://b/18015246
+ # zipfile also refers to ZIP64_LIMIT during close() when it writes out the
+ # central directory.
+ saved_zip64_limit = zipfile.ZIP64_LIMIT
+ zipfile.ZIP64_LIMIT = (1 << 32) - 1
+
+ zip_file.close()
+
+ zipfile.ZIP64_LIMIT = saved_zip64_limit
class DeviceSpecificParams(object):
"""Keyword arguments to the constructor become attributes of this
object, which is passed to all functions in the device-specific
module."""
- for k, v in kwargs.iteritems():
+ for k, v in iteritems(kwargs):
setattr(self, k, v)
self.extras = OPTIONS.extras
if self.module is None:
path = OPTIONS.device_specific
- if not path: return
+ if not path:
+ return
try:
if os.path.isdir(path):
info = imp.find_module("releasetools", [path])
if x == ".py":
f = b
info = imp.find_module(f, [d])
- print "loaded device-specific extensions from", path
+ print("loaded device-specific extensions from", path)
self.module = imp.load_module("device_specific", *info)
except ImportError:
- print "unable to load device-specific module; assuming none"
+ print("unable to load device-specific module; assuming none")
def _DoCall(self, function_name, *args, **kwargs):
"""Call the named function in the device-specific module, passing
used to install the image for the device's baseband processor."""
return self._DoCall("FullOTA_InstallEnd")
+ def FullOTA_PostValidate(self):
+ """Called after installing and validating /system; typically this is
+ used to resize the system partition after a block based installation."""
+ return self._DoCall("FullOTA_PostValidate")
+
def IncrementalOTA_Assertions(self):
"""Called after emitting the block of assertions at the top of an
incremental OTA package. Implementations can add whatever
processor."""
return self._DoCall("IncrementalOTA_InstallEnd")
+ def VerifyOTA_Assertions(self):
+ return self._DoCall("VerifyOTA_Assertions")
+
class File(object):
def __init__(self, name, data):
self.name = name
return t
def AddToZip(self, z, compression=None):
- ZipWriteStr(z, self.name, self.data, compression=compression)
+ ZipWriteStr(z, self.name, self.data, compress_type=compression)
DIFF_PROGRAM_BY_EXT = {
".gz" : "imgdiff",
err = []
def run():
_, e = p.communicate()
- if e: err.append(e)
+ if e:
+ err.append(e)
th = threading.Thread(target=run)
th.start()
th.join(timeout=300) # 5 mins
if th.is_alive():
- print "WARNING: diff command timed out"
+ print("WARNING: diff command timed out")
p.terminate()
th.join(5)
if th.is_alive():
th.join()
if err or p.returncode != 0:
- print "WARNING: failure running %s:\n%s\n" % (
- diff_program, "".join(err))
+ print("WARNING: failure running %s:\n%s\n" % (
+ cmd, "".join(err)))
self.patch = None
return None, None, None
diff = ptemp.read()
def ComputeDifferences(diffs):
"""Call ComputePatch on all the Difference objects in 'diffs'."""
- print len(diffs), "diffs to compute"
+ print(len(diffs), "diffs to compute")
# Do the largest files first, to try and reduce the long-pole effect.
by_size = [(i.tf.size, i) for i in diffs]
else:
name = "%s (%s)" % (tf.name, sf.name)
if patch is None:
- print "patching failed! %s" % (name,)
+ print("patching failed! %s" % name)
else:
- print "%8.2f sec %8d / %8d bytes (%6.2f%%) %s" % (
- dur, len(patch), tf.size, 100.0 * len(patch) / tf.size, name)
+ print("%8.2f sec %8d / %8d bytes (%6.2f%%) %s" % (
+ dur, len(patch), tf.size, 100.0 * len(patch) / tf.size, name))
lock.release()
- except Exception, e:
- print e
+ except Exception as e:
+ print(e)
raise
# start worker threads; wait for them all to finish.
threads.pop().join()
-class BlockDifference:
- def __init__(self, partition, tgt, src=None, check_first_block=False):
+class BlockDifference(object):
+ def __init__(self, partition, tgt, src=None, check_first_block=False,
+ version=None, disable_imgdiff=False):
self.tgt = tgt
self.src = src
self.partition = partition
self.check_first_block = check_first_block
-
- b = blockimgdiff.BlockImageDiff(tgt, src, threads=OPTIONS.worker_threads)
+ self.disable_imgdiff = disable_imgdiff
+
+ if version is None:
+ version = 1
+ if OPTIONS.info_dict:
+ version = max(
+ int(i) for i in
+ OPTIONS.info_dict.get("blockimgdiff_versions", "1").split(","))
+ self.version = version
+
+ b = blockimgdiff.BlockImageDiff(tgt, src, threads=OPTIONS.worker_threads,
+ version=self.version,
+ disable_imgdiff=self.disable_imgdiff)
tmpdir = tempfile.mkdtemp()
OPTIONS.tempfiles.append(tmpdir)
self.path = os.path.join(tmpdir, partition)
b.Compute(self.path)
+ self._required_cache = b.max_stashed_size
+ self.touched_src_ranges = b.touched_src_ranges
+ self.touched_src_sha1 = b.touched_src_sha1
+
+ if src is None:
+ _, self.device = GetTypeAndDevice("/" + partition, OPTIONS.info_dict)
+ else:
+ _, self.device = GetTypeAndDevice("/" + partition,
+ OPTIONS.source_info_dict)
- _, self.device = GetTypeAndDevice("/" + partition, OPTIONS.info_dict)
+ @property
+ def required_cache(self):
+ return self._required_cache
def WriteScript(self, script, output_zip, progress=None):
if not self.src:
# write the output unconditionally
- if progress: script.ShowProgress(progress, 0)
- self._WriteUpdate(script, output_zip)
+ script.Print("Patching %s image unconditionally..." % (self.partition,))
+ else:
+ script.Print("Patching %s image after verification." % (self.partition,))
+
+ if progress:
+ script.ShowProgress(progress, 0)
+ self._WriteUpdate(script, output_zip)
+ if OPTIONS.verify:
+ self._WritePostInstallVerifyScript(script)
+
+ def WriteStrictVerifyScript(self, script):
+ """Verify all the blocks in the care_map, including clobbered blocks.
+
+ This differs from the WriteVerifyScript() function: a) it prints different
+ error messages; b) it doesn't allow half-way updated images to pass the
+ verification."""
+
+ partition = self.partition
+ script.Print("Verifying %s..." % (partition,))
+ ranges = self.tgt.care_map
+ ranges_str = ranges.to_string_raw()
+ script.AppendExtra('range_sha1("%s", "%s") == "%s" && '
+ 'ui_print(" Verified.") || '
+ 'ui_print("\\"%s\\" has unexpected contents.");' % (
+ self.device, ranges_str,
+ self.tgt.TotalSha1(include_clobbered_blocks=True),
+ self.device))
+ script.AppendExtra("")
+
+ def WriteVerifyScript(self, script, touched_blocks_only=False):
+ partition = self.partition
+ # full OTA
+ if not self.src:
+ script.Print("Image %s will be patched unconditionally." % (partition,))
+
+ # incremental OTA
else:
- if self.check_first_block:
- self._CheckFirstBlock(script)
-
- script.AppendExtra('if range_sha1("%s", "%s") == "%s" then' %
- (self.device, self.src.care_map.to_string_raw(),
- self.src.TotalSha1()))
- script.Print("Patching %s image..." % (self.partition,))
- if progress: script.ShowProgress(progress, 0)
- self._WriteUpdate(script, output_zip)
- script.AppendExtra(('else\n'
- ' (range_sha1("%s", "%s") == "%s") ||\n'
- ' abort("%s partition has unexpected contents");\n'
- 'endif;') %
- (self.device, self.tgt.care_map.to_string_raw(),
- self.tgt.TotalSha1(), self.partition))
+ if touched_blocks_only and self.version >= 3:
+ ranges = self.touched_src_ranges
+ expected_sha1 = self.touched_src_sha1
+ else:
+ ranges = self.src.care_map.subtract(self.src.clobbered_blocks)
+ expected_sha1 = self.src.TotalSha1()
+
+ # No blocks to be checked, skipping.
+ if not ranges:
+ return
+
+ ranges_str = ranges.to_string_raw()
+ if self.version >= 4:
+ script.AppendExtra(('if (range_sha1("%s", "%s") == "%s" || '
+ 'block_image_verify("%s", '
+ 'package_extract_file("%s.transfer.list"), '
+ '"%s.new.dat", "%s.patch.dat")) then') % (
+ self.device, ranges_str, expected_sha1,
+ self.device, partition, partition, partition))
+ elif self.version == 3:
+ script.AppendExtra(('if (range_sha1("%s", "%s") == "%s" || '
+ 'block_image_verify("%s", '
+ 'package_extract_file("%s.transfer.list"), '
+ '"%s.new.dat", "%s.patch.dat")) then') % (
+ self.device, ranges_str, expected_sha1,
+ self.device, partition, partition, partition))
+ else:
+ script.AppendExtra('if range_sha1("%s", "%s") == "%s" then' % (
+ self.device, ranges_str, self.src.TotalSha1()))
+ script.Print('Verified %s image...' % (partition,))
+ script.AppendExtra('else')
+
+ if self.version >= 4:
+
+ # Bug: 21124327
+ # When generating incrementals for the system and vendor partitions in
+ # version 4 or newer, explicitly check the first block (which contains
+ # the superblock) of the partition to see if it's what we expect. If
+ # this check fails, give an explicit log message about the partition
+ # having been remounted R/W (the most likely explanation).
+ if self.check_first_block:
+ script.AppendExtra('check_first_block("%s");' % (self.device,))
+
+ # If version >= 4, try block recovery before abort update
+ if partition == "system":
+ code = ErrorCode.SYSTEM_RECOVER_FAILURE
+ else:
+ code = ErrorCode.VENDOR_RECOVER_FAILURE
+ script.AppendExtra((
+ 'ifelse (block_image_recover("{device}", "{ranges}") && '
+ 'block_image_verify("{device}", '
+ 'package_extract_file("{partition}.transfer.list"), '
+ '"{partition}.new.dat", "{partition}.patch.dat"), '
+ 'ui_print("{partition} recovered successfully."), '
+ 'abort("E{code}: {partition} partition fails to recover"));\n'
+ 'endif;').format(device=self.device, ranges=ranges_str,
+ partition=partition, code=code))
+
+ # Abort the OTA update. Note that the incremental OTA cannot be applied
+ # even if it may match the checksum of the target partition.
+ # a) If version < 3, operations like move and erase will make changes
+ # unconditionally and damage the partition.
+ # b) If version >= 3, it won't even reach here.
+ else:
+ if partition == "system":
+ code = ErrorCode.SYSTEM_VERIFICATION_FAILURE
+ else:
+ code = ErrorCode.VENDOR_VERIFICATION_FAILURE
+ script.AppendExtra((
+ 'abort("E%d: %s partition has unexpected contents");\n'
+ 'endif;') % (code, partition))
- def _WriteUpdate(self, script, output_zip):
+ def _WritePostInstallVerifyScript(self, script):
partition = self.partition
- with open(self.path + ".transfer.list", "rb") as f:
- ZipWriteStr(output_zip, partition + ".transfer.list", f.read())
- with open(self.path + ".new.dat", "rb") as f:
- ZipWriteStr(output_zip, partition + ".new.dat", f.read())
- with open(self.path + ".patch.dat", "rb") as f:
- ZipWriteStr(output_zip, partition + ".patch.dat", f.read(),
- compression=zipfile.ZIP_STORED)
-
- call = (('block_image_update("%s", '
- 'package_extract_file("%s.transfer.list"), '
- '"%s.new.dat", "%s.patch.dat");\n') %
- (self.device, partition, partition, partition))
- script.AppendExtra(script._WordWrap(call))
-
- def _CheckFirstBlock(self, script):
- r = RangeSet((0, 1))
- h = sha1()
- for data in self.src.ReadRangeSet(r):
- h.update(data)
- h = h.hexdigest()
-
- script.AppendExtra(('(range_sha1("%s", "%s") == "%s") || '
- 'abort("%s has been remounted R/W; '
- 'reflash device to reenable OTA updates");')
- % (self.device, r.to_string_raw(), h, self.device))
+ script.Print('Verifying the updated %s image...' % (partition,))
+ # Unlike pre-install verification, clobbered_blocks should not be ignored.
+ ranges = self.tgt.care_map
+ ranges_str = ranges.to_string_raw()
+ script.AppendExtra('if range_sha1("%s", "%s") == "%s" then' % (
+ self.device, ranges_str,
+ self.tgt.TotalSha1(include_clobbered_blocks=True)))
+
+ # Bug: 20881595
+ # Verify that extended blocks are really zeroed out.
+ if self.tgt.extended:
+ ranges_str = self.tgt.extended.to_string_raw()
+ script.AppendExtra('if range_sha1("%s", "%s") == "%s" then' % (
+ self.device, ranges_str,
+ self._HashZeroBlocks(self.tgt.extended.size())))
+ script.Print('Verified the updated %s image.' % (partition,))
+ if partition == "system":
+ code = ErrorCode.SYSTEM_NONZERO_CONTENTS
+ else:
+ code = ErrorCode.VENDOR_NONZERO_CONTENTS
+ script.AppendExtra(
+ 'else\n'
+ ' abort("E%d: %s partition has unexpected non-zero contents after '
+ 'OTA update");\n'
+ 'endif;' % (code, partition))
+ else:
+ script.Print('Verified the updated %s image.' % (partition,))
+ if partition == "system":
+ code = ErrorCode.SYSTEM_UNEXPECTED_CONTENTS
+ else:
+ code = ErrorCode.VENDOR_UNEXPECTED_CONTENTS
-DataImage = blockimgdiff.DataImage
+ script.AppendExtra(
+ 'else\n'
+ ' abort("E%d: %s partition has unexpected contents after OTA '
+ 'update");\n'
+ 'endif;' % (code, partition))
+
+ def _WriteUpdate(self, script, output_zip):
+ ZipWrite(output_zip,
+ '{}.transfer.list'.format(self.path),
+ '{}.transfer.list'.format(self.partition))
+ ZipWrite(output_zip,
+ '{}.new.dat'.format(self.path),
+ '{}.new.dat'.format(self.partition))
+ ZipWrite(output_zip,
+ '{}.patch.dat'.format(self.path),
+ '{}.patch.dat'.format(self.partition),
+ compress_type=zipfile.ZIP_STORED)
+
+ if self.partition == "system":
+ code = ErrorCode.SYSTEM_UPDATE_FAILURE
+ else:
+ code = ErrorCode.VENDOR_UPDATE_FAILURE
+
+ call = ('block_image_update("{device}", '
+ 'package_extract_file("{partition}.transfer.list"), '
+ '"{partition}.new.dat", "{partition}.patch.dat") ||\n'
+ ' abort("E{code}: Failed to update {partition} image.");'.format(
+ device=self.device, partition=self.partition, code=code))
+ script.AppendExtra(script.WordWrap(call))
+
+ def _HashBlocks(self, source, ranges): # pylint: disable=no-self-use
+ data = source.ReadRangeSet(ranges)
+ ctx = sha1()
+
+ for p in data:
+ ctx.update(p)
+ return ctx.hexdigest()
+
+ def _HashZeroBlocks(self, num_blocks): # pylint: disable=no-self-use
+ """Return the hash value for all zero blocks."""
+ zero_block = '\x00' * 4096
+ ctx = sha1()
+ for _ in range(num_blocks):
+ ctx.update(zero_block)
+
+ return ctx.hexdigest()
+
+
+DataImage = blockimgdiff.DataImage
# map recovery.fstab's fs_types to mount/format "partition types"
-PARTITION_TYPES = { "yaffs2": "MTD", "mtd": "MTD",
- "ext4": "EMMC", "emmc": "EMMC",
- "f2fs": "EMMC" }
+PARTITION_TYPES = {
+ "yaffs2": "MTD",
+ "mtd": "MTD",
+ "ext4": "EMMC",
+ "emmc": "EMMC",
+ "f2fs": "EMMC",
+ "squashfs": "EMMC",
+ "ext2": "EMMC",
+ "ext3": "EMMC",
+ "vfat": "EMMC",
+ "osip": "OSIP"
+}
def GetTypeAndDevice(mount_point, info):
fstab = info["fstab"]
if fstab:
- return PARTITION_TYPES[fstab[mount_point].fs_type], fstab[mount_point].device
+ return (PARTITION_TYPES[fstab[mount_point].fs_type],
+ fstab[mount_point].device)
else:
- return None
+ raise KeyError
def ParseCertificate(data):
"""Parse a PEM-format certificate."""
+ from codecs import decode
cert = []
save = False
for line in data.split("\n"):
if "--END CERTIFICATE--" in line:
break
if save:
- cert.append(line)
+ l = line.encode() if hasattr(line, 'encode') else line
+ cert.append(l)
if "--BEGIN CERTIFICATE--" in line:
save = True
- cert = "".join(cert).decode('base64')
+ cert = decode(b"".join(cert), 'base64')
return cert
def MakeRecoveryPatch(input_dir, output_sink, recovery_img, boot_img,
if info_dict is None:
info_dict = OPTIONS.info_dict
- diff_program = ["imgdiff"]
- path = os.path.join(input_dir, "SYSTEM", "etc", "recovery-resource.dat")
- if os.path.exists(path):
- diff_program.append("-b")
- diff_program.append(path)
- bonus_args = "-b /system/etc/recovery-resource.dat"
+ full_recovery_image = info_dict.get("full_recovery_image", None) == "true"
+ system_root_image = info_dict.get("system_root_image", None) == "true"
+
+ if full_recovery_image:
+ output_sink("etc/recovery.img", recovery_img.data)
+
else:
- bonus_args = ""
+ diff_program = ["imgdiff"]
+ path = os.path.join(input_dir, "SYSTEM", "etc", "recovery-resource.dat")
+ if os.path.exists(path):
+ diff_program.append("-b")
+ diff_program.append(path)
+ bonus_args = "-b /system/etc/recovery-resource.dat"
+ else:
+ bonus_args = ""
- d = Difference(recovery_img, boot_img, diff_program=diff_program)
- _, _, patch = d.ComputePatch()
- output_sink("recovery-from-boot.p", patch)
+ d = Difference(recovery_img, boot_img, diff_program=diff_program)
+ _, _, patch = d.ComputePatch()
+ output_sink("recovery-from-boot.p", patch)
- td_pair = GetTypeAndDevice("/boot", info_dict)
- if not td_pair:
- return
- boot_type, boot_device = td_pair
- td_pair = GetTypeAndDevice("/recovery", info_dict)
- if not td_pair:
+ try:
+ # The following GetTypeAndDevice()s need to use the path in the target
+ # info_dict instead of source_info_dict.
+ boot_type, boot_device = GetTypeAndDevice("/boot", info_dict)
+ recovery_type, recovery_device = GetTypeAndDevice("/recovery", info_dict)
+ except KeyError:
return
- recovery_type, recovery_device = td_pair
- sh = """#!/system/bin/sh
+ if full_recovery_image:
+ sh = """#!/system/bin/sh
+if ! applypatch -c %(type)s:%(device)s:%(size)d:%(sha1)s; then
+ applypatch /system/etc/recovery.img %(type)s:%(device)s %(sha1)s %(size)d && log -t recovery "Installing new recovery image: succeeded" || log -t recovery "Installing new recovery image: failed"
+else
+ log -t recovery "Recovery image already installed"
+fi
+""" % {'type': recovery_type,
+ 'device': recovery_device,
+ 'sha1': recovery_img.sha1,
+ 'size': recovery_img.size}
+ else:
+ sh = """#!/system/bin/sh
+if [ -f /system/etc/recovery-transform.sh ]; then
+ exec sh /system/etc/recovery-transform.sh %(recovery_size)d %(recovery_sha1)s %(boot_size)d %(boot_sha1)s
+fi
if ! applypatch -c %(recovery_type)s:%(recovery_device)s:%(recovery_size)d:%(recovery_sha1)s; then
applypatch %(bonus_args)s %(boot_type)s:%(boot_device)s:%(boot_size)d:%(boot_sha1)s %(recovery_type)s:%(recovery_device)s %(recovery_sha1)s %(recovery_size)d %(boot_sha1)s:/system/recovery-from-boot.p && log -t recovery "Installing new recovery image: succeeded" || log -t recovery "Installing new recovery image: failed"
else
log -t recovery "Recovery image already installed"
fi
-""" % { 'boot_size': boot_img.size,
- 'boot_sha1': boot_img.sha1,
- 'recovery_size': recovery_img.size,
- 'recovery_sha1': recovery_img.sha1,
- 'boot_type': boot_type,
- 'boot_device': boot_device,
- 'recovery_type': recovery_type,
- 'recovery_device': recovery_device,
- 'bonus_args': bonus_args,
- }
+""" % {'boot_size': boot_img.size,
+ 'boot_sha1': boot_img.sha1,
+ 'recovery_size': recovery_img.size,
+ 'recovery_sha1': recovery_img.sha1,
+ 'boot_type': boot_type,
+ 'boot_device': boot_device,
+ 'recovery_type': recovery_type,
+ 'recovery_device': recovery_device,
+ 'bonus_args': bonus_args}
# The install script location moved from /system/etc to /system/bin
- # in the L release. Parse the init.rc file to find out where the
+ # in the L release. Parse init.*.rc files to find out where the
# target-files expects it to be, and put it there.
sh_location = "etc/install-recovery.sh"
- try:
- with open(os.path.join(input_dir, "BOOT", "RAMDISK", "init.rc")) as f:
+ found = False
+ if system_root_image:
+ init_rc_dir = os.path.join(input_dir, "ROOT")
+ else:
+ init_rc_dir = os.path.join(input_dir, "BOOT", "RAMDISK")
+ init_rc_files = os.listdir(init_rc_dir)
+ for init_rc_file in init_rc_files:
+ if (not init_rc_file.startswith('init.') or
+ not init_rc_file.endswith('.rc')):
+ continue
+
+ with open(os.path.join(init_rc_dir, init_rc_file)) as f:
for line in f:
- m = re.match("^service flash_recovery /system/(\S+)\s*$", line)
+ m = re.match(r"^service flash_recovery /system/(\S+)\s*$", line)
if m:
sh_location = m.group(1)
- print "putting script in", sh_location
+ found = True
break
- except (OSError, IOError), e:
- print "failed to read init.rc: %s" % (e,)
+
+ if found:
+ break
+
+ print("putting script in", sh_location)
output_sink(sh_location, sh)