Module mydata_did.v1_0.routes.jsonld_routes
Expand source code
import logging
from aiohttp import web
from aiohttp_apispec import (
    docs,
    request_schema,
    match_info_schema,
)
from ..manager import ADAManager, ADAManagerError
from ..routes.maps.tag_maps import (
    TAGS_JSONLD_FUNCTIONS_LABEL,
)
from .openapi import (
    SendJSONLDDIDCommProcessedDataMessageHandlerRequestSchema,
    SendJSONLDDIDCommProcessedDataMessageHandlerMatchInfoSchema,
)
LOGGER = logging.getLogger(__name__)
PAGINATION_PAGE_SIZE = 10
@docs(
    tags=[TAGS_JSONLD_FUNCTIONS_LABEL],
    summary="Send JSON-LD processed-data didcomm message to the remote agent.",
    responses={
        204: {
            "description": "Success",
        },
    },
)
@match_info_schema(SendJSONLDDIDCommProcessedDataMessageHandlerMatchInfoSchema())
@request_schema(SendJSONLDDIDCommProcessedDataMessageHandlerRequestSchema())
async def send_json_ld_didcomm_processed_data_message_handler(request: web.BaseRequest):
    """Send JSON-LD didcomm processed data message handler."""
    # Context.
    context = request.app["request_context"]
    # Fetch path parameters.
    connection_id = request.match_info["connection_id"]
    # Fetch request body
    body = await request.json()
    # Initialise MyData DID Manager.
    mydata_did_manager: ADAManager = ADAManager(context=context)
    try:
        # Call the function.
        await mydata_did_manager.send_json_ld_processed_message(
            connection_id=connection_id,
            data=body.get("data", {}),
            signature_options=body.get("signature_options", {}),
            proof_chain=body.get("proof_chain", False),
        )
    except ADAManagerError as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err
    return web.json_response({}, status=204)
Functions
async def send_json_ld_didcomm_processed_data_message_handler(request: aiohttp.web_request.BaseRequest)- 
Send JSON-LD didcomm processed data message handler.
Expand source code
@docs( tags=[TAGS_JSONLD_FUNCTIONS_LABEL], summary="Send JSON-LD processed-data didcomm message to the remote agent.", responses={ 204: { "description": "Success", }, }, ) @match_info_schema(SendJSONLDDIDCommProcessedDataMessageHandlerMatchInfoSchema()) @request_schema(SendJSONLDDIDCommProcessedDataMessageHandlerRequestSchema()) async def send_json_ld_didcomm_processed_data_message_handler(request: web.BaseRequest): """Send JSON-LD didcomm processed data message handler.""" # Context. context = request.app["request_context"] # Fetch path parameters. connection_id = request.match_info["connection_id"] # Fetch request body body = await request.json() # Initialise MyData DID Manager. mydata_did_manager: ADAManager = ADAManager(context=context) try: # Call the function. await mydata_did_manager.send_json_ld_processed_message( connection_id=connection_id, data=body.get("data", {}), signature_options=body.get("signature_options", {}), proof_chain=body.get("proof_chain", False), ) except ADAManagerError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({}, status=204)