7
7
# See https://aboutcode.org for more information about nexB OSS projects.
8
8
#
9
9
10
+ import os
11
+ import logging
12
+
13
+ from collections import defaultdict
10
14
from os .path import exists
11
15
from os .path import isdir
12
16
13
17
import attr
14
- import os
15
- import logging
18
+ import click
16
19
import saneyaml
17
20
18
- from plugincode .post_scan import PostScanPlugin
19
- from plugincode .post_scan import post_scan_impl
20
21
from commoncode .cliutils import PluggableCommandLineOption
21
22
from commoncode .cliutils import POST_SCAN_GROUP
23
+ from commoncode .filetype import is_file
24
+ from commoncode .filetype import is_readable
22
25
from licensedcode .detection import get_license_keys_from_detections
23
-
26
+ from plugincode .post_scan import PostScanPlugin
27
+ from plugincode .post_scan import post_scan_impl
24
28
25
29
TRACE = os .environ .get ('SCANCODE_DEBUG_LICENSE_POLICY' , False )
26
30
@@ -42,6 +46,21 @@ def logger_debug(*args):
42
46
return logger .debug (' ' .join (isinstance (a , str ) and a or repr (a ) for a in args ))
43
47
44
48
49
+ def validate_policy_path (ctx , param , value ):
50
+ """
51
+ Validate the ``value`` of the policy file path
52
+ """
53
+ policy = value
54
+ if policy :
55
+ if not is_file (location = value , follow_symlinks = True ):
56
+ raise click .BadParameter (f"policy file is not a regular file: { value !r} " )
57
+
58
+ if not is_readable (location = value ):
59
+ raise click .BadParameter (f"policy file is not readable: { value !r} " )
60
+ policy = load_license_policy (value )
61
+ return policy
62
+
63
+
45
64
@post_scan_impl
46
65
class LicensePolicy (PostScanPlugin ):
47
66
"""
@@ -57,10 +76,12 @@ class LicensePolicy(PostScanPlugin):
57
76
options = [
58
77
PluggableCommandLineOption (('--license-policy' ,),
59
78
multiple = False ,
79
+ callback = validate_policy_path ,
60
80
metavar = 'FILE' ,
61
81
help = 'Load a License Policy file and apply it to the scan at the '
62
82
'Resource level.' ,
63
- help_group = POST_SCAN_GROUP )
83
+ help_group = POST_SCAN_GROUP ,
84
+ )
64
85
]
65
86
66
87
def is_enabled (self , license_policy , ** kwargs ):
@@ -74,12 +95,19 @@ def process_codebase(self, codebase, license_policy, **kwargs):
74
95
if not self .is_enabled (license_policy ):
75
96
return
76
97
77
- if has_policy_duplicates (license_policy ):
78
- codebase .errors .append ('ERROR: License Policy file contains duplicate entries.\n ' )
98
+ # license_policy has been validated through a callback and contains data
99
+ # loaded from YAML
100
+ policies = license_policy .get ('license_policies' , [])
101
+ if not policies :
102
+ codebase .errors .append (f'ERROR: License Policy file is empty' )
79
103
return
80
104
81
105
# get a list of unique license policies from the license_policy file
82
- policies = load_license_policy (license_policy ).get ('license_policies' , [])
106
+ dupes = get_duplicate_policies (policies )
107
+ if dupes :
108
+ dupes = '\n ' .join (repr (d ) for d in dupes .items ())
109
+ codebase .errors .append (f'ERROR: License Policy file contains duplicate entries:\n { dupes } ' )
110
+ return
83
111
84
112
# apply policy to Resources if they contain an offending license
85
113
for resource in codebase .walk (topdown = True ):
@@ -106,37 +134,46 @@ def process_codebase(self, codebase, license_policy, **kwargs):
106
134
codebase .save_resource (resource )
107
135
108
136
109
- def has_policy_duplicates ( license_policy_location ):
137
+ def get_duplicate_policies ( policies ):
110
138
"""
111
- Returns True if the policy file contains duplicate entries for a specific license
112
- key. Returns False otherwise .
139
+ Return a list of duplicated policy mappings based on the license key.
140
+ Return an empty list if there are no duplicates .
113
141
"""
114
- policies = load_license_policy (license_policy_location ).get ('license_policies' , [])
115
-
116
- unique_policies = {}
117
-
118
- if policies == []:
119
- return False
142
+ if not policies :
143
+ return []
120
144
145
+ policies_by_license = defaultdict (list )
121
146
for policy in policies :
122
147
license_key = policy .get ('license_key' )
123
-
124
- if license_key in unique_policies .keys ():
125
- return True
126
- else :
127
- unique_policies [license_key ] = policy
128
-
129
- return False
148
+ policies_by_license [license_key ].append (policy )
149
+ return {key : pols for key , pols in policies_by_license .items () if len (pols ) > 1 }
130
150
131
151
132
152
def load_license_policy (license_policy_location ):
133
153
"""
134
- Return a license_policy dictionary loaded from a license policy file.
154
+ Return a license policy mapping loaded from a license policy file.
135
155
"""
136
- if not license_policy_location or not exists (license_policy_location ):
137
- return {}
138
- elif isdir (license_policy_location ):
156
+ if not license_policy_location :
139
157
return {}
140
- with open (license_policy_location , 'r' ) as conf :
141
- conf_content = conf .read ()
142
- return saneyaml .load (conf_content )
158
+
159
+ if not exists (license_policy_location ):
160
+ raise click .BadParameter (f"policy file does not exists: { license_policy_location !r} " )
161
+
162
+ if isdir (license_policy_location ):
163
+ raise click .BadParameter (f"policy file is a directory: { license_policy_location !r} " )
164
+
165
+ try :
166
+ with open (license_policy_location , 'r' ) as conf :
167
+ conf_content = conf .read ()
168
+ policy = saneyaml .load (conf_content )
169
+ if not policy :
170
+ raise click .BadParameter (f"policy file is empty: { license_policy_location !r} " )
171
+ if "license_policies" not in policy :
172
+ raise click .BadParameter (f"policy file is missing a 'license_policies' attribute: { license_policy_location !r} " )
173
+ except Exception as e :
174
+ if isinstance (e , click .BadParameter ):
175
+ raise e
176
+ else :
177
+ raise click .BadParameter (f"policy file is not a well formed or readable YAML file: { license_policy_location !r} { e !r} " ) from e
178
+ return policy
179
+
0 commit comments