Skip to content

Support int typed aggregations #696

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def to_metric(self, desc, tag_values, agg_data):
return metric

elif isinstance(agg_data,
aggregation_data_module.SumAggregationDataFloat):
aggregation_data_module.SumAggregationData):
metric = UnknownMetricFamily(name=metric_name,
documentation=metric_description,
labels=label_keys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def test_collector_to_metric_count(self):
collector.register_view(view)
desc = collector.registered_views[list(REGISTERED_VIEW)[0]]
metric = collector.to_metric(
desc=desc, tag_values=[None], agg_data=agg.aggregation_data)
desc=desc, tag_values=[None], agg_data=agg.new_aggregation_data(VIDEO_SIZE_MEASURE))

self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
Expand All @@ -158,7 +158,7 @@ def test_collector_to_metric_sum(self):
collector.register_view(view)
desc = collector.registered_views[list(REGISTERED_VIEW)[0]]
metric = collector.to_metric(
desc=desc, tag_values=[None], agg_data=agg.aggregation_data)
desc=desc, tag_values=[None], agg_data=agg.new_aggregation_data(VIDEO_SIZE_MEASURE))

self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
Expand All @@ -176,7 +176,7 @@ def test_collector_to_metric_last_value(self):
collector.register_view(view)
desc = collector.registered_views[list(REGISTERED_VIEW)[0]]
metric = collector.to_metric(
desc=desc, tag_values=[None], agg_data=agg.aggregation_data)
desc=desc, tag_values=[None], agg_data=agg.new_aggregation_data(VIDEO_SIZE_MEASURE))

self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
Expand All @@ -189,7 +189,7 @@ def test_collector_to_metric_histogram(self):
collector = prometheus.Collector(options=options)
collector.register_view(VIDEO_SIZE_VIEW)
desc = collector.registered_views[list(REGISTERED_VIEW)[0]]
distribution = copy.deepcopy(VIDEO_SIZE_DISTRIBUTION.aggregation_data)
distribution = VIDEO_SIZE_DISTRIBUTION.new_aggregation_data(VIDEO_SIZE_MEASURE)
distribution.add_sample(280.0 * MiB, None, None)
metric = collector.to_metric(
desc=desc,
Expand Down Expand Up @@ -243,7 +243,7 @@ def test_collector_collect(self):
metric = collector.to_metric(
desc=desc,
tag_values=[tag_value_module.TagValue("value")],
agg_data=agg.aggregation_data)
agg_data=agg.new_aggregation_data(VIDEO_SIZE_MEASURE))

self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
Expand All @@ -262,7 +262,7 @@ def test_collector_collect_with_none_label_value(self):
collector.register_view(view)
desc = collector.registered_views['test3_new_view']
metric = collector.to_metric(
desc=desc, tag_values=[None], agg_data=agg.aggregation_data)
desc=desc, tag_values=[None], agg_data=agg.new_aggregation_data(VIDEO_SIZE_MEASURE))

self.assertEqual(1, len(metric.samples))
sample = metric.samples[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,8 +1275,7 @@ def test_create_timeseries_disjoint_tags(self, monitoring_resoure_mock):

def test_create_timeseries_from_distribution(self):
"""Check for explicit 0-bound bucket for SD export."""
agg = aggregation_module.DistributionAggregation(
aggregation_type=aggregation_module.Type.DISTRIBUTION)
agg = aggregation_module.DistributionAggregation()

view = view_module.View(
name="example.org/test_view",
Expand Down Expand Up @@ -1328,8 +1327,7 @@ def test_create_timeseries_multiple_tags(self):
create_time_series_list should return a time series for each set of
values in the tag value aggregation map.
"""
agg = aggregation_module.CountAggregation(
aggregation_type=aggregation_module.Type.COUNT)
agg = aggregation_module.CountAggregation()

view = view_module.View(
name="example.org/test_view",
Expand Down Expand Up @@ -1375,8 +1373,6 @@ def test_create_timeseries_invalid_aggregation(self):
v_data = mock.Mock(spec=view_data_module.ViewData)
v_data.view.name = "example.org/base_view"
v_data.view.columns = [tag_key_module.TagKey('base_key')]
v_data.view.aggregation.aggregation_type = \
aggregation_module.Type.NONE
v_data.start_time = TEST_TIME_STR
v_data.end_time = TEST_TIME_STR

Expand Down
127 changes: 58 additions & 69 deletions opencensus/stats/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,14 @@

import logging

from opencensus.stats import bucket_boundaries
from opencensus.stats import aggregation_data
from opencensus.stats import measure as measure_module
from opencensus.metrics.export.metric_descriptor import MetricDescriptorType


logger = logging.getLogger(__name__)


class Type(object):
""" The type of aggregation function used on a View.

Attributes:
NONE (int): The aggregation type of the view is 'unknown'.
SUM (int): The aggregation type of the view is 'sum'.
COUNT (int): The aggregation type of the view is 'count'.
DISTRIBUTION (int): The aggregation type of the view is 'distribution'.
LASTVALUE (int): The aggregation type of the view is 'lastvalue'.
"""
NONE = 0
SUM = 1
COUNT = 2
DISTRIBUTION = 3
LASTVALUE = 4


class BaseAggregation(object):
"""Aggregation describes how the data collected is aggregated by type of
aggregation and buckets
Expand All @@ -51,8 +35,7 @@ class BaseAggregation(object):
:param aggregation_type: represents the type of this aggregation

"""
def __init__(self, buckets=None, aggregation_type=Type.NONE):
self._aggregation_type = aggregation_type
def __init__(self, buckets=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you're in here: I don't see any reason for buckets to be on BaseAggregation, it looks like an odd choice from the first implementation (#149) that never got cleaned up. Since aggregation_type is gone now I recommend scrapping this whole class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, just removed the class.

self._buckets = buckets or []

@property
Copy link
Contributor

@lzchen lzchen Jun 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still have an aggregation_type property. As well remove the param comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up removing BaseAggregation altogether, per @c24t's comments.

Expand All @@ -78,16 +61,25 @@ class SumAggregation(BaseAggregation):
:param aggregation_type: represents the type of this aggregation
Copy link
Contributor

@lzchen lzchen Jun 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, I would include a get_metric_type(measure) method in BaseAggregation with no implementation to let developers know that aggregations should implement this (there might be new kinds of aggregations)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor

@lzchen lzchen Jun 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Fix "Sum Aggregation DESCRIBES" in comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


"""
def __init__(self, sum=None, aggregation_type=Type.SUM):
super(SumAggregation, self).__init__(aggregation_type=aggregation_type)
self._sum = aggregation_data.SumAggregationDataFloat(
sum_data=float(sum or 0))
self.aggregation_data = self._sum

@property
def sum(self):
"""The sum of the current aggregation"""
return self._sum
def __init__(self, sum=None):
super(SumAggregation, self).__init__()
self._initial_sum = sum or 0

def new_aggregation_data(self, measure):
value_type = MetricDescriptorType.to_type_class(
self.get_metric_type(measure))
# TODO: do we need to type cast `_initial_sum`?
return aggregation_data.SumAggregationData(
value_type=value_type, sum_data=self._initial_sum)

@staticmethod
def get_metric_type(measure):
if isinstance(measure, measure_module.MeasureInt):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong opinion here, it seems a more "Pythonic" style is:

        if isinstance(measure, measure_module.MeasureInt):
            return MetricDescriptorType.CUMULATIVE_INT64
        if isinstance(measure, measure_module.MeasureFloat):
            return MetricDescriptorType.CUMULATIVE_DOUBLE
        raise ValueError

Up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, surprised the linter didn't yell about this (or maybe it was a warning I didn't see 😁)

return MetricDescriptorType.CUMULATIVE_INT64
elif isinstance(measure, measure_module.MeasureFloat):
return MetricDescriptorType.CUMULATIVE_DOUBLE
else:
raise ValueError


class CountAggregation(BaseAggregation):
Expand All @@ -101,16 +93,16 @@ class CountAggregation(BaseAggregation):
:param aggregation_type: represents the type of this aggregation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix param comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


"""
def __init__(self, count=0, aggregation_type=Type.COUNT):
super(CountAggregation, self).__init__(
aggregation_type=aggregation_type)
self._count = aggregation_data.CountAggregationData(count)
self.aggregation_data = self._count
def __init__(self, count=0):
super(CountAggregation, self).__init__()
self._initial_count = count

@property
def count(self):
"""The count of the current aggregation"""
return self._count
def new_aggregation_data(self, measure=None):
return aggregation_data.CountAggregationData(self._initial_count)

@staticmethod
def get_metric_type(measure):
return MetricDescriptorType.CUMULATIVE_INT64


class DistributionAggregation(BaseAggregation):
Expand All @@ -129,10 +121,7 @@ class DistributionAggregation(BaseAggregation):

"""

def __init__(self,
boundaries=None,
distribution=None,
aggregation_type=Type.DISTRIBUTION):
def __init__(self, boundaries=None):
if boundaries:
if not all(boundaries[ii] < boundaries[ii + 1]
for ii in range(len(boundaries) - 1)):
Expand All @@ -147,22 +136,16 @@ def __init__(self,
ii)
boundaries = boundaries[ii:]

super(DistributionAggregation, self).__init__(
buckets=boundaries, aggregation_type=aggregation_type)
self._boundaries = bucket_boundaries.BucketBoundaries(boundaries)
self._distribution = distribution or {}
self.aggregation_data = aggregation_data.DistributionAggregationData(
0, 0, 0, None, boundaries)
super(DistributionAggregation, self).__init__(buckets=boundaries)
self._boundaries = boundaries

@property
def boundaries(self):
"""The boundaries of the current aggregation"""
return self._boundaries
def new_aggregation_data(self, measure=None):
return aggregation_data.DistributionAggregationData(
0, 0, 0, None, self._boundaries)

@property
def distribution(self):
"""The distribution of the current aggregation"""
return self._distribution
@staticmethod
def get_metric_type(measure):
return MetricDescriptorType.CUMULATIVE_DISTRIBUTION


class LastValueAggregation(BaseAggregation):
Expand All @@ -176,15 +159,21 @@ class LastValueAggregation(BaseAggregation):
:param aggregation_type: represents the type of this aggregation

"""
def __init__(self, value=0, aggregation_type=Type.LASTVALUE):
super(LastValueAggregation, self).__init__(
aggregation_type=aggregation_type)
self.aggregation_data = aggregation_data.LastValueAggregationData(
value=value)
self._value = value

@property
def value(self):
"""The current recorded value
"""
return self._value
def __init__(self, value=0):
super(LastValueAggregation, self).__init__()
self._initial_value = value

def new_aggregation_data(self, measure):
value_type = MetricDescriptorType.to_type_class(
self.get_metric_type(measure))
return aggregation_data.LastValueAggregationData(
value=self._initial_value, value_type=value_type)

@staticmethod
def get_metric_type(measure):
if isinstance(measure, measure_module.MeasureInt):
return MetricDescriptorType.GAUGE_INT64
elif isinstance(measure, measure_module.MeasureFloat):
return MetricDescriptorType.GAUGE_DOUBLE
else:
raise ValueError
41 changes: 30 additions & 11 deletions opencensus/stats/aggregation_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,21 @@ def to_point(self, timestamp):
raise NotImplementedError # pragma: NO COVER


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BaseAggregationData class seems a little odd to me as well, each implementation supplies aggregation_data upon construction but then doesn't update it, and stores there data in a separate instance variable that they update and return.

Is there a reason for this class to exist?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I can tell, besides to show that aggregations should implement to_point, but in that case there should be an unimplemented add_sample in the class too.

In any case it looks like you can lose the aggregation_data attr and property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just removed BaseAggregationData entirely

class SumAggregationDataFloat(BaseAggregationData):
class SumAggregationData(BaseAggregationData):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the decision to make value_type an instance attr instead of keeping separate aggregation classes for each value type here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I had this private class with value_type and then bound classes like:

SumAggregationDataFloat = functools.partial(SumAggregationData, value_type=value.ValueDouble)
SumAggregationDataInt = functools.partial(SumAggregationData, value_type=value.ValueInt)

But then in SumAggregation.new_aggregation_data, we needed an additional set of conditionals to check the type of the measure or metric and decide which to use. Instead doing it this way, we just need the one set of conditionals in .get_metric_type which we need anyway, and can reuse MetricDescriptor.to_type_class. It seemed a little cleaner, but really it isn't that different.

We could also duplicate the classes entirely, instead of the functools.partial, but that seemed unnecessary.

I guess the other option is to force the user to decide and provide Int or Float, but since we can determine it that seems easier and less error prone.

"""Sum Aggregation Data is the aggregated data for the Sum aggregation

:type sum_data: float
:type value_type: class that is either
:class:`opencensus.metrics.export.value.ValueDouble` or
:class:`opencensus.metrics.export.value.ValueLong`
:param value_type: the type of value to be used when creating a point
:type sum_data: meh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meh?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loll 🤸‍♂

:param sum_data: represents the aggregated sum

"""

def __init__(self, sum_data):
super(SumAggregationDataFloat, self).__init__(sum_data)
def __init__(self, value_type, sum_data):
super(SumAggregationData, self).__init__(sum_data)
self._value_type = value_type
self._sum_data = sum_data

def __repr__(self):
Expand All @@ -82,17 +87,22 @@ def sum_data(self):
"""The current sum data"""
return self._sum_data

@property
def value_type(self):
"""The value type to use when creating the point"""
return self._value_type

def to_point(self, timestamp):
"""Get a Point conversion of this aggregation.

:type timestamp: :class: `datetime.datetime`
:param timestamp: The time to report the point as having been recorded.

:rtype: :class: `opencensus.metrics.export.point.Point`
:return: a :class: `opencensus.metrics.export.value.ValueDouble`-valued
Point with value equal to `sum_data`.
:return: a Point with value equal to `sum_data` and of type
`_value_type`.
"""
return point.Point(value.ValueDouble(self.sum_data), timestamp)
return point.Point(self._value_type(self.sum_data), timestamp)


class CountAggregationData(BaseAggregationData):
Expand Down Expand Up @@ -328,13 +338,18 @@ class LastValueAggregationData(BaseAggregationData):
"""
LastValue Aggregation Data is the value of aggregated data

:type value_type: class that is either
:class:`opencensus.metrics.export.value.ValueDouble` or
:class:`opencensus.metrics.export.value.ValueLong`
:param value_type: the type of value to be used when creating a point
:type value: long
:param value: represents the current value

"""

def __init__(self, value):
def __init__(self, value_type, value):
super(LastValueAggregationData, self).__init__(value)
self._value_type = value_type
self._value = value

def __repr__(self):
Expand All @@ -355,17 +370,21 @@ def value(self):
"""The current value recorded"""
return self._value

@property
def value_type(self):
"""The value type to use when creating the point"""
return self._value_type

def to_point(self, timestamp):
"""Get a Point conversion of this aggregation.

:type timestamp: :class: `datetime.datetime`
:param timestamp: The time to report the point as having been recorded.

:rtype: :class: `opencensus.metrics.export.point.Point`
:return: a :class: `opencensus.metrics.export.value.ValueDouble`-valued
Point.
:return: a Point with value of type `_value_type`.
"""
return point.Point(value.ValueDouble(self.value), timestamp)
return point.Point(self._value_type(self.value), timestamp)


class Exemplar(object):
Expand Down
Loading