|
| 1 | +# Wrapper class for in-toto attestation SCAI predicate protos. |
| 2 | + |
| 3 | +import in_toto_attestation.predicates.scai.v0.scai_pb2 as scaipb |
| 4 | +from in_toto_attestation.v1.resource_descriptor import ResourceDescriptor |
| 5 | + |
| 6 | +SCAI_PREDICATE_TYPE = 'https://in-toto.io/attestation/scai/attribute-report/' |
| 7 | +SCAI_PREDICATE_VERSION = 'v0.2' |
| 8 | + |
| 9 | +class AttributeAssertion: |
| 10 | + def __init__(self, attribute, target=None, conditions=None, evidence=None) -> None: |
| 11 | + self.pb = scaipb.AttributeAssertion() |
| 12 | + self.pb.attribute = attribute |
| 13 | + if target: |
| 14 | + self.pb.target.CopyFrom(target) |
| 15 | + if conditions: |
| 16 | + self.pb.conditions.update(conditions) |
| 17 | + if evidence: |
| 18 | + self.pb.evidence.CopyFrom(evidence) |
| 19 | + |
| 20 | + @staticmethod |
| 21 | + def copy_from_pb(proto: type[scaipb.AttributeAssertion]) -> 'AttributeAssertion': |
| 22 | + assertion = AttributeAssertion('tmp-attr') |
| 23 | + assertion.pb.CopyFrom(proto) |
| 24 | + return assertion |
| 25 | + |
| 26 | + def validate(self) -> None: |
| 27 | + if len(self.pb.attribute) == 0: |
| 28 | + raise ValueError('The attribute field is required') |
| 29 | + |
| 30 | + # check any resource descriptors are valid |
| 31 | + if self.pb.target.ByteSize() > 0: |
| 32 | + rd = ResourceDescriptor.copy_from_pb(self.pb.target) |
| 33 | + rd.validate() |
| 34 | + |
| 35 | + if self.pb.evidence.ByteSize() > 0: |
| 36 | + rd = ResourceDescriptor.copy_from_pb(self.pb.evidence) |
| 37 | + rd.validate() |
| 38 | + |
| 39 | +class AttributeReport: |
| 40 | + def __init__(self, attributes: list, producer=None) -> None: |
| 41 | + self.pb = scaipb.AttributeReport() |
| 42 | + self.pb.attributes.extend(attributes) |
| 43 | + if producer: |
| 44 | + self.pb.producer.CopyFrom(producer) |
| 45 | + |
| 46 | + @staticmethod |
| 47 | + def copy_from_pb(proto: type[scaipb.AttributeReport]) -> 'AttributeReport': |
| 48 | + report = AttributeReport([None]) |
| 49 | + report.pb.CopyFrom(proto) |
| 50 | + return report |
| 51 | + |
| 52 | + def validate(self) -> None: |
| 53 | + if len(self.pb.attributes) == 0: |
| 54 | + raise ValueError('The attributes field is required') |
| 55 | + |
| 56 | + # check any attribute assertions are valid |
| 57 | + attributes = self.pb.attributes |
| 58 | + for i, attrpb in enumerate(attributes): |
| 59 | + assertion = AttributeAssertion.copy_from_pb(attrpb) |
| 60 | + |
| 61 | + try: |
| 62 | + assertion.validate() |
| 63 | + except ValueError as e: |
| 64 | + # return index in the attributes list in case of failure: |
| 65 | + # can't assume any other fields in attribute assertion are set |
| 66 | + raise ValueError('Invalid attributes field (index: {0})'.format(i)) from e |
| 67 | + |
| 68 | + if self.pb.producer.ByteSize() > 0: |
| 69 | + rd = ResourceDescriptor.copy_from_pb(self.pb.producer) |
| 70 | + rd.validate() |
0 commit comments