Skip to content

Commit 116f2f7

Browse files
authored
NIFI-13997 Limit Python Processor loading to requested Processor Class (#9568)
Signed-off-by: David Handermann <[email protected]>
1 parent d3d3de2 commit 116f2f7

File tree

7 files changed

+114
-3
lines changed

7 files changed

+114
-3
lines changed

.github/workflows/system-tests.yml

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ env:
6161
-pl :nifi-python-framework
6262
-pl :nifi-python-extension-api
6363
-pl :nifi-python-test-extensions
64+
-pl :nifi-py4j-integration-tests
6465
-pl nifi-system-tests/nifi-system-test-suite
6566
-pl nifi-system-tests/nifi-stateless-system-test-suite
6667

nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java

+10
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,16 @@ public void testCreateNothing() {
613613
runner.assertTransferCount("success", 0);
614614
}
615615

616+
@Test
617+
public void testMultipleProcessorsInAPackage() {
618+
// This processor is in a package with another processor, which has additional dependency requirements
619+
final TestRunner runner = createProcessor("CreateHttpRequest");
620+
waitForValid(runner);
621+
runner.run();
622+
623+
runner.assertTransferCount("success", 1);
624+
}
625+
616626
@Test
617627
public void testStateManagerSetState() {
618628
final TestRunner runner = createStateManagerTesterProcessor("setState");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import requests
17+
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
18+
19+
class CreateHttpRequest(FlowFileSource):
20+
class Java:
21+
implements = ['org.apache.nifi.python.processor.FlowFileSource']
22+
23+
class ProcessorDetails:
24+
description = "Test processor which creates an http request"
25+
version = '0.0.1-SNAPSHOT'
26+
tags = ['test', 'http', 'requests']
27+
dependencies = ['requests==2.32.3']
28+
29+
def __init__(self, jvm):
30+
pass
31+
32+
def create(self, context):
33+
req = requests.Request('GET', 'https://wikipedia.org')
34+
return FlowFileSourceResult('success', contents=str(req))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from google.protobuf.runtime_version import OSS_MAJOR, OSS_MINOR, OSS_PATCH
17+
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
18+
19+
class ProtobufVersion(FlowFileSource):
20+
class Java:
21+
implements = ['org.apache.nifi.python.processor.FlowFileSource']
22+
23+
class ProcessorDetails:
24+
description = "Test processor which outputs the version of the protobuf library"
25+
version = '0.0.1-SNAPSHOT'
26+
tags = ['test', 'protobuf', 'version']
27+
dependencies = ['protobuf==5.29.1']
28+
29+
def __init__(self, jvm):
30+
pass
31+
32+
def create(self, context):
33+
version = f"{OSS_MAJOR}.{OSS_MINOR}.{OSS_PATCH}"
34+
return FlowFileSourceResult('success', contents=version)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
charset-normalizer==3.4.0

nifi-framework-bundle/nifi-framework-extensions/nifi-py4j-framework-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -319,9 +319,7 @@ def import_external_dependencies(self, processor_details, work_dir):
319319
def __load_extension_module(self, file, local_dependencies):
320320
# If there are any local dependencies (i.e., other python files in the same directory), load those modules first
321321
if local_dependencies:
322-
to_load = [dep for dep in local_dependencies]
323-
if file in to_load:
324-
to_load.remove(file)
322+
to_load = [dep for dep in local_dependencies if dep != file and not self.__is_processor_module(dep)]
325323

326324
# There is almost certainly a better way to do this. But we need to load all modules that are 'local dependencies'. I.e., all
327325
# modules in the same directory/package. But Python does not appear to give us a simple way to do this. We could have a situation in which
@@ -378,6 +376,10 @@ def __load_extension_module(self, file, local_dependencies):
378376
return None
379377

380378

379+
def __is_processor_module(self, module_file):
380+
return len(ProcessorInspection.get_processor_class_nodes(module_file)) > 0
381+
382+
381383
def __is_processor_class(self, potential_processor_class):
382384
# Go through all members of the given class and see if it has an inner class named Java
383385
for name, member in inspect.getmembers(potential_processor_class):

0 commit comments

Comments
 (0)