# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
+
import copy
import errno
import getopt
import zipfile
import blockimgdiff
-import rangelib
from hashlib import sha1 as sha1
+try:
+ raw_input
+except NameError:
+ raw_input = input
+
+
+def iteritems(obj):
+ if hasattr(obj, 'iteritems'):
+ return obj.iteritems()
+ return obj.items()
+
class Options(object):
def __init__(self):
self.search_path = platform_search_path.get(sys.platform, None)
self.signapk_path = "framework/signapk.jar" # Relative to search_path
+ self.signapk_shared_library_path = "lib64" # Relative to search_path
self.extra_signapk_args = []
self.java_path = "java" # Use the one on the path by default.
self.java_args = "-Xmx2048m" # JVM Args
self.source_info_dict = None
self.target_info_dict = None
self.worker_threads = None
+ # Stash size cannot exceed cache_size * threshold.
+ self.cache_size = None
+ self.stash_threshold = 0.8
OPTIONS = Options()
# Values for "certificate" in apkcerts that mean special things.
SPECIAL_CERT_STRINGS = ("PRESIGNED", "EXTERNAL")
+class ErrorCode(object):
+ """Define error_codes for failures that happen during the actual
+ update package installation.
+
+ Error codes 0-999 are reserved for failures before the package
+ installation (i.e. low battery, package verification failure).
+ Detailed code in 'bootable/recovery/error_code.h' """
+
+ SYSTEM_VERIFICATION_FAILURE = 1000
+ SYSTEM_UPDATE_FAILURE = 1001
+ SYSTEM_UNEXPECTED_CONTENTS = 1002
+ SYSTEM_NONZERO_CONTENTS = 1003
+ SYSTEM_RECOVER_FAILURE = 1004
+ VENDOR_VERIFICATION_FAILURE = 2000
+ VENDOR_UPDATE_FAILURE = 2001
+ VENDOR_UNEXPECTED_CONTENTS = 2002
+ VENDOR_NONZERO_CONTENTS = 2003
+ VENDOR_RECOVER_FAILURE = 2004
+ OEM_PROP_MISMATCH = 3000
+ FINGERPRINT_MISMATCH = 3001
+ THUMBPRINT_MISMATCH = 3002
+ OLDER_BUILD = 3003
+ DEVICE_MISMATCH = 3004
+ BAD_PATCH_FILE = 3005
+ INSUFFICIENT_CACHE_SPACE = 3006
+ TUNE_PARTITION_FAILURE = 3007
+ APPLY_PATCH_FAILURE = 3008
class ExternalError(RuntimeError):
pass
"""Create and return a subprocess.Popen object, printing the command
line on the terminal if -v was specified."""
if OPTIONS.verbose:
- print " running: ", " ".join(args)
+ print(" running: ", " ".join(args))
return subprocess.Popen(args, **kwargs)
pass
-def LoadInfoDict(input_file):
+def LoadInfoDict(input_file, input_dir=None):
"""Read and parse the META/misc_info.txt key/value pairs from the
input target files and return a dict."""
if "fstab_version" not in d:
d["fstab_version"] = "1"
+ # A few properties are stored as links to the files in the out/ directory.
+ # It works fine with the build system. However, they are no longer available
+ # when (re)generating from target_files zip. If input_dir is not None, we
+ # are doing repacking. Redirect those properties to the actual files in the
+ # unzipped directory.
+ if input_dir is not None:
+ # We carry a copy of file_contexts.bin under META/. If not available,
+ # search BOOT/RAMDISK/. Note that sometimes we may need a different file
+ # to build images than the one running on device, such as when enabling
+ # system_root_image. In that case, we must have the one for image
+ # generation copied to META/.
+ fc_basename = os.path.basename(d.get("selinux_fc", "file_contexts"))
+ fc_config = os.path.join(input_dir, "META", fc_basename)
+ if d.get("system_root_image") == "true":
+ assert os.path.exists(fc_config)
+ if not os.path.exists(fc_config):
+ fc_config = os.path.join(input_dir, "BOOT", "RAMDISK", fc_basename)
+ if not os.path.exists(fc_config):
+ fc_config = None
+
+ if fc_config:
+ d["selinux_fc"] = fc_config
+
+ # Similarly we need to redirect "ramdisk_dir" and "ramdisk_fs_config".
+ if d.get("system_root_image") == "true":
+ d["ramdisk_dir"] = os.path.join(input_dir, "ROOT")
+ d["ramdisk_fs_config"] = os.path.join(
+ input_dir, "META", "root_filesystem_config.txt")
+
+ # Redirect {system,vendor}_base_fs_file.
+ if "system_base_fs_file" in d:
+ basename = os.path.basename(d["system_base_fs_file"])
+ system_base_fs_file = os.path.join(input_dir, "META", basename)
+ if os.path.exists(system_base_fs_file):
+ d["system_base_fs_file"] = system_base_fs_file
+ else:
+ print("Warning: failed to find system base fs file: %s" % (
+ system_base_fs_file,))
+ del d["system_base_fs_file"]
+
+ if "vendor_base_fs_file" in d:
+ basename = os.path.basename(d["vendor_base_fs_file"])
+ vendor_base_fs_file = os.path.join(input_dir, "META", basename)
+ if os.path.exists(vendor_base_fs_file):
+ d["vendor_base_fs_file"] = vendor_base_fs_file
+ else:
+ print("Warning: failed to find vendor base fs file: %s" % (
+ vendor_base_fs_file,))
+ del d["vendor_base_fs_file"]
+
+
+ if "device_type" not in d:
+ d["device_type"] = "MMC"
try:
data = read_helper("META/imagesizes.txt")
for line in data.split("\n"):
makeint("boot_size")
makeint("fstab_version")
- d["fstab"] = LoadRecoveryFSTab(read_helper, d["fstab_version"])
+ if d.get("no_recovery", False) == "true":
+ d["fstab"] = None
+ else:
+ d["fstab"] = LoadRecoveryFSTab(read_helper, d["fstab_version"],
+ d["device_type"], d.get("system_root_image", False))
d["build.prop"] = LoadBuildProp(read_helper)
return d
try:
data = read_helper("SYSTEM/build.prop")
except KeyError:
- print "Warning: could not find SYSTEM/build.prop in %s" % zip
+ print("Warning: could not find SYSTEM/build.prop in %s" % zip)
data = ""
return LoadDictionaryFromLines(data.split("\n"))
d[name] = value
return d
-def LoadRecoveryFSTab(read_helper, fstab_version):
+def LoadRecoveryFSTab(read_helper, fstab_version, type, system_root_image=False):
class Partition(object):
def __init__(self, mount_point, fs_type, device, length, device2, context):
self.mount_point = mount_point
try:
data = read_helper("RECOVERY/RAMDISK/etc/recovery.fstab")
except KeyError:
- print "Warning: could not find RECOVERY/RAMDISK/etc/recovery.fstab"
+ print("Warning: could not find RECOVERY/RAMDISK/etc/recovery.fstab")
data = ""
if fstab_version == 1:
if i.startswith("length="):
length = int(i[7:])
else:
- print "%s: unknown option \"%s\"" % (mount_point, i)
+ print("%s: unknown option \"%s\"" % (mount_point, i))
- d[mount_point] = Partition(mount_point=mount_point, fs_type=pieces[1],
- device=pieces[2], length=length,
- device2=device2)
+ if not d.get(mount_point):
+ d[mount_point] = Partition(mount_point=mount_point, fs_type=pieces[1],
+ device=pieces[2], length=length,
+ device2=device2)
elif fstab_version == 2:
d = {}
context = i
mount_point = pieces[1]
- d[mount_point] = Partition(mount_point=mount_point, fs_type=pieces[2],
- device=pieces[0], length=length,
- device2=None, context=context)
+ if not d.get(mount_point):
+ d[mount_point] = Partition(mount_point=mount_point, fs_type=pieces[2],
+ device=pieces[0], length=length,
+ device2=None, context=context)
else:
raise ValueError("Unknown fstab_version: \"%d\"" % (fstab_version,))
+ # / is used for the system mount point when the root directory is included in
+ # system. Other areas assume system is always at "/system" so point /system
+ # at /.
+ if system_root_image:
+ assert not d.has_key("/system") and d.has_key("/")
+ d["/system"] = d["/"]
return d
def DumpInfoDict(d):
for k, v in sorted(d.items()):
- print "%-25s = (%s) %s" % (k, type(v).__name__, v)
+ print("%-25s = (%s) %s" % (k, type(v).__name__, v))
-def BuildBootableImage(sourcedir, fs_config_file, info_dict=None):
- """Take a kernel, cmdline, and ramdisk directory from the input (in
- 'sourcedir'), and turn them into a boot image. Return the image
- data, or None if sourcedir does not appear to contains files for
- building the requested image."""
+def _BuildBootableImage(sourcedir, fs_config_file, info_dict=None,
+ has_ramdisk=False):
+ """Build a bootable image from the specified sourcedir.
- if (not os.access(os.path.join(sourcedir, "RAMDISK"), os.F_OK) or
- not os.access(os.path.join(sourcedir, "kernel"), os.F_OK)):
- return None
+ Take a kernel, cmdline, and optionally a ramdisk directory from the input (in
+ 'sourcedir'), and turn them into a boot image. Return the image data, or
+ None if sourcedir does not appear to contains files for building the
+ requested image."""
- if info_dict is None:
- info_dict = OPTIONS.info_dict
+ def make_ramdisk():
+ ramdisk_img = tempfile.NamedTemporaryFile()
- ramdisk_img = tempfile.NamedTemporaryFile()
- img = tempfile.NamedTemporaryFile()
+ if os.access(fs_config_file, os.F_OK):
+ cmd = ["mkbootfs", "-f", fs_config_file,
+ os.path.join(sourcedir, "RAMDISK")]
+ else:
+ cmd = ["mkbootfs", os.path.join(sourcedir, "RAMDISK")]
+ p1 = Run(cmd, stdout=subprocess.PIPE)
+ p2 = Run(["minigzip"], stdin=p1.stdout, stdout=ramdisk_img.file.fileno())
- if os.access(fs_config_file, os.F_OK):
- cmd = ["mkbootfs", "-f", fs_config_file, os.path.join(sourcedir, "RAMDISK")]
- else:
- cmd = ["mkbootfs", os.path.join(sourcedir, "RAMDISK")]
- p1 = Run(cmd, stdout=subprocess.PIPE)
- p2 = Run(["minigzip"],
- stdin=p1.stdout, stdout=ramdisk_img.file.fileno())
+ p2.wait()
+ p1.wait()
+ assert p1.returncode == 0, "mkbootfs of %s ramdisk failed" % (sourcedir,)
+ assert p2.returncode == 0, "minigzip of %s ramdisk failed" % (sourcedir,)
- p2.wait()
- p1.wait()
- assert p1.returncode == 0, "mkbootfs of %s ramdisk failed" % (sourcedir,)
- assert p2.returncode == 0, "minigzip of %s ramdisk failed" % (sourcedir,)
+ return ramdisk_img
- # use MKBOOTIMG from environ, or "mkbootimg" if empty or not set
- mkbootimg = os.getenv('MKBOOTIMG') or "mkbootimg"
+ if not os.access(os.path.join(sourcedir, "kernel"), os.F_OK):
+ return None
- cmd = [mkbootimg, "--kernel", os.path.join(sourcedir, "kernel")]
+ if has_ramdisk and not os.access(os.path.join(sourcedir, "RAMDISK"), os.F_OK):
+ return None
- fn = os.path.join(sourcedir, "second")
- if os.access(fn, os.F_OK):
- cmd.append("--second")
- cmd.append(fn)
+ if info_dict is None:
+ info_dict = OPTIONS.info_dict
- fn = os.path.join(sourcedir, "cmdline")
- if os.access(fn, os.F_OK):
- cmd.append("--cmdline")
- cmd.append(open(fn).read().rstrip("\n"))
+ img = tempfile.NamedTemporaryFile()
+ bootimg_key = os.getenv("PRODUCT_PRIVATE_KEY", None)
+ verity_key = os.getenv("PRODUCT_VERITY_KEY", None)
+ custom_boot_signer = os.getenv("PRODUCT_BOOT_SIGNER", None)
- fn = os.path.join(sourcedir, "base")
- if os.access(fn, os.F_OK):
- cmd.append("--base")
- cmd.append(open(fn).read().rstrip("\n"))
+ if has_ramdisk:
+ ramdisk_img = make_ramdisk()
- fn = os.path.join(sourcedir, "pagesize")
+ """check if uboot is requested"""
+ fn = os.path.join(sourcedir, "ubootargs")
if os.access(fn, os.F_OK):
- cmd.append("--pagesize")
- cmd.append(open(fn).read().rstrip("\n"))
-
- args = info_dict.get("mkbootimg_args", None)
- if args and args.strip():
- cmd.extend(shlex.split(args))
-
- img_unsigned = None
- if info_dict.get("vboot", None):
- img_unsigned = tempfile.NamedTemporaryFile()
- cmd.extend(["--ramdisk", ramdisk_img.name,
- "--output", img_unsigned.name])
+ cmd = ["mkimage"]
+ for argument in open(fn).read().rstrip("\n").split(" "):
+ cmd.append(argument)
+ cmd.append("-d")
+ cmd.append(os.path.join(sourcedir, "kernel") + ":" + ramdisk_img.name)
+ cmd.append(img.name)
else:
- cmd.extend(["--ramdisk", ramdisk_img.name,
- "--output", img.name])
+ # use MKBOOTIMG from environ, or "mkbootimg" if empty or not set
+ mkbootimg = os.getenv('MKBOOTIMG') or "mkbootimg"
+
+ cmd = [mkbootimg, "--kernel", os.path.join(sourcedir, "kernel")]
+
+ fn = os.path.join(sourcedir, "second")
+ if os.access(fn, os.F_OK):
+ cmd.append("--second")
+ cmd.append(fn)
+
+ fn = os.path.join(sourcedir, "cmdline")
+ if os.access(fn, os.F_OK):
+ cmd.append("--cmdline")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "base")
+ if os.access(fn, os.F_OK):
+ cmd.append("--base")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "tagsaddr")
+ if os.access(fn, os.F_OK):
+ cmd.append("--tags-addr")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "tags_offset")
+ if os.access(fn, os.F_OK):
+ cmd.append("--tags_offset")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "ramdisk_offset")
+ if os.access(fn, os.F_OK):
+ cmd.append("--ramdisk_offset")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "dt")
+ if os.access(fn, os.F_OK):
+ cmd.append("--dt")
+ cmd.append(fn)
+
+ fn = os.path.join(sourcedir, "pagesize")
+ if os.access(fn, os.F_OK):
+ kernel_pagesize = open(fn).read().rstrip("\n")
+ cmd.append("--pagesize")
+ cmd.append(kernel_pagesize)
+
+ args = info_dict.get("mkbootimg_args", None)
+ if args and args.strip():
+ cmd.extend(shlex.split(args))
+
+ args = info_dict.get("mkbootimg_version_args", None)
+ if args and args.strip():
+ cmd.extend(shlex.split(args))
+
+ if has_ramdisk:
+ cmd.extend(["--ramdisk", ramdisk_img.name])
+
+ img_unsigned = None
+ if info_dict.get("vboot", None):
+ img_unsigned = tempfile.NamedTemporaryFile()
+ cmd.extend(["--output", img_unsigned.name])
+ else:
+ cmd.extend(["--output", img.name])
p = Run(cmd, stdout=subprocess.PIPE)
p.communicate()
assert p.returncode == 0, "mkbootimg of %s image failed" % (
os.path.basename(sourcedir),)
+ if custom_boot_signer and bootimg_key and os.path.exists(bootimg_key):
+ print("Signing bootable image with custom boot signer...")
+ img_secure = tempfile.NamedTemporaryFile()
+ p = Run([custom_boot_signer, img.name, img_secure.name], stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ shutil.copyfile(img_secure.name, img.name)
+ img_secure.close()
+ elif bootimg_key and os.path.exists(bootimg_key) and kernel_pagesize > 0:
+ print("Signing bootable image...")
+ bootimg_key_passwords = {}
+ bootimg_key_passwords.update(PasswordManager().GetPasswords(bootimg_key.split()))
+ bootimg_key_password = bootimg_key_passwords[bootimg_key]
+ if bootimg_key_password is not None:
+ bootimg_key_password += "\n"
+ img_sha256 = tempfile.NamedTemporaryFile()
+ img_sig = tempfile.NamedTemporaryFile()
+ img_sig_padded = tempfile.NamedTemporaryFile()
+ img_secure = tempfile.NamedTemporaryFile()
+ p = Run(["openssl", "dgst", "-sha256", "-binary", "-out", img_sha256.name, img.name],
+ stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["openssl", "rsautl", "-sign", "-in", img_sha256.name, "-inkey", bootimg_key, "-out",
+ img_sig.name, "-passin", "stdin"], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ p.communicate(bootimg_key_password)
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["dd", "if=/dev/zero", "of=%s" % img_sig_padded.name, "bs=%s" % kernel_pagesize,
+ "count=1"], stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["dd", "if=%s" % img_sig.name, "of=%s" % img_sig_padded.name, "conv=notrunc"],
+ stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["cat", img.name, img_sig_padded.name], stdout=img_secure.file.fileno())
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ shutil.copyfile(img_secure.name, img.name)
+ img_sha256.close()
+ img_sig.close()
+ img_sig_padded.close()
+ img_secure.close()
+
if (info_dict.get("boot_signer", None) == "true" and
info_dict.get("verity_key", None)):
path = "/" + os.path.basename(sourcedir).lower()
cmd.extend([path, img.name,
info_dict["verity_key"] + ".pk8",
info_dict["verity_key"] + ".x509.pem", img.name])
- p = Run(cmd, stdout=subprocess.PIPE)
- p.communicate()
+ verity_key_password = None
+
+ if verity_key and os.path.exists(verity_key+".pk8") and kernel_pagesize > 0:
+ verity_key_passwords = {}
+ verity_key_passwords.update(PasswordManager().GetPasswords(verity_key.split()))
+ verity_key_password = verity_key_passwords[verity_key]
+
+ if verity_key_password is not None:
+ verity_key_password += "\n"
+ p = Run(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ p.communicate(verity_key_password)
+ else:
+ p = Run(cmd)
+ p.communicate()
+
assert p.returncode == 0, "boot_signer of %s image failed" % path
# Sign the image if vboot is non-empty.
img.seek(os.SEEK_SET, 0)
data = img.read()
- ramdisk_img.close()
+ if has_ramdisk:
+ ramdisk_img.close()
img.close()
return data
def GetBootableImage(name, prebuilt_name, unpack_dir, tree_subdir,
info_dict=None):
- """Return a File object (with name 'name') with the desired bootable
- image. Look for it in 'unpack_dir'/BOOTABLE_IMAGES under the name
- 'prebuilt_name', otherwise look for it under 'unpack_dir'/IMAGES,
- otherwise construct it from the source files in
- 'unpack_dir'/'tree_subdir'."""
+ """Return a File object with the desired bootable image.
+
+ Look for it in 'unpack_dir'/BOOTABLE_IMAGES under the name 'prebuilt_name',
+ otherwise look for it under 'unpack_dir'/IMAGES, otherwise construct it from
+ the source files in 'unpack_dir'/'tree_subdir'."""
prebuilt_path = os.path.join(unpack_dir, "BOOTABLE_IMAGES", prebuilt_name)
if os.path.exists(prebuilt_path):
- print "using prebuilt %s from BOOTABLE_IMAGES..." % (prebuilt_name,)
+ print("using prebuilt %s from BOOTABLE_IMAGES..." % prebuilt_name)
return File.FromLocalFile(name, prebuilt_path)
prebuilt_path = os.path.join(unpack_dir, "IMAGES", prebuilt_name)
if os.path.exists(prebuilt_path):
- print "using prebuilt %s from IMAGES..." % (prebuilt_name,)
+ print("using prebuilt %s from IMAGES..." % prebuilt_name)
return File.FromLocalFile(name, prebuilt_path)
- print "building image from target_files %s..." % (tree_subdir,)
+ print("building image from target_files %s..." % tree_subdir)
+
+ if info_dict is None:
+ info_dict = OPTIONS.info_dict
+
+ # With system_root_image == "true", we don't pack ramdisk into the boot image.
+ # Unless "recovery_as_boot" is specified, in which case we carry the ramdisk
+ # for recovery.
+ has_ramdisk = (info_dict.get("system_root_image") != "true" or
+ prebuilt_name != "boot.img" or
+ info_dict.get("recovery_as_boot") == "true")
+
fs_config = "META/" + tree_subdir.lower() + "_filesystem_config.txt"
- data = BuildBootableImage(os.path.join(unpack_dir, tree_subdir),
- os.path.join(unpack_dir, fs_config),
- info_dict)
+ data = _BuildBootableImage(os.path.join(unpack_dir, tree_subdir),
+ os.path.join(unpack_dir, fs_config),
+ info_dict, has_ramdisk)
if data:
return File(name, data)
return None
OPTIONS.tempfiles.append(tmp)
def unzip_to_dir(filename, dirname):
+ subprocess.call(["rm", "-rf", dirname + filename, "targetfiles-*"])
cmd = ["unzip", "-o", "-q", filename, "-d", dirname]
if pattern is not None:
cmd.append(pattern)
if p.returncode == 0:
# Encrypted key with empty string as password.
key_passwords[k] = ''
- elif stderr.startswith('Error decrypting key'):
+ elif stderr.startswith(b'Error decrypting key'):
# Definitely encrypted key.
# It would have said "Error reading key" if it didn't parse correctly.
need_passwords.append(k)
return key_passwords
-def SignFile(input_name, output_name, key, password, align=None,
- whole_file=False):
+def GetMinSdkVersion(apk_name):
+ """Get the minSdkVersion delared in the APK. This can be both a decimal number
+ (API Level) or a codename.
+ """
+
+ p = Run(["aapt", "dump", "badging", apk_name], stdout=subprocess.PIPE)
+ output, err = p.communicate()
+ if err:
+ raise ExternalError("Failed to obtain minSdkVersion: aapt return code %s"
+ % (p.returncode,))
+
+ for line in output.split("\n"):
+ # Looking for lines such as sdkVersion:'23' or sdkVersion:'M'
+ m = re.match(r'sdkVersion:\'([^\']*)\'', line)
+ if m:
+ return m.group(1)
+ raise ExternalError("No minSdkVersion returned by aapt")
+
+
+def GetMinSdkVersionInt(apk_name, codename_to_api_level_map):
+ """Get the minSdkVersion declared in the APK as a number (API Level). If
+ minSdkVersion is set to a codename, it is translated to a number using the
+ provided map.
+ """
+
+ version = GetMinSdkVersion(apk_name)
+ try:
+ return int(version)
+ except ValueError:
+ # Not a decimal number. Codename?
+ if version in codename_to_api_level_map:
+ return codename_to_api_level_map[version]
+ else:
+ raise ExternalError("Unknown minSdkVersion: '%s'. Known codenames: %s"
+ % (version, codename_to_api_level_map))
+
+
+def SignFile(input_name, output_name, key, password, min_api_level=None,
+ codename_to_api_level_map=dict(),
+ whole_file=False):
"""Sign the input_name zip/jar/apk, producing output_name. Use the
given key and password (the latter may be None if the key does not
have a password.
- If align is an integer > 1, zipalign is run to align stored files in
- the output zip on 'align'-byte boundaries.
-
If whole_file is true, use the "-w" option to SignApk to embed a
signature that covers the whole file in the archive comment of the
zip file.
- """
- if align == 0 or align == 1:
- align = None
+ min_api_level is the API Level (int) of the oldest platform this file may end
+ up on. If not specified for an APK, the API Level is obtained by interpreting
+ the minSdkVersion attribute of the APK's AndroidManifest.xml.
- if align:
- temp = tempfile.NamedTemporaryFile()
- sign_name = temp.name
- else:
- sign_name = output_name
+ codename_to_api_level_map is needed to translate the codename which may be
+ encountered as the APK's minSdkVersion.
+ """
- cmd = [OPTIONS.java_path, OPTIONS.java_args, "-jar",
+ java_library_path = os.path.join(
+ OPTIONS.search_path, OPTIONS.signapk_shared_library_path)
+
+ cmd = [OPTIONS.java_path, OPTIONS.java_args,
+ "-Djava.library.path=" + java_library_path,
+ "-jar",
os.path.join(OPTIONS.search_path, OPTIONS.signapk_path)]
cmd.extend(OPTIONS.extra_signapk_args)
if whole_file:
cmd.append("-w")
+
+ min_sdk_version = min_api_level
+ if min_sdk_version is None:
+ if not whole_file:
+ min_sdk_version = GetMinSdkVersionInt(
+ input_name, codename_to_api_level_map)
+ if min_sdk_version is not None:
+ cmd.extend(["--min-sdk-version", str(min_sdk_version)])
+
cmd.extend([key + OPTIONS.public_key_suffix,
key + OPTIONS.private_key_suffix,
- input_name, sign_name])
+ input_name, output_name])
p = Run(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
if password is not None:
if p.returncode != 0:
raise ExternalError("signapk.jar failed: return code %s" % (p.returncode,))
- if align:
- p = Run(["zipalign", "-f", "-p", str(align), sign_name, output_name])
- p.communicate()
- if p.returncode != 0:
- raise ExternalError("zipalign failed: return code %s" % (p.returncode,))
- temp.close()
-
def CheckSize(data, target, info_dict):
"""Check the data string passed against the max size limit, if
fs_type = None
limit = None
if info_dict["fstab"]:
+ if mount_point == "/userdata_extra":
+ mount_point = "/data"
if mount_point == "/userdata":
mount_point = "/data"
p = info_dict["fstab"][mount_point]
if pct >= 99.0:
raise ExternalError(msg)
elif pct >= 95.0:
- print
- print " WARNING: ", msg
- print
+ print()
+ print(" WARNING: ", msg)
+ print()
elif OPTIONS.verbose:
- print " ", msg
+ print(" ", msg)
def ReadApkCerts(tf_zip):
"""
def Usage(docstring):
- print docstring.rstrip("\n")
- print COMMON_DOCSTRING
+ print(docstring.rstrip("\n"))
+ print(COMMON_DOCSTRING)
def ParseOptions(argv,
try:
opts, args = getopt.getopt(
argv, "hvp:s:x:" + extra_opts,
- ["help", "verbose", "path=", "signapk_path=", "extra_signapk_args=",
+ ["help", "verbose", "path=", "signapk_path=",
+ "signapk_shared_library_path=", "extra_signapk_args=",
"java_path=", "java_args=", "public_key_suffix=",
"private_key_suffix=", "boot_signer_path=", "boot_signer_args=",
"verity_signer_path=", "verity_signer_args=", "device_specific=",
list(extra_long_opts))
except getopt.GetoptError as err:
Usage(docstring)
- print "**", str(err), "**"
+ print("**", str(err), "**")
sys.exit(2)
for o, a in opts:
OPTIONS.search_path = a
elif o in ("--signapk_path",):
OPTIONS.signapk_path = a
+ elif o in ("--signapk_shared_library_path",):
+ OPTIONS.signapk_shared_library_path = a
elif o in ("--extra_signapk_args",):
OPTIONS.extra_signapk_args = shlex.split(a)
elif o in ("--java_path",):
current[i] = ""
if not first:
- print "key file %s still missing some passwords." % (self.pwfile,)
+ print("key file %s still missing some passwords." % self.pwfile)
answer = raw_input("try to edit again? [y]> ").strip()
if answer and answer[0] not in 'yY':
raise RuntimeError("key passwords unavailable")
values.
"""
result = {}
- for k, v in sorted(current.iteritems()):
+ for k, v in sorted(iteritems(current)):
if v:
result[k] = v
else:
f.write("# (Additional spaces are harmless.)\n\n")
first_line = None
- sorted_list = sorted([(not v, k, v) for (k, v) in current.iteritems()])
+ sorted_list = sorted((not v, k, v) for (k, v) in current.items())
for i, (_, k, v) in enumerate(sorted_list):
f.write("[[[ %s ]]] %s\n" % (v, k))
if not v and first_line is None:
continue
m = re.match(r"^\[\[\[\s*(.*?)\s*\]\]\]\s*(\S+)$", line)
if not m:
- print "failed to parse password file: ", line
+ print("failed to parse password file: ", line)
else:
result[m.group(2)] = m.group(1)
f.close()
except IOError as e:
if e.errno != errno.ENOENT:
- print "error reading password file: ", str(e)
+ print("error reading password file: ", str(e))
return result
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname)
zinfo.compress_type = zip_file.compression
if perms is None:
- perms = 0o644
+ perms = 0o100644
else:
zinfo = zinfo_or_arcname
# If perms is given, it has a priority.
if perms is not None:
+ # If perms doesn't set the file type, mark it as a regular file.
+ if perms & 0o770000 == 0:
+ perms |= 0o100000
zinfo.external_attr = perms << 16
# Use a fixed timestamp so the output is repeatable.
"""Keyword arguments to the constructor become attributes of this
object, which is passed to all functions in the device-specific
module."""
- for k, v in kwargs.iteritems():
+ for k, v in iteritems(kwargs):
setattr(self, k, v)
self.extras = OPTIONS.extras
if x == ".py":
f = b
info = imp.find_module(f, [d])
- print "loaded device-specific extensions from", path
+ print("loaded device-specific extensions from", path)
self.module = imp.load_module("device_specific", *info)
except ImportError:
- print "unable to load device-specific module; assuming none"
+ print("unable to load device-specific module; assuming none")
def _DoCall(self, function_name, *args, **kwargs):
"""Call the named function in the device-specific module, passing
used to install the image for the device's baseband processor."""
return self._DoCall("FullOTA_InstallEnd")
+ def FullOTA_PostValidate(self):
+ """Called after installing and validating /system; typically this is
+ used to resize the system partition after a block based installation."""
+ return self._DoCall("FullOTA_PostValidate")
+
def IncrementalOTA_Assertions(self):
"""Called after emitting the block of assertions at the top of an
incremental OTA package. Implementations can add whatever
processor."""
return self._DoCall("IncrementalOTA_InstallEnd")
+ def VerifyOTA_Assertions(self):
+ return self._DoCall("VerifyOTA_Assertions")
+
class File(object):
def __init__(self, name, data):
self.name = name
th.start()
th.join(timeout=300) # 5 mins
if th.is_alive():
- print "WARNING: diff command timed out"
+ print("WARNING: diff command timed out")
p.terminate()
th.join(5)
if th.is_alive():
th.join()
if err or p.returncode != 0:
- print "WARNING: failure running %s:\n%s\n" % (
- diff_program, "".join(err))
+ print("WARNING: failure running %s:\n%s\n" % (
+ cmd, "".join(err)))
self.patch = None
return None, None, None
diff = ptemp.read()
def ComputeDifferences(diffs):
"""Call ComputePatch on all the Difference objects in 'diffs'."""
- print len(diffs), "diffs to compute"
+ print(len(diffs), "diffs to compute")
# Do the largest files first, to try and reduce the long-pole effect.
by_size = [(i.tf.size, i) for i in diffs]
else:
name = "%s (%s)" % (tf.name, sf.name)
if patch is None:
- print "patching failed! %s" % (name,)
+ print("patching failed! %s" % name)
else:
- print "%8.2f sec %8d / %8d bytes (%6.2f%%) %s" % (
- dur, len(patch), tf.size, 100.0 * len(patch) / tf.size, name)
+ print("%8.2f sec %8d / %8d bytes (%6.2f%%) %s" % (
+ dur, len(patch), tf.size, 100.0 * len(patch) / tf.size, name))
lock.release()
except Exception as e:
- print e
+ print(e)
raise
# start worker threads; wait for them all to finish.
class BlockDifference(object):
def __init__(self, partition, tgt, src=None, check_first_block=False,
- version=None):
+ version=None, disable_imgdiff=False):
self.tgt = tgt
self.src = src
self.partition = partition
self.check_first_block = check_first_block
-
- # Due to http://b/20939131, check_first_block is disabled temporarily.
- assert not self.check_first_block
+ self.disable_imgdiff = disable_imgdiff
if version is None:
version = 1
self.version = version
b = blockimgdiff.BlockImageDiff(tgt, src, threads=OPTIONS.worker_threads,
- version=self.version)
+ version=self.version,
+ disable_imgdiff=self.disable_imgdiff)
tmpdir = tempfile.mkdtemp()
OPTIONS.tempfiles.append(tmpdir)
self.path = os.path.join(tmpdir, partition)
b.Compute(self.path)
+ self._required_cache = b.max_stashed_size
+ self.touched_src_ranges = b.touched_src_ranges
+ self.touched_src_sha1 = b.touched_src_sha1
if src is None:
_, self.device = GetTypeAndDevice("/" + partition, OPTIONS.info_dict)
_, self.device = GetTypeAndDevice("/" + partition,
OPTIONS.source_info_dict)
+ @property
+ def required_cache(self):
+ return self._required_cache
+
def WriteScript(self, script, output_zip, progress=None):
if not self.src:
# write the output unconditionally
if progress:
script.ShowProgress(progress, 0)
self._WriteUpdate(script, output_zip)
- self._WritePostInstallVerifyScript(script)
+ if OPTIONS.verify:
+ self._WritePostInstallVerifyScript(script)
+
+ def WriteStrictVerifyScript(self, script):
+ """Verify all the blocks in the care_map, including clobbered blocks.
+
+ This differs from the WriteVerifyScript() function: a) it prints different
+ error messages; b) it doesn't allow half-way updated images to pass the
+ verification."""
- def WriteVerifyScript(self, script):
partition = self.partition
+ script.Print("Verifying %s..." % (partition,))
+ ranges = self.tgt.care_map
+ ranges_str = ranges.to_string_raw()
+ script.AppendExtra('range_sha1("%s", "%s") == "%s" && '
+ 'ui_print(" Verified.") || '
+ 'ui_print("\\"%s\\" has unexpected contents.");' % (
+ self.device, ranges_str,
+ self.tgt.TotalSha1(include_clobbered_blocks=True),
+ self.device))
+ script.AppendExtra("")
+
+ def WriteVerifyScript(self, script, touched_blocks_only=False):
+ partition = self.partition
+
+ # full OTA
if not self.src:
script.Print("Image %s will be patched unconditionally." % (partition,))
+
+ # incremental OTA
else:
- ranges = self.src.care_map.subtract(self.src.clobbered_blocks)
+ if touched_blocks_only and self.version >= 3:
+ ranges = self.touched_src_ranges
+ expected_sha1 = self.touched_src_sha1
+ else:
+ ranges = self.src.care_map.subtract(self.src.clobbered_blocks)
+ expected_sha1 = self.src.TotalSha1()
+
+ # No blocks to be checked, skipping.
+ if not ranges:
+ return
+
ranges_str = ranges.to_string_raw()
- if self.version >= 3:
+ if self.version >= 4:
+ script.AppendExtra(('if (range_sha1("%s", "%s") == "%s" || '
+ 'block_image_verify("%s", '
+ 'package_extract_file("%s.transfer.list"), '
+ '"%s.new.dat", "%s.patch.dat")) then') % (
+ self.device, ranges_str, expected_sha1,
+ self.device, partition, partition, partition))
+ elif self.version == 3:
script.AppendExtra(('if (range_sha1("%s", "%s") == "%s" || '
'block_image_verify("%s", '
'package_extract_file("%s.transfer.list"), '
'"%s.new.dat", "%s.patch.dat")) then') % (
- self.device, ranges_str, self.src.TotalSha1(),
+ self.device, ranges_str, expected_sha1,
self.device, partition, partition, partition))
else:
script.AppendExtra('if range_sha1("%s", "%s") == "%s" then' % (
script.Print('Verified %s image...' % (partition,))
script.AppendExtra('else')
- # When generating incrementals for the system and vendor partitions,
- # explicitly check the first block (which contains the superblock) of
- # the partition to see if it's what we expect. If this check fails,
- # give an explicit log message about the partition having been
- # remounted R/W (the most likely explanation) and the need to flash to
- # get OTAs working again.
- if self.check_first_block:
- self._CheckFirstBlock(script)
+ if self.version >= 4:
+
+ # Bug: 21124327
+ # When generating incrementals for the system and vendor partitions in
+ # version 4 or newer, explicitly check the first block (which contains
+ # the superblock) of the partition to see if it's what we expect. If
+ # this check fails, give an explicit log message about the partition
+ # having been remounted R/W (the most likely explanation).
+ if self.check_first_block:
+ script.AppendExtra('check_first_block("%s");' % (self.device,))
+
+ # If version >= 4, try block recovery before abort update
+ if partition == "system":
+ code = ErrorCode.SYSTEM_RECOVER_FAILURE
+ else:
+ code = ErrorCode.VENDOR_RECOVER_FAILURE
+ script.AppendExtra((
+ 'ifelse (block_image_recover("{device}", "{ranges}") && '
+ 'block_image_verify("{device}", '
+ 'package_extract_file("{partition}.transfer.list"), '
+ '"{partition}.new.dat", "{partition}.patch.dat"), '
+ 'ui_print("{partition} recovered successfully."), '
+ 'abort("E{code}: {partition} partition fails to recover"));\n'
+ 'endif;').format(device=self.device, ranges=ranges_str,
+ partition=partition, code=code))
# Abort the OTA update. Note that the incremental OTA cannot be applied
# even if it may match the checksum of the target partition.
# a) If version < 3, operations like move and erase will make changes
# unconditionally and damage the partition.
# b) If version >= 3, it won't even reach here.
- script.AppendExtra(('abort("%s partition has unexpected contents");\n'
- 'endif;') % (partition,))
+ else:
+ if partition == "system":
+ code = ErrorCode.SYSTEM_VERIFICATION_FAILURE
+ else:
+ code = ErrorCode.VENDOR_VERIFICATION_FAILURE
+ script.AppendExtra((
+ 'abort("E%d: %s partition has unexpected contents");\n'
+ 'endif;') % (code, partition))
def _WritePostInstallVerifyScript(self, script):
partition = self.partition
self.device, ranges_str,
self._HashZeroBlocks(self.tgt.extended.size())))
script.Print('Verified the updated %s image.' % (partition,))
+ if partition == "system":
+ code = ErrorCode.SYSTEM_NONZERO_CONTENTS
+ else:
+ code = ErrorCode.VENDOR_NONZERO_CONTENTS
script.AppendExtra(
'else\n'
- ' abort("%s partition has unexpected non-zero contents after OTA '
- 'update");\n'
- 'endif;' % (partition,))
+ ' abort("E%d: %s partition has unexpected non-zero contents after '
+ 'OTA update");\n'
+ 'endif;' % (code, partition))
else:
script.Print('Verified the updated %s image.' % (partition,))
+ if partition == "system":
+ code = ErrorCode.SYSTEM_UNEXPECTED_CONTENTS
+ else:
+ code = ErrorCode.VENDOR_UNEXPECTED_CONTENTS
+
script.AppendExtra(
'else\n'
- ' abort("%s partition has unexpected contents after OTA update");\n'
- 'endif;' % (partition,))
+ ' abort("E%d: %s partition has unexpected contents after OTA '
+ 'update");\n'
+ 'endif;' % (code, partition))
def _WriteUpdate(self, script, output_zip):
ZipWrite(output_zip,
'{}.patch.dat'.format(self.partition),
compress_type=zipfile.ZIP_STORED)
+ if self.partition == "system":
+ code = ErrorCode.SYSTEM_UPDATE_FAILURE
+ else:
+ code = ErrorCode.VENDOR_UPDATE_FAILURE
+
call = ('block_image_update("{device}", '
'package_extract_file("{partition}.transfer.list"), '
- '"{partition}.new.dat", "{partition}.patch.dat");\n'.format(
- device=self.device, partition=self.partition))
+ '"{partition}.new.dat", "{partition}.patch.dat") ||\n'
+ ' abort("E{code}: Failed to update {partition} image.");'.format(
+ device=self.device, partition=self.partition, code=code))
script.AppendExtra(script.WordWrap(call))
def _HashBlocks(self, source, ranges): # pylint: disable=no-self-use
return ctx.hexdigest()
- # TODO(tbao): Due to http://b/20939131, block 0 may be changed without
- # remounting R/W. Will change the checking to a finer-grained way to
- # mask off those bits.
- def _CheckFirstBlock(self, script):
- r = rangelib.RangeSet((0, 1))
- srchash = self._HashBlocks(self.src, r)
-
- script.AppendExtra(('(range_sha1("%s", "%s") == "%s") || '
- 'abort("%s has been remounted R/W; '
- 'reflash device to reenable OTA updates");')
- % (self.device, r.to_string_raw(), srchash,
- self.device))
DataImage = blockimgdiff.DataImage
-
# map recovery.fstab's fs_types to mount/format "partition types"
PARTITION_TYPES = {
"yaffs2": "MTD",
"ext4": "EMMC",
"emmc": "EMMC",
"f2fs": "EMMC",
- "squashfs": "EMMC"
+ "squashfs": "EMMC",
+ "ext2": "EMMC",
+ "ext3": "EMMC",
+ "vfat": "EMMC",
+ "osip": "OSIP"
}
def GetTypeAndDevice(mount_point, info):
def ParseCertificate(data):
"""Parse a PEM-format certificate."""
+ from codecs import decode
cert = []
save = False
for line in data.split("\n"):
if "--END CERTIFICATE--" in line:
break
if save:
- cert.append(line)
+ l = line.encode() if hasattr(line, 'encode') else line
+ cert.append(l)
if "--BEGIN CERTIFICATE--" in line:
save = True
- cert = "".join(cert).decode('base64')
+ cert = decode(b"".join(cert), 'base64')
return cert
def MakeRecoveryPatch(input_dir, output_sink, recovery_img, boot_img,
if info_dict is None:
info_dict = OPTIONS.info_dict
- diff_program = ["imgdiff"]
- path = os.path.join(input_dir, "SYSTEM", "etc", "recovery-resource.dat")
- if os.path.exists(path):
- diff_program.append("-b")
- diff_program.append(path)
- bonus_args = "-b /system/etc/recovery-resource.dat"
+ full_recovery_image = info_dict.get("full_recovery_image", None) == "true"
+ system_root_image = info_dict.get("system_root_image", None) == "true"
+
+ if full_recovery_image:
+ output_sink("etc/recovery.img", recovery_img.data)
+
else:
- bonus_args = ""
+ diff_program = ["imgdiff"]
+ path = os.path.join(input_dir, "SYSTEM", "etc", "recovery-resource.dat")
+ if os.path.exists(path):
+ diff_program.append("-b")
+ diff_program.append(path)
+ bonus_args = "-b /system/etc/recovery-resource.dat"
+ else:
+ bonus_args = ""
- d = Difference(recovery_img, boot_img, diff_program=diff_program)
- _, _, patch = d.ComputePatch()
- output_sink("recovery-from-boot.p", patch)
+ d = Difference(recovery_img, boot_img, diff_program=diff_program)
+ _, _, patch = d.ComputePatch()
+ output_sink("recovery-from-boot.p", patch)
try:
# The following GetTypeAndDevice()s need to use the path in the target
except KeyError:
return
- sh = """#!/system/bin/sh
+ if full_recovery_image:
+ sh = """#!/system/bin/sh
+if ! applypatch -c %(type)s:%(device)s:%(size)d:%(sha1)s; then
+ applypatch /system/etc/recovery.img %(type)s:%(device)s %(sha1)s %(size)d && log -t recovery "Installing new recovery image: succeeded" || log -t recovery "Installing new recovery image: failed"
+else
+ log -t recovery "Recovery image already installed"
+fi
+""" % {'type': recovery_type,
+ 'device': recovery_device,
+ 'sha1': recovery_img.sha1,
+ 'size': recovery_img.size}
+ else:
+ sh = """#!/system/bin/sh
+if [ -f /system/etc/recovery-transform.sh ]; then
+ exec sh /system/etc/recovery-transform.sh %(recovery_size)d %(recovery_sha1)s %(boot_size)d %(boot_sha1)s
+fi
if ! applypatch -c %(recovery_type)s:%(recovery_device)s:%(recovery_size)d:%(recovery_sha1)s; then
applypatch %(bonus_args)s %(boot_type)s:%(boot_device)s:%(boot_size)d:%(boot_sha1)s %(recovery_type)s:%(recovery_device)s %(recovery_sha1)s %(recovery_size)d %(boot_sha1)s:/system/recovery-from-boot.p && log -t recovery "Installing new recovery image: succeeded" || log -t recovery "Installing new recovery image: failed"
else
'bonus_args': bonus_args}
# The install script location moved from /system/etc to /system/bin
- # in the L release. Parse the init.rc file to find out where the
+ # in the L release. Parse init.*.rc files to find out where the
# target-files expects it to be, and put it there.
sh_location = "etc/install-recovery.sh"
- try:
- with open(os.path.join(input_dir, "BOOT", "RAMDISK", "init.rc")) as f:
+ found = False
+ if system_root_image:
+ init_rc_dir = os.path.join(input_dir, "ROOT")
+ else:
+ init_rc_dir = os.path.join(input_dir, "BOOT", "RAMDISK")
+ init_rc_files = os.listdir(init_rc_dir)
+ for init_rc_file in init_rc_files:
+ if (not init_rc_file.startswith('init.') or
+ not init_rc_file.endswith('.rc')):
+ continue
+
+ with open(os.path.join(init_rc_dir, init_rc_file)) as f:
for line in f:
m = re.match(r"^service flash_recovery /system/(\S+)\s*$", line)
if m:
sh_location = m.group(1)
- print "putting script in", sh_location
+ found = True
break
- except (OSError, IOError) as e:
- print "failed to read init.rc: %s" % (e,)
+
+ if found:
+ break
+
+ print("putting script in", sh_location)
output_sink(sh_location, sh)