Module mydata_did.v1_0.models.exchange_records.data_agreement_record

Expand source code
import json
import typing
from typing import Any

from marshmallow import fields, validate

from aries_cloudagent.messaging.models.base_record import BaseExchangeRecord, BaseExchangeSchema, match_post_filter
from aries_cloudagent.messaging.valid import UUIDFour
from aries_cloudagent.config.injection_context import InjectionContext
from aries_cloudagent.storage.base import BaseStorage, StorageDuplicateError, StorageNotFoundError

from ...utils.util import bool_to_str

class DataAgreementV1Record(BaseExchangeRecord):
    """
    DataAgreementV1Record model class for serialisation/deserialisation of data agreement records to and from wallet.
    Data agreement schema will be version 1.0.
    """
    class Meta:
        # Data Agreement Record schema (version 1.0)
        schema_class = "DataAgreementV1RecordSchema"

    # Wallet record type
    RECORD_TYPE = "data_agreement_record"

    # Wallet record identifier field
    RECORD_ID_NAME = "data_agreement_record_id"

    # Webhook topic name for this record type
    WEBHOOK_TOPIC = None

    # Wallet record tags used for filtering
    # Note: These are not tags for the ledger, but rather tags for the wallet
    #      to group records.
    TAG_NAMES = {
        "~method_of_use", 
        "~data_agreement_id", 
        "~publish_flag", 
        "~delete_flag",
        "~schema_id",
        "~cred_def_id",
        "~is_existing_schema"
    }

    # State of the data agreement.
    # Only one possible value at this stage of the DA - preparation
    STATE_PREPARATION = "PREPARATION"

    METHOD_OF_USE_DATA_SOURCE = "data-source"
    METHOD_OF_USE_DATA_USING_SERVICE = "data-using-service"

    def __init__(
        self,
        *,
        data_agreement_record_id: str = None,
        data_agreement_id: str = None,
        state: str = None,
        method_of_use: str = None,
        data_agreement: dict = None,
        publish_flag: str = "false",
        delete_flag: str = "false",
        schema_id: str = None,
        cred_def_id: str = None,
        data_agreement_proof_presentation_request: dict = None,
        is_existing_schema: str = "false",
        **kwargs
    ):
        """
        Initialise a new DataAgreementRecordV1 instance.

        Args:
            data_agreement_id: The unique identifier for the data agreement.
            state: The state of the data agreement.
            method_of_use: The method of use for the data agreement.
            data_agreement: The data agreement.
            kwargs: Any other parameters.
        """
        super().__init__(data_agreement_record_id, state, **kwargs)
        self.method_of_use = method_of_use
        self.state = state
        self.data_agreement = data_agreement
        self.data_agreement_id = data_agreement_id
        self.publish_flag = publish_flag
        self.delete_flag = delete_flag
        self.schema_id = schema_id
        self.cred_def_id = cred_def_id
        self.data_agreement_proof_presentation_request = data_agreement_proof_presentation_request
        self.is_existing_schema = is_existing_schema

    @property
    def data_agreement_record_id(self) -> str:
        """Accessor for data_agreement_record_id."""
        return self._id

    @property
    def record_value(self) -> dict:
        """Accessor for JSON record value generated for this transaction record."""
        return {
            prop: getattr(self, prop)
            for prop in (
                "state",
                "method_of_use",
                "data_agreement",
                "data_agreement_id",
                "publish_flag",
                "delete_flag",
                "schema_id",
                "cred_def_id",
                "data_agreement_proof_presentation_request",
                "is_existing_schema",
            )
        }

    def __eq__(self, other: Any) -> bool:
        """Comparison between records."""
        return super().__eq__(other)
    
    @property
    def _publish_flag(self) -> bool:
        """Accessor for publish_flag."""
        return self.publish_flag == "true"

    @_publish_flag.setter
    def _publish_flag(self, value: bool) -> None:
        """Setter for publish_flag."""
        self.publish_flag = "true" if value else "false"
    
    @property
    def _delete_flag(self) -> bool:
        """Accessor for delete_flag."""
        return self.delete_flag == "true"
    
    @_delete_flag.setter
    def _delete_flag(self, value: bool) -> None:
        """Setter for delete_flag."""
        self.delete_flag = "true" if value else "false"
    
    @property
    def _is_existing_schema(self) -> bool:
        """Accessor for is_existing_schema."""
        return self.is_existing_schema == "true"
    
    @_is_existing_schema.setter
    def _is_existing_schema(self, value: bool) -> None:
        """Setter for is_existing_schema."""
        self.is_existing_schema = "true" if value else "false"

    @property
    def is_published(self) -> bool:
        """Check if data agreement record is published."""
        return True if self._publish_flag and not self._delete_flag else False
    
    @property
    def is_deleted(self) -> bool:
        """Check if data agreemnent is deleted."""
        return True if self._delete_flag and not self._publish_flag else False

    @property
    def is_draft(self) -> bool:
        """Check if data agreement is a draft."""
        return True if not self._publish_flag and not self._delete_flag else False
    
    @classmethod
    async def retrieve_non_deleted_data_agreement_by_id(
        cls, 
        context: InjectionContext,
        data_agreement_id: str,
    ) -> "DataAgreementV1Record":
        """
        Retrieve a non-deleted data agreement record by its data agreement id.

        Args:
            context: The injection context to use.
            data_agreement_id: The data agreement id.
        
        Returns:
            The data agreement record.
        """

        tag_filter: dict = {
            "data_agreement_id": data_agreement_id,
            "delete_flag": bool_to_str(False)
        }
        post_filter: dict = None

        return await cls.retrieve_by_tag_filter(
            context,
            tag_filter,
            post_filter
        )
    
    @classmethod
    async def retrieve_all_non_deleted_data_agreements(
        cls,
        context: InjectionContext,
    ) -> typing.List["DataAgreementV1Record"]:
        """
        Retrieve all non-deleted data agreements.

        Args:
            context: The injection context to use.

        Returns:
            The data agreements.
        """

        tag_filter: dict = {
            "delete_flag": bool_to_str(False)
        }

        return await cls.query(
            context,
            tag_filter=tag_filter,
        )

class DataAgreementV1RecordSchema(BaseExchangeSchema):

    class Meta:
        """DataAgreementRecordV1Schema metadata."""

        # Model class
        model_class = DataAgreementV1Record

    # Data agreement record identifier
    data_agreement_record_id = fields.Str(
        required=True,
        description="Data Agreement Record identifier",
        example=UUIDFour.EXAMPLE
    )

    # Data agreement identifier
    data_agreement_id = fields.Str(
        required=True,
        description="The unique identifier for the data agreement.",
        example=UUIDFour.EXAMPLE
    )

    # State of the data agreement.
    state = fields.Str(
        required=True,
        description="The state of the data agreement.",
        example=DataAgreementV1Record.STATE_PREPARATION,
        validate=validate.OneOf(
            [
                DataAgreementV1Record.STATE_PREPARATION,
            ]
        )
    )

    # Method of use for the data agreement.
    method_of_use = fields.Str(
        required=True,
        description="The method of use for the data agreement.",
        example="data-source",
        validate=validate.OneOf(
            [
                DataAgreementV1Record.METHOD_OF_USE_DATA_SOURCE,
                DataAgreementV1Record.METHOD_OF_USE_DATA_USING_SERVICE,
            ]
        )
    )

    # Data agreement
    data_agreement = fields.Dict(
        required=True,
        description="The data agreement.",
    )

    # Production flag
    publish_flag = fields.Str(
        required=True,
        description="The production flag.",
        example="false",
        validate=validate.OneOf(
            [
                "true",
                "false",
            ]
        )
    )

    # Delete flag
    delete_flag = fields.Str(
        required=True,
        description="The delete flag.",
        example="false",
        validate=validate.OneOf(
            [
                "true",
                "false",
            ]
        )
    )

    # Schema identifier
    schema_id = fields.Str(
        required=True,
        description="The schema identifier.",
        example="WgWxqztrNooG92RXvxSTWv:2:schema_name:1.0"
    )

    # Credential definition identifier
    cred_def_id = fields.Str(
        required=True,
        description="The credential definition identifier.",
        example="WgWxqztrNooG92RXvxSTWv:3:CL:20:tag"
    )

    # Data agreement proof presentation request
    data_agreement_proof_presentation_request = fields.Dict(
        required=True,
        description="The data agreement proof presentation request.",
    )

    is_existing_schema = fields.Str(
        required=True,
        description="Is existing schema.",
        example="false",
        validate=validate.OneOf(
            [
                "true",
                "false",
            ]
        )
    )

Classes

class DataAgreementV1Record (*, data_agreement_record_id: str = None, data_agreement_id: str = None, state: str = None, method_of_use: str = None, data_agreement: dict = None, publish_flag: str = 'false', delete_flag: str = 'false', schema_id: str = None, cred_def_id: str = None, data_agreement_proof_presentation_request: dict = None, is_existing_schema: str = 'false', **kwargs)

DataAgreementV1Record model class for serialisation/deserialisation of data agreement records to and from wallet. Data agreement schema will be version 1.0.

Initialise a new DataAgreementRecordV1 instance.

Args

data_agreement_id
The unique identifier for the data agreement.
state
The state of the data agreement.
method_of_use
The method of use for the data agreement.
data_agreement
The data agreement.
kwargs
Any other parameters.
Expand source code
class DataAgreementV1Record(BaseExchangeRecord):
    """
    DataAgreementV1Record model class for serialisation/deserialisation of data agreement records to and from wallet.
    Data agreement schema will be version 1.0.
    """
    class Meta:
        # Data Agreement Record schema (version 1.0)
        schema_class = "DataAgreementV1RecordSchema"

    # Wallet record type
    RECORD_TYPE = "data_agreement_record"

    # Wallet record identifier field
    RECORD_ID_NAME = "data_agreement_record_id"

    # Webhook topic name for this record type
    WEBHOOK_TOPIC = None

    # Wallet record tags used for filtering
    # Note: These are not tags for the ledger, but rather tags for the wallet
    #      to group records.
    TAG_NAMES = {
        "~method_of_use", 
        "~data_agreement_id", 
        "~publish_flag", 
        "~delete_flag",
        "~schema_id",
        "~cred_def_id",
        "~is_existing_schema"
    }

    # State of the data agreement.
    # Only one possible value at this stage of the DA - preparation
    STATE_PREPARATION = "PREPARATION"

    METHOD_OF_USE_DATA_SOURCE = "data-source"
    METHOD_OF_USE_DATA_USING_SERVICE = "data-using-service"

    def __init__(
        self,
        *,
        data_agreement_record_id: str = None,
        data_agreement_id: str = None,
        state: str = None,
        method_of_use: str = None,
        data_agreement: dict = None,
        publish_flag: str = "false",
        delete_flag: str = "false",
        schema_id: str = None,
        cred_def_id: str = None,
        data_agreement_proof_presentation_request: dict = None,
        is_existing_schema: str = "false",
        **kwargs
    ):
        """
        Initialise a new DataAgreementRecordV1 instance.

        Args:
            data_agreement_id: The unique identifier for the data agreement.
            state: The state of the data agreement.
            method_of_use: The method of use for the data agreement.
            data_agreement: The data agreement.
            kwargs: Any other parameters.
        """
        super().__init__(data_agreement_record_id, state, **kwargs)
        self.method_of_use = method_of_use
        self.state = state
        self.data_agreement = data_agreement
        self.data_agreement_id = data_agreement_id
        self.publish_flag = publish_flag
        self.delete_flag = delete_flag
        self.schema_id = schema_id
        self.cred_def_id = cred_def_id
        self.data_agreement_proof_presentation_request = data_agreement_proof_presentation_request
        self.is_existing_schema = is_existing_schema

    @property
    def data_agreement_record_id(self) -> str:
        """Accessor for data_agreement_record_id."""
        return self._id

    @property
    def record_value(self) -> dict:
        """Accessor for JSON record value generated for this transaction record."""
        return {
            prop: getattr(self, prop)
            for prop in (
                "state",
                "method_of_use",
                "data_agreement",
                "data_agreement_id",
                "publish_flag",
                "delete_flag",
                "schema_id",
                "cred_def_id",
                "data_agreement_proof_presentation_request",
                "is_existing_schema",
            )
        }

    def __eq__(self, other: Any) -> bool:
        """Comparison between records."""
        return super().__eq__(other)
    
    @property
    def _publish_flag(self) -> bool:
        """Accessor for publish_flag."""
        return self.publish_flag == "true"

    @_publish_flag.setter
    def _publish_flag(self, value: bool) -> None:
        """Setter for publish_flag."""
        self.publish_flag = "true" if value else "false"
    
    @property
    def _delete_flag(self) -> bool:
        """Accessor for delete_flag."""
        return self.delete_flag == "true"
    
    @_delete_flag.setter
    def _delete_flag(self, value: bool) -> None:
        """Setter for delete_flag."""
        self.delete_flag = "true" if value else "false"
    
    @property
    def _is_existing_schema(self) -> bool:
        """Accessor for is_existing_schema."""
        return self.is_existing_schema == "true"
    
    @_is_existing_schema.setter
    def _is_existing_schema(self, value: bool) -> None:
        """Setter for is_existing_schema."""
        self.is_existing_schema = "true" if value else "false"

    @property
    def is_published(self) -> bool:
        """Check if data agreement record is published."""
        return True if self._publish_flag and not self._delete_flag else False
    
    @property
    def is_deleted(self) -> bool:
        """Check if data agreemnent is deleted."""
        return True if self._delete_flag and not self._publish_flag else False

    @property
    def is_draft(self) -> bool:
        """Check if data agreement is a draft."""
        return True if not self._publish_flag and not self._delete_flag else False
    
    @classmethod
    async def retrieve_non_deleted_data_agreement_by_id(
        cls, 
        context: InjectionContext,
        data_agreement_id: str,
    ) -> "DataAgreementV1Record":
        """
        Retrieve a non-deleted data agreement record by its data agreement id.

        Args:
            context: The injection context to use.
            data_agreement_id: The data agreement id.
        
        Returns:
            The data agreement record.
        """

        tag_filter: dict = {
            "data_agreement_id": data_agreement_id,
            "delete_flag": bool_to_str(False)
        }
        post_filter: dict = None

        return await cls.retrieve_by_tag_filter(
            context,
            tag_filter,
            post_filter
        )
    
    @classmethod
    async def retrieve_all_non_deleted_data_agreements(
        cls,
        context: InjectionContext,
    ) -> typing.List["DataAgreementV1Record"]:
        """
        Retrieve all non-deleted data agreements.

        Args:
            context: The injection context to use.

        Returns:
            The data agreements.
        """

        tag_filter: dict = {
            "delete_flag": bool_to_str(False)
        }

        return await cls.query(
            context,
            tag_filter=tag_filter,
        )

Ancestors

  • aries_cloudagent.messaging.models.base_record.BaseExchangeRecord
  • aries_cloudagent.messaging.models.base_record.BaseRecord
  • aries_cloudagent.messaging.models.base.BaseModel
  • abc.ABC

Class variables

var METHOD_OF_USE_DATA_SOURCE
var METHOD_OF_USE_DATA_USING_SERVICE
var Meta
var RECORD_ID_NAME
var RECORD_TYPE
var STATE_PREPARATION
var TAG_NAMES
var WEBHOOK_TOPIC

Static methods

async def retrieve_all_non_deleted_data_agreements(context: aries_cloudagent.config.injection_context.InjectionContext) ‑> List[DataAgreementV1Record]

Retrieve all non-deleted data agreements.

Args

context
The injection context to use.

Returns

The data agreements.

Expand source code
@classmethod
async def retrieve_all_non_deleted_data_agreements(
    cls,
    context: InjectionContext,
) -> typing.List["DataAgreementV1Record"]:
    """
    Retrieve all non-deleted data agreements.

    Args:
        context: The injection context to use.

    Returns:
        The data agreements.
    """

    tag_filter: dict = {
        "delete_flag": bool_to_str(False)
    }

    return await cls.query(
        context,
        tag_filter=tag_filter,
    )
async def retrieve_non_deleted_data_agreement_by_id(context: aries_cloudagent.config.injection_context.InjectionContext, data_agreement_id: str) ‑> DataAgreementV1Record

Retrieve a non-deleted data agreement record by its data agreement id.

Args

context
The injection context to use.
data_agreement_id
The data agreement id.

Returns

The data agreement record.

Expand source code
@classmethod
async def retrieve_non_deleted_data_agreement_by_id(
    cls, 
    context: InjectionContext,
    data_agreement_id: str,
) -> "DataAgreementV1Record":
    """
    Retrieve a non-deleted data agreement record by its data agreement id.

    Args:
        context: The injection context to use.
        data_agreement_id: The data agreement id.
    
    Returns:
        The data agreement record.
    """

    tag_filter: dict = {
        "data_agreement_id": data_agreement_id,
        "delete_flag": bool_to_str(False)
    }
    post_filter: dict = None

    return await cls.retrieve_by_tag_filter(
        context,
        tag_filter,
        post_filter
    )

Instance variables

var data_agreement_record_id : str

Accessor for data_agreement_record_id.

Expand source code
@property
def data_agreement_record_id(self) -> str:
    """Accessor for data_agreement_record_id."""
    return self._id
var is_deleted : bool

Check if data agreemnent is deleted.

Expand source code
@property
def is_deleted(self) -> bool:
    """Check if data agreemnent is deleted."""
    return True if self._delete_flag and not self._publish_flag else False
var is_draft : bool

Check if data agreement is a draft.

Expand source code
@property
def is_draft(self) -> bool:
    """Check if data agreement is a draft."""
    return True if not self._publish_flag and not self._delete_flag else False
var is_published : bool

Check if data agreement record is published.

Expand source code
@property
def is_published(self) -> bool:
    """Check if data agreement record is published."""
    return True if self._publish_flag and not self._delete_flag else False
var record_value : dict

Accessor for JSON record value generated for this transaction record.

Expand source code
@property
def record_value(self) -> dict:
    """Accessor for JSON record value generated for this transaction record."""
    return {
        prop: getattr(self, prop)
        for prop in (
            "state",
            "method_of_use",
            "data_agreement",
            "data_agreement_id",
            "publish_flag",
            "delete_flag",
            "schema_id",
            "cred_def_id",
            "data_agreement_proof_presentation_request",
            "is_existing_schema",
        )
    }
class DataAgreementV1RecordSchema (*args, **kwargs)

Base schema for exchange records.

Initialize BaseModelSchema.

Raises

TypeError
If model_class is not set on Meta
Expand source code
class DataAgreementV1RecordSchema(BaseExchangeSchema):

    class Meta:
        """DataAgreementRecordV1Schema metadata."""

        # Model class
        model_class = DataAgreementV1Record

    # Data agreement record identifier
    data_agreement_record_id = fields.Str(
        required=True,
        description="Data Agreement Record identifier",
        example=UUIDFour.EXAMPLE
    )

    # Data agreement identifier
    data_agreement_id = fields.Str(
        required=True,
        description="The unique identifier for the data agreement.",
        example=UUIDFour.EXAMPLE
    )

    # State of the data agreement.
    state = fields.Str(
        required=True,
        description="The state of the data agreement.",
        example=DataAgreementV1Record.STATE_PREPARATION,
        validate=validate.OneOf(
            [
                DataAgreementV1Record.STATE_PREPARATION,
            ]
        )
    )

    # Method of use for the data agreement.
    method_of_use = fields.Str(
        required=True,
        description="The method of use for the data agreement.",
        example="data-source",
        validate=validate.OneOf(
            [
                DataAgreementV1Record.METHOD_OF_USE_DATA_SOURCE,
                DataAgreementV1Record.METHOD_OF_USE_DATA_USING_SERVICE,
            ]
        )
    )

    # Data agreement
    data_agreement = fields.Dict(
        required=True,
        description="The data agreement.",
    )

    # Production flag
    publish_flag = fields.Str(
        required=True,
        description="The production flag.",
        example="false",
        validate=validate.OneOf(
            [
                "true",
                "false",
            ]
        )
    )

    # Delete flag
    delete_flag = fields.Str(
        required=True,
        description="The delete flag.",
        example="false",
        validate=validate.OneOf(
            [
                "true",
                "false",
            ]
        )
    )

    # Schema identifier
    schema_id = fields.Str(
        required=True,
        description="The schema identifier.",
        example="WgWxqztrNooG92RXvxSTWv:2:schema_name:1.0"
    )

    # Credential definition identifier
    cred_def_id = fields.Str(
        required=True,
        description="The credential definition identifier.",
        example="WgWxqztrNooG92RXvxSTWv:3:CL:20:tag"
    )

    # Data agreement proof presentation request
    data_agreement_proof_presentation_request = fields.Dict(
        required=True,
        description="The data agreement proof presentation request.",
    )

    is_existing_schema = fields.Str(
        required=True,
        description="Is existing schema.",
        example="false",
        validate=validate.OneOf(
            [
                "true",
                "false",
            ]
        )
    )

Ancestors

  • aries_cloudagent.messaging.models.base_record.BaseExchangeSchema
  • aries_cloudagent.messaging.models.base_record.BaseRecordSchema
  • aries_cloudagent.messaging.models.base.BaseModelSchema
  • marshmallow.schema.Schema
  • marshmallow.base.SchemaABC

Class variables

var Meta

DataAgreementRecordV1Schema metadata.

var opts