Module mydata_did.v1_0.manager
Expand source code
import base64
import logging
import json
import time
import typing
import aiohttp
from aries_cloudagent.config.injection_context import InjectionContext
from aries_cloudagent.connections.models.connection_record import ConnectionRecord
from aries_cloudagent.connections.models.connection_target import ConnectionTarget
from aries_cloudagent.messaging.responder import BaseResponder
from aries_cloudagent.core.dispatcher import DispatcherResponder
from aries_cloudagent.transport.inbound.receipt import MessageReceipt
from aries_cloudagent.core.error import BaseError
from aries_cloudagent.storage.base import BaseStorage, StorageRecord
from aries_cloudagent.storage.indy import IndyStorage
from aries_cloudagent.storage.error import (
StorageNotFoundError,
StorageDuplicateError,
StorageError
)
from aries_cloudagent.wallet.indy import IndyWallet
from aries_cloudagent.wallet.base import BaseWallet
from aries_cloudagent.transport.pack_format import PackWireFormat
from aries_cloudagent.transport.wire_format import BaseWireFormat
from aries_cloudagent.messaging.decorators.transport_decorator import TransportDecorator
from aries_cloudagent.protocols.connections.v1_0.manager import (
ConnectionManager,
)
from .messages.read_did import ReadDIDMessage, ReadDIDMessageBody
from .messages.read_did_response import ReadDIDResponseMessage, ReadDIDResponseMessageSchema
from .messages.problem_report import (
MyDataDIDProblemReportMessage,
MyDataDIDProblemReportMessageReason
)
from .messages.json_ld_processed import JSONLDProcessedMessage
from .messages.json_ld_processed_response import JSONLDProcessedResponseMessage
from .messages.json_ld_problem_report import JSONLDProblemReport, JSONLDProblemReportReason
from .models.diddoc_model import (
MyDataDIDResponseBody,
MyDataDIDDoc,
)
from .models.json_ld_processed_response_model import JSONLDProcessedResponseBody
from .models.json_ld_processed_model import JSONLDProcessedBody
from .utils.did.mydata_did import DIDMyData
from .utils.wallet.key_type import KeyType
from .utils.jsonld.create_verify_data import create_verify_data
class ADAManagerError(BaseError):
"""ADA manager error"""
class ADAManager:
# Record for storing data agreement instance metadata (client)
RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA = "data_agreement_instance_metadata"
# Record for keeping track of DIDs that are registered in the DID registry (MyData DID registry)
RECORD_TYPE_MYDATA_DID_REGISTRY_DID_INFO = "mydata_did_registry_did_info"
# Record for keeping metadata about data agreement QR codes (client)
RECORD_TYPE_DATA_AGREEMENT_QR_CODE_METADATA = "data_agreement_qr_code_metadata"
# Temporary record for keeping personal data of unpublished (or draft) data agreements
RECORD_TYPE_TEMPORARY_DATA_AGREEMENT_PERSONAL_DATA = "temporary_data_agreement_personal_data"
# Record for data controller details
RECORD_TYPE_DATA_CONTROLLER_DETAILS = "data_controller_details"
# Record for existing connection details.
RECORD_TYPE_EXISTING_CONNECTION = "existing_connection"
DATA_AGREEMENT_RECORD_TYPE = "dataagreement_record"
def __init__(self, context: InjectionContext) -> None:
self._context = context
self._logger = logging.getLogger(__name__)
@property
def context(self) -> InjectionContext:
return self._context
async def process_read_did_message(self,
read_did_message: ReadDIDMessage,
receipt: MessageReceipt):
"""
Process read-did DIDComm message
"""
# Storage instance from context
storage: IndyStorage = await self.context.inject(BaseStorage)
# Responder instance from context
responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False)
# From and To DIDs of the recieved message
create_did_message_from_did: DIDMyData = DIDMyData.from_public_key_b58(
receipt.sender_verkey, key_type=KeyType.ED25519)
create_did_message_to_did: DIDMyData = DIDMyData.from_public_key_b58(
receipt.recipient_verkey, key_type=KeyType.ED25519)
# From and To DIDs for the response messages
response_message_from_did = create_did_message_to_did
response_message_to_did = create_did_message_from_did
mydata_did_registry_did_info_record = None
try:
# Fetch DID from wallet
mydata_did_registry_did_info_record = await storage.search_records(
type_filter=ADAManager.RECORD_TYPE_MYDATA_DID_REGISTRY_DID_INFO,
tag_query={"did": read_did_message.body.did}
).fetch_single()
except (StorageNotFoundError, StorageDuplicateError):
# Send problem-report message.
mydata_did_problem_report = MyDataDIDProblemReportMessage(
problem_code=MyDataDIDProblemReportMessageReason.DID_NOT_FOUND.value,
explain="DID not found.",
from_did=response_message_from_did.did,
to_did=response_message_to_did.did,
created_time=round(time.time() * 1000)
)
# Assign thread id
mydata_did_problem_report.assign_thread_id(
thid=read_did_message._id)
if responder:
await responder.send_reply(mydata_did_problem_report)
return
# Send read-did-response message
read_did_response_message = ReadDIDResponseMessage(
from_did=response_message_from_did.did,
to_did=response_message_to_did.did,
created_time=round(time.time() * 1000),
body=MyDataDIDResponseBody(
did_doc=MyDataDIDDoc.from_json(
mydata_did_registry_did_info_record.value),
version=mydata_did_registry_did_info_record.tags.get(
"version"),
status=mydata_did_registry_did_info_record.tags.get("status")
)
)
# Assign thread id
read_did_response_message.assign_thread_id(
thid=read_did_message._id)
if responder:
await responder.send_reply(read_did_response_message)
async def process_read_did_response_message(
self,
read_did_response_message: ReadDIDResponseMessage,
receipt: MessageReceipt
):
"""
Process read-did-response DIDComm message
"""
pass
async def send_read_did_message(self, did: str):
"""
Send read-did DIDComm message
"""
pass
async def store_data_agreement_instance_metadata(
self,
*,
data_agreement_id: str = None,
data_agreement_template_id: str = None,
method_of_use: str = None,
data_exchange_record_id: str = None
) -> None:
"""Store data agreement instance metadata"""
# Fetch storage from context
storage: IndyStorage = await self.context.inject(BaseStorage)
data_instance_metadata_record = StorageRecord(
self.RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA,
data_agreement_id,
{
"data_agreement_id": data_agreement_id,
"data_agreement_template_id": data_agreement_template_id,
"method_of_use": method_of_use,
"data_exchange_record_id": data_exchange_record_id
}
)
await storage.add_record(data_instance_metadata_record)
async def delete_data_agreement_instance_metadata(self, *, tag_query: dict = None) -> None:
"""Delete data agreement instance metadata"""
# Fetch storage from context
storage: IndyStorage = await self.context.inject(BaseStorage)
storage_records = await storage.search_records(
type_filter=self.RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA,
tag_query=tag_query
).fetch_all()
for storage_record in storage_records:
await storage.delete_record(storage_record)
async def query_data_agreement_instance_metadata(self, *, tag_query: dict = None) -> typing.List[StorageRecord]:
"""Query data agreement instance metadata"""
# Fetch storage from context
storage: IndyStorage = await self.context.inject(BaseStorage)
storage_records = await storage.search_records(
type_filter=self.RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA,
tag_query=tag_query
).fetch_all()
return storage_records
async def resolve_remote_mydata_did(self, *, mydata_did: str) -> MyDataDIDResponseBody:
"""Resolve remote MyData DID"""
# Initialize DID MyData
mydata_did = DIDMyData.from_did(mydata_did)
# Fetch wallet from context
wallet: IndyWallet = await self.context.inject(BaseWallet)
# Get pack format from context
pack_format: PackWireFormat = await self.context.inject(BaseWireFormat)
# Fetch connection record marked as MyData DID registry
connection_record, err = await self.fetch_mydata_did_registry_connection_record()
if err:
raise ADAManagerError(
"Failed to fetch MyData DID registry connection record")
# Construct read-did message
# from_did
pairwise_local_did_record = await wallet.get_local_did(connection_record.my_did)
from_did = DIDMyData.from_public_key_b58(
pairwise_local_did_record.verkey, key_type=KeyType.ED25519)
# to_did
to_did = DIDMyData.from_public_key_b58(
connection_record.their_did, key_type=KeyType.ED25519)
# Create read-did message
read_did_message = ReadDIDMessage(
from_did=from_did.did,
to_did=to_did.did,
created_time=round(time.time() * 1000),
body=ReadDIDMessageBody(
did=mydata_did.did
)
)
# Add transport decorator
read_did_message._decorators["transport"] = TransportDecorator(
return_route="all"
)
# Initialise connection manager
connection_manager = ConnectionManager(self.context)
# Fetch connection targets
connection_targets = await connection_manager.fetch_connection_targets(connection_record)
if len(connection_targets) == 0:
raise ADAManagerError("No connection targets found")
connection_target: ConnectionTarget = connection_targets[0]
# Pack message
packed_message = await pack_format.pack(
context=self.context,
message_json=read_did_message.serialize(as_string=True),
recipient_keys=connection_target.recipient_keys,
routing_keys=None,
sender_key=connection_target.sender_key,
)
headers = {
"Content-Type": "application/ssi-agent-wire"
}
async with aiohttp.ClientSession(headers=headers) as session:
async with session.post(connection_target.endpoint, data=packed_message) as response:
if response.status != 200:
raise ADAManagerError(
f"HTTP request failed with status code {response.status}")
message_body = await response.read()
unpacked = await wallet.unpack_message(message_body)
(
message_json,
sender_verkey,
recipient_verkey,
) = unpacked
message_json = json.loads(message_json)
if "problem-report" in message_json["@type"]:
raise ADAManagerError(
f"Problem report received with problem-code:{message_json['problem-code']} and reason: {message_json['explain']}")
if "read-did-response" in message_json["@type"]:
read_did_response_message: ReadDIDResponseMessage = \
ReadDIDResponseMessageSchema().load(message_json)
if read_did_response_message.body.status == "revoked":
raise ADAManagerError(
f"MyData DID {mydata_did.did} is revoked"
)
return read_did_response_message.body
async def process_json_ld_processed_message(
self,
json_ld_processed_message: JSONLDProcessedMessage,
receipt: MessageReceipt
) -> None:
# Responder instance
responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False)
# From and To MyData DIDs
to_did: DIDMyData = DIDMyData.from_public_key_b58(
receipt.sender_verkey, key_type=KeyType.ED25519)
from_did: DIDMyData = DIDMyData.from_public_key_b58(
receipt.recipient_verkey, key_type=KeyType.ED25519)
try:
# Base64 decode data_base64
data_base64_decoded = base64.b64decode(
json_ld_processed_message.body.data_base64)
# JSON load data_base64
data_json = json.loads(data_base64_decoded)
# Base64 decode signature_options_base64
signature_options_base64_decoded = base64.b64decode(
json_ld_processed_message.body.signature_options_base64)
# JSON load signature_options_base64
signature_options_json = json.loads(
signature_options_base64_decoded)
# verify_data function
framed, combine_hash = create_verify_data(
data_json, signature_options_json, json_ld_processed_message.body.proof_chain)
# Base64 encode framed
framed_base64_encoded = base64.b64encode(
json.dumps(framed).encode("utf-8")).decode("utf-8")
# Base64 encode combine_hash
combine_hash_base64_encoded = base64.b64encode(
combine_hash.encode("utf-8")).decode("utf-8")
# Construct JSONLD Processed Response Message
json_ld_processed_response_message = JSONLDProcessedResponseMessage(
from_did=from_did.did,
to_did=to_did.did,
created_time=round(time.time() * 1000),
body=JSONLDProcessedResponseBody(
framed_base64=framed_base64_encoded,
combined_hash_base64=combine_hash_base64_encoded
)
)
if responder:
await responder.send_reply(
json_ld_processed_response_message,
connection_id=self.context.connection_record.connection_id
)
except Exception as err:
# Send problem report
json_ld_problem_report_message = JSONLDProblemReport(
problem_code=JSONLDProblemReportReason.INVALID_INPUT.value,
explain=str(err),
from_did=from_did.did,
to_did=to_did.did,
created_time=round(time.time() * 1000)
)
if responder:
await responder.send_reply(
json_ld_problem_report_message,
connection_id=self.context.connection_record.connection_id
)
async def send_json_ld_processed_message(
self,
*,
connection_id: str,
data: dict,
signature_options: dict,
proof_chain: bool
) -> None:
"""Send JSON-LD Processed Message to remote agent."""
# Responder instance
responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False)
try:
# Retrieve connection record by id
connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
self.context,
connection_id
)
except StorageError as err:
raise ADAManagerError(
f"Failed to retrieve connection record: {err}"
)
# From and to mydata dids
from_did: DIDMyData = DIDMyData.from_public_key_b58(
connection_record.my_did, key_type=KeyType.ED25519
)
to_did: DIDMyData = DIDMyData.from_public_key_b58(
connection_record.their_did, key_type=KeyType.ED25519
)
# Base64 encode data
data_base64 = base64.b64encode(json.dumps(
data).encode("utf-8")).decode("utf-8")
# Base64 encode signature_options
signature_options_base64 = base64.b64encode(json.dumps(
signature_options).encode("utf-8")).decode("utf-8")
# Construct JSONLD Processed Message
json_ld_processed_message = JSONLDProcessedMessage(
from_did=from_did.did,
to_did=to_did.did,
created_time=round(time.time() * 1000),
body=JSONLDProcessedBody(
data_base64=data_base64,
signature_options_base64=signature_options_base64,
proof_chain=proof_chain
)
)
# Send JSONLD Processed Message
if responder:
await responder.send_reply(
json_ld_processed_message,
connection_id=connection_record.connection_id
)
Classes
class ADAManager (context: aries_cloudagent.config.injection_context.InjectionContext)
-
Expand source code
class ADAManager: # Record for storing data agreement instance metadata (client) RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA = "data_agreement_instance_metadata" # Record for keeping track of DIDs that are registered in the DID registry (MyData DID registry) RECORD_TYPE_MYDATA_DID_REGISTRY_DID_INFO = "mydata_did_registry_did_info" # Record for keeping metadata about data agreement QR codes (client) RECORD_TYPE_DATA_AGREEMENT_QR_CODE_METADATA = "data_agreement_qr_code_metadata" # Temporary record for keeping personal data of unpublished (or draft) data agreements RECORD_TYPE_TEMPORARY_DATA_AGREEMENT_PERSONAL_DATA = "temporary_data_agreement_personal_data" # Record for data controller details RECORD_TYPE_DATA_CONTROLLER_DETAILS = "data_controller_details" # Record for existing connection details. RECORD_TYPE_EXISTING_CONNECTION = "existing_connection" DATA_AGREEMENT_RECORD_TYPE = "dataagreement_record" def __init__(self, context: InjectionContext) -> None: self._context = context self._logger = logging.getLogger(__name__) @property def context(self) -> InjectionContext: return self._context async def process_read_did_message(self, read_did_message: ReadDIDMessage, receipt: MessageReceipt): """ Process read-did DIDComm message """ # Storage instance from context storage: IndyStorage = await self.context.inject(BaseStorage) # Responder instance from context responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False) # From and To DIDs of the recieved message create_did_message_from_did: DIDMyData = DIDMyData.from_public_key_b58( receipt.sender_verkey, key_type=KeyType.ED25519) create_did_message_to_did: DIDMyData = DIDMyData.from_public_key_b58( receipt.recipient_verkey, key_type=KeyType.ED25519) # From and To DIDs for the response messages response_message_from_did = create_did_message_to_did response_message_to_did = create_did_message_from_did mydata_did_registry_did_info_record = None try: # Fetch DID from wallet mydata_did_registry_did_info_record = await storage.search_records( type_filter=ADAManager.RECORD_TYPE_MYDATA_DID_REGISTRY_DID_INFO, tag_query={"did": read_did_message.body.did} ).fetch_single() except (StorageNotFoundError, StorageDuplicateError): # Send problem-report message. mydata_did_problem_report = MyDataDIDProblemReportMessage( problem_code=MyDataDIDProblemReportMessageReason.DID_NOT_FOUND.value, explain="DID not found.", from_did=response_message_from_did.did, to_did=response_message_to_did.did, created_time=round(time.time() * 1000) ) # Assign thread id mydata_did_problem_report.assign_thread_id( thid=read_did_message._id) if responder: await responder.send_reply(mydata_did_problem_report) return # Send read-did-response message read_did_response_message = ReadDIDResponseMessage( from_did=response_message_from_did.did, to_did=response_message_to_did.did, created_time=round(time.time() * 1000), body=MyDataDIDResponseBody( did_doc=MyDataDIDDoc.from_json( mydata_did_registry_did_info_record.value), version=mydata_did_registry_did_info_record.tags.get( "version"), status=mydata_did_registry_did_info_record.tags.get("status") ) ) # Assign thread id read_did_response_message.assign_thread_id( thid=read_did_message._id) if responder: await responder.send_reply(read_did_response_message) async def process_read_did_response_message( self, read_did_response_message: ReadDIDResponseMessage, receipt: MessageReceipt ): """ Process read-did-response DIDComm message """ pass async def send_read_did_message(self, did: str): """ Send read-did DIDComm message """ pass async def store_data_agreement_instance_metadata( self, *, data_agreement_id: str = None, data_agreement_template_id: str = None, method_of_use: str = None, data_exchange_record_id: str = None ) -> None: """Store data agreement instance metadata""" # Fetch storage from context storage: IndyStorage = await self.context.inject(BaseStorage) data_instance_metadata_record = StorageRecord( self.RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA, data_agreement_id, { "data_agreement_id": data_agreement_id, "data_agreement_template_id": data_agreement_template_id, "method_of_use": method_of_use, "data_exchange_record_id": data_exchange_record_id } ) await storage.add_record(data_instance_metadata_record) async def delete_data_agreement_instance_metadata(self, *, tag_query: dict = None) -> None: """Delete data agreement instance metadata""" # Fetch storage from context storage: IndyStorage = await self.context.inject(BaseStorage) storage_records = await storage.search_records( type_filter=self.RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA, tag_query=tag_query ).fetch_all() for storage_record in storage_records: await storage.delete_record(storage_record) async def query_data_agreement_instance_metadata(self, *, tag_query: dict = None) -> typing.List[StorageRecord]: """Query data agreement instance metadata""" # Fetch storage from context storage: IndyStorage = await self.context.inject(BaseStorage) storage_records = await storage.search_records( type_filter=self.RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA, tag_query=tag_query ).fetch_all() return storage_records async def resolve_remote_mydata_did(self, *, mydata_did: str) -> MyDataDIDResponseBody: """Resolve remote MyData DID""" # Initialize DID MyData mydata_did = DIDMyData.from_did(mydata_did) # Fetch wallet from context wallet: IndyWallet = await self.context.inject(BaseWallet) # Get pack format from context pack_format: PackWireFormat = await self.context.inject(BaseWireFormat) # Fetch connection record marked as MyData DID registry connection_record, err = await self.fetch_mydata_did_registry_connection_record() if err: raise ADAManagerError( "Failed to fetch MyData DID registry connection record") # Construct read-did message # from_did pairwise_local_did_record = await wallet.get_local_did(connection_record.my_did) from_did = DIDMyData.from_public_key_b58( pairwise_local_did_record.verkey, key_type=KeyType.ED25519) # to_did to_did = DIDMyData.from_public_key_b58( connection_record.their_did, key_type=KeyType.ED25519) # Create read-did message read_did_message = ReadDIDMessage( from_did=from_did.did, to_did=to_did.did, created_time=round(time.time() * 1000), body=ReadDIDMessageBody( did=mydata_did.did ) ) # Add transport decorator read_did_message._decorators["transport"] = TransportDecorator( return_route="all" ) # Initialise connection manager connection_manager = ConnectionManager(self.context) # Fetch connection targets connection_targets = await connection_manager.fetch_connection_targets(connection_record) if len(connection_targets) == 0: raise ADAManagerError("No connection targets found") connection_target: ConnectionTarget = connection_targets[0] # Pack message packed_message = await pack_format.pack( context=self.context, message_json=read_did_message.serialize(as_string=True), recipient_keys=connection_target.recipient_keys, routing_keys=None, sender_key=connection_target.sender_key, ) headers = { "Content-Type": "application/ssi-agent-wire" } async with aiohttp.ClientSession(headers=headers) as session: async with session.post(connection_target.endpoint, data=packed_message) as response: if response.status != 200: raise ADAManagerError( f"HTTP request failed with status code {response.status}") message_body = await response.read() unpacked = await wallet.unpack_message(message_body) ( message_json, sender_verkey, recipient_verkey, ) = unpacked message_json = json.loads(message_json) if "problem-report" in message_json["@type"]: raise ADAManagerError( f"Problem report received with problem-code:{message_json['problem-code']} and reason: {message_json['explain']}") if "read-did-response" in message_json["@type"]: read_did_response_message: ReadDIDResponseMessage = \ ReadDIDResponseMessageSchema().load(message_json) if read_did_response_message.body.status == "revoked": raise ADAManagerError( f"MyData DID {mydata_did.did} is revoked" ) return read_did_response_message.body async def process_json_ld_processed_message( self, json_ld_processed_message: JSONLDProcessedMessage, receipt: MessageReceipt ) -> None: # Responder instance responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False) # From and To MyData DIDs to_did: DIDMyData = DIDMyData.from_public_key_b58( receipt.sender_verkey, key_type=KeyType.ED25519) from_did: DIDMyData = DIDMyData.from_public_key_b58( receipt.recipient_verkey, key_type=KeyType.ED25519) try: # Base64 decode data_base64 data_base64_decoded = base64.b64decode( json_ld_processed_message.body.data_base64) # JSON load data_base64 data_json = json.loads(data_base64_decoded) # Base64 decode signature_options_base64 signature_options_base64_decoded = base64.b64decode( json_ld_processed_message.body.signature_options_base64) # JSON load signature_options_base64 signature_options_json = json.loads( signature_options_base64_decoded) # verify_data function framed, combine_hash = create_verify_data( data_json, signature_options_json, json_ld_processed_message.body.proof_chain) # Base64 encode framed framed_base64_encoded = base64.b64encode( json.dumps(framed).encode("utf-8")).decode("utf-8") # Base64 encode combine_hash combine_hash_base64_encoded = base64.b64encode( combine_hash.encode("utf-8")).decode("utf-8") # Construct JSONLD Processed Response Message json_ld_processed_response_message = JSONLDProcessedResponseMessage( from_did=from_did.did, to_did=to_did.did, created_time=round(time.time() * 1000), body=JSONLDProcessedResponseBody( framed_base64=framed_base64_encoded, combined_hash_base64=combine_hash_base64_encoded ) ) if responder: await responder.send_reply( json_ld_processed_response_message, connection_id=self.context.connection_record.connection_id ) except Exception as err: # Send problem report json_ld_problem_report_message = JSONLDProblemReport( problem_code=JSONLDProblemReportReason.INVALID_INPUT.value, explain=str(err), from_did=from_did.did, to_did=to_did.did, created_time=round(time.time() * 1000) ) if responder: await responder.send_reply( json_ld_problem_report_message, connection_id=self.context.connection_record.connection_id ) async def send_json_ld_processed_message( self, *, connection_id: str, data: dict, signature_options: dict, proof_chain: bool ) -> None: """Send JSON-LD Processed Message to remote agent.""" # Responder instance responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False) try: # Retrieve connection record by id connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id( self.context, connection_id ) except StorageError as err: raise ADAManagerError( f"Failed to retrieve connection record: {err}" ) # From and to mydata dids from_did: DIDMyData = DIDMyData.from_public_key_b58( connection_record.my_did, key_type=KeyType.ED25519 ) to_did: DIDMyData = DIDMyData.from_public_key_b58( connection_record.their_did, key_type=KeyType.ED25519 ) # Base64 encode data data_base64 = base64.b64encode(json.dumps( data).encode("utf-8")).decode("utf-8") # Base64 encode signature_options signature_options_base64 = base64.b64encode(json.dumps( signature_options).encode("utf-8")).decode("utf-8") # Construct JSONLD Processed Message json_ld_processed_message = JSONLDProcessedMessage( from_did=from_did.did, to_did=to_did.did, created_time=round(time.time() * 1000), body=JSONLDProcessedBody( data_base64=data_base64, signature_options_base64=signature_options_base64, proof_chain=proof_chain ) ) # Send JSONLD Processed Message if responder: await responder.send_reply( json_ld_processed_message, connection_id=connection_record.connection_id )
Class variables
var DATA_AGREEMENT_RECORD_TYPE
var RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA
var RECORD_TYPE_DATA_AGREEMENT_QR_CODE_METADATA
var RECORD_TYPE_DATA_CONTROLLER_DETAILS
var RECORD_TYPE_EXISTING_CONNECTION
var RECORD_TYPE_MYDATA_DID_REGISTRY_DID_INFO
var RECORD_TYPE_TEMPORARY_DATA_AGREEMENT_PERSONAL_DATA
Instance variables
var context : aries_cloudagent.config.injection_context.InjectionContext
-
Expand source code
@property def context(self) -> InjectionContext: return self._context
Methods
async def delete_data_agreement_instance_metadata(self, *, tag_query: dict = None) ‑> None
-
Delete data agreement instance metadata
Expand source code
async def delete_data_agreement_instance_metadata(self, *, tag_query: dict = None) -> None: """Delete data agreement instance metadata""" # Fetch storage from context storage: IndyStorage = await self.context.inject(BaseStorage) storage_records = await storage.search_records( type_filter=self.RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA, tag_query=tag_query ).fetch_all() for storage_record in storage_records: await storage.delete_record(storage_record)
async def process_json_ld_processed_message(self, json_ld_processed_message: JSONLDProcessedMessage, receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt) ‑> None
-
Expand source code
async def process_json_ld_processed_message( self, json_ld_processed_message: JSONLDProcessedMessage, receipt: MessageReceipt ) -> None: # Responder instance responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False) # From and To MyData DIDs to_did: DIDMyData = DIDMyData.from_public_key_b58( receipt.sender_verkey, key_type=KeyType.ED25519) from_did: DIDMyData = DIDMyData.from_public_key_b58( receipt.recipient_verkey, key_type=KeyType.ED25519) try: # Base64 decode data_base64 data_base64_decoded = base64.b64decode( json_ld_processed_message.body.data_base64) # JSON load data_base64 data_json = json.loads(data_base64_decoded) # Base64 decode signature_options_base64 signature_options_base64_decoded = base64.b64decode( json_ld_processed_message.body.signature_options_base64) # JSON load signature_options_base64 signature_options_json = json.loads( signature_options_base64_decoded) # verify_data function framed, combine_hash = create_verify_data( data_json, signature_options_json, json_ld_processed_message.body.proof_chain) # Base64 encode framed framed_base64_encoded = base64.b64encode( json.dumps(framed).encode("utf-8")).decode("utf-8") # Base64 encode combine_hash combine_hash_base64_encoded = base64.b64encode( combine_hash.encode("utf-8")).decode("utf-8") # Construct JSONLD Processed Response Message json_ld_processed_response_message = JSONLDProcessedResponseMessage( from_did=from_did.did, to_did=to_did.did, created_time=round(time.time() * 1000), body=JSONLDProcessedResponseBody( framed_base64=framed_base64_encoded, combined_hash_base64=combine_hash_base64_encoded ) ) if responder: await responder.send_reply( json_ld_processed_response_message, connection_id=self.context.connection_record.connection_id ) except Exception as err: # Send problem report json_ld_problem_report_message = JSONLDProblemReport( problem_code=JSONLDProblemReportReason.INVALID_INPUT.value, explain=str(err), from_did=from_did.did, to_did=to_did.did, created_time=round(time.time() * 1000) ) if responder: await responder.send_reply( json_ld_problem_report_message, connection_id=self.context.connection_record.connection_id )
async def process_read_did_message(self, read_did_message: ReadDIDMessage, receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)
-
Process read-did DIDComm message
Expand source code
async def process_read_did_message(self, read_did_message: ReadDIDMessage, receipt: MessageReceipt): """ Process read-did DIDComm message """ # Storage instance from context storage: IndyStorage = await self.context.inject(BaseStorage) # Responder instance from context responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False) # From and To DIDs of the recieved message create_did_message_from_did: DIDMyData = DIDMyData.from_public_key_b58( receipt.sender_verkey, key_type=KeyType.ED25519) create_did_message_to_did: DIDMyData = DIDMyData.from_public_key_b58( receipt.recipient_verkey, key_type=KeyType.ED25519) # From and To DIDs for the response messages response_message_from_did = create_did_message_to_did response_message_to_did = create_did_message_from_did mydata_did_registry_did_info_record = None try: # Fetch DID from wallet mydata_did_registry_did_info_record = await storage.search_records( type_filter=ADAManager.RECORD_TYPE_MYDATA_DID_REGISTRY_DID_INFO, tag_query={"did": read_did_message.body.did} ).fetch_single() except (StorageNotFoundError, StorageDuplicateError): # Send problem-report message. mydata_did_problem_report = MyDataDIDProblemReportMessage( problem_code=MyDataDIDProblemReportMessageReason.DID_NOT_FOUND.value, explain="DID not found.", from_did=response_message_from_did.did, to_did=response_message_to_did.did, created_time=round(time.time() * 1000) ) # Assign thread id mydata_did_problem_report.assign_thread_id( thid=read_did_message._id) if responder: await responder.send_reply(mydata_did_problem_report) return # Send read-did-response message read_did_response_message = ReadDIDResponseMessage( from_did=response_message_from_did.did, to_did=response_message_to_did.did, created_time=round(time.time() * 1000), body=MyDataDIDResponseBody( did_doc=MyDataDIDDoc.from_json( mydata_did_registry_did_info_record.value), version=mydata_did_registry_did_info_record.tags.get( "version"), status=mydata_did_registry_did_info_record.tags.get("status") ) ) # Assign thread id read_did_response_message.assign_thread_id( thid=read_did_message._id) if responder: await responder.send_reply(read_did_response_message)
async def process_read_did_response_message(self, read_did_response_message: ReadDIDResponseMessage, receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)
-
Process read-did-response DIDComm message
Expand source code
async def process_read_did_response_message( self, read_did_response_message: ReadDIDResponseMessage, receipt: MessageReceipt ): """ Process read-did-response DIDComm message """ pass
async def query_data_agreement_instance_metadata(self, *, tag_query: dict = None) ‑> List[aries_cloudagent.storage.record.StorageRecord]
-
Query data agreement instance metadata
Expand source code
async def query_data_agreement_instance_metadata(self, *, tag_query: dict = None) -> typing.List[StorageRecord]: """Query data agreement instance metadata""" # Fetch storage from context storage: IndyStorage = await self.context.inject(BaseStorage) storage_records = await storage.search_records( type_filter=self.RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA, tag_query=tag_query ).fetch_all() return storage_records
async def resolve_remote_mydata_did(self, *, mydata_did: str) ‑> MyDataDIDResponseBody
-
Resolve remote MyData DID
Expand source code
async def resolve_remote_mydata_did(self, *, mydata_did: str) -> MyDataDIDResponseBody: """Resolve remote MyData DID""" # Initialize DID MyData mydata_did = DIDMyData.from_did(mydata_did) # Fetch wallet from context wallet: IndyWallet = await self.context.inject(BaseWallet) # Get pack format from context pack_format: PackWireFormat = await self.context.inject(BaseWireFormat) # Fetch connection record marked as MyData DID registry connection_record, err = await self.fetch_mydata_did_registry_connection_record() if err: raise ADAManagerError( "Failed to fetch MyData DID registry connection record") # Construct read-did message # from_did pairwise_local_did_record = await wallet.get_local_did(connection_record.my_did) from_did = DIDMyData.from_public_key_b58( pairwise_local_did_record.verkey, key_type=KeyType.ED25519) # to_did to_did = DIDMyData.from_public_key_b58( connection_record.their_did, key_type=KeyType.ED25519) # Create read-did message read_did_message = ReadDIDMessage( from_did=from_did.did, to_did=to_did.did, created_time=round(time.time() * 1000), body=ReadDIDMessageBody( did=mydata_did.did ) ) # Add transport decorator read_did_message._decorators["transport"] = TransportDecorator( return_route="all" ) # Initialise connection manager connection_manager = ConnectionManager(self.context) # Fetch connection targets connection_targets = await connection_manager.fetch_connection_targets(connection_record) if len(connection_targets) == 0: raise ADAManagerError("No connection targets found") connection_target: ConnectionTarget = connection_targets[0] # Pack message packed_message = await pack_format.pack( context=self.context, message_json=read_did_message.serialize(as_string=True), recipient_keys=connection_target.recipient_keys, routing_keys=None, sender_key=connection_target.sender_key, ) headers = { "Content-Type": "application/ssi-agent-wire" } async with aiohttp.ClientSession(headers=headers) as session: async with session.post(connection_target.endpoint, data=packed_message) as response: if response.status != 200: raise ADAManagerError( f"HTTP request failed with status code {response.status}") message_body = await response.read() unpacked = await wallet.unpack_message(message_body) ( message_json, sender_verkey, recipient_verkey, ) = unpacked message_json = json.loads(message_json) if "problem-report" in message_json["@type"]: raise ADAManagerError( f"Problem report received with problem-code:{message_json['problem-code']} and reason: {message_json['explain']}") if "read-did-response" in message_json["@type"]: read_did_response_message: ReadDIDResponseMessage = \ ReadDIDResponseMessageSchema().load(message_json) if read_did_response_message.body.status == "revoked": raise ADAManagerError( f"MyData DID {mydata_did.did} is revoked" ) return read_did_response_message.body
async def send_json_ld_processed_message(self, *, connection_id: str, data: dict, signature_options: dict, proof_chain: bool) ‑> None
-
Send JSON-LD Processed Message to remote agent.
Expand source code
async def send_json_ld_processed_message( self, *, connection_id: str, data: dict, signature_options: dict, proof_chain: bool ) -> None: """Send JSON-LD Processed Message to remote agent.""" # Responder instance responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False) try: # Retrieve connection record by id connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id( self.context, connection_id ) except StorageError as err: raise ADAManagerError( f"Failed to retrieve connection record: {err}" ) # From and to mydata dids from_did: DIDMyData = DIDMyData.from_public_key_b58( connection_record.my_did, key_type=KeyType.ED25519 ) to_did: DIDMyData = DIDMyData.from_public_key_b58( connection_record.their_did, key_type=KeyType.ED25519 ) # Base64 encode data data_base64 = base64.b64encode(json.dumps( data).encode("utf-8")).decode("utf-8") # Base64 encode signature_options signature_options_base64 = base64.b64encode(json.dumps( signature_options).encode("utf-8")).decode("utf-8") # Construct JSONLD Processed Message json_ld_processed_message = JSONLDProcessedMessage( from_did=from_did.did, to_did=to_did.did, created_time=round(time.time() * 1000), body=JSONLDProcessedBody( data_base64=data_base64, signature_options_base64=signature_options_base64, proof_chain=proof_chain ) ) # Send JSONLD Processed Message if responder: await responder.send_reply( json_ld_processed_message, connection_id=connection_record.connection_id )
async def send_read_did_message(self, did: str)
-
Send read-did DIDComm message
Expand source code
async def send_read_did_message(self, did: str): """ Send read-did DIDComm message """ pass
async def store_data_agreement_instance_metadata(self, *, data_agreement_id: str = None, data_agreement_template_id: str = None, method_of_use: str = None, data_exchange_record_id: str = None) ‑> None
-
Store data agreement instance metadata
Expand source code
async def store_data_agreement_instance_metadata( self, *, data_agreement_id: str = None, data_agreement_template_id: str = None, method_of_use: str = None, data_exchange_record_id: str = None ) -> None: """Store data agreement instance metadata""" # Fetch storage from context storage: IndyStorage = await self.context.inject(BaseStorage) data_instance_metadata_record = StorageRecord( self.RECORD_TYPE_DATA_AGREEMENT_INSTANCE_METADATA, data_agreement_id, { "data_agreement_id": data_agreement_id, "data_agreement_template_id": data_agreement_template_id, "method_of_use": method_of_use, "data_exchange_record_id": data_exchange_record_id } ) await storage.add_record(data_instance_metadata_record)
class ADAManagerError (*args, error_code: str = None, **kwargs)
-
ADA manager error
Initialize a BaseError instance.
Expand source code
class ADAManagerError(BaseError): """ADA manager error"""
Ancestors
- aries_cloudagent.core.error.BaseError
- builtins.Exception
- builtins.BaseException