Skip to content

Commit edd00b2

Browse files
authored
Add support and unit test for PyOD models (#34709)
* Add support and unit test for PyOD models Additionally, it includes: - A minor fix for error messages in the `specifiable` module. - Support for scoring offline detectors on a subset of features. * Fix lints * More lints. * Add pyod dependencyto ml_test extra * Revise based on reviews. Fix lints. * Fix failed tests due to the side effect of lazy init on model handlers.
1 parent 8509a87 commit edd00b2

File tree

5 files changed

+288
-2
lines changed

5 files changed

+288
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import pickle
19+
from typing import Any
20+
from typing import Dict
21+
from typing import Iterable
22+
from typing import Optional
23+
from typing import Sequence
24+
25+
import numpy as np
26+
27+
import apache_beam as beam
28+
from apache_beam.io.filesystems import FileSystems
29+
from apache_beam.ml.anomaly.detectors.offline import OfflineDetector
30+
from apache_beam.ml.anomaly.specifiable import specifiable
31+
from apache_beam.ml.anomaly.thresholds import FixedThreshold
32+
from apache_beam.ml.inference.base import KeyedModelHandler
33+
from apache_beam.ml.inference.base import ModelHandler
34+
from apache_beam.ml.inference.base import PredictionResult
35+
from apache_beam.ml.inference.base import _PostProcessingModelHandler
36+
from apache_beam.ml.inference.utils import _convert_to_result
37+
from pyod.models.base import BaseDetector as PyODBaseDetector
38+
39+
# Turn the used ModelHandler into specifiable, but without lazy init.
40+
KeyedModelHandler = specifiable( # type: ignore[misc]
41+
KeyedModelHandler,
42+
on_demand_init=False,
43+
just_in_time_init=False)
44+
_PostProcessingModelHandler = specifiable( # type: ignore[misc]
45+
_PostProcessingModelHandler,
46+
on_demand_init=False,
47+
just_in_time_init=False)
48+
49+
50+
@specifiable
51+
class PyODModelHandler(ModelHandler[beam.Row,
52+
PredictionResult,
53+
PyODBaseDetector]):
54+
"""Implementation of the ModelHandler interface for PyOD [#]_ Models.
55+
56+
The ModelHandler processes input data as `beam.Row` objects.
57+
58+
**NOTE:** This API and its implementation are currently under active
59+
development and may not be backward compatible.
60+
61+
Args:
62+
model_uri: The URI specifying the location of the pickled PyOD model.
63+
64+
.. [#] https://github.com/yzhao062/pyod
65+
"""
66+
def __init__(self, model_uri: str):
67+
self._model_uri = model_uri
68+
69+
def load_model(self) -> PyODBaseDetector:
70+
file = FileSystems.open(self._model_uri, 'rb')
71+
return pickle.load(file)
72+
73+
def run_inference(
74+
self,
75+
batch: Sequence[beam.Row],
76+
model: PyODBaseDetector,
77+
inference_args: Optional[Dict[str, Any]] = None
78+
) -> Iterable[PredictionResult]:
79+
np_batch = []
80+
for row in batch:
81+
np_batch.append(np.fromiter(row, dtype=np.float64))
82+
83+
# stack a batch of samples into a 2-D array for better performance
84+
vectorized_batch = np.stack(np_batch, axis=0)
85+
predictions = model.decision_function(vectorized_batch)
86+
87+
return _convert_to_result(batch, predictions, model_id=self._model_uri)
88+
89+
90+
class PyODFactory():
91+
@staticmethod
92+
def create_detector(model_uri: str, **kwargs) -> OfflineDetector:
93+
"""A utility function to create OfflineDetector for a PyOD model.
94+
95+
**NOTE:** This API and its implementation are currently under active
96+
development and may not be backward compatible.
97+
98+
Args:
99+
model_uri: The URI specifying the location of the pickled PyOD model.
100+
**kwargs: Additional keyword arguments.
101+
"""
102+
model_handler = KeyedModelHandler(
103+
PyODModelHandler(model_uri=model_uri)).with_postprocess_fn(
104+
OfflineDetector.score_prediction_adapter)
105+
m = model_handler.load_model()
106+
assert (isinstance(m, PyODBaseDetector))
107+
threshold = float(m.threshold_)
108+
detector = OfflineDetector(
109+
model_handler, threshold_criterion=FixedThreshold(threshold), **kwargs) # type: ignore[arg-type]
110+
return detector
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import logging
19+
import os.path
20+
import pickle
21+
import shutil
22+
import tempfile
23+
import unittest
24+
25+
import numpy as np
26+
from parameterized import parameterized
27+
28+
import apache_beam as beam
29+
from apache_beam.ml.anomaly.base import AnomalyPrediction
30+
from apache_beam.ml.anomaly.base import AnomalyResult
31+
from apache_beam.ml.anomaly.transforms import AnomalyDetection
32+
from apache_beam.ml.anomaly.transforms_test import _keyed_result_is_equal_to
33+
from apache_beam.options.pipeline_options import PipelineOptions
34+
from apache_beam.testing.util import assert_that
35+
from apache_beam.testing.util import equal_to
36+
37+
# Protect against environments where onnx and pytorch library is not available.
38+
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
39+
try:
40+
from apache_beam.ml.anomaly.detectors.pyod_adapter import PyODFactory
41+
from pyod.models.iforest import IForest
42+
except ImportError:
43+
raise unittest.SkipTest('PyOD dependencies are not installed')
44+
45+
46+
class PyODIForestTest(unittest.TestCase):
47+
def setUp(self) -> None:
48+
self.tmp_dir = tempfile.mkdtemp()
49+
50+
seed = 1234
51+
model = IForest(random_state=seed)
52+
model.fit(self.get_train_data())
53+
self.pickled_model_uri = os.path.join(self.tmp_dir, 'iforest_pickled')
54+
55+
with open(self.pickled_model_uri, 'wb') as fp:
56+
pickle.dump(model, fp)
57+
58+
def tearDown(self) -> None:
59+
shutil.rmtree(self.tmp_dir)
60+
61+
def get_train_data(self):
62+
return [
63+
np.array([1, 5], dtype="float32"),
64+
np.array([2, 6], dtype="float32"),
65+
np.array([3, 4], dtype="float32"),
66+
np.array([2, 6], dtype="float32"),
67+
np.array([10, 10], dtype="float32"), # need an outlier in training data
68+
np.array([3, 4], dtype="float32"),
69+
np.array([2, 6], dtype="float32"),
70+
np.array([2, 6], dtype="float32"),
71+
np.array([2, 5], dtype="float32"),
72+
]
73+
74+
def get_test_data(self):
75+
return [
76+
np.array([2, 6], dtype="float32"),
77+
np.array([100, 100], dtype="float32"),
78+
]
79+
80+
def get_test_data_with_target(self):
81+
return [
82+
np.array([2, 6, 0], dtype="float32"),
83+
np.array([100, 100, 1], dtype="float32"),
84+
]
85+
86+
@parameterized.expand([True, False])
87+
def test_scoring_with_matched_features(self, with_target):
88+
if with_target:
89+
rows = [beam.Row(a=2, b=6, target=0), beam.Row(a=100, b=100, target=1)]
90+
field_names = ["a", "b", "target"]
91+
# The selected features should match the features used for training
92+
detector = PyODFactory.create_detector(
93+
self.pickled_model_uri, features=["a", "b"])
94+
input_data = self.get_test_data_with_target()
95+
else:
96+
rows = [beam.Row(a=2, b=6), beam.Row(a=100, b=100)]
97+
field_names = ["a", "b"]
98+
detector = PyODFactory.create_detector(self.pickled_model_uri)
99+
input_data = self.get_test_data()
100+
101+
expected_out = [(
102+
0,
103+
AnomalyResult(
104+
example=rows[0],
105+
predictions=[
106+
AnomalyPrediction(
107+
model_id='OfflineDetector',
108+
score=-0.20316164744828075,
109+
label=0,
110+
threshold=8.326672684688674e-17,
111+
info='',
112+
source_predictions=None)
113+
])),
114+
(
115+
0,
116+
AnomalyResult(
117+
example=rows[1],
118+
predictions=[
119+
AnomalyPrediction(
120+
model_id='OfflineDetector',
121+
score=0.179516865091218,
122+
label=1,
123+
threshold=8.326672684688674e-17,
124+
info='',
125+
source_predictions=None)
126+
]))]
127+
128+
options = PipelineOptions([])
129+
with beam.Pipeline(options=options) as p:
130+
out = (
131+
p | beam.Create(input_data)
132+
| beam.Map(lambda x: beam.Row(**dict(zip(field_names, map(int, x)))))
133+
| beam.WithKeys(0)
134+
| AnomalyDetection(detector=detector))
135+
assert_that(out, equal_to(expected_out, _keyed_result_is_equal_to))
136+
137+
def test_scoring_with_unmatched_features(self):
138+
# The model is trained with two features: a, b, but the input features of
139+
# scoring has one more feature (target).
140+
# In this case, we should either get rid of the extra feature(s) from
141+
# the scoring input or set `features` when creating the offline detector
142+
# (see the `test_scoring_with_matched_features`)
143+
detector = PyODFactory.create_detector(self.pickled_model_uri)
144+
options = PipelineOptions([])
145+
p = beam.Pipeline(options=options)
146+
_ = (
147+
p | beam.Create(self.get_test_data_with_target())
148+
| beam.Map(
149+
lambda x: beam.Row(**dict(zip(["a", "b", "target"], map(int, x)))))
150+
| beam.WithKeys(0)
151+
| AnomalyDetection(detector=detector))
152+
153+
# This should raise a ValueError with message
154+
# "X has 3 features, but IsolationForest is expecting 2 features as input."
155+
self.assertRaises(ValueError, p.run)
156+
157+
158+
if __name__ == '__main__':
159+
logging.getLogger().setLevel(logging.WARNING)
160+
unittest.main()

sdks/python/apache_beam/ml/anomaly/specifiable.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def _specifiable_from_spec_helper(v, _run_init):
116116
# TODO: support spec treatment for more types
117117
if not isinstance(v, BUILTIN_TYPES_IN_SPEC):
118118
logging.warning(
119-
"Type %s is not a recognized supported type for the"
119+
"Type %s is not a recognized supported type for the "
120120
"specification. It will be included without conversion.",
121121
str(type(v)))
122122
return v
@@ -142,7 +142,7 @@ def _specifiable_to_spec_helper(v):
142142
# TODO: support spec treatment for more types
143143
if not isinstance(v, BUILTIN_TYPES_IN_SPEC):
144144
logging.warning(
145-
"Type %s is not a recognized supported type for the"
145+
"Type %s is not a recognized supported type for the "
146146
"specification. It will be included without conversion.",
147147
str(type(v)))
148148
return v

sdks/python/apache_beam/ml/anomaly/transforms.py

+14
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,16 @@ def _restore_and_convert(
454454
])
455455
return orig_key, (temp_key, result)
456456

457+
def _select_features(self, elem: Tuple[Any,
458+
beam.Row]) -> Tuple[Any, beam.Row]:
459+
assert self._offline_detector._features is not None
460+
k, v = elem
461+
row_dict = v._asdict()
462+
return (
463+
k,
464+
beam.Row(**{k: row_dict[k]
465+
for k in self._offline_detector._features}))
466+
457467
def expand(
458468
self,
459469
input: beam.PCollection[KeyedInputT]) -> beam.PCollection[KeyedOutputT]:
@@ -468,6 +478,10 @@ def expand(
468478
rekeyed_model_input = input | "Rekey" >> beam.Map(
469479
lambda x: ((x[0], x[1][0], x[1][1]), x[1][1]))
470480

481+
if self._offline_detector._features is not None:
482+
rekeyed_model_input = rekeyed_model_input | "Select Features" >> beam.Map(
483+
self._select_features)
484+
471485
# ((orig_key, temp_key, beam.Row), AnomalyPrediction)
472486
rekeyed_model_output = (
473487
rekeyed_model_input

sdks/python/setup.py

+2
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ def get_portability_package_data():
490490
'sentence-transformers',
491491
'skl2onnx',
492492
'pillow',
493+
'pyod',
493494
'tensorflow',
494495
'tensorflow-hub',
495496
'tensorflow-transform',
@@ -509,6 +510,7 @@ def get_portability_package_data():
509510
'sentence-transformers',
510511
'skl2onnx',
511512
'pillow',
513+
'pyod',
512514
'tensorflow',
513515
'tensorflow-hub',
514516
'tf2onnx',

0 commit comments

Comments
 (0)