Module mydata_did.v1_0.handlers.data_agreement_termination_terminate_handler
Expand source code
from aries_cloudagent.messaging.base_handler import BaseHandler, BaseResponder, RequestContext, HandlerException
from aries_cloudagent.storage.record import StorageRecord
from aries_cloudagent.wallet.base import BaseWallet
from aries_cloudagent.wallet.indy import IndyWallet
from ..messages.data_agreement_terminate import DataAgreementTerminationTerminateMessage
from ..messages.data_agreement_terminate_ack import DataAgreementTerminationAck
from ..manager import ADAManager
from ..models.data_agreement_termination_terminate_model import DataAgreementTerminationTerminateBody
from ..models.exchange_records.data_agreement_record import DataAgreementV1Record
from ..models.data_agreement_instance_model import DataAgreementInstance, DataAgreementInstanceSchema
from ..utils.did.mydata_did import DIDMyData
from ..utils.jsonld.data_agreement import verify_data_agreement
from ..messages.problem_report import (
DataAgreementTerminationProblemReport,
DataAgreementTerminationProblemReportReason
)
from ...patched_protocols.issue_credential.v1_0.models.credential_exchange import (
V10CredentialExchange
)
from ...patched_protocols.present_proof.v1_0.models.presentation_exchange import (
V10PresentationExchange
)
import json
import datetime
class DataAgreementTerminationTerminateMessageHandler(BaseHandler):
"""Handler for data-agreement-termination/1.0/terminate message."""
async def handle(self, context: RequestContext, responder: BaseResponder):
"""Message handler logic for data-agreement-termination/1.0/terminate message."""
# Assert that the message is of the correct type
assert isinstance(
context.message, DataAgreementTerminationTerminateMessage)
self._logger.info(
"Received data-agreement-termination/1.0/terminate message: \n%s",
json.dumps(context.message.serialize(), indent=4)
)
# Check if connection is ready
if not context.connection_ready:
self._logger.info(
"Connection not active, skipping data-agreement-termination/1.0/terminate handler: %s",
context.message_receipt.sender_did,
)
return
data_agreement_termination_terminate_message = context.message
data_agreement_termination_terminate_message_body: DataAgreementTerminationTerminateBody = data_agreement_termination_terminate_message.body
# Wallet instance from request context
wallet: IndyWallet = await context.inject(BaseWallet)
# Initialize ADA manager
ada_manager = ADAManager(context)
# Fetch the data agreement instance metadata
data_agreement_instance_metadata_records = await ada_manager.query_data_agreement_instance_metadata(
tag_query={
'data_agreement_id': data_agreement_termination_terminate_message_body.data_agreement_id,
}
)
# Check if there is a data agreement instance metadata record
if not data_agreement_instance_metadata_records:
self._logger.info(
"Data agreement not found; Failed to handle terminate message for data agreement: %s",
data_agreement_termination_terminate_message_body.data_agreement_id,
)
return
if len(data_agreement_instance_metadata_records) > 1:
self._logger.info(
"Duplicate data agreement records found; Failed to handle terminate message for data agreement: %s",
data_agreement_termination_terminate_message_body.data_agreement_id,
)
return
data_agreement_instance_metadata_record: StorageRecord = data_agreement_instance_metadata_records[
0]
# Identify the method of use
if data_agreement_instance_metadata_record.tags.get("method_of_use") == DataAgreementV1Record.METHOD_OF_USE_DATA_SOURCE:
# Fetch exchante record (credential exchange if method of use is "data-source")
tag_filter = {}
post_filter = {
"data_agreement_id": data_agreement_termination_terminate_message_body.data_agreement_id
}
records = await V10CredentialExchange.query(context, tag_filter, post_filter)
if not records:
self._logger.info(
"Credential exchange record not found; Failed to handle terminate message for data agreement: %s",
data_agreement_termination_terminate_message_body.data_agreement_id,
)
return
if len(records) > 1:
self._logger.info(
"Duplicate credential exchange records found; Failed to handle terminate message for data agreement: %s",
data_agreement_termination_terminate_message_body.data_agreement_id,
)
return
cred_ex_record: V10CredentialExchange = records[0]
# Check if data agreement is in "accept" status
if cred_ex_record.data_agreement_status != V10CredentialExchange.DATA_AGREEMENT_ACCEPT:
self._logger.info(
"Credential exchange record not in offer sent state; Failed to handle terminate message for data agreement: %s",
data_agreement_termination_terminate_message_body.data_agreement_id,
)
return
# Reconstruct the data agreement
# Deserialise data agreement
data_agreement_instance: DataAgreementInstance = DataAgreementInstanceSchema().load(
cred_ex_record.data_agreement
)
# Check if terminate message is signed by data agreement principle did
if data_agreement_instance.principle_did != data_agreement_termination_terminate_message_body.proof.verification_method:
self._logger.info(
"Data agreement principle DID does not match sender DID; Failed to handle terminate message for data agreement: %s",
data_agreement_termination_terminate_message_body.data_agreement_id,
)
# Send problem report.
problem_report = DataAgreementTerminationProblemReport(
from_did=data_agreement_termination_terminate_message.to_did,
to_did=data_agreement_termination_terminate_message.from_did,
created_time=str(
int(datetime.datetime.utcnow().timestamp())),
problem_code=DataAgreementTerminationProblemReportReason.PRINCIPLE_DID_INVALID.value,
explain=f"Data agreement principle DID does not match sender DID; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}",
data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id
)
problem_report.assign_thread_id(
thid=data_agreement_termination_terminate_message._id
)
# Update credential exchange record with data agreement metadata
cred_ex_record.data_agreement_problem_report = problem_report.serialize()
cred_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT
await cred_ex_record.save(context)
await responder.send_reply(problem_report)
return
# Update data agreement event with terminate event
data_agreement_instance.event.append(
data_agreement_termination_terminate_message_body.event
)
# Update data agreement proof chain with terminate proof
data_agreement_instance.proof_chain.append(
data_agreement_termination_terminate_message_body.proof
)
# Verify signatures on data agreement
verkeys = []
for event in data_agreement_instance.event:
temp_verkey = DIDMyData.from_did(event.did).public_key_b58
verkeys.append(temp_verkey)
valid = await verify_data_agreement(
data_agreement_instance.serialize(),
verkeys[-1],
wallet,
drop_proof_chain=False
)
if not valid:
self._logger.error(
"Data agreement terminate verification failed"
)
# Send problem report
problem_report = DataAgreementTerminationProblemReport(
from_did=data_agreement_termination_terminate_message.to_did,
to_did=data_agreement_termination_terminate_message.from_did,
created_time=str(
int(datetime.datetime.utcnow().timestamp())),
problem_code=DataAgreementTerminationProblemReportReason.SIGNATURE_VERIFICATION_FAILED.value,
explain=f"Data agreement terminate verification failed; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}",
data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id
)
problem_report.assign_thread_id(
thid=data_agreement_termination_terminate_message._id
)
# Update credential exchange record with data agreement metadata
cred_ex_record.data_agreement_problem_report = problem_report.serialize()
cred_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT
await cred_ex_record.save(context)
await responder.send_reply(problem_report)
raise HandlerException(
"Data agreement terminate signature verification failed"
)
# Update credential exchange record with data agreement metadata
cred_ex_record.data_agreement = data_agreement_instance.serialize()
cred_ex_record.data_agreement_status = V10CredentialExchange.DATA_AGREEMENT_TERMINATE
await cred_ex_record.save(context)
# Construct terminate ack message
data_agreement_terminate_ack = DataAgreementTerminationAck(
status="TERMINATE OK"
)
data_agreement_terminate_ack.assign_thread_id(
thid=data_agreement_termination_terminate_message._id
)
await responder.send_reply(data_agreement_terminate_ack)
if data_agreement_instance_metadata_record.tags.get("method_of_use") == DataAgreementV1Record.METHOD_OF_USE_DATA_USING_SERVICE:
# Fetch exchange record (presentation exchange if method of use is "data-using-service")
tag_filter = {}
post_filter = {
"data_agreement_id": data_agreement_termination_terminate_message_body.data_agreement_id
}
records = await V10PresentationExchange.query(context, tag_filter, post_filter)
if not records:
self._logger.info(
"Presentation exchange record not found; Failed to handle terminate message for data agreement: %s",
data_agreement_termination_terminate_message_body.data_agreement_id,
)
return
if len(records) > 1:
self._logger.info(
"Duplicate presentation exchange records found; Failed to handle terminate message for data agreement: %s",
data_agreement_termination_terminate_message_body.data_agreement_id,
)
return
pres_ex_record: V10PresentationExchange = records[0]
# Check if data agreement is in "accept" status
if pres_ex_record.data_agreement_status != V10PresentationExchange.DATA_AGREEMENT_ACCEPT:
self._logger.info(
"Presentation exchange record not in offer sent state; Failed to handle terminate message for data agreement: %s",
data_agreement_termination_terminate_message_body.data_agreement_id,
)
return
# Reconstruct the data agreement
# Deserialise data agreement
data_agreement_instance: DataAgreementInstance = DataAgreementInstanceSchema().load(
pres_ex_record.data_agreement
)
# Check if terminate message is signed by data agreement principle did
if data_agreement_instance.principle_did != data_agreement_termination_terminate_message_body.proof.verification_method:
self._logger.info(
"Data agreement principle DID does not match sender DID; Failed to handle terminate message for data agreement: %s",
data_agreement_termination_terminate_message_body.data_agreement_id,
)
# Send problem report.
problem_report = DataAgreementTerminationProblemReport(
from_did=data_agreement_termination_terminate_message.to_did,
to_did=data_agreement_termination_terminate_message.from_did,
created_time=str(
int(datetime.datetime.utcnow().timestamp())),
problem_code=DataAgreementTerminationProblemReportReason.PRINCIPLE_DID_INVALID.value,
explain=f"Data agreement principle DID does not match sender DID; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}",
data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id
)
problem_report.assign_thread_id(
thid=data_agreement_termination_terminate_message._id
)
# Update presentation exchange record with data agreement metadata
pres_ex_record.data_agreement_problem_report = problem_report.serialize()
pres_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT
await pres_ex_record.save(context)
await responder.send_reply(problem_report)
return
# Update data agreement event with terminate event
data_agreement_instance.event.append(
data_agreement_termination_terminate_message_body.event
)
# Update data agreement proof chain with terminate proof
data_agreement_instance.proof_chain.append(
data_agreement_termination_terminate_message_body.proof
)
# Verify signatures on data agreement
verkeys = []
for event in data_agreement_instance.event:
temp_verkey = DIDMyData.from_did(event.did).public_key_b58
verkeys.append(temp_verkey)
valid = await verify_data_agreement(
data_agreement_instance.serialize(),
verkeys[-1],
wallet,
drop_proof_chain=False
)
if not valid:
self._logger.error(
"Data agreement terminate verification failed"
)
# Send problem report
problem_report = DataAgreementTerminationProblemReport(
from_did=data_agreement_termination_terminate_message.to_did,
to_did=data_agreement_termination_terminate_message.from_did,
created_time=str(
int(datetime.datetime.utcnow().timestamp())),
problem_code=DataAgreementTerminationProblemReportReason.SIGNATURE_VERIFICATION_FAILED.value,
explain=f"Data agreement terminate verification failed; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}",
data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id
)
problem_report.assign_thread_id(
thid=data_agreement_termination_terminate_message._id
)
# Update presentation exchange record with data agreement metadata
pres_ex_record.data_agreement_problem_report = problem_report.serialize()
pres_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT
await pres_ex_record.save(context)
await responder.send_reply(problem_report)
raise HandlerException(
"Data agreement terminate signature verification failed"
)
# Update presentation exchange record with data agreement metadata
pres_ex_record.data_agreement = data_agreement_instance.serialize()
pres_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_TERMINATE
await pres_ex_record.save(context)
# Construct terminate ack message
data_agreement_terminate_ack = DataAgreementTerminationAck(
status="TERMINATE OK"
)
data_agreement_terminate_ack.assign_thread_id(
thid=data_agreement_termination_terminate_message._id
)
await responder.send_reply(data_agreement_terminate_ack)
Classes
class DataAgreementTerminationTerminateMessageHandler
-
Handler for data-agreement-termination/1.0/terminate message.
Initialize a BaseHandler instance.
Expand source code
class DataAgreementTerminationTerminateMessageHandler(BaseHandler): """Handler for data-agreement-termination/1.0/terminate message.""" async def handle(self, context: RequestContext, responder: BaseResponder): """Message handler logic for data-agreement-termination/1.0/terminate message.""" # Assert that the message is of the correct type assert isinstance( context.message, DataAgreementTerminationTerminateMessage) self._logger.info( "Received data-agreement-termination/1.0/terminate message: \n%s", json.dumps(context.message.serialize(), indent=4) ) # Check if connection is ready if not context.connection_ready: self._logger.info( "Connection not active, skipping data-agreement-termination/1.0/terminate handler: %s", context.message_receipt.sender_did, ) return data_agreement_termination_terminate_message = context.message data_agreement_termination_terminate_message_body: DataAgreementTerminationTerminateBody = data_agreement_termination_terminate_message.body # Wallet instance from request context wallet: IndyWallet = await context.inject(BaseWallet) # Initialize ADA manager ada_manager = ADAManager(context) # Fetch the data agreement instance metadata data_agreement_instance_metadata_records = await ada_manager.query_data_agreement_instance_metadata( tag_query={ 'data_agreement_id': data_agreement_termination_terminate_message_body.data_agreement_id, } ) # Check if there is a data agreement instance metadata record if not data_agreement_instance_metadata_records: self._logger.info( "Data agreement not found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return if len(data_agreement_instance_metadata_records) > 1: self._logger.info( "Duplicate data agreement records found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return data_agreement_instance_metadata_record: StorageRecord = data_agreement_instance_metadata_records[ 0] # Identify the method of use if data_agreement_instance_metadata_record.tags.get("method_of_use") == DataAgreementV1Record.METHOD_OF_USE_DATA_SOURCE: # Fetch exchante record (credential exchange if method of use is "data-source") tag_filter = {} post_filter = { "data_agreement_id": data_agreement_termination_terminate_message_body.data_agreement_id } records = await V10CredentialExchange.query(context, tag_filter, post_filter) if not records: self._logger.info( "Credential exchange record not found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return if len(records) > 1: self._logger.info( "Duplicate credential exchange records found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return cred_ex_record: V10CredentialExchange = records[0] # Check if data agreement is in "accept" status if cred_ex_record.data_agreement_status != V10CredentialExchange.DATA_AGREEMENT_ACCEPT: self._logger.info( "Credential exchange record not in offer sent state; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return # Reconstruct the data agreement # Deserialise data agreement data_agreement_instance: DataAgreementInstance = DataAgreementInstanceSchema().load( cred_ex_record.data_agreement ) # Check if terminate message is signed by data agreement principle did if data_agreement_instance.principle_did != data_agreement_termination_terminate_message_body.proof.verification_method: self._logger.info( "Data agreement principle DID does not match sender DID; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) # Send problem report. problem_report = DataAgreementTerminationProblemReport( from_did=data_agreement_termination_terminate_message.to_did, to_did=data_agreement_termination_terminate_message.from_did, created_time=str( int(datetime.datetime.utcnow().timestamp())), problem_code=DataAgreementTerminationProblemReportReason.PRINCIPLE_DID_INVALID.value, explain=f"Data agreement principle DID does not match sender DID; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}", data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id ) problem_report.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) # Update credential exchange record with data agreement metadata cred_ex_record.data_agreement_problem_report = problem_report.serialize() cred_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT await cred_ex_record.save(context) await responder.send_reply(problem_report) return # Update data agreement event with terminate event data_agreement_instance.event.append( data_agreement_termination_terminate_message_body.event ) # Update data agreement proof chain with terminate proof data_agreement_instance.proof_chain.append( data_agreement_termination_terminate_message_body.proof ) # Verify signatures on data agreement verkeys = [] for event in data_agreement_instance.event: temp_verkey = DIDMyData.from_did(event.did).public_key_b58 verkeys.append(temp_verkey) valid = await verify_data_agreement( data_agreement_instance.serialize(), verkeys[-1], wallet, drop_proof_chain=False ) if not valid: self._logger.error( "Data agreement terminate verification failed" ) # Send problem report problem_report = DataAgreementTerminationProblemReport( from_did=data_agreement_termination_terminate_message.to_did, to_did=data_agreement_termination_terminate_message.from_did, created_time=str( int(datetime.datetime.utcnow().timestamp())), problem_code=DataAgreementTerminationProblemReportReason.SIGNATURE_VERIFICATION_FAILED.value, explain=f"Data agreement terminate verification failed; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}", data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id ) problem_report.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) # Update credential exchange record with data agreement metadata cred_ex_record.data_agreement_problem_report = problem_report.serialize() cred_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT await cred_ex_record.save(context) await responder.send_reply(problem_report) raise HandlerException( "Data agreement terminate signature verification failed" ) # Update credential exchange record with data agreement metadata cred_ex_record.data_agreement = data_agreement_instance.serialize() cred_ex_record.data_agreement_status = V10CredentialExchange.DATA_AGREEMENT_TERMINATE await cred_ex_record.save(context) # Construct terminate ack message data_agreement_terminate_ack = DataAgreementTerminationAck( status="TERMINATE OK" ) data_agreement_terminate_ack.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) await responder.send_reply(data_agreement_terminate_ack) if data_agreement_instance_metadata_record.tags.get("method_of_use") == DataAgreementV1Record.METHOD_OF_USE_DATA_USING_SERVICE: # Fetch exchange record (presentation exchange if method of use is "data-using-service") tag_filter = {} post_filter = { "data_agreement_id": data_agreement_termination_terminate_message_body.data_agreement_id } records = await V10PresentationExchange.query(context, tag_filter, post_filter) if not records: self._logger.info( "Presentation exchange record not found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return if len(records) > 1: self._logger.info( "Duplicate presentation exchange records found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return pres_ex_record: V10PresentationExchange = records[0] # Check if data agreement is in "accept" status if pres_ex_record.data_agreement_status != V10PresentationExchange.DATA_AGREEMENT_ACCEPT: self._logger.info( "Presentation exchange record not in offer sent state; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return # Reconstruct the data agreement # Deserialise data agreement data_agreement_instance: DataAgreementInstance = DataAgreementInstanceSchema().load( pres_ex_record.data_agreement ) # Check if terminate message is signed by data agreement principle did if data_agreement_instance.principle_did != data_agreement_termination_terminate_message_body.proof.verification_method: self._logger.info( "Data agreement principle DID does not match sender DID; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) # Send problem report. problem_report = DataAgreementTerminationProblemReport( from_did=data_agreement_termination_terminate_message.to_did, to_did=data_agreement_termination_terminate_message.from_did, created_time=str( int(datetime.datetime.utcnow().timestamp())), problem_code=DataAgreementTerminationProblemReportReason.PRINCIPLE_DID_INVALID.value, explain=f"Data agreement principle DID does not match sender DID; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}", data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id ) problem_report.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) # Update presentation exchange record with data agreement metadata pres_ex_record.data_agreement_problem_report = problem_report.serialize() pres_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT await pres_ex_record.save(context) await responder.send_reply(problem_report) return # Update data agreement event with terminate event data_agreement_instance.event.append( data_agreement_termination_terminate_message_body.event ) # Update data agreement proof chain with terminate proof data_agreement_instance.proof_chain.append( data_agreement_termination_terminate_message_body.proof ) # Verify signatures on data agreement verkeys = [] for event in data_agreement_instance.event: temp_verkey = DIDMyData.from_did(event.did).public_key_b58 verkeys.append(temp_verkey) valid = await verify_data_agreement( data_agreement_instance.serialize(), verkeys[-1], wallet, drop_proof_chain=False ) if not valid: self._logger.error( "Data agreement terminate verification failed" ) # Send problem report problem_report = DataAgreementTerminationProblemReport( from_did=data_agreement_termination_terminate_message.to_did, to_did=data_agreement_termination_terminate_message.from_did, created_time=str( int(datetime.datetime.utcnow().timestamp())), problem_code=DataAgreementTerminationProblemReportReason.SIGNATURE_VERIFICATION_FAILED.value, explain=f"Data agreement terminate verification failed; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}", data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id ) problem_report.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) # Update presentation exchange record with data agreement metadata pres_ex_record.data_agreement_problem_report = problem_report.serialize() pres_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT await pres_ex_record.save(context) await responder.send_reply(problem_report) raise HandlerException( "Data agreement terminate signature verification failed" ) # Update presentation exchange record with data agreement metadata pres_ex_record.data_agreement = data_agreement_instance.serialize() pres_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_TERMINATE await pres_ex_record.save(context) # Construct terminate ack message data_agreement_terminate_ack = DataAgreementTerminationAck( status="TERMINATE OK" ) data_agreement_terminate_ack.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) await responder.send_reply(data_agreement_terminate_ack)
Ancestors
- aries_cloudagent.messaging.base_handler.BaseHandler
- abc.ABC
Methods
async def handle(self, context: aries_cloudagent.messaging.request_context.RequestContext, responder: aries_cloudagent.messaging.responder.BaseResponder)
-
Message handler logic for data-agreement-termination/1.0/terminate message.
Expand source code
async def handle(self, context: RequestContext, responder: BaseResponder): """Message handler logic for data-agreement-termination/1.0/terminate message.""" # Assert that the message is of the correct type assert isinstance( context.message, DataAgreementTerminationTerminateMessage) self._logger.info( "Received data-agreement-termination/1.0/terminate message: \n%s", json.dumps(context.message.serialize(), indent=4) ) # Check if connection is ready if not context.connection_ready: self._logger.info( "Connection not active, skipping data-agreement-termination/1.0/terminate handler: %s", context.message_receipt.sender_did, ) return data_agreement_termination_terminate_message = context.message data_agreement_termination_terminate_message_body: DataAgreementTerminationTerminateBody = data_agreement_termination_terminate_message.body # Wallet instance from request context wallet: IndyWallet = await context.inject(BaseWallet) # Initialize ADA manager ada_manager = ADAManager(context) # Fetch the data agreement instance metadata data_agreement_instance_metadata_records = await ada_manager.query_data_agreement_instance_metadata( tag_query={ 'data_agreement_id': data_agreement_termination_terminate_message_body.data_agreement_id, } ) # Check if there is a data agreement instance metadata record if not data_agreement_instance_metadata_records: self._logger.info( "Data agreement not found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return if len(data_agreement_instance_metadata_records) > 1: self._logger.info( "Duplicate data agreement records found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return data_agreement_instance_metadata_record: StorageRecord = data_agreement_instance_metadata_records[ 0] # Identify the method of use if data_agreement_instance_metadata_record.tags.get("method_of_use") == DataAgreementV1Record.METHOD_OF_USE_DATA_SOURCE: # Fetch exchante record (credential exchange if method of use is "data-source") tag_filter = {} post_filter = { "data_agreement_id": data_agreement_termination_terminate_message_body.data_agreement_id } records = await V10CredentialExchange.query(context, tag_filter, post_filter) if not records: self._logger.info( "Credential exchange record not found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return if len(records) > 1: self._logger.info( "Duplicate credential exchange records found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return cred_ex_record: V10CredentialExchange = records[0] # Check if data agreement is in "accept" status if cred_ex_record.data_agreement_status != V10CredentialExchange.DATA_AGREEMENT_ACCEPT: self._logger.info( "Credential exchange record not in offer sent state; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return # Reconstruct the data agreement # Deserialise data agreement data_agreement_instance: DataAgreementInstance = DataAgreementInstanceSchema().load( cred_ex_record.data_agreement ) # Check if terminate message is signed by data agreement principle did if data_agreement_instance.principle_did != data_agreement_termination_terminate_message_body.proof.verification_method: self._logger.info( "Data agreement principle DID does not match sender DID; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) # Send problem report. problem_report = DataAgreementTerminationProblemReport( from_did=data_agreement_termination_terminate_message.to_did, to_did=data_agreement_termination_terminate_message.from_did, created_time=str( int(datetime.datetime.utcnow().timestamp())), problem_code=DataAgreementTerminationProblemReportReason.PRINCIPLE_DID_INVALID.value, explain=f"Data agreement principle DID does not match sender DID; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}", data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id ) problem_report.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) # Update credential exchange record with data agreement metadata cred_ex_record.data_agreement_problem_report = problem_report.serialize() cred_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT await cred_ex_record.save(context) await responder.send_reply(problem_report) return # Update data agreement event with terminate event data_agreement_instance.event.append( data_agreement_termination_terminate_message_body.event ) # Update data agreement proof chain with terminate proof data_agreement_instance.proof_chain.append( data_agreement_termination_terminate_message_body.proof ) # Verify signatures on data agreement verkeys = [] for event in data_agreement_instance.event: temp_verkey = DIDMyData.from_did(event.did).public_key_b58 verkeys.append(temp_verkey) valid = await verify_data_agreement( data_agreement_instance.serialize(), verkeys[-1], wallet, drop_proof_chain=False ) if not valid: self._logger.error( "Data agreement terminate verification failed" ) # Send problem report problem_report = DataAgreementTerminationProblemReport( from_did=data_agreement_termination_terminate_message.to_did, to_did=data_agreement_termination_terminate_message.from_did, created_time=str( int(datetime.datetime.utcnow().timestamp())), problem_code=DataAgreementTerminationProblemReportReason.SIGNATURE_VERIFICATION_FAILED.value, explain=f"Data agreement terminate verification failed; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}", data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id ) problem_report.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) # Update credential exchange record with data agreement metadata cred_ex_record.data_agreement_problem_report = problem_report.serialize() cred_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT await cred_ex_record.save(context) await responder.send_reply(problem_report) raise HandlerException( "Data agreement terminate signature verification failed" ) # Update credential exchange record with data agreement metadata cred_ex_record.data_agreement = data_agreement_instance.serialize() cred_ex_record.data_agreement_status = V10CredentialExchange.DATA_AGREEMENT_TERMINATE await cred_ex_record.save(context) # Construct terminate ack message data_agreement_terminate_ack = DataAgreementTerminationAck( status="TERMINATE OK" ) data_agreement_terminate_ack.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) await responder.send_reply(data_agreement_terminate_ack) if data_agreement_instance_metadata_record.tags.get("method_of_use") == DataAgreementV1Record.METHOD_OF_USE_DATA_USING_SERVICE: # Fetch exchange record (presentation exchange if method of use is "data-using-service") tag_filter = {} post_filter = { "data_agreement_id": data_agreement_termination_terminate_message_body.data_agreement_id } records = await V10PresentationExchange.query(context, tag_filter, post_filter) if not records: self._logger.info( "Presentation exchange record not found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return if len(records) > 1: self._logger.info( "Duplicate presentation exchange records found; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return pres_ex_record: V10PresentationExchange = records[0] # Check if data agreement is in "accept" status if pres_ex_record.data_agreement_status != V10PresentationExchange.DATA_AGREEMENT_ACCEPT: self._logger.info( "Presentation exchange record not in offer sent state; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) return # Reconstruct the data agreement # Deserialise data agreement data_agreement_instance: DataAgreementInstance = DataAgreementInstanceSchema().load( pres_ex_record.data_agreement ) # Check if terminate message is signed by data agreement principle did if data_agreement_instance.principle_did != data_agreement_termination_terminate_message_body.proof.verification_method: self._logger.info( "Data agreement principle DID does not match sender DID; Failed to handle terminate message for data agreement: %s", data_agreement_termination_terminate_message_body.data_agreement_id, ) # Send problem report. problem_report = DataAgreementTerminationProblemReport( from_did=data_agreement_termination_terminate_message.to_did, to_did=data_agreement_termination_terminate_message.from_did, created_time=str( int(datetime.datetime.utcnow().timestamp())), problem_code=DataAgreementTerminationProblemReportReason.PRINCIPLE_DID_INVALID.value, explain=f"Data agreement principle DID does not match sender DID; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}", data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id ) problem_report.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) # Update presentation exchange record with data agreement metadata pres_ex_record.data_agreement_problem_report = problem_report.serialize() pres_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT await pres_ex_record.save(context) await responder.send_reply(problem_report) return # Update data agreement event with terminate event data_agreement_instance.event.append( data_agreement_termination_terminate_message_body.event ) # Update data agreement proof chain with terminate proof data_agreement_instance.proof_chain.append( data_agreement_termination_terminate_message_body.proof ) # Verify signatures on data agreement verkeys = [] for event in data_agreement_instance.event: temp_verkey = DIDMyData.from_did(event.did).public_key_b58 verkeys.append(temp_verkey) valid = await verify_data_agreement( data_agreement_instance.serialize(), verkeys[-1], wallet, drop_proof_chain=False ) if not valid: self._logger.error( "Data agreement terminate verification failed" ) # Send problem report problem_report = DataAgreementTerminationProblemReport( from_did=data_agreement_termination_terminate_message.to_did, to_did=data_agreement_termination_terminate_message.from_did, created_time=str( int(datetime.datetime.utcnow().timestamp())), problem_code=DataAgreementTerminationProblemReportReason.SIGNATURE_VERIFICATION_FAILED.value, explain=f"Data agreement terminate verification failed; Failed to process terminate message for data agreement: {data_agreement_termination_terminate_message.body.data_agreement_id}", data_agreement_id=data_agreement_termination_terminate_message_body.data_agreement_id ) problem_report.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) # Update presentation exchange record with data agreement metadata pres_ex_record.data_agreement_problem_report = problem_report.serialize() pres_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_PROBLEM_REPORT await pres_ex_record.save(context) await responder.send_reply(problem_report) raise HandlerException( "Data agreement terminate signature verification failed" ) # Update presentation exchange record with data agreement metadata pres_ex_record.data_agreement = data_agreement_instance.serialize() pres_ex_record.data_agreement_status = V10PresentationExchange.DATA_AGREEMENT_TERMINATE await pres_ex_record.save(context) # Construct terminate ack message data_agreement_terminate_ack = DataAgreementTerminationAck( status="TERMINATE OK" ) data_agreement_terminate_ack.assign_thread_id( thid=data_agreement_termination_terminate_message._id ) await responder.send_reply(data_agreement_terminate_ack)