Skip to content

Validate CLI inputs and paths #3596 #3609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion src/formattedcode/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,62 @@ def convert(self, value, param, ctx):
if value in known_opts:
self.fail(
'Illegal file name conflicting with an option name: '
f'{ os.fsdecode(value)}. '
f'{os.fsdecode(value)!r}. '
'Use the special "-" file name to print results on screen/stdout.',
param,
ctx,
)

try:
validate_output_file_path(location=value)
except Exception as e:
self.fail(str(e), param, ctx)

return click.File.convert(self, value, param, ctx)


class InvalidScanCodeOutputFileError(Exception):
pass


def validate_output_file_path(location):
"""
Raise an InvalidScanCodeOutputFileError if the output file is invalid.
"""
if location != "-":
from pathlib import Path
from commoncode.filetype import is_writable

path = Path(location)

if path.is_dir():
raise InvalidScanCodeOutputFileError(
f'output file is a directory, not a file: {os.fsdecode(location)!r}',
)

if path.is_fifo() or path.is_socket() or path.is_block_device() or path.is_char_device():
raise InvalidScanCodeOutputFileError(
f'output file cannot be a special/char/device/fifo/pipe file: {os.fsdecode(location)!r}',
)

if path.exists():
if not path.is_file():
raise InvalidScanCodeOutputFileError(
f'output file exists and is not a file: {os.fsdecode(location)!r}',
)
if not is_writable(location):
raise InvalidScanCodeOutputFileError(
f'output file exists and is not writable: {os.fsdecode(location)!r}',
)

else:
parent = path.parent
if not parent.exists() or not parent.is_dir():
raise InvalidScanCodeOutputFileError(
f'output file parent is not a directory or does not exists: {os.fsdecode(location)!r}',
)

if not is_writable(str(parent)):
raise InvalidScanCodeOutputFileError(
f'output file parent is not a writable directory: {os.fsdecode(location)!r}',
)
101 changes: 69 additions & 32 deletions src/licensedcode/plugin_license_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
import logging

from collections import defaultdict
from os.path import exists
from os.path import isdir

import attr
import os
import logging
import click
import saneyaml

from plugincode.post_scan import PostScanPlugin
from plugincode.post_scan import post_scan_impl
from commoncode.cliutils import PluggableCommandLineOption
from commoncode.cliutils import POST_SCAN_GROUP
from commoncode.filetype import is_file
from commoncode.filetype import is_readable
from licensedcode.detection import get_license_keys_from_detections

from plugincode.post_scan import PostScanPlugin
from plugincode.post_scan import post_scan_impl

TRACE = os.environ.get('SCANCODE_DEBUG_LICENSE_POLICY', False)

Expand All @@ -42,6 +46,21 @@ def logger_debug(*args):
return logger.debug(' '.join(isinstance(a, str) and a or repr(a) for a in args))


def validate_policy_path(ctx, param, value):
"""
Validate the ``value`` of the policy file path
"""
policy = value
if policy:
if not is_file(location=value, follow_symlinks=True):
raise click.BadParameter(f"policy file is not a regular file: {value!r}")

if not is_readable(location=value):
raise click.BadParameter(f"policy file is not readable: {value!r}")
policy = load_license_policy(value)
return policy


@post_scan_impl
class LicensePolicy(PostScanPlugin):
"""
Expand All @@ -57,10 +76,12 @@ class LicensePolicy(PostScanPlugin):
options = [
PluggableCommandLineOption(('--license-policy',),
multiple=False,
callback=validate_policy_path,
metavar='FILE',
help='Load a License Policy file and apply it to the scan at the '
'Resource level.',
help_group=POST_SCAN_GROUP)
help_group=POST_SCAN_GROUP,
)
]

def is_enabled(self, license_policy, **kwargs):
Expand All @@ -74,12 +95,19 @@ def process_codebase(self, codebase, license_policy, **kwargs):
if not self.is_enabled(license_policy):
return

if has_policy_duplicates(license_policy):
codebase.errors.append('ERROR: License Policy file contains duplicate entries.\n')
# license_policy has been validated through a callback and contains data
# loaded from YAML
policies = license_policy.get('license_policies', [])
if not policies:
codebase.errors.append(f'ERROR: License Policy file is empty')
return

# get a list of unique license policies from the license_policy file
policies = load_license_policy(license_policy).get('license_policies', [])
dupes = get_duplicate_policies(policies)
if dupes:
dupes = '\n'.join(repr(d) for d in dupes.items())
codebase.errors.append(f'ERROR: License Policy file contains duplicate entries:\n{dupes}')
return

# apply policy to Resources if they contain an offending license
for resource in codebase.walk(topdown=True):
Expand All @@ -106,37 +134,46 @@ def process_codebase(self, codebase, license_policy, **kwargs):
codebase.save_resource(resource)


def has_policy_duplicates(license_policy_location):
def get_duplicate_policies(policies):
"""
Returns True if the policy file contains duplicate entries for a specific license
key. Returns False otherwise.
Return a list of duplicated policy mappings based on the license key.
Return an empty list if there are no duplicates.
"""
policies = load_license_policy(license_policy_location).get('license_policies', [])

unique_policies = {}

if policies == []:
return False
if not policies:
return []

policies_by_license = defaultdict(list)
for policy in policies:
license_key = policy.get('license_key')

if license_key in unique_policies.keys():
return True
else:
unique_policies[license_key] = policy

return False
policies_by_license[license_key].append(policy)
return {key: pols for key, pols in policies_by_license.items() if len(pols) > 1}


def load_license_policy(license_policy_location):
"""
Return a license_policy dictionary loaded from a license policy file.
Return a license policy mapping loaded from a license policy file.
"""
if not license_policy_location or not exists(license_policy_location):
return {}
elif isdir(license_policy_location):
if not license_policy_location:
return {}
with open(license_policy_location, 'r') as conf:
conf_content = conf.read()
return saneyaml.load(conf_content)

if not exists(license_policy_location):
raise click.BadParameter(f"policy file does not exists: {license_policy_location!r} ")

if isdir(license_policy_location):
raise click.BadParameter(f"policy file is a directory: {license_policy_location!r} ")

try:
with open(license_policy_location, 'r') as conf:
conf_content = conf.read()
policy = saneyaml.load(conf_content)
if not policy:
raise click.BadParameter(f"policy file is empty: {license_policy_location!r}")
if "license_policies" not in policy:
raise click.BadParameter(f"policy file is missing a 'license_policies' attribute: {license_policy_location!r} ")
except Exception as e:
if isinstance(e, click.BadParameter):
raise e
else:
raise click.BadParameter(f"policy file is not a well formed or readable YAML file: {license_policy_location!r} {e!r}") from e
return policy

41 changes: 37 additions & 4 deletions src/scancode/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# Import early because of the side effects
import scancode_config

import json
import logging
import os
import platform
Expand Down Expand Up @@ -42,6 +43,9 @@ class WindowsError(Exception):
from commoncode.cliutils import path_progress_message
from commoncode.cliutils import progressmanager
from commoncode.cliutils import PluggableCommandLineOption
from commoncode.filetype import is_dir
from commoncode.filetype import is_file
from commoncode.filetype import is_readable
from commoncode.fileutils import as_posixpath
from commoncode.timeutils import time2tstamp
from commoncode.resource import Codebase
Expand All @@ -68,7 +72,6 @@ class WindowsError(Exception):
from scancode.interrupt import interruptible
from scancode.pool import ScanCodeTimeoutError


# Tracing flags
TRACE = False
TRACE_DEEP = False
Expand Down Expand Up @@ -173,6 +176,32 @@ def validate_depth(ctx, param, value):
return value


def validate_input_path(ctx, param, value):
"""
Validate a ``value`` list of inputs path strings
"""
options = ctx.params
from_json = options.get("--from-json", False)
for inp in value:
if not (is_file(location=inp, follow_symlinks=True) or is_dir(location=inp, follow_symlinks=True)):
raise click.BadParameter(f"input: {inp!r} is not a regular file or a directory")

if not is_readable(location=inp):
raise click.BadParameter(f"input: {inp!r} is not readable")

if from_json and not is_file(location=inp, follow_symlinks=True):
# extra JSON validation
raise click.BadParameter(f"JSON input: {inp!r} is not a file")
if not inp.lower().endswith(".json"):
raise click.BadParameter(f"JSON input: {inp!r} is not a JSON file with a .json extension")
with open(inp) as js:
start = js.read(100).strip()
if not start.startswith("{"):
raise click.BadParameter(f"JSON input: {inp!r} is not a well formed JSON file")

return value


@click.command(name='scancode',
epilog=epilog_text,
cls=ScancodeCommand,
Expand All @@ -182,6 +211,7 @@ def validate_depth(ctx, param, value):

@click.argument('input',
metavar='<OUTPUT FORMAT OPTION(s)> <input>...', nargs=-1,
callback=validate_input_path,
type=click.Path(exists=True, readable=True, path_type=str))

@click.option('--strip-root',
Expand Down Expand Up @@ -850,9 +880,12 @@ def echo_func(*_args, **_kwargs):
max_in_memory=max_in_memory,
max_depth=max_depth,
)
except:
msg = 'ERROR: failed to collect codebase at: %(input)r' % locals()
raise ScancodeError(msg + '\n' + traceback.format_exc())
except Exception as e:
if from_json and isinstance(e, (json.decoder.JSONDecodeError, UnicodeDecodeError)):
raise click.BadParameter(f"Input JSON scan file(s) is not valid JSON: {input!r} : {e!r}")
else:
msg = f'Failed to process codebase at: {input!r}'
raise ScancodeError(msg + '\n' + traceback.format_exc())

# update headers
cle = codebase.get_or_create_current_header()
Expand Down
Loading