#!/usr/bin/env python3 # # Copyright 2013 The Flutter Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. # # Usage: scan_deps.py --deps --output # # This script extracts the dependencies provided from the DEPS file and # finds the appropriate git commit hash per dependency for osv-scanner # to use in checking for vulnerabilities. # It is expected that the lockfile output of this script is then # uploaded using GitHub actions to be used by the osv-scanner reusable action. import argparse import json import os import re import shutil import subprocess import sys from compatibility_helper import byte_str_decode SCRIPT_DIR = os.path.dirname(sys.argv[0]) CHECKOUT_ROOT = os.path.realpath(os.path.join(SCRIPT_DIR, '..')) CHROMIUM_README_FILE = 'third_party/accessibility/README.md' CHROMIUM_README_COMMIT_LINE = 4 # The fifth line will always contain the commit hash. CHROMIUM = 'https://chromium.googlesource.com/chromium/src' DEP_CLONE_DIR = CHECKOUT_ROOT + '/clone-test' DEPS = os.path.join(CHECKOUT_ROOT, 'DEPS') UPSTREAM_PREFIX = 'upstream_' # Used in parsing the DEPS file. class VarImpl: _env_vars = { 'host_cpu': 'x64', 'host_os': 'linux', } def __init__(self, local_scope): self._local_scope = local_scope def lookup(self, var_name): """Implements the Var syntax.""" if var_name in self._local_scope.get('vars', {}): return self._local_scope['vars'][var_name] # Inject default values for env variables. if var_name in self._env_vars: return self._env_vars[var_name] raise Exception('Var is not defined: %s' % var_name) def extract_deps(deps_file): local_scope = {} var = VarImpl(local_scope) global_scope = { 'Var': var.lookup, 'deps_os': {}, } # Read the content. with open(deps_file, 'r') as file: deps_content = file.read() # Eval the content. exec(deps_content, global_scope, local_scope) if not os.path.exists(DEP_CLONE_DIR): os.mkdir(DEP_CLONE_DIR) # Clone deps with upstream into temporary dir. # Extract the deps and filter. deps = local_scope.get('deps', {}) deps_list = local_scope.get('vars') filtered_osv_deps = [] for _, dep in deps.items(): # We currently do not support packages or cipd which are represented # as dictionaries. if not isinstance(dep, str): continue dep_split = dep.rsplit('@', 1) ancestor_result = get_common_ancestor([dep_split[0], dep_split[1]], deps_list) if ancestor_result: filtered_osv_deps.append({ 'package': {'name': ancestor_result[1], 'commit': ancestor_result[0]} }) try: # Clean up cloned upstream dependency directory. shutil.rmtree(DEP_CLONE_DIR) # Use shutil.rmtree since dir could be non-empty. except OSError as clone_dir_error: print('Error cleaning up clone directory: %s : %s' % (DEP_CLONE_DIR, clone_dir_error.strerror)) osv_result = { 'packageSource': {'path': deps_file, 'type': 'lockfile'}, 'packages': filtered_osv_deps } return osv_result def parse_readme(): """ Opens the Flutter Accessibility Library README and uses the commit hash found in the README to check for viulnerabilities. The commit hash in this README will always be in the same format """ file_path = os.path.join(CHECKOUT_ROOT, CHROMIUM_README_FILE) with open(file_path) as file: # Read the content of the file opened. content = file.readlines() commit_line = content[CHROMIUM_README_COMMIT_LINE] commit = re.search(r'(?<=\[).*(?=\])', commit_line) osv_result = { 'packageSource': {'path': file_path, 'type': 'lockfile'}, 'packages': [{'package': {'name': CHROMIUM, 'commit': commit.group()}}] } return osv_result def get_common_ancestor(dep, deps_list): """ Given an input of a mirrored dep, compare to the mapping of deps to their upstream in DEPS and find a common ancestor commit SHA value. This is done by first cloning the mirrored dep, then a branch which tracks the upstream. From there, git merge-base operates using the HEAD commit SHA of the upstream branch and the pinned SHA value of the mirrored branch """ # dep[0] contains the mirror repo. # dep[1] contains the mirror's pinned SHA. # upstream is the origin repo. dep_name = dep[0].split('/')[-1].split('.')[0] if UPSTREAM_PREFIX + dep_name not in deps_list: print('did not find dep: ' + dep_name) return None try: # Get the upstream URL from the mapping in DEPS file. upstream = deps_list.get(UPSTREAM_PREFIX + dep_name) temp_dep_dir = DEP_CLONE_DIR + '/' + dep_name # Clone dependency from mirror. subprocess.check_output(['git', 'clone', '--quiet', '--', dep[0], dep_name], cwd=DEP_CLONE_DIR) # Create branch that will track the upstream dep. print('attempting to add upstream remote from: {upstream}'.format(upstream=upstream)) subprocess.check_output(['git', 'remote', 'add', 'upstream', upstream], cwd=temp_dep_dir) subprocess.check_output(['git', 'fetch', '--quiet', 'upstream'], cwd=temp_dep_dir) # Get name of the default branch for upstream (e.g. main/master/etc.). default_branch = subprocess.check_output( 'git remote show upstream ' + "| sed -n \'/HEAD branch/s/.*: //p\'", cwd=temp_dep_dir, shell=True ) default_branch = byte_str_decode(default_branch) default_branch = default_branch.strip() # Make upstream branch track the upstream dep. subprocess.check_output([ 'git', 'checkout', '--force', '-b', 'upstream', '--track', 'upstream/' + default_branch ], cwd=temp_dep_dir) # Get the most recent commit from default branch of upstream. commit = subprocess.check_output( 'git for-each-ref ' + "--format=\'%(objectname:short)\' refs/heads/upstream", cwd=temp_dep_dir, shell=True ) commit = byte_str_decode(commit) commit = commit.strip() # Perform merge-base on most recent default branch commit and pinned mirror commit. ancestor_commit = subprocess.check_output( 'git merge-base {commit} {depUrl}'.format(commit=commit, depUrl=dep[1]), cwd=temp_dep_dir, shell=True ) ancestor_commit = byte_str_decode(ancestor_commit) ancestor_commit = ancestor_commit.strip() print('Ancestor commit: ' + ancestor_commit) return ancestor_commit, upstream except subprocess.CalledProcessError as error: print( "Subprocess command '{0}' failed with exit code: {1}.".format( error.cmd, str(error.returncode) ) ) if error.output: print("Subprocess error output: '{0}'".format(error.output)) return None def parse_args(args): args = args[1:] parser = argparse.ArgumentParser(description='A script to find common ancestor commit SHAs') parser.add_argument( '--deps', '-d', type=str, help='Input DEPS file to extract.', default=os.path.join(CHECKOUT_ROOT, 'DEPS') ) parser.add_argument( '--output', '-o', type=str, help='Output osv-scanner compatible deps file.', default=os.path.join(CHECKOUT_ROOT, 'osv-lockfile.json') ) return parser.parse_args(args) def write_manifest(deps, manifest_file): output = {'results': deps} print(json.dumps(output, indent=2)) with open(manifest_file, 'w') as manifest: json.dump(output, manifest, indent=2) def main(argv): args = parse_args(argv) deps = extract_deps(args.deps) readme_deps = parse_readme() write_manifest([deps, readme_deps], args.output) return 0 if __name__ == '__main__': sys.exit(main(sys.argv))