west/src/west/manifest.py

1841 lines
71 KiB
Python

# Copyright (c) 2018, 2019, 2020 Nordic Semiconductor ASA
# Copyright 2018, 2019 Foundries.io Ltd
#
# SPDX-License-Identifier: Apache-2.0
'''
Parser and abstract data types for west manifests.
'''
import configparser
import enum
import errno
import logging
import os
from pathlib import PurePosixPath, Path
import shlex
import subprocess
import sys
from typing import Any, Callable, Dict, Iterable, List, NoReturn, \
NamedTuple, Optional, Tuple, TYPE_CHECKING, Union
from packaging.version import parse as parse_version
import pykwalify.core
import yaml
from west import util
from west.util import PathType, escapes_directory
import west.configuration as cfg
#
# Public constants
#
#: Index in a Manifest.projects attribute where the `ManifestProject`
#: instance for the workspace is stored.
MANIFEST_PROJECT_INDEX = 0
#: A git revision which points to the most recent `Project` update.
MANIFEST_REV_BRANCH = 'manifest-rev'
#: A fully qualified reference to `MANIFEST_REV_BRANCH`.
QUAL_MANIFEST_REV_BRANCH = 'refs/heads/' + MANIFEST_REV_BRANCH
#: Git ref space used by west for internal purposes.
QUAL_REFS_WEST = 'refs/west/'
#: The latest manifest schema version supported by this west program.
#:
#: This value changes when a new version of west includes new manifest
#: file features not supported by earlier versions of west.
SCHEMA_VERSION = '0.7'
# MAINTAINERS:
#
# If you want to update the schema version, you need to make sure that
# it has the exact same value as west.version.__version__ when the
# next release is cut.
#
# Internal helpers
#
# Type aliases
# The value of a west-commands as passed around during manifest
# resolution. It can become a list due to resolving imports, even
# though it's just a str in each individual file right now.
WestCommandsType = Union[str, List[str]]
# Type for the importer callback passed to the manifest constructor.
# (ImportedContentType is just an alias for what it gives back.)
ImportedContentType = Optional[Union[str, List[str]]]
ImporterType = Callable[['Project', str], ImportedContentType]
# Type for an import map filter function, which takes a Project and
# returns a bool. The various whitelists and blacklists are used to
# create these filter functions. A None value is treated as a function
# which always returns True.
ImapFilterFnType = Optional[Callable[['Project'], bool]]
# The parsed contents of a manifest YAML file as returned by _load(),
# after sanitychecking with validate().
ManifestDataType = Union[str, Dict]
# Logging
_logger = logging.getLogger(__name__)
# Manifest locating, parsing, loading, etc.
class _defaults(NamedTuple):
remote: Optional[str]
revision: str
_DEFAULT_REV = 'master'
_WEST_YML = 'west.yml'
_SCHEMA_PATH = os.path.join(os.path.dirname(__file__), "manifest-schema.yml")
_SCHEMA_VER = parse_version(SCHEMA_VERSION)
_EARLIEST_VER_STR = '0.6.99' # we introduced the version feature after 0.6
_EARLIEST_VER = parse_version(_EARLIEST_VER_STR)
def _is_yml(path: PathType) -> bool:
return Path(path).suffix in ['.yml', '.yaml']
def _load(data: str) -> Any:
try:
return yaml.safe_load(data)
except yaml.scanner.ScannerError as e:
raise MalformedManifest(data) from e
def _west_commands_list(west_commands: Optional[WestCommandsType]) -> \
List[str]:
# Convert the raw data from a manifest file to a list of
# west_commands locations. (If it's already a list, make a
# defensive copy.)
if west_commands is None:
return []
elif isinstance(west_commands, str):
return [west_commands]
else:
return list(west_commands)
def _west_commands_maybe_delist(west_commands: List[str]) -> WestCommandsType:
# Convert a west_commands list to a string if there's
# just one element, otherwise return the list itself.
if len(west_commands) == 1:
return west_commands[0]
else:
return west_commands
def _west_commands_merge(wc1: List[str], wc2: List[str]) -> List[str]:
# Merge two west_commands lists, filtering out duplicates.
if wc1 and wc2:
return wc1 + [wc for wc in wc2 if wc not in wc1]
else:
return wc1 or wc2
def _mpath(cp: Optional[configparser.ConfigParser] = None,
topdir: Optional[PathType] = None) -> Tuple[str, str]:
# Return the value of the manifest.path configuration option
# in *cp*, a ConfigParser. If not given, create a new one and
# load configuration options with the given *topdir* as west
# workspace root.
#
# TODO: write a cfg.get(section, key)
# wrapper, with friends for update and delete, to avoid
# requiring this boilerplate.
if cp is None:
cp = cfg._configparser()
cfg.read_config(configfile=cfg.ConfigFile.LOCAL, config=cp, topdir=topdir)
try:
path = cp.get('manifest', 'path')
filename = cp.get('manifest', 'file', fallback=_WEST_YML)
return (path, filename)
except (configparser.NoOptionError, configparser.NoSectionError) as e:
raise MalformedConfig('no "manifest.path" config option is set') from e
# Manifest import handling
def _default_importer(project: 'Project', file: str) -> NoReturn:
raise ManifestImportFailed(project, file)
def _manifest_content_at(project: 'Project', path: PathType,
rev: str = QUAL_MANIFEST_REV_BRANCH) \
-> ImportedContentType:
# Get a list of manifest data from project at path
#
# The data are loaded from Git at ref QUAL_MANIFEST_REV_BRANCH,
# *NOT* the file system.
#
# If path is a tree at that ref, the contents of the YAML files
# inside path are returned, as strings. If it's a file at that
# ref, it's a string with its contents.
#
# Though this module and the "west update" implementation share
# this code, it's an implementation detail, not API.
path = os.fspath(path)
_logger.debug(f'{project.name}: looking up path {path} type at {rev}')
# Returns 'blob', 'tree', etc. for path at revision, if it exists.
out = project.git(['ls-tree', rev, path], capture_stdout=True,
capture_stderr=True).stdout
if not out:
# It's a bit inaccurate to raise FileNotFoundError for
# something that isn't actually file, but this is internal
# API, and git is a content addressable file system, so close
# enough!
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), path)
ptype = out.decode('utf-8').split()[1]
if ptype == 'blob':
# Importing a file: just return its content.
return project.read_at(path, rev=rev).decode('utf-8')
elif ptype == 'tree':
# Importing a tree: return the content of the YAML files inside it.
ret = []
# Use a PurePosixPath because that's the form git seems to
# store internally, even on Windows.
pathobj = PurePosixPath(path)
for f in filter(_is_yml, project.listdir_at(path, rev=rev)):
ret.append(project.read_at(pathobj / f, rev=rev).decode('utf-8'))
return ret
else:
raise MalformedManifest(f"can't decipher project {project.name} "
f'path {path} revision {rev} '
f'(git type: {ptype})')
class _import_map(NamedTuple):
file: str
name_whitelist: List[str]
path_whitelist: List[str]
name_blacklist: List[str]
path_blacklist: List[str]
path_prefix: str
def _is_imap_list(value: Any) -> bool:
# Return True if the value is a valid import map 'blacklist' or
# 'whitelist'. Empty strings and lists are OK, and list nothing.
return (isinstance(value, str) or
(isinstance(value, list) and
all(isinstance(item, str) for item in value)))
def _imap_filter(imap: _import_map) -> ImapFilterFnType:
# Returns either None (if no filter is necessary) or a
# filter function for the given import map.
if any([imap.name_whitelist, imap.path_whitelist,
imap.name_blacklist, imap.path_blacklist]):
return lambda project: _is_imap_ok(imap, project)
else:
return None
def _ensure_list(item: Union[str, List[str]]) -> List[str]:
# Converts item to a list containing it if item is a string, or
# returns item.
if isinstance(item, str):
return [item]
return item
def _is_imap_ok(imap: _import_map, project: 'Project') -> bool:
# Return True if a project passes an import map's filters,
# and False otherwise.
nwl, pwl, nbl, pbl = [_ensure_list(lst) for lst in
(imap.name_whitelist, imap.path_whitelist,
imap.name_blacklist, imap.path_blacklist)]
name = project.name
path = Path(project.path)
blacklisted = (name in nbl) or any(path.match(p) for p in pbl)
whitelisted = (name in nwl) or any(path.match(p) for p in pwl)
no_whitelists = not (nwl or pwl)
if blacklisted:
return whitelisted
else:
return whitelisted or no_whitelists
class _import_ctx(NamedTuple):
projects: Dict[str, 'Project']
filter_fn: ImapFilterFnType
path_prefix: Path
def _new_ctx(ctx: _import_ctx, imap: _import_map) -> _import_ctx:
# Combine the map data from "some-map" in a manifest's
# "import: some-map" into an existing import context type,
# returning the new context.
return _import_ctx(ctx.projects,
_and_filters(ctx.filter_fn, _imap_filter(imap)),
ctx.path_prefix / imap.path_prefix)
def _filter_ok(filter_fn: ImapFilterFnType,
project: 'Project') -> bool:
# filter_fn(project) if filter_fn is not None; True otherwise.
return (filter_fn is None) or filter_fn(project)
def _and_filters(filter_fn1: ImapFilterFnType,
filter_fn2: ImapFilterFnType) -> ImapFilterFnType:
# Return a filter_fn which is the logical AND of the two
# arguments.
if filter_fn1 and filter_fn2:
# These type annotated versions silence mypy warnings.
fn1: Callable[['Project'], bool] = filter_fn1
fn2: Callable[['Project'], bool] = filter_fn2
return lambda project: (fn1(project) and fn2(project))
else:
return filter_fn1 or filter_fn2
#
# Public functions
#
def manifest_path() -> str:
'''Absolute path of the manifest file in the current workspace.
Exceptions raised:
- `west.util.WestNotFound` if called from outside of a west
workspace
- `MalformedConfig` if the configuration file has no
``manifest.path`` key
- ``FileNotFoundError`` if no manifest file exists as determined by
``manifest.path`` and ``manifest.file``
'''
(mpath, mname) = _mpath()
ret = os.path.join(util.west_topdir(), mpath, mname)
# It's kind of annoying to manually instantiate a FileNotFoundError.
# This seems to be the best way.
if not os.path.isfile(ret):
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), ret)
return ret
def validate(data: Any) -> None:
'''Validate manifest data
Raises an exception if the manifest data is not valid for loading
by this version of west. (Actually attempting to load the data may
still fail if the it contains imports which cannot be resolved.)
:param data: YAML manifest data as a string or object
'''
if isinstance(data, str):
as_str = data
data = _load(data)
if not isinstance(data, dict):
raise MalformedManifest(f'{as_str} is not a YAML dictionary')
elif not isinstance(data, dict):
raise TypeError(f'{data} has type {type(data)}, '
'expected valid manifest data')
if 'manifest' not in data:
raise MalformedManifest('manifest data contains no "manifest" key')
data = data['manifest']
# Make sure this version of west can load this manifest data.
# This has to happen before the schema check -- later schemas
# may incompatibly extend this one.
if 'version' in data:
# As a convenience for the user, convert floats to strings.
# This avoids forcing them to write:
#
# version: "1.0"
#
# by explicitly allowing:
#
# version: 1.0
min_version_str = str(data['version'])
min_version = parse_version(min_version_str)
if min_version > _SCHEMA_VER:
raise ManifestVersionError(min_version_str)
elif min_version < _EARLIEST_VER:
raise MalformedManifest(
f'invalid version {min_version_str}; '
f'lowest schema version is {_EARLIEST_VER_STR}')
try:
pykwalify.core.Core(source_data=data,
schema_files=[_SCHEMA_PATH]).validate()
except pykwalify.errors.SchemaError as se:
raise MalformedManifest(se.msg) from se
#
# Exception types
#
class MalformedManifest(Exception):
'''Manifest parsing failed due to invalid data.
'''
class MalformedConfig(Exception):
'''The west configuration was malformed in a way that made a
manifest operation fail.
'''
class ManifestImportFailed(Exception):
'''An operation required to resolve a manifest failed.
Attributes:
- ``project``: the Project instance with the missing manifest data
- ``filename``: the missing file, as a str
'''
def __init__(self, project: 'Project', filename: PathType):
super().__init__(project, filename)
self.project = project
self.filename = os.fspath(filename)
def __str__(self):
return (f'ManifestImportFailed: project {self.project} '
f'file {self.filename}')
class ManifestVersionError(Exception):
'''The manifest required a version of west more recent than the
current version.
'''
def __init__(self, version: str, file: Optional[PathType] = None):
super().__init__(version, file)
self.version = version
'''The minimum version of west that was required.'''
self.file = os.fspath(file) if file else None
'''The file that required this version of west, if any.'''
class _ManifestImportDepth(ManifestImportFailed):
# A hack to signal to main.py what happened.
pass
#
# The main Manifest class and its public helper types, like Project
# and ImportFlag.
#
class ImportFlag(enum.IntFlag):
'''Bit flags for handling imports when resolving a manifest.
The DEFAULT (0) value allows reading the file system to resolve
"self: import:", and running git to resolve a "projects:" import.
Other flags:
- IGNORE: ignore projects added via "import:" in "self:" and "projects:"
- FORCE_PROJECTS: always invoke importer callback for "projects:" imports
- IGNORE_PROJECTS: ignore projects added via "import:" in "projects:" only;
including any projects added via "import:" in "self:"
Note that any "path-prefix:" values set in an "import:" still take
effect for the project itself even when IGNORE or IGNORE_PROJECTS are
given. For example, in this manifest:
manifest:
projects:
- name: foo
import:
path-prefix: bar
Project 'foo' has path 'bar/foo' regardless of whether IGNORE or
IGNORE_PROJECTS is given. This ensures the Project has the same path
attribute as it normally would if imported projects weren't being
ignored.
'''
DEFAULT = 0
IGNORE = 1
FORCE_PROJECTS = 2
IGNORE_PROJECTS = 4
def _flags_ok(flags: ImportFlag) -> bool:
# Sanity-check the combination of flags.
F_I = ImportFlag.IGNORE
F_FP = ImportFlag.FORCE_PROJECTS
F_IP = ImportFlag.IGNORE_PROJECTS
if (flags & F_I) or (flags & F_IP):
return not flags & F_FP
elif flags & (F_FP | F_IP):
return bool((flags & F_FP) ^ (flags & F_IP))
else:
return True
class Project:
'''Represents a project defined in a west manifest.
Attributes:
- ``name``: project's unique name
- ``url``: project fetch URL
- ``revision``: revision to fetch from ``url`` when the
project is updated
- ``path``: relative path to the project within the workspace
(i.e. from ``topdir`` if that is set)
- ``abspath``: absolute path to the project in the native path name
format (or ``None`` if ``topdir`` is)
- ``posixpath``: like ``abspath``, but with slashes (``/``) as
path separators
- ``clone_depth``: clone depth to fetch when first cloning the
project, or ``None`` (the revision should not be a SHA
if this is used)
- ``west_commands``: list of YAML files where extension commands in
the project are declared
- ``topdir``: the top level directory of the west workspace
the project is part of, or ``None``
- ``remote_name``: the name of the remote which should be set up
when the project is being cloned (default: 'origin')
'''
def __eq__(self, other):
return NotImplemented
def __repr__(self):
return (f'Project("{self.name}", "{self.url}", '
f'revision="{self.revision}", path={repr(self.path)}, '
f'clone_depth={self.clone_depth}, '
f'west_commands={self.west_commands}, '
f'topdir={repr(self.topdir)})')
def __str__(self):
path_repr = repr(self.abspath or self.path)
return f'<Project {self.name} ({path_repr}) at {self.revision}>'
def __init__(self, name: str, url: str,
revision: Optional[str] = None,
path: Optional[PathType] = None,
clone_depth: Optional[int] = None,
west_commands: Optional[WestCommandsType] = None,
topdir: Optional[PathType] = None,
remote_name: Optional[str] = None):
'''Project constructor.
If *topdir* is ``None``, then absolute path attributes
(``abspath`` and ``posixpath``) will also be ``None``.
:param name: project's ``name:`` attribute in the manifest
:param url: fetch URL
:param revision: fetch revision
:param path: path (relative to topdir), or None for *name*
:param clone_depth: depth to use for initial clone
:param west_commands: path to a west commands specification YAML
file in the project, relative to its base directory,
or list of these
:param topdir: the west workspace's top level directory
:param remote_name: the name of the remote which should be
set up if the project is being cloned (default: 'origin')
'''
self.name = name
self.url = url
self.revision = revision or _DEFAULT_REV
self.clone_depth = clone_depth
self.path = os.fspath(path or name)
self.west_commands = _west_commands_list(west_commands)
self.topdir = os.fspath(topdir) if topdir else None
self.remote_name = remote_name or 'origin'
@property
def path(self) -> str:
return self._path
@path.setter
def path(self, path: PathType) -> None:
self._path: str = os.fspath(path)
# Invalidate the absolute path attributes. They'll get
# computed again next time they're accessed.
self._abspath: Optional[str] = None
self._posixpath: Optional[str] = None
@property
def abspath(self) -> Optional[str]:
if self._abspath is None and self.topdir:
self._abspath = os.path.abspath(Path(self.topdir) /
self.path)
return self._abspath
@property
def posixpath(self) -> Optional[str]:
if self._posixpath is None and self.abspath is not None:
self._posixpath = Path(self.abspath).as_posix()
return self._posixpath
@property
def name_and_path(self) -> str:
return f'{self.name} ({self.path})'
def as_dict(self) -> Dict:
'''Return a representation of this object as a dict, as it
would be parsed from an equivalent YAML manifest.
'''
ret: Dict = {}
ret['name'] = self.name
ret['url'] = self.url
ret['revision'] = self.revision
if self.path != self.name:
ret['path'] = self.path
if self.clone_depth:
ret['clone-depth'] = self.clone_depth
if self.west_commands:
ret['west-commands'] = \
_west_commands_maybe_delist(self.west_commands)
return ret
#
# Git helpers
#
def git(self, cmd: Union[str, List[str]],
extra_args: Iterable[str] = (),
capture_stdout: bool = False,
capture_stderr: bool = False,
check: bool = True,
cwd: Optional[PathType] = None) -> subprocess.CompletedProcess:
'''Run a git command in the project repository.
:param cmd: git command as a string (or list of strings)
:param extra_args: sequence of additional arguments to pass to
the git command (useful mostly if *cmd* is a string).
:param capture_stdout: if True, git's standard output is
captured in the ``CompletedProcess`` instead of being
printed.
:param capture_stderr: Like *capture_stdout*, but for standard
error. Use with caution: this may prevent error messages
from being shown to the user.
:param check: if given, ``subprocess.CalledProcessError`` is
raised if git finishes with a non-zero return code
:param cwd: directory to run git in (default: ``self.abspath``)
'''
if isinstance(cmd, str):
cmd_list = shlex.split(cmd)
else:
cmd_list = list(cmd)
extra_args = list(extra_args)
if cwd is None:
if self.abspath is not None:
cwd = self.abspath
else:
raise ValueError('no abspath; cwd must be given')
elif sys.version_info < (3, 6, 1) and not isinstance(cwd, str):
# Popen didn't accept a PathLike cwd on Windows until
# python v3.7; this was backported onto cpython v3.6.1,
# though. West currently supports "python 3.6", though, so
# in the unlikely event someone is running 3.6.0 on
# Windows, do the right thing.
cwd = os.fspath(cwd)
args = ['git'] + cmd_list + extra_args
cmd_str = util.quote_sh_list(args)
_logger.debug(f"running '{cmd_str}' in {cwd}")
popen = subprocess.Popen(
args, cwd=cwd,
stdout=subprocess.PIPE if capture_stdout else None,
stderr=subprocess.PIPE if capture_stderr else None)
stdout, stderr = popen.communicate()
# We use logger style % formatting here to avoid the
# potentially expensive overhead of formatting long
# stdout/stderr strings if the current log level isn't DEBUG,
# which is the usual case.
_logger.debug('"%s" exit code: %d stdout: %r stderr: %r',
cmd_str, popen.returncode, stdout, stderr)
if check and popen.returncode:
raise subprocess.CalledProcessError(popen.returncode, cmd_list,
output=stdout, stderr=stderr)
else:
return subprocess.CompletedProcess(popen.args, popen.returncode,
stdout, stderr)
def sha(self, rev: str, cwd: Optional[PathType] = None) -> str:
'''Get the SHA for a project revision.
:param rev: git revision (HEAD, v2.0.0, etc.) as a string
:param cwd: directory to run command in (default:
self.abspath)
'''
# Though we capture stderr, it will be available as the stderr
# attribute in the CalledProcessError raised by git() in
# Python 3.5 and above if this call fails.
cp = self.git(f'rev-parse {rev}', capture_stdout=True, cwd=cwd,
capture_stderr=True)
# Assumption: SHAs are hex values and thus safe to decode in ASCII.
# It'll be fun when we find out that was wrong and how...
return cp.stdout.decode('ascii').strip()
def is_ancestor_of(self, rev1: str, rev2: str,
cwd: Optional[PathType] = None) -> bool:
'''Check if 'rev1' is an ancestor of 'rev2' in this project.
Returns True if rev1 is an ancestor commit of rev2 in the
given project; rev1 and rev2 can be anything that resolves to
a commit. (If rev1 and rev2 refer to the same commit, the
return value is True, i.e. a commit is considered an ancestor
of itself.) Returns False otherwise.
:param rev1: commit that could be the ancestor of *rev2*
:param rev2: commit that could be a descendant or *rev1*
:param cwd: directory to run command in (default:
``self.abspath``)
'''
rc = self.git(f'merge-base --is-ancestor {rev1} {rev2}',
check=False, cwd=cwd).returncode
if rc == 0:
return True
elif rc == 1:
return False
else:
raise RuntimeError(f'unexpected git merge-base result {rc}')
def is_up_to_date_with(self, rev: str,
cwd: Optional[PathType] = None) -> bool:
'''Check if the project is up to date with *rev*, returning
``True`` if so.
This is equivalent to ``is_ancestor_of(rev, 'HEAD',
cwd=cwd)``.
:param rev: base revision to check if project is up to date
with.
:param cwd: directory to run command in (default:
``self.abspath``)
'''
return self.is_ancestor_of(rev, 'HEAD', cwd=cwd)
def is_up_to_date(self, cwd: Optional[PathType] = None) -> bool:
'''Check if the project HEAD is up to date with the manifest.
This is equivalent to ``is_up_to_date_with(self.revision,
cwd=cwd)``.
:param cwd: directory to run command in (default:
``self.abspath``)
'''
return self.is_up_to_date_with(self.revision, cwd=cwd)
def is_cloned(self, cwd: Optional[PathType] = None) -> bool:
'''Returns ``True`` if ``self.abspath`` looks like a git
repository's top-level directory, and ``False`` otherwise.
:param cwd: directory to run command in (default:
``self.abspath``)
'''
if not self.abspath or not os.path.isdir(self.abspath):
return False
# --is-inside-work-tree doesn't require that the directory is
# the top-level directory of a Git repository. Use --show-cdup
# instead, which prints an empty string (i.e., just a newline,
# which we strip) for the top-level directory.
_logger.debug(f'{self.name}: checking if cloned')
res = self.git('rev-parse --show-cdup', check=False, cwd=cwd,
capture_stderr=True, capture_stdout=True)
return not (res.returncode or res.stdout.strip())
def read_at(self, path: PathType, rev: Optional[str] = None,
cwd: Optional[PathType] = None) -> bytes:
'''Read file contents in the project at a specific revision.
:param path: relative path to file in this project
:param rev: revision to read *path* from (default: ``self.revision``)
:param cwd: directory to run command in (default: ``self.abspath``)
'''
if rev is None:
rev = self.revision
cp = self.git(['show', f'{rev}:{os.fspath(path)}'],
capture_stdout=True, capture_stderr=True, cwd=cwd)
return cp.stdout
def listdir_at(self, path: PathType, rev: Optional[str] = None,
cwd: Optional[PathType] = None,
encoding: Optional[str] = None) -> List[str]:
'''List of directory contents in the project at a specific revision.
The return value is the directory contents as a list of files and
subdirectories.
:param path: relative path to file in this project
:param rev: revision to read *path* from (default: ``self.revision``)
:param cwd: directory to run command in (default: ``self.abspath``)
:param encoding: directory contents encoding (default: 'utf-8')
'''
if rev is None:
rev = self.revision
if encoding is None:
encoding = 'utf-8'
# git-ls-tree -z means we get NUL-separated output with no quoting
# of the file names. Using 'git-show' or 'git-cat-file -p'
# wouldn't work for files with special characters in their names.
out = self.git(['ls-tree', '-z', f'{rev}:{os.fspath(path)}'], cwd=cwd,
capture_stdout=True, capture_stderr=True).stdout
# A tab character separates the SHA from the file name in each
# NUL-separated entry.
return [f.decode(encoding).split('\t', 1)[1]
for f in out.split(b'\x00') if f]
# FIXME: this whole class should just go away. See #327.
class ManifestProject(Project):
'''Represents the manifest repository as a `Project`.
Meaningful attributes:
- ``name``: the string ``"manifest"``
- ``topdir``: the top level directory of the west workspace
the manifest project controls, or ``None``
- ``path``: relative path to the manifest repository within the
workspace, or ``None`` (i.e. from ``topdir`` if that is set)
- ``abspath``: absolute path to the manifest repository in the
native path name format (or ``None`` if ``topdir`` is)
- ``posixpath``: like ``abspath``, but with slashes (``/``) as
path separators
- ``west_commands``:``west_commands:`` key in the manifest's
``self:`` map. This may be a list of such if the self
section imports multiple additional files with west commands.
Other readable attributes included for Project compatibility:
- ``url``: the empty string; the west manifest is not
version-controlled by west itself, even though 'west init'
can fetch a manifest repository from a Git remote
- ``revision``: ``"HEAD"``
- ``clone_depth``: ``None``, because there's no URL
'''
def __repr__(self):
return (f'ManifestProject({self.name}, path={repr(self.path)}, '
f'west_commands={self.west_commands}, '
f'topdir={repr(self.topdir)})')
def __init__(self, path: Optional[PathType] = None,
west_commands: Optional[WestCommandsType] = None,
topdir: Optional[PathType] = None):
'''
:param path: Relative path to the manifest repository in the
west workspace, if known.
:param west_commands: path to a west commands specification YAML
file in the project, relative to its base directory,
or list of these
:param topdir: Root of the west workspace the manifest
project is inside. If not given, all absolute path
attributes (abspath and posixpath) will be None.
'''
self.name: str = 'manifest'
# Pretending that this is a Project, even though it's not (#327)
self.url: str = ''
self.revision: str = 'HEAD'
self.clone_depth: Optional[int] = None
# The following type: ignore is necessary since every Project
# actually has a non-None _path attribute, so the parent class
# defines its type as 'str', where here we need it to be
# an Optional[str].
self._path = os.fspath(path) if path else None # type: ignore
# Path related attributes
self.topdir: Optional[str] = os.fspath(topdir) if topdir else None
self._abspath: Optional[str] = None
self._posixpath: Optional[str] = None
# Extension commands.
self.west_commands = _west_commands_list(west_commands)
@property
def abspath(self) -> Optional[str]:
if self._abspath is None and self.topdir and self.path:
self._abspath = os.path.abspath(os.path.join(self.topdir,
self.path))
return self._abspath
def as_dict(self) -> Dict:
'''Return a representation of this object as a dict, as it would be
parsed from an equivalent YAML manifest.'''
ret: Dict = {}
if self.path:
ret['path'] = self.path
if self.west_commands:
ret['west-commands'] = \
_west_commands_maybe_delist(self.west_commands)
return ret
class Manifest:
'''The parsed contents of a west manifest file.
'''
@staticmethod
def from_file(source_file: Optional[PathType] = None,
**kwargs) -> 'Manifest':
'''Manifest object factory given a source YAML file.
The default behavior is to find the current west workspace's
manifest file and resolve it.
Results depend on the keyword arguments given in *kwargs*:
- If both *source_file* and *topdir* are given, the
returned Manifest object is based on the data in
*source_file*, rooted at *topdir*. The configuration
files are not read in this case. This allows parsing a
manifest file "as if" its project hierarchy were rooted
at another location in the system.
- If neither *source_file* nor *topdir* is given, the file
system is searched for *topdir*. That workspace's
``manifest.path`` configuration option is used to find
*source_file*, ``topdir/<manifest.path>/<manifest.file>``.
- If only *source_file* is given, *topdir* is found
starting there. The directory containing *source_file*
doesn't have to be ``manifest.path`` in this case.
- If only *topdir* is given, that workspace's
``manifest.path`` is used to find *source_file*.
Exceptions raised:
- `west.util.WestNotFound` if no *topdir* can be found
- `MalformedManifest` if *source_file* contains invalid
data
- `ManifestVersionError` if this version of west is too
old to parse the manifest.
- `MalformedConfig` if ``manifest.path`` is needed and
can't be read
- ``ValueError`` if *topdir* is given but is not a west
workspace root
:param source_file: source file to load
:param kwargs: Manifest.__init__ keyword arguments
'''
topdir = kwargs.get('topdir')
if topdir is None:
if source_file is None:
# neither source_file nor topdir: search the filesystem
# for the workspace and use its manifest.path.
topdir = util.west_topdir()
(mpath, mname) = _mpath(topdir=topdir)
kwargs.update({
'topdir': topdir,
'source_file': os.path.join(topdir, mpath, mname),
'manifest_path': mpath
})
else:
# Just source_file: find topdir starting there.
# We need source_file in kwargs as that's what gets used below.
kwargs.update({
'source_file': source_file,
'topdir':
util.west_topdir(start=os.path.dirname(source_file))
})
elif source_file is None:
# Just topdir.
# Verify topdir is a real west workspace root.
msg = f'topdir {topdir} is not a west workspace root'
try:
real_topdir = util.west_topdir(start=topdir, fall_back=False)
except util.WestNotFound:
raise ValueError(msg)
if Path(topdir) != Path(real_topdir):
raise ValueError(f'{msg}; but {real_topdir} is')
# Read manifest.path from topdir/.west/config, and use it
# to locate source_file.
(mpath, mname) = _mpath(topdir=topdir)
source_file = os.path.join(topdir, mpath, mname)
kwargs.update({
'source_file': source_file,
'manifest_path': mpath,
})
else:
# Both source_file and topdir.
kwargs['source_file'] = source_file
return Manifest(**kwargs)
@staticmethod
def from_data(source_data: ManifestDataType, **kwargs) -> 'Manifest':
'''Manifest object factory given parsed YAML data.
This factory does not read any configuration files.
Letting the return value be ``m``. Results then depend on
keyword arguments in *kwargs*:
- Unless *topdir* is given, all absolute paths in ``m``,
like ``m.projects[1].abspath``, are ``None``.
- Relative paths, like ``m.projects[1].path``, are taken
from *source_data*.
- If ``source_data['manifest']['self']['path']`` is not
set, then ``m.projects[MANIFEST_PROJECT_INDEX].abspath``
will be set to *manifest_path* if given.
Returns the same exceptions as the Manifest constructor.
:param source_data: parsed YAML data as a Python object, or a
string with unparsed YAML data
:param kwargs: Manifest.__init__ keyword arguments
'''
kwargs.update({'source_data': source_data})
return Manifest(**kwargs)
def __init__(self, source_file: Optional[PathType] = None,
source_data: Optional[ManifestDataType] = None,
manifest_path: Optional[PathType] = None,
topdir: Optional[PathType] = None,
importer: Optional[ImporterType] = None,
import_flags: ImportFlag = ImportFlag.DEFAULT,
**kwargs: Dict[str, Any]):
'''
Using `from_file` or `from_data` is usually easier than direct
instantiation.
Instance attributes:
- ``projects``: sequence of `Project`
- ``topdir``: west workspace top level directory, or
None
- ``path``: path to the manifest file itself, or None
- ``has_imports``: bool, True if the manifest contains
an "import:" attribute in "self:" or "projects:"; False
otherwise
Exactly one of *source_file* and *source_data* must be given.
If *source_file* is given:
- If *topdir* is too, ``projects`` is rooted there.
- Otherwise, *topdir* is found starting at *source_file*.
If *source_data* is given:
- If *topdir* is too, ``projects`` is rooted there.
- Otherwise, there is no root: ``projects[i].abspath`` and
other absolute path attributes are ``None``.
- If ``source_data['manifest']['self']['path']`` is unset,
*manifest_path* is used as a fallback.
The *importer* kwarg, if given, is a callable. It is called
when *source_file* requires importing manifest data that
aren't found locally. It will be called as:
``importer(project, file)``
where ``project`` is a `Project` and ``file`` is the missing
file. The file's contents at refs/heads/manifest-rev should
usually be returned, potentially after fetching the project's
revision from its remote URL and updating that ref.
The return value should be a string containing manifest data,
or a list of strings if ``file`` is a directory containing
YAML files. A return value of None will cause the import to be
ignored.
Exceptions raised:
- `MalformedManifest`: if the manifest data is invalid
- `ManifestImportFailed`: if the manifest could not be
resolved due to import errors
- `ManifestVersionError`: if this version of west is too
old to parse the manifest
- `WestNotFound`: if *topdir* was needed and not found
- ``ValueError``: for other invalid arguments
:param source_file: YAML file containing manifest data
:param source_data: parsed YAML data as a Python object, or a
string containing unparsed YAML data
:param manifest_path: fallback `ManifestProject` ``path``
attribute
:param topdir: used as the west workspace top level
directory
:param importer: callback to resolve missing manifest import
data
:param import_flags: bit mask, controls import resolution
'''
if source_file and source_data:
raise ValueError('both source_file and source_data were given')
if not _flags_ok(import_flags):
raise ValueError(f'bad import_flags {import_flags:x}')
self.path: Optional[str] = None
'''Path to the file containing the manifest, or None if
created from data rather than the file system.
'''
if source_file:
source_file = Path(source_file)
source_data = source_file.read_text()
self.path = os.path.abspath(source_file)
if not source_data:
self._malformed('manifest contains no data')
if isinstance(source_data, str):
source_data = _load(source_data)
# Validate the manifest. Wrap a couple of the exceptions with
# extra context about the problematic file in case of errors,
# to help debugging.
try:
validate(source_data)
except ManifestVersionError as mv:
raise ManifestVersionError(mv.version, file=source_file) from mv
except MalformedManifest as mm:
self._malformed(mm.args[0], parent=mm)
except TypeError as te:
self._malformed(te.args[0], parent=te)
# The above validate() and exception handling block's job is
# to ensure this, but pacify the type checker in a way that
# crashes if something goes wrong with that.
assert isinstance(source_data, dict)
self._projects: List[Project] = []
'''Sequence of `Project` objects representing manifest
projects.
Index 0 (`MANIFEST_PROJECT_INDEX`) contains a
`ManifestProject` representing the manifest repository. The
rest of the sequence contains projects in manifest file order
(or resolution order if the manifest contains imports).
'''
self.topdir: Optional[str] = None
'''The west workspace's top level directory, or None.'''
if topdir:
self.topdir = os.fspath(topdir)
self.has_imports: bool = False
# Set up the public attributes documented above, as well as
# any internal attributes needed to implement the public API.
self._importer: ImporterType = importer or _default_importer
self._import_flags = import_flags
ctx = kwargs.get('import-context')
if ctx is not None:
assert isinstance(ctx, _import_ctx)
if manifest_path:
mpath: Optional[Path] = Path(manifest_path)
else:
mpath = None
self._load(source_data['manifest'],
mpath,
ctx or _import_ctx({}, None, Path('.')))
def get_projects(self,
# any str name is also a PathType
project_ids: Iterable[PathType],
allow_paths: bool = True,
only_cloned: bool = False) -> List[Project]:
'''Get a list of `Project` objects in the manifest from
*project_ids*.
If *project_ids* is empty, a copy of ``self.projects``
attribute is returned as a list. Otherwise, the returned list
has projects in the same order as *project_ids*.
``ValueError`` is raised if:
- *project_ids* contains unknown project IDs
- (with *only_cloned*) an uncloned project was found
The ``ValueError`` *args* attribute is a 2-tuple with a list
of unknown *project_ids* at index 0, and a list of uncloned
`Project` objects at index 1.
:param project_ids: a sequence of projects, identified by name
or (absolute or relative) path. Names are matched first; path
checking can be disabled with *allow_paths*.
:param allow_paths: if false, *project_ids* is assumed to contain
names only, not paths
:param only_cloned: raise an exception for uncloned projects
'''
projects = list(self.projects)
unknown: List[PathType] = [] # project_ids with no Projects
uncloned: List[Project] = [] # if only_cloned, the uncloned Projects
ret: List[Project] = [] # result list of resolved Projects
# If no project_ids are specified, use all projects.
if not project_ids:
if only_cloned:
uncloned = [p for p in projects if not p.is_cloned()]
if uncloned:
raise ValueError(unknown, uncloned)
return projects
# Otherwise, resolve each of the project_ids to a project,
# returning the result or raising ValueError.
mp = self.projects[MANIFEST_PROJECT_INDEX]
if mp.path is not None:
mpath: Optional[Path] = Path(mp.path).resolve()
else:
mpath = None
for pid in project_ids:
if isinstance(pid, str):
if pid == 'manifest':
project: Optional[Project] = mp
else:
project = self._projects_by_name.get(pid)
else:
project = None
if project is None and allow_paths:
rpath = Path(pid).resolve()
if mpath is not None and rpath == mpath:
project = mp
else:
project = self._projects_by_rpath.get(rpath)
if project is None:
unknown.append(pid)
else:
ret.append(project)
if only_cloned and not project.is_cloned():
uncloned.append(project)
if unknown or (only_cloned and uncloned):
raise ValueError(unknown, uncloned)
return ret
def _as_dict_helper(
self, pdict: Optional[Callable[[Project], Dict]] = None) \
-> Dict:
# pdict: returns a Project's dict representation.
# By default, it's Project.as_dict.
if pdict is None:
pdict = Project.as_dict
projects = list(self.projects)
del projects[MANIFEST_PROJECT_INDEX]
project_dicts = [pdict(p) for p in projects]
# This relies on insertion-ordered dictionaries for
# predictability, which is a CPython 3.6 implementation detail
# and Python 3.7+ guarantee.
r: Dict[str, Any] = {}
r['manifest'] = {}
r['manifest']['projects'] = project_dicts
r['manifest']['self'] = self.projects[MANIFEST_PROJECT_INDEX].as_dict()
return r
def as_dict(self) -> Dict:
'''Returns a dict representing self, fully resolved.
The value is "resolved" in that the result is as if all
projects had been defined in a single manifest without any
import attributes.
'''
return self._as_dict_helper()
def as_frozen_dict(self) -> Dict:
'''Returns a dict representing self, but frozen.
The value is "frozen" in that all project revisions are the
full SHAs pointed to by `QUAL_MANIFEST_REV_BRANCH` references.
Raises ``RuntimeError`` if a project SHA can't be resolved.
'''
def pdict(p):
if not p.is_cloned():
raise RuntimeError(f'cannot freeze; project {p.name} '
'is uncloned')
try:
sha = p.sha(QUAL_MANIFEST_REV_BRANCH)
except subprocess.CalledProcessError as e:
raise RuntimeError(f'cannot freeze; project {p.name} '
f'ref {QUAL_MANIFEST_REV_BRANCH} '
'cannot be resolved to a SHA') from e
d = p.as_dict()
d['revision'] = sha
return d
return self._as_dict_helper(pdict=pdict)
def as_yaml(self, **kwargs) -> str:
'''Returns a YAML representation for self, fully resolved.
The value is "resolved" in that the result is as if all
projects had been defined in a single manifest without any
import attributes.
:param kwargs: passed to yaml.safe_dump()
'''
return yaml.safe_dump(self.as_dict(), **kwargs)
def as_frozen_yaml(self, **kwargs) -> str:
'''Returns a YAML representation for self, but frozen.
The value is "frozen" in that all project revisions are the
full SHAs pointed to by `QUAL_MANIFEST_REV_BRANCH` references.
Raises ``RuntimeError`` if a project SHA can't be resolved.
:param kwargs: passed to yaml.safe_dump()
'''
return yaml.safe_dump(self.as_frozen_dict(), **kwargs)
@property
def projects(self) -> List[Project]:
return self._projects
def _malformed(self, complaint: str,
parent: Optional[Exception] = None) -> NoReturn:
context = (f'file: {self.path} ' if self.path else 'data')
args = [f'Malformed manifest {context}',
f'Schema file: {_SCHEMA_PATH}']
if complaint:
args.append('Hint: ' + complaint)
exc = MalformedManifest(*args)
if parent:
raise exc from parent
else:
raise exc
def _load(self, manifest: Dict[str, Any],
path_hint: Optional[Path], # not PathType!
ctx: _import_ctx) -> None:
# Initialize this instance.
#
# - manifest: manifest data, parsed and validated
# - path_hint: hint about where the manifest repo lives
# - ctx: recursive import context
top_level = not bool(ctx.projects)
if self.path:
loading_what = self.path
else:
loading_what = 'data (no file)'
_logger.debug(f'loading {loading_what}')
# We want to make an ordered map from project names to
# corresponding Project instances. Insertion order into this
# map should reflect the final project order including
# manifest import resolution, which is:
#
# 1. Imported projects from "manifest: self: import:"
# 2. "manifest: projects:"
# 3. Imported projects from "manifest: projects: ... import:"
# Create the ManifestProject, and import projects from "self:".
mp = self._load_self(manifest, path_hint, ctx)
# Add this manifest's projects to the map, then project imports.
url_bases = {r['name']: r['url-base'] for r in
manifest.get('remotes', [])}
defaults = self._load_defaults(manifest.get('defaults', {}), url_bases)
self._load_projects(manifest, url_bases, defaults, ctx)
# The manifest is resolved. Make sure paths are unique.
self._check_paths_are_unique(mp, ctx.projects, top_level)
# Save the results.
self._projects = list(ctx.projects.values())
self._projects.insert(MANIFEST_PROJECT_INDEX, mp)
self._projects_by_name: Dict[str, Project] = {'manifest': mp}
self._projects_by_name.update(ctx.projects)
self._projects_by_rpath: Dict[Path, Project] = {} # resolved paths
if self.topdir:
for i, p in enumerate(self.projects):
if i == MANIFEST_PROJECT_INDEX and not p.abspath:
# When from_data() is called without a path hint, mp
# can have a topdir but no path, and thus no abspath.
continue
if TYPE_CHECKING:
# The typing module can't tell that self.topdir
# being truthy guarantees p.abspath is a str, not None.
assert p.abspath
self._projects_by_rpath[Path(p.abspath).resolve()] = p
_logger.debug(f'loaded {loading_what}')
def _load_self(self, manifest: Dict[str, Any],
path_hint: Optional[Path],
ctx: _import_ctx) -> ManifestProject:
# Handle the "self:" section in the manifest data.
slf = manifest.get('self', {})
path = slf.get('path', path_hint)
mp = ManifestProject(path=path, topdir=self.topdir,
west_commands=slf.get('west-commands'))
imp = slf.get('import')
if imp is not None:
if self._import_flags & ImportFlag.IGNORE:
_logger.debug('ignored self import')
else:
_logger.debug(f'resolving self import {imp}')
self._import_from_self(mp, imp, ctx)
_logger.debug('resolved self import')
return mp
def _assert_imports_ok(self) -> None:
# Sanity check that we aren't calling code that does importing
# if the flags tell us not to.
#
# Could be deleted if this feature stabilizes and we never hit
# this assertion.
assert not self._import_flags & ImportFlag.IGNORE
def _import_from_self(self, mp: ManifestProject, imp: Any,
ctx: _import_ctx) -> None:
# Recursive helper to import projects from the manifest repository.
#
# The 'imp' argument is the loaded value of "foo" in "self:
# import: foo".
#
# All data is read from the file system. Requests to read
# files which don't exist or aren't ordinary files/directories
# raise MalformedManifest.
#
# This is unlike importing from projects -- for projects, data
# are read from Git (treating it as a content-addressable file
# system) with a fallback on self._importer.
self._assert_imports_ok()
self.has_imports = True
imptype = type(imp)
if imptype == bool:
self._malformed(f'got "self: import: {imp}" of boolean')
elif imptype == str:
self._import_path_from_self(mp, imp, ctx)
elif imptype == list:
for subimp in imp:
self._import_from_self(mp, subimp, ctx)
elif imptype == dict:
imap = self._load_imap(imp, f'manifest file {mp.abspath}')
# imap may introduce additional constraints on the
# existing ctx, such as a stricter filter_fn or a longer
# path_prefix.
#
# Compose them using _new_ctx() to pass along the updated
# context to the recursive import.
self._import_path_from_self(mp, imap.file,
_new_ctx(ctx, imap))
else:
self._malformed(f'{mp.abspath}: "self: import: {imp}" '
f'has invalid type {imptype}')
def _import_path_from_self(self, mp: ManifestProject, imp: Any,
ctx: _import_ctx) -> None:
if mp.abspath:
# Fast path, when we're working inside a fully initialized
# topdir.
repo_root = Path(mp.abspath)
else:
# Fallback path, which is needed by at least west init. If
# this happens too often, something may be wrong with how
# we've implemented this. We'd like to avoid too many git
# commands, as subprocesses are slow on windows.
assert self.path is not None # to ensure and satisfy type checker
start = Path(self.path).parent
_logger.debug(
f'searching for manifest repository root from {start}')
repo_root = Path(mp.git('rev-parse --show-toplevel',
capture_stdout=True,
cwd=start).
stdout[:-1]. # chop off newline
decode('utf-8')) # hopefully this is safe
p = repo_root / imp
if p.is_file():
_logger.debug(f'found submanifest file: {p}')
self._import_pathobj_from_self(mp, p, ctx)
elif p.is_dir():
_logger.debug(f'found submanifest directory: {p}')
for yml in filter(_is_yml, sorted(p.iterdir())):
self._import_pathobj_from_self(mp, p / yml, ctx)
else:
# This also happens for special files like character
# devices, but it doesn't seem worth handling that error
# separately. Who would call mknod in their manifest repo?
self._malformed(f'{mp.abspath}: "self: import: {imp}": '
f'file {p} not found')
def _import_pathobj_from_self(self, mp: ManifestProject, pathobj: Path,
ctx: _import_ctx) -> None:
# Import a Path object, which is a manifest file in the
# manifest repository whose ManifestProject is mp.
# Destructively add the imported content into our 'projects'
# map, passing along our context. The intermediate manifest is
# thrown away; we're basically just using __init__ as a
# function here.
#
# The only thing we need to do with it is check if the
# submanifest has west commands, add them to mp's if so.
try:
kwargs: Dict[str, Any] = {'import-context': ctx}
submp = Manifest(source_file=pathobj,
manifest_path=mp.path,
topdir=self.topdir,
importer=self._importer,
import_flags=self._import_flags,
**kwargs).projects[MANIFEST_PROJECT_INDEX]
except RecursionError as e:
raise _ManifestImportDepth(mp, pathobj) from e
# submp.west_commands comes first because we
# logically treat imports from self as if they are
# defined before the contents in the higher level
# manifest.
mp.west_commands = _west_commands_merge(submp.west_commands,
mp.west_commands)
def _load_defaults(self, md: Dict, url_bases: Dict[str, str]) -> _defaults:
# md = manifest defaults (dictionary with values parsed from
# the manifest)
mdrem: Optional[str] = md.get('remote')
if mdrem:
# The default remote name, if provided, must refer to a
# well-defined remote.
if mdrem not in url_bases:
self._malformed(f'default remote {mdrem} is not defined')
return _defaults(mdrem, md.get('revision', _DEFAULT_REV))
def _load_projects(self, manifest: Dict[str, Any],
url_bases: Dict[str, str],
defaults: _defaults,
ctx: _import_ctx) -> None:
# Load projects and add them to the list, returning
# information about which ones have imports that need to be
# processed next.
have_imports = []
names = set()
for pd in manifest['projects']:
project = self._load_project(pd, url_bases, defaults, ctx)
name = project.name
if not _filter_ok(ctx.filter_fn, project):
_logger.debug(f'project {name} in file {self.path} ' +
'ignored due to filters')
continue
if name in names:
# Project names must be unique within a manifest.
self._malformed(f'project name {name} used twice in ' +
(self.path or 'the same manifest'))
names.add(name)
# Add the project to the map if it's new.
added = self._add_project(project, ctx.projects)
if added:
# Track project imports unless we are ignoring those.
imp = pd.get('import')
if imp:
if self._import_flags & (ImportFlag.IGNORE |
ImportFlag.IGNORE_PROJECTS):
_logger.debug(
f'project {project}: ignored import ({imp})')
else:
have_imports.append((project, imp))
# Handle imports from new projects in our "projects:" section.
for project, imp in have_imports:
self._import_from_project(project, imp, ctx)
def _load_project(self, pd: Dict, url_bases: Dict[str, str],
defaults: _defaults, ctx: _import_ctx) -> Project:
# pd = project data (dictionary with values parsed from the
# manifest)
name = pd['name']
# The name "manifest" cannot be used as a project name; it
# is reserved to refer to the manifest repository itself
# (e.g. from "west list"). Note that this has not always
# been enforced, but it is part of the documentation.
if name == 'manifest':
self._malformed('no project can be named "manifest"')
# Figure out the project's fetch URL:
#
# - url is tested first (and can't be used with remote or repo-path)
# - remote is tested next (and must be defined if present)
# - default remote is tested last, if there is one
url = pd.get('url')
remote = pd.get('remote')
repo_path = pd.get('repo-path')
if remote and url:
self._malformed(f'project {name} has both "remote: {remote}" '
f'and "url: {url}"')
if defaults.remote and not (remote or url):
remote = defaults.remote
if url:
if repo_path:
self._malformed(f'project {name} has "repo_path: {repo_path}" '
f'and "url: {url}"')
elif remote:
if remote not in url_bases:
self._malformed(f'project {name} remote {remote} '
'is not defined')
url = url_bases[remote] + '/' + (repo_path or name)
else:
self._malformed(
f'project {name} '
'has no remote or url and no default remote is set')
# The project's path needs to respect any import: path-prefix,
# regardless of self._import_flags. The 'ignore' type flags
# just mean ignore the imported data. The path-prefix in this
# manifest affects the project no matter what.
imp = pd.get('import', None)
if isinstance(imp, dict):
pfx = self._load_imap(imp, f'project {name}').path_prefix
else:
pfx = ''
# Historically, path attributes came directly from the manifest data
# itself and were passed along to the Project constructor unmodified.
# When we added path-prefix support, we needed to introduce pathlib
# wrappers around the pd['path'] value as is done here.
#
# Since west is a git wrapper and git prefers to work with
# POSIX paths in general, we've decided for now to force paths
# to POSIX style in all circumstances. If this breaks
# anything, we can always revisit, maybe adding a 'nativepath'
# attribute or something like that.
path = (ctx.path_prefix / pfx / pd.get('path', name)).as_posix()
ret = Project(name, url, pd.get('revision', defaults.revision),
path, clone_depth=pd.get('clone-depth'),
west_commands=pd.get('west-commands'),
topdir=self.topdir, remote_name=remote)
if self.topdir:
assert isinstance(ret.abspath, str)
apath = Path(ret.abspath)
topdir = Path(self.topdir)
if escapes_directory(apath, topdir) or apath == topdir:
self._malformed(f'project {name} absolute path {apath} '
'is not a subdirectory of topdir ' +
self.topdir)
return ret
def _import_from_project(self, project: Project, imp: Any,
ctx: _import_ctx):
# Recursively resolve a manifest import from 'project'.
#
# - project: Project instance to import from
# - imp: the parsed value of project's import key (string, list, etc.)
# - ctx: recursive import context
self._assert_imports_ok()
self.has_imports = True
imptype = type(imp)
if imptype == bool:
# We should not have been called unless the import was truthy.
assert imp
self._import_path_from_project(project, _WEST_YML, ctx)
elif imptype == str:
self._import_path_from_project(project, imp, ctx)
elif imptype == list:
for subimp in imp:
self._import_from_project(project, subimp, ctx)
elif imptype == dict:
imap = self._load_imap(imp, f'project {project.name}')
# Similar comments about composing ctx and imap apply here as
# they do in _import_from_self().
self._import_path_from_project(project, imap.file,
_new_ctx(ctx, imap))
else:
self._malformed(f'{project.name_and_path}: invalid import {imp} '
f'type: {imptype}')
def _import_path_from_project(self, project: Project, path: str,
ctx: _import_ctx) -> None:
# Import data from git at the given path at revision manifest-rev.
# Fall back on self._importer if that fails.
_logger.debug(f'resolving import {path} for {project}')
imported = self._import_content_from_project(project, path)
if imported is None:
# This can happen if self._importer returns None.
# It means there's nothing to do.
return
for data in imported:
if isinstance(data, str):
data = _load(data)
validate(data)
try:
# Force a fallback onto manifest_path=project.path.
# The subpath to the manifest file itself will not be
# available, so that's the best we can do.
#
# Perhaps there's a cleaner way to convince mypy that
# the validate() postcondition is that we've got a
# real manifest and this is safe, but maybe just
# fixing this hack would be best. For now, silence the
# type checker on this line.
del data['manifest']['self']['path'] # type: ignore
except KeyError:
pass
# Destructively add the imported content into our 'projects'
# map, passing along our context.
try:
kwargs: Dict[str, Any] = {'import-context': ctx}
submp = Manifest(source_data=data,
manifest_path=project.path,
topdir=self.topdir,
importer=self._importer,
import_flags=self._import_flags,
**kwargs).projects[MANIFEST_PROJECT_INDEX]
except RecursionError as e:
raise _ManifestImportDepth(project, path) from e
# If the submanifest has west commands, merge them
# into project's.
project.west_commands = _west_commands_merge(
project.west_commands, submp.west_commands)
_logger.debug(f'done resolving import {path} for {project}')
def _import_content_from_project(self, project: Project,
path: str) -> ImportedContentType:
if not (self._import_flags & ImportFlag.FORCE_PROJECTS) and \
project.is_cloned():
try:
content = _manifest_content_at(project, path)
except MalformedManifest as mm:
self._malformed(mm.args[0])
except FileNotFoundError:
# We may need to fetch a new manifest-rev, e.g. if
# revision is a branch that didn't used to have a
# manifest, but now does.
content = self._importer(project, path)
except subprocess.CalledProcessError:
# We may need a new manifest-rev, e.g. if revision is
# a SHA we don't have yet.
content = self._importer(project, path)
else:
# We need to clone this project, or we were specifically
# asked to use the importer.
content = self._importer(project, path)
if isinstance(content, str):
content = [content]
return content
def _load_imap(self, imp: Dict, src: str) -> _import_map:
# Convert a parsed self or project import value from YAML into
# an _import_map namedtuple.
# Work on a copy in case the caller needs the full value.
copy = dict(imp)
ret = _import_map(copy.pop('file', _WEST_YML),
copy.pop('name-whitelist', []),
copy.pop('path-whitelist', []),
copy.pop('name-blacklist', []),
copy.pop('path-blacklist', []),
copy.pop('path-prefix', ''))
# Check that the value is OK.
if copy:
# We popped out all of the valid keys already.
self._malformed(f'{src}: invalid import contents: {copy}')
elif not _is_imap_list(ret.name_whitelist):
self._malformed(f'{src}: bad import name-whitelist '
f'{ret.name_whitelist}')
elif not _is_imap_list(ret.path_whitelist):
self._malformed(f'{src}: bad import path-whitelist '
f'{ret.path_whitelist}')
elif not _is_imap_list(ret.name_blacklist):
self._malformed(f'{src}: bad import name-blacklist '
f'{ret.name_blacklist}')
elif not _is_imap_list(ret.path_blacklist):
self._malformed(f'{src}: bad import path-blacklist '
f'{ret.path_blacklist}')
elif not isinstance(ret.path_prefix, str):
self._malformed(f'{src}: bad import path-prefix '
f'{ret.path_prefix}; expected str, not '
f'{type(ret.path_prefix)}')
return ret
def _add_project(self, project: Project,
projects: Dict[str, Project]) -> bool:
# Add the project to our map if we don't already know about it.
# Return the result.
if project.name not in projects:
projects[project.name] = project
_logger.debug(f'added project {project.name} '
f'path {project.path} '
f'revision {project.revision}' +
(f' from {self.path}' if self.path else ''))
return True
else:
return False
def _check_paths_are_unique(self, mp: ManifestProject,
projects: Dict[str, Project],
top_level: bool) -> None:
# TODO: top_level can probably go away when #327 is done.
ppaths: Dict[Path, Project] = {}
if mp.path:
mppath: Optional[Path] = Path(mp.path)
else:
mppath = None
for name, project in projects.items():
pp = Path(project.path)
if top_level and pp == mppath:
self._malformed(f'project {name} path "{project.path}" '
'is taken by the manifest repository')
other = ppaths.get(pp)
if other:
self._malformed(f'project {name} path "{project.path}" '
f'is taken by project {other.name}')
ppaths[pp] = project