+
+
+class PasswordManager(object):
+ def __init__(self):
+ self.editor = os.getenv("EDITOR", None)
+ self.pwfile = os.getenv("ANDROID_PW_FILE", None)
+
+ def GetPasswords(self, items):
+ """Get passwords corresponding to each string in 'items',
+ returning a dict. (The dict may have keys in addition to the
+ values in 'items'.)
+
+ Uses the passwords in $ANDROID_PW_FILE if available, letting the
+ user edit that file to add more needed passwords. If no editor is
+ available, or $ANDROID_PW_FILE isn't define, prompts the user
+ interactively in the ordinary way.
+ """
+
+ current = self.ReadFile()
+
+ first = True
+ while True:
+ missing = []
+ for i in items:
+ if i not in current or not current[i]:
+ missing.append(i)
+ # Are all the passwords already in the file?
+ if not missing: return current
+
+ for i in missing:
+ current[i] = ""
+
+ if not first:
+ print "key file %s still missing some passwords." % (self.pwfile,)
+ answer = raw_input("try to edit again? [y]> ").strip()
+ if answer and answer[0] not in 'yY':
+ raise RuntimeError("key passwords unavailable")
+ first = False
+
+ current = self.UpdateAndReadFile(current)
+
+ def PromptResult(self, current):
+ """Prompt the user to enter a value (password) for each key in
+ 'current' whose value is fales. Returns a new dict with all the
+ values.
+ """
+ result = {}
+ for k, v in sorted(current.iteritems()):
+ if v:
+ result[k] = v
+ else:
+ while True:
+ result[k] = getpass.getpass("Enter password for %s key> "
+ % (k,)).strip()
+ if result[k]: break
+ return result
+
+ def UpdateAndReadFile(self, current):
+ if not self.editor or not self.pwfile:
+ return self.PromptResult(current)
+
+ f = open(self.pwfile, "w")
+ os.chmod(self.pwfile, 0600)
+ f.write("# Enter key passwords between the [[[ ]]] brackets.\n")
+ f.write("# (Additional spaces are harmless.)\n\n")
+
+ first_line = None
+ sorted = [(not v, k, v) for (k, v) in current.iteritems()]
+ sorted.sort()
+ for i, (_, k, v) in enumerate(sorted):
+ f.write("[[[ %s ]]] %s\n" % (v, k))
+ if not v and first_line is None:
+ # position cursor on first line with no password.
+ first_line = i + 4
+ f.close()
+
+ p = Run([self.editor, "+%d" % (first_line,), self.pwfile])
+ _, _ = p.communicate()
+
+ return self.ReadFile()
+
+ def ReadFile(self):
+ result = {}
+ if self.pwfile is None: return result
+ try:
+ f = open(self.pwfile, "r")
+ for line in f:
+ line = line.strip()
+ if not line or line[0] == '#': continue
+ m = re.match(r"^\[\[\[\s*(.*?)\s*\]\]\]\s*(\S+)$", line)
+ if not m:
+ print "failed to parse password file: ", line
+ else:
+ result[m.group(2)] = m.group(1)
+ f.close()
+ except IOError, e:
+ if e.errno != errno.ENOENT:
+ print "error reading password file: ", str(e)
+ return result
+
+
+def ZipWriteStr(zip, filename, data, perms=0644):
+ # use a fixed timestamp so the output is repeatable.
+ zinfo = zipfile.ZipInfo(filename=filename,
+ date_time=(2009, 1, 1, 0, 0, 0))
+ zinfo.compress_type = zip.compression
+ zinfo.external_attr = perms << 16
+ zip.writestr(zinfo, data)
+
+
+class DeviceSpecificParams(object):
+ module = None
+ def __init__(self, **kwargs):
+ """Keyword arguments to the constructor become attributes of this
+ object, which is passed to all functions in the device-specific
+ module."""
+ for k, v in kwargs.iteritems():
+ setattr(self, k, v)
+
+ if self.module is None:
+ path = OPTIONS.device_specific
+ if not path: return
+ try:
+ if os.path.isdir(path):
+ info = imp.find_module("releasetools", [path])
+ else:
+ d, f = os.path.split(path)
+ b, x = os.path.splitext(f)
+ if x == ".py":
+ f = b
+ info = imp.find_module(f, [d])
+ self.module = imp.load_module("device_specific", *info)
+ except ImportError:
+ print "unable to load device-specific module; assuming none"
+
+ def _DoCall(self, function_name, *args, **kwargs):
+ """Call the named function in the device-specific module, passing
+ the given args and kwargs. The first argument to the call will be
+ the DeviceSpecific object itself. If there is no module, or the
+ module does not define the function, return the value of the
+ 'default' kwarg (which itself defaults to None)."""
+ if self.module is None or not hasattr(self.module, function_name):
+ return kwargs.get("default", None)
+ return getattr(self.module, function_name)(*((self,) + args), **kwargs)
+
+ def FullOTA_Assertions(self):
+ """Called after emitting the block of assertions at the top of a
+ full OTA package. Implementations can add whatever additional
+ assertions they like."""
+ return self._DoCall("FullOTA_Assertions")
+
+ def FullOTA_InstallEnd(self):
+ """Called at the end of full OTA installation; typically this is
+ used to install the image for the device's baseband processor."""
+ return self._DoCall("FullOTA_InstallEnd")
+
+ def IncrementalOTA_Assertions(self):
+ """Called after emitting the block of assertions at the top of an
+ incremental OTA package. Implementations can add whatever
+ additional assertions they like."""
+ return self._DoCall("IncrementalOTA_Assertions")
+
+ def IncrementalOTA_VerifyEnd(self):
+ """Called at the end of the verification phase of incremental OTA
+ installation; additional checks can be placed here to abort the
+ script before any changes are made."""
+ return self._DoCall("IncrementalOTA_VerifyEnd")
+
+ def IncrementalOTA_InstallEnd(self):
+ """Called at the end of incremental OTA installation; typically
+ this is used to install the image for the device's baseband
+ processor."""
+ return self._DoCall("IncrementalOTA_InstallEnd")