# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
+
import copy
import errno
import getopt
from hashlib import sha1 as sha1
+try:
+ raw_input
+except NameError:
+ raw_input = input
+
+
+def iteritems(obj):
+ if hasattr(obj, 'iteritems'):
+ return obj.iteritems()
+ return obj.items()
+
class Options(object):
def __init__(self):
"""Create and return a subprocess.Popen object, printing the command
line on the terminal if -v was specified."""
if OPTIONS.verbose:
- print " running: ", " ".join(args)
+ print(" running: ", " ".join(args))
return subprocess.Popen(args, **kwargs)
if os.path.exists(system_base_fs_file):
d["system_base_fs_file"] = system_base_fs_file
else:
- print "Warning: failed to find system base fs file: %s" % (
- system_base_fs_file,)
+ print("Warning: failed to find system base fs file: %s" % (
+ system_base_fs_file,))
del d["system_base_fs_file"]
if "vendor_base_fs_file" in d:
if os.path.exists(vendor_base_fs_file):
d["vendor_base_fs_file"] = vendor_base_fs_file
else:
- print "Warning: failed to find vendor base fs file: %s" % (
- vendor_base_fs_file,)
+ print("Warning: failed to find vendor base fs file: %s" % (
+ vendor_base_fs_file,))
del d["vendor_base_fs_file"]
+
+ if "device_type" not in d:
+ d["device_type"] = "MMC"
try:
data = read_helper("META/imagesizes.txt")
for line in data.split("\n"):
d["fstab"] = None
else:
d["fstab"] = LoadRecoveryFSTab(read_helper, d["fstab_version"],
- d.get("system_root_image", False))
+ d["device_type"], d.get("system_root_image", False))
d["build.prop"] = LoadBuildProp(read_helper)
return d
try:
data = read_helper("SYSTEM/build.prop")
except KeyError:
- print "Warning: could not find SYSTEM/build.prop in %s" % zip
+ print("Warning: could not find SYSTEM/build.prop in %s" % zip)
data = ""
return LoadDictionaryFromLines(data.split("\n"))
d[name] = value
return d
-def LoadRecoveryFSTab(read_helper, fstab_version, system_root_image=False):
+def LoadRecoveryFSTab(read_helper, fstab_version, type, system_root_image=False):
class Partition(object):
def __init__(self, mount_point, fs_type, device, length, device2, context):
self.mount_point = mount_point
try:
data = read_helper("RECOVERY/RAMDISK/etc/recovery.fstab")
except KeyError:
- print "Warning: could not find RECOVERY/RAMDISK/etc/recovery.fstab"
+ print("Warning: could not find RECOVERY/RAMDISK/etc/recovery.fstab")
data = ""
if fstab_version == 1:
if i.startswith("length="):
length = int(i[7:])
else:
- print "%s: unknown option \"%s\"" % (mount_point, i)
+ print("%s: unknown option \"%s\"" % (mount_point, i))
- d[mount_point] = Partition(mount_point=mount_point, fs_type=pieces[1],
- device=pieces[2], length=length,
- device2=device2)
+ if not d.get(mount_point):
+ d[mount_point] = Partition(mount_point=mount_point, fs_type=pieces[1],
+ device=pieces[2], length=length,
+ device2=device2)
elif fstab_version == 2:
d = {}
context = i
mount_point = pieces[1]
- d[mount_point] = Partition(mount_point=mount_point, fs_type=pieces[2],
- device=pieces[0], length=length,
- device2=None, context=context)
+ if not d.get(mount_point):
+ d[mount_point] = Partition(mount_point=mount_point, fs_type=pieces[2],
+ device=pieces[0], length=length,
+ device2=None, context=context)
else:
raise ValueError("Unknown fstab_version: \"%d\"" % (fstab_version,))
def DumpInfoDict(d):
for k, v in sorted(d.items()):
- print "%-25s = (%s) %s" % (k, type(v).__name__, v)
+ print("%-25s = (%s) %s" % (k, type(v).__name__, v))
def _BuildBootableImage(sourcedir, fs_config_file, info_dict=None,
info_dict = OPTIONS.info_dict
img = tempfile.NamedTemporaryFile()
+ bootimg_key = os.getenv("PRODUCT_PRIVATE_KEY", None)
+ verity_key = os.getenv("PRODUCT_VERITY_KEY", None)
+ custom_boot_signer = os.getenv("PRODUCT_BOOT_SIGNER", None)
if has_ramdisk:
ramdisk_img = make_ramdisk()
- # use MKBOOTIMG from environ, or "mkbootimg" if empty or not set
- mkbootimg = os.getenv('MKBOOTIMG') or "mkbootimg"
-
- cmd = [mkbootimg, "--kernel", os.path.join(sourcedir, "kernel")]
-
- fn = os.path.join(sourcedir, "second")
- if os.access(fn, os.F_OK):
- cmd.append("--second")
- cmd.append(fn)
-
- fn = os.path.join(sourcedir, "cmdline")
+ """check if uboot is requested"""
+ fn = os.path.join(sourcedir, "ubootargs")
if os.access(fn, os.F_OK):
- cmd.append("--cmdline")
- cmd.append(open(fn).read().rstrip("\n"))
-
- fn = os.path.join(sourcedir, "base")
- if os.access(fn, os.F_OK):
- cmd.append("--base")
- cmd.append(open(fn).read().rstrip("\n"))
-
- fn = os.path.join(sourcedir, "pagesize")
- if os.access(fn, os.F_OK):
- cmd.append("--pagesize")
- cmd.append(open(fn).read().rstrip("\n"))
-
- args = info_dict.get("mkbootimg_args", None)
- if args and args.strip():
- cmd.extend(shlex.split(args))
-
- args = info_dict.get("mkbootimg_version_args", None)
- if args and args.strip():
- cmd.extend(shlex.split(args))
-
- if has_ramdisk:
- cmd.extend(["--ramdisk", ramdisk_img.name])
-
- img_unsigned = None
- if info_dict.get("vboot", None):
- img_unsigned = tempfile.NamedTemporaryFile()
- cmd.extend(["--output", img_unsigned.name])
+ cmd = ["mkimage"]
+ for argument in open(fn).read().rstrip("\n").split(" "):
+ cmd.append(argument)
+ cmd.append("-d")
+ cmd.append(os.path.join(sourcedir, "kernel") + ":" + ramdisk_img.name)
+ cmd.append(img.name)
else:
- cmd.extend(["--output", img.name])
+ # use MKBOOTIMG from environ, or "mkbootimg" if empty or not set
+ mkbootimg = os.getenv('MKBOOTIMG') or "mkbootimg"
+
+ cmd = [mkbootimg, "--kernel", os.path.join(sourcedir, "kernel")]
+
+ fn = os.path.join(sourcedir, "second")
+ if os.access(fn, os.F_OK):
+ cmd.append("--second")
+ cmd.append(fn)
+
+ fn = os.path.join(sourcedir, "cmdline")
+ if os.access(fn, os.F_OK):
+ cmd.append("--cmdline")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "base")
+ if os.access(fn, os.F_OK):
+ cmd.append("--base")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "tagsaddr")
+ if os.access(fn, os.F_OK):
+ cmd.append("--tags-addr")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "tags_offset")
+ if os.access(fn, os.F_OK):
+ cmd.append("--tags_offset")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "ramdisk_offset")
+ if os.access(fn, os.F_OK):
+ cmd.append("--ramdisk_offset")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "dt")
+ if os.access(fn, os.F_OK):
+ cmd.append("--dt")
+ cmd.append(fn)
+
+ fn = os.path.join(sourcedir, "pagesize")
+ if os.access(fn, os.F_OK):
+ kernel_pagesize = open(fn).read().rstrip("\n")
+ cmd.append("--pagesize")
+ cmd.append(kernel_pagesize)
+
+ args = info_dict.get("mkbootimg_args", None)
+ if args and args.strip():
+ cmd.extend(shlex.split(args))
+
+ args = info_dict.get("mkbootimg_version_args", None)
+ if args and args.strip():
+ cmd.extend(shlex.split(args))
+
+ if has_ramdisk:
+ cmd.extend(["--ramdisk", ramdisk_img.name])
+
+ img_unsigned = None
+ if info_dict.get("vboot", None):
+ img_unsigned = tempfile.NamedTemporaryFile()
+ cmd.extend(["--output", img_unsigned.name])
+ else:
+ cmd.extend(["--output", img.name])
p = Run(cmd, stdout=subprocess.PIPE)
p.communicate()
assert p.returncode == 0, "mkbootimg of %s image failed" % (
os.path.basename(sourcedir),)
+ if custom_boot_signer and bootimg_key and os.path.exists(bootimg_key):
+ print("Signing bootable image with custom boot signer...")
+ img_secure = tempfile.NamedTemporaryFile()
+ p = Run([custom_boot_signer, img.name, img_secure.name], stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ shutil.copyfile(img_secure.name, img.name)
+ img_secure.close()
+ elif bootimg_key and os.path.exists(bootimg_key) and kernel_pagesize > 0:
+ print("Signing bootable image...")
+ bootimg_key_passwords = {}
+ bootimg_key_passwords.update(PasswordManager().GetPasswords(bootimg_key.split()))
+ bootimg_key_password = bootimg_key_passwords[bootimg_key]
+ if bootimg_key_password is not None:
+ bootimg_key_password += "\n"
+ img_sha256 = tempfile.NamedTemporaryFile()
+ img_sig = tempfile.NamedTemporaryFile()
+ img_sig_padded = tempfile.NamedTemporaryFile()
+ img_secure = tempfile.NamedTemporaryFile()
+ p = Run(["openssl", "dgst", "-sha256", "-binary", "-out", img_sha256.name, img.name],
+ stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["openssl", "rsautl", "-sign", "-in", img_sha256.name, "-inkey", bootimg_key, "-out",
+ img_sig.name, "-passin", "stdin"], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ p.communicate(bootimg_key_password)
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["dd", "if=/dev/zero", "of=%s" % img_sig_padded.name, "bs=%s" % kernel_pagesize,
+ "count=1"], stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["dd", "if=%s" % img_sig.name, "of=%s" % img_sig_padded.name, "conv=notrunc"],
+ stdout=subprocess.PIPE)
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ p = Run(["cat", img.name, img_sig_padded.name], stdout=img_secure.file.fileno())
+ p.communicate()
+ assert p.returncode == 0, "signing of bootable image failed"
+ shutil.copyfile(img_secure.name, img.name)
+ img_sha256.close()
+ img_sig.close()
+ img_sig_padded.close()
+ img_secure.close()
+
if (info_dict.get("boot_signer", None) == "true" and
info_dict.get("verity_key", None)):
path = "/" + os.path.basename(sourcedir).lower()
cmd.extend([path, img.name,
info_dict["verity_key"] + ".pk8",
info_dict["verity_key"] + ".x509.pem", img.name])
- p = Run(cmd, stdout=subprocess.PIPE)
- p.communicate()
+ verity_key_password = None
+
+ if verity_key and os.path.exists(verity_key+".pk8") and kernel_pagesize > 0:
+ verity_key_passwords = {}
+ verity_key_passwords.update(PasswordManager().GetPasswords(verity_key.split()))
+ verity_key_password = verity_key_passwords[verity_key]
+
+ if verity_key_password is not None:
+ verity_key_password += "\n"
+ p = Run(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ p.communicate(verity_key_password)
+ else:
+ p = Run(cmd)
+ p.communicate()
+
assert p.returncode == 0, "boot_signer of %s image failed" % path
# Sign the image if vboot is non-empty.
prebuilt_path = os.path.join(unpack_dir, "BOOTABLE_IMAGES", prebuilt_name)
if os.path.exists(prebuilt_path):
- print "using prebuilt %s from BOOTABLE_IMAGES..." % (prebuilt_name,)
+ print("using prebuilt %s from BOOTABLE_IMAGES..." % prebuilt_name)
return File.FromLocalFile(name, prebuilt_path)
prebuilt_path = os.path.join(unpack_dir, "IMAGES", prebuilt_name)
if os.path.exists(prebuilt_path):
- print "using prebuilt %s from IMAGES..." % (prebuilt_name,)
+ print("using prebuilt %s from IMAGES..." % prebuilt_name)
return File.FromLocalFile(name, prebuilt_path)
- print "building image from target_files %s..." % (tree_subdir,)
+ print("building image from target_files %s..." % tree_subdir)
if info_dict is None:
info_dict = OPTIONS.info_dict
OPTIONS.tempfiles.append(tmp)
def unzip_to_dir(filename, dirname):
+ subprocess.call(["rm", "-rf", dirname + filename, "targetfiles-*"])
cmd = ["unzip", "-o", "-q", filename, "-d", dirname]
if pattern is not None:
cmd.append(pattern)
if p.returncode == 0:
# Encrypted key with empty string as password.
key_passwords[k] = ''
- elif stderr.startswith('Error decrypting key'):
+ elif stderr.startswith(b'Error decrypting key'):
# Definitely encrypted key.
# It would have said "Error reading key" if it didn't parse correctly.
need_passwords.append(k)
fs_type = None
limit = None
if info_dict["fstab"]:
+ if mount_point == "/userdata_extra":
+ mount_point = "/data"
if mount_point == "/userdata":
mount_point = "/data"
p = info_dict["fstab"][mount_point]
if pct >= 99.0:
raise ExternalError(msg)
elif pct >= 95.0:
- print
- print " WARNING: ", msg
- print
+ print()
+ print(" WARNING: ", msg)
+ print()
elif OPTIONS.verbose:
- print " ", msg
+ print(" ", msg)
def ReadApkCerts(tf_zip):
"""
def Usage(docstring):
- print docstring.rstrip("\n")
- print COMMON_DOCSTRING
+ print(docstring.rstrip("\n"))
+ print(COMMON_DOCSTRING)
def ParseOptions(argv,
list(extra_long_opts))
except getopt.GetoptError as err:
Usage(docstring)
- print "**", str(err), "**"
+ print("**", str(err), "**")
sys.exit(2)
for o, a in opts:
current[i] = ""
if not first:
- print "key file %s still missing some passwords." % (self.pwfile,)
+ print("key file %s still missing some passwords." % self.pwfile)
answer = raw_input("try to edit again? [y]> ").strip()
if answer and answer[0] not in 'yY':
raise RuntimeError("key passwords unavailable")
values.
"""
result = {}
- for k, v in sorted(current.iteritems()):
+ for k, v in sorted(iteritems(current)):
if v:
result[k] = v
else:
f.write("# (Additional spaces are harmless.)\n\n")
first_line = None
- sorted_list = sorted([(not v, k, v) for (k, v) in current.iteritems()])
+ sorted_list = sorted((not v, k, v) for (k, v) in current.items())
for i, (_, k, v) in enumerate(sorted_list):
f.write("[[[ %s ]]] %s\n" % (v, k))
if not v and first_line is None:
continue
m = re.match(r"^\[\[\[\s*(.*?)\s*\]\]\]\s*(\S+)$", line)
if not m:
- print "failed to parse password file: ", line
+ print("failed to parse password file: ", line)
else:
result[m.group(2)] = m.group(1)
f.close()
except IOError as e:
if e.errno != errno.ENOENT:
- print "error reading password file: ", str(e)
+ print("error reading password file: ", str(e))
return result
"""Keyword arguments to the constructor become attributes of this
object, which is passed to all functions in the device-specific
module."""
- for k, v in kwargs.iteritems():
+ for k, v in iteritems(kwargs):
setattr(self, k, v)
self.extras = OPTIONS.extras
if x == ".py":
f = b
info = imp.find_module(f, [d])
- print "loaded device-specific extensions from", path
+ print("loaded device-specific extensions from", path)
self.module = imp.load_module("device_specific", *info)
except ImportError:
- print "unable to load device-specific module; assuming none"
+ print("unable to load device-specific module; assuming none")
def _DoCall(self, function_name, *args, **kwargs):
"""Call the named function in the device-specific module, passing
used to install the image for the device's baseband processor."""
return self._DoCall("FullOTA_InstallEnd")
+ def FullOTA_PostValidate(self):
+ """Called after installing and validating /system; typically this is
+ used to resize the system partition after a block based installation."""
+ return self._DoCall("FullOTA_PostValidate")
+
def IncrementalOTA_Assertions(self):
"""Called after emitting the block of assertions at the top of an
incremental OTA package. Implementations can add whatever
th.start()
th.join(timeout=300) # 5 mins
if th.is_alive():
- print "WARNING: diff command timed out"
+ print("WARNING: diff command timed out")
p.terminate()
th.join(5)
if th.is_alive():
th.join()
if err or p.returncode != 0:
- print "WARNING: failure running %s:\n%s\n" % (
- diff_program, "".join(err))
+ print("WARNING: failure running %s:\n%s\n" % (
+ cmd, "".join(err)))
self.patch = None
return None, None, None
diff = ptemp.read()
def ComputeDifferences(diffs):
"""Call ComputePatch on all the Difference objects in 'diffs'."""
- print len(diffs), "diffs to compute"
+ print(len(diffs), "diffs to compute")
# Do the largest files first, to try and reduce the long-pole effect.
by_size = [(i.tf.size, i) for i in diffs]
else:
name = "%s (%s)" % (tf.name, sf.name)
if patch is None:
- print "patching failed! %s" % (name,)
+ print("patching failed! %s" % name)
else:
- print "%8.2f sec %8d / %8d bytes (%6.2f%%) %s" % (
- dur, len(patch), tf.size, 100.0 * len(patch) / tf.size, name)
+ print("%8.2f sec %8d / %8d bytes (%6.2f%%) %s" % (
+ dur, len(patch), tf.size, 100.0 * len(patch) / tf.size, name))
lock.release()
except Exception as e:
- print e
+ print(e)
raise
# start worker threads; wait for them all to finish.
"ext4": "EMMC",
"emmc": "EMMC",
"f2fs": "EMMC",
- "squashfs": "EMMC"
+ "squashfs": "EMMC",
+ "ext2": "EMMC",
+ "ext3": "EMMC",
+ "vfat": "EMMC",
+ "osip": "OSIP"
}
def GetTypeAndDevice(mount_point, info):
def ParseCertificate(data):
"""Parse a PEM-format certificate."""
+ from codecs import decode
cert = []
save = False
for line in data.split("\n"):
if "--END CERTIFICATE--" in line:
break
if save:
- cert.append(line)
+ l = line.encode() if hasattr(line, 'encode') else line
+ cert.append(l)
if "--BEGIN CERTIFICATE--" in line:
save = True
- cert = "".join(cert).decode('base64')
+ cert = decode(b"".join(cert), 'base64')
return cert
def MakeRecoveryPatch(input_dir, output_sink, recovery_img, boot_img,
'size': recovery_img.size}
else:
sh = """#!/system/bin/sh
+if [ -f /system/etc/recovery-transform.sh ]; then
+ exec sh /system/etc/recovery-transform.sh %(recovery_size)d %(recovery_sha1)s %(boot_size)d %(boot_sha1)s
+fi
if ! applypatch -c %(recovery_type)s:%(recovery_device)s:%(recovery_size)d:%(recovery_sha1)s; then
applypatch %(bonus_args)s %(boot_type)s:%(boot_device)s:%(boot_size)d:%(boot_sha1)s %(recovery_type)s:%(recovery_device)s %(recovery_sha1)s %(recovery_size)d %(boot_sha1)s:/system/recovery-from-boot.p && log -t recovery "Installing new recovery image: succeeded" || log -t recovery "Installing new recovery image: failed"
else
if found:
break
- print "putting script in", sh_location
+ print("putting script in", sh_location)
output_sink(sh_location, sh)