Module mydata_did.v1_0.routes.connection_routes
Expand source code
import json
import uuid
import logging
from aiohttp import web
from aiohttp_apispec import (
    docs,
    request_schema,
    querystring_schema,
    response_schema,
    match_info_schema,
)
from aries_cloudagent.messaging.models.base import BaseModelError
from aries_cloudagent.connections.models.connection_record import (
    ConnectionRecord,
)
from aries_cloudagent.protocols.connections.v1_0.manager import (
    ConnectionManager,
    ConnectionManagerError,
)
from dexa_sdk.managers.ada_manager import V2ADAManager
from dexa_sdk.utils import clean_and_get_field_from_dict
from ..utils.util import str_to_bool
from ..manager import ADAManagerError
from .openapi import (
    V2CreateInvitationQueryStringSchema,
    V2InvitationResultSchema,
    GenerateFirebaseDynamicLinkForConnectionInvitationMatchInfoSchema,
    GenerateFirebaseDynamicLinkForConnectionInvitationResponseSchema,
    SendExistingConnectionsMessageHandlerMatchInfoSchema,
    SendExistingConnectionsMessageHandlerRequestSchema,
    GetExistingConnectionMatchInfoSchema,
    GetExistingConnectionResponseSchema,
    ConnectionsListQueryStringSchemaV2,
    ConnectionListSchema,
)
LOGGER = logging.getLogger(__name__)
PAGINATION_PAGE_SIZE = 10
@docs(
    tags=["connection"],
    summary="Well-known endpoint for connection",
)
async def wellknown_connection_handler(request: web.BaseRequest):
    """Handler for well-known connection for the agent."""
    context = request.app["request_context"]
    auto_accept = True
    alias = ""
    public = False
    multi_use = False
    connection_mgr = ConnectionManager(context)
    try:
        (connection, invitation) = await connection_mgr.create_invitation(
            auto_accept=auto_accept, public=public, multi_use=multi_use, alias=alias
        )
        result = {
            "ServiceEndpoint": invitation.serialize()["serviceEndpoint"],
            "RoutingKey": "",
            "Invitation": {
                "label": invitation.label,
                "serviceEndpoint": invitation.serialize()["serviceEndpoint"],
                "routingKeys": [],
                "recipientKeys": invitation.serialize()["recipientKeys"],
                "@id": str(uuid.uuid4()),
                "@type": "did:sov:BzCbsNYhMrjHiqZDTUASHg;spec/connections/1.0/invitation",
            },
        }
    except (ConnectionManagerError, BaseModelError) as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err
    return web.json_response(result)
@docs(
    tags=["connection"],
    summary="Create a new connection invitation (Overridden API)",
)
@querystring_schema(V2CreateInvitationQueryStringSchema())
@response_schema(V2InvitationResultSchema(), 200)
async def v2_connections_create_invitation(request: web.BaseRequest):
    """
    Request handler for creating a new connection invitation.
    Args:
        request: aiohttp request object
    Returns:
        The connection invitation details
    """
    context = request.app["request_context"]
    auto_accept = json.loads(request.query.get("auto_accept", "null"))
    alias = request.query.get("alias")
    public = json.loads(request.query.get("public", "false"))
    multi_use = json.loads(request.query.get("multi_use", "false"))
    if public and not context.settings.get("public_invites"):
        raise web.HTTPForbidden(
            reason="Configuration does not include public invitations"
        )
    base_url = context.settings.get("invite_base_url")
    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)
    try:
        (connection, invitation) = await mgr.create_invitation(
            auto_accept=auto_accept, public=public, multi_use=multi_use, alias=alias
        )
        result = {
            "connection_id": connection and connection.connection_id,
            "invitation": invitation.serialize(),
            "invitation_url": invitation.to_url(base_url),
        }
    except (ConnectionManagerError, BaseModelError) as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err
    if connection and connection.alias:
        result["alias"] = connection.alias
    return web.json_response(result)
@docs(
    tags=["connection"],
    summary="Generate firebase dynamic link for connection invitation",
)
@match_info_schema(GenerateFirebaseDynamicLinkForConnectionInvitationMatchInfoSchema())
@response_schema(
    GenerateFirebaseDynamicLinkForConnectionInvitationResponseSchema(), 200
)
async def generate_firebase_dynamic_link_for_connection_invitation_handler(
    request: web.BaseRequest,
):
    """
    Request handler for generating firebase dynamic link for connection invitation.
    Args:
        request: aiohttp request object
    Returns:
        The firebase dynamic link
    """
    # Request context.
    context = request.app["request_context"]
    # Path params.
    conn_id = request.match_info["conn_id"]
    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)
    try:
        # Call the function
        res = await mgr.create_connection_qr_code(
            conn_id
        )
    except (ConnectionManagerError, BaseModelError) as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err
    return web.json_response(res)
@docs(
    tags=["connection"],
    summary="Send existing connections message to remote agent.",
    responses={
        200: {
            "description": "Success",
        }
    },
)
@match_info_schema(SendExistingConnectionsMessageHandlerMatchInfoSchema())
@request_schema(SendExistingConnectionsMessageHandlerRequestSchema())
async def send_existing_connections_message_handler(request: web.BaseRequest):
    """Send existing connections message to remote agent."""
    context = request.app["request_context"]
    conn_id = request.match_info["conn_id"]
    # Fetch request body
    body = await request.json()
    theirdid = body.get("theirdid")
    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)
    try:
        # Call the function
        await mgr.send_existing_connections_message(
            theirdid,
            conn_id
        )
    except ADAManagerError as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err
    return web.json_response({}, status=200)
@docs(
    tags=["connection"],
    summary="Fetch existing connection details if any for a current connection.",
)
@match_info_schema(GetExistingConnectionMatchInfoSchema())
@response_schema(GetExistingConnectionResponseSchema(), 200)
async def get_existing_connections_handler(request: web.BaseRequest):
    """Fetch existing connection details if any for a current connection."""
    # Request context
    context = request.app["request_context"]
    # Path params.
    conn_id = request.match_info["conn_id"]
    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)
    # Call the function
    result = await mgr.get_existing_connection_record_for_new_connection_id(
        conn_id
    )
    result = result.serialize() if result else {}
    return web.json_response(result)
def connection_sort_key(conn):
    """Get the sorting key for a particular connection."""
    if conn["state"] == ConnectionRecord.STATE_INACTIVE:
        pfx = "2"
    elif conn["state"] == ConnectionRecord.STATE_INVITATION:
        pfx = "1"
    else:
        pfx = "0"
    return pfx + conn["created_at"]
@docs(
    tags=["connection"],
    summary="Query agent-to-agent connections (v2)",
)
@querystring_schema(ConnectionsListQueryStringSchemaV2())
@response_schema(ConnectionListSchema(), 200)
async def connections_list_v2(request: web.BaseRequest):
    """
    Request handler for searching connection records.
    Args:
        request: aiohttp request object
    Returns:
        The connection list response
    """
    context = request.app["request_context"]
    tag_filter = {}
    for param_name in (
        "invitation_id",
        "my_did",
        "their_did",
        "request_id",
        "invitation_key"
    ):
        if param_name in request.query and request.query[param_name] != "":
            tag_filter[param_name] = request.query[param_name]
    post_filter = {}
    for param_name in (
        "alias",
        "initiator",
        "state",
        "their_role",
    ):
        if param_name in request.query and request.query[param_name] != "":
            post_filter[param_name] = request.query[param_name]
    # Pagination parameters
    page = clean_and_get_field_from_dict(request.query, "page")
    page = int(page) if page is not None else page
    page_size = clean_and_get_field_from_dict(request.query, "page_size")
    page_size = int(page_size) if page_size is not None else page_size
    # Category
    org_flag = clean_and_get_field_from_dict(request.query, "org_flag")
    marketplace_flag = clean_and_get_field_from_dict(request.query, "marketplace_flag")
    mgr = V2ADAManager(context)
    pagination_result = await mgr.query_connections_and_categorise_results(
        tag_filter,
        post_filter,
        page if page else 1,
        page_size if page_size else 10,
        str_to_bool(org_flag) if org_flag else org_flag,
        str_to_bool(marketplace_flag) if marketplace_flag else marketplace_flag
    )
    return web.json_response(pagination_result._asdict())
Functions
def connection_sort_key(conn)- 
Get the sorting key for a particular connection.
Expand source code
def connection_sort_key(conn): """Get the sorting key for a particular connection.""" if conn["state"] == ConnectionRecord.STATE_INACTIVE: pfx = "2" elif conn["state"] == ConnectionRecord.STATE_INVITATION: pfx = "1" else: pfx = "0" return pfx + conn["created_at"] async def connections_list_v2(request: aiohttp.web_request.BaseRequest)- 
Request handler for searching connection records.
Args
request- aiohttp request object
 
Returns
The connection list response
Expand source code
@docs( tags=["connection"], summary="Query agent-to-agent connections (v2)", ) @querystring_schema(ConnectionsListQueryStringSchemaV2()) @response_schema(ConnectionListSchema(), 200) async def connections_list_v2(request: web.BaseRequest): """ Request handler for searching connection records. Args: request: aiohttp request object Returns: The connection list response """ context = request.app["request_context"] tag_filter = {} for param_name in ( "invitation_id", "my_did", "their_did", "request_id", "invitation_key" ): if param_name in request.query and request.query[param_name] != "": tag_filter[param_name] = request.query[param_name] post_filter = {} for param_name in ( "alias", "initiator", "state", "their_role", ): if param_name in request.query and request.query[param_name] != "": post_filter[param_name] = request.query[param_name] # Pagination parameters page = clean_and_get_field_from_dict(request.query, "page") page = int(page) if page is not None else page page_size = clean_and_get_field_from_dict(request.query, "page_size") page_size = int(page_size) if page_size is not None else page_size # Category org_flag = clean_and_get_field_from_dict(request.query, "org_flag") marketplace_flag = clean_and_get_field_from_dict(request.query, "marketplace_flag") mgr = V2ADAManager(context) pagination_result = await mgr.query_connections_and_categorise_results( tag_filter, post_filter, page if page else 1, page_size if page_size else 10, str_to_bool(org_flag) if org_flag else org_flag, str_to_bool(marketplace_flag) if marketplace_flag else marketplace_flag ) return web.json_response(pagination_result._asdict()) async def generate_firebase_dynamic_link_for_connection_invitation_handler(request: aiohttp.web_request.BaseRequest)- 
Request handler for generating firebase dynamic link for connection invitation.
Args
request- aiohttp request object
 
Returns
The firebase dynamic link
Expand source code
@docs( tags=["connection"], summary="Generate firebase dynamic link for connection invitation", ) @match_info_schema(GenerateFirebaseDynamicLinkForConnectionInvitationMatchInfoSchema()) @response_schema( GenerateFirebaseDynamicLinkForConnectionInvitationResponseSchema(), 200 ) async def generate_firebase_dynamic_link_for_connection_invitation_handler( request: web.BaseRequest, ): """ Request handler for generating firebase dynamic link for connection invitation. Args: request: aiohttp request object Returns: The firebase dynamic link """ # Request context. context = request.app["request_context"] # Path params. conn_id = request.match_info["conn_id"] # Initialise MyData DID Manager. mgr = V2ADAManager(context=context) try: # Call the function res = await mgr.create_connection_qr_code( conn_id ) except (ConnectionManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response(res) async def get_existing_connections_handler(request: aiohttp.web_request.BaseRequest)- 
Fetch existing connection details if any for a current connection.
Expand source code
@docs( tags=["connection"], summary="Fetch existing connection details if any for a current connection.", ) @match_info_schema(GetExistingConnectionMatchInfoSchema()) @response_schema(GetExistingConnectionResponseSchema(), 200) async def get_existing_connections_handler(request: web.BaseRequest): """Fetch existing connection details if any for a current connection.""" # Request context context = request.app["request_context"] # Path params. conn_id = request.match_info["conn_id"] # Initialise MyData DID Manager. mgr = V2ADAManager(context=context) # Call the function result = await mgr.get_existing_connection_record_for_new_connection_id( conn_id ) result = result.serialize() if result else {} return web.json_response(result) async def send_existing_connections_message_handler(request: aiohttp.web_request.BaseRequest)- 
Send existing connections message to remote agent.
Expand source code
@docs( tags=["connection"], summary="Send existing connections message to remote agent.", responses={ 200: { "description": "Success", } }, ) @match_info_schema(SendExistingConnectionsMessageHandlerMatchInfoSchema()) @request_schema(SendExistingConnectionsMessageHandlerRequestSchema()) async def send_existing_connections_message_handler(request: web.BaseRequest): """Send existing connections message to remote agent.""" context = request.app["request_context"] conn_id = request.match_info["conn_id"] # Fetch request body body = await request.json() theirdid = body.get("theirdid") # Initialise MyData DID Manager. mgr = V2ADAManager(context=context) try: # Call the function await mgr.send_existing_connections_message( theirdid, conn_id ) except ADAManagerError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response({}, status=200) async def v2_connections_create_invitation(request: aiohttp.web_request.BaseRequest)- 
Request handler for creating a new connection invitation.
Args
request- aiohttp request object
 
Returns
The connection invitation details
Expand source code
@docs( tags=["connection"], summary="Create a new connection invitation (Overridden API)", ) @querystring_schema(V2CreateInvitationQueryStringSchema()) @response_schema(V2InvitationResultSchema(), 200) async def v2_connections_create_invitation(request: web.BaseRequest): """ Request handler for creating a new connection invitation. Args: request: aiohttp request object Returns: The connection invitation details """ context = request.app["request_context"] auto_accept = json.loads(request.query.get("auto_accept", "null")) alias = request.query.get("alias") public = json.loads(request.query.get("public", "false")) multi_use = json.loads(request.query.get("multi_use", "false")) if public and not context.settings.get("public_invites"): raise web.HTTPForbidden( reason="Configuration does not include public invitations" ) base_url = context.settings.get("invite_base_url") # Initialise MyData DID Manager. mgr = V2ADAManager(context=context) try: (connection, invitation) = await mgr.create_invitation( auto_accept=auto_accept, public=public, multi_use=multi_use, alias=alias ) result = { "connection_id": connection and connection.connection_id, "invitation": invitation.serialize(), "invitation_url": invitation.to_url(base_url), } except (ConnectionManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err if connection and connection.alias: result["alias"] = connection.alias return web.json_response(result) async def wellknown_connection_handler(request: aiohttp.web_request.BaseRequest)- 
Handler for well-known connection for the agent.
Expand source code
@docs( tags=["connection"], summary="Well-known endpoint for connection", ) async def wellknown_connection_handler(request: web.BaseRequest): """Handler for well-known connection for the agent.""" context = request.app["request_context"] auto_accept = True alias = "" public = False multi_use = False connection_mgr = ConnectionManager(context) try: (connection, invitation) = await connection_mgr.create_invitation( auto_accept=auto_accept, public=public, multi_use=multi_use, alias=alias ) result = { "ServiceEndpoint": invitation.serialize()["serviceEndpoint"], "RoutingKey": "", "Invitation": { "label": invitation.label, "serviceEndpoint": invitation.serialize()["serviceEndpoint"], "routingKeys": [], "recipientKeys": invitation.serialize()["recipientKeys"], "@id": str(uuid.uuid4()), "@type": "did:sov:BzCbsNYhMrjHiqZDTUASHg;spec/connections/1.0/invitation", }, } except (ConnectionManagerError, BaseModelError) as err: raise web.HTTPBadRequest(reason=err.roll_up) from err return web.json_response(result)