Module mydata_did.v1_0.routes.connection_routes

Expand source code
import json
import uuid
import logging
from aiohttp import web
from aiohttp_apispec import (
    docs,
    request_schema,
    querystring_schema,
    response_schema,
    match_info_schema,
)
from aries_cloudagent.messaging.models.base import BaseModelError
from aries_cloudagent.connections.models.connection_record import (
    ConnectionRecord,
)
from aries_cloudagent.protocols.connections.v1_0.manager import (
    ConnectionManager,
    ConnectionManagerError,
)
from dexa_sdk.managers.ada_manager import V2ADAManager
from dexa_sdk.utils import clean_and_get_field_from_dict
from ..utils.util import str_to_bool
from ..manager import ADAManagerError
from .openapi import (
    V2CreateInvitationQueryStringSchema,
    V2InvitationResultSchema,
    GenerateFirebaseDynamicLinkForConnectionInvitationMatchInfoSchema,
    GenerateFirebaseDynamicLinkForConnectionInvitationResponseSchema,
    SendExistingConnectionsMessageHandlerMatchInfoSchema,
    SendExistingConnectionsMessageHandlerRequestSchema,
    GetExistingConnectionMatchInfoSchema,
    GetExistingConnectionResponseSchema,
    ConnectionsListQueryStringSchemaV2,
    ConnectionListSchema,
)

LOGGER = logging.getLogger(__name__)

PAGINATION_PAGE_SIZE = 10


@docs(
    tags=["connection"],
    summary="Well-known endpoint for connection",
)
async def wellknown_connection_handler(request: web.BaseRequest):
    """Handler for well-known connection for the agent."""
    context = request.app["request_context"]
    auto_accept = True
    alias = ""
    public = False
    multi_use = False

    connection_mgr = ConnectionManager(context)
    try:
        (connection, invitation) = await connection_mgr.create_invitation(
            auto_accept=auto_accept, public=public, multi_use=multi_use, alias=alias
        )

        result = {
            "ServiceEndpoint": invitation.serialize()["serviceEndpoint"],
            "RoutingKey": "",
            "Invitation": {
                "label": invitation.label,
                "serviceEndpoint": invitation.serialize()["serviceEndpoint"],
                "routingKeys": [],
                "recipientKeys": invitation.serialize()["recipientKeys"],
                "@id": str(uuid.uuid4()),
                "@type": "did:sov:BzCbsNYhMrjHiqZDTUASHg;spec/connections/1.0/invitation",
            },
        }
    except (ConnectionManagerError, BaseModelError) as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err

    return web.json_response(result)


@docs(
    tags=["connection"],
    summary="Create a new connection invitation (Overridden API)",
)
@querystring_schema(V2CreateInvitationQueryStringSchema())
@response_schema(V2InvitationResultSchema(), 200)
async def v2_connections_create_invitation(request: web.BaseRequest):
    """
    Request handler for creating a new connection invitation.

    Args:
        request: aiohttp request object

    Returns:
        The connection invitation details

    """

    context = request.app["request_context"]
    auto_accept = json.loads(request.query.get("auto_accept", "null"))
    alias = request.query.get("alias")
    public = json.loads(request.query.get("public", "false"))
    multi_use = json.loads(request.query.get("multi_use", "false"))

    if public and not context.settings.get("public_invites"):
        raise web.HTTPForbidden(
            reason="Configuration does not include public invitations"
        )
    base_url = context.settings.get("invite_base_url")

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)
    try:
        (connection, invitation) = await mgr.create_invitation(
            auto_accept=auto_accept, public=public, multi_use=multi_use, alias=alias
        )

        result = {
            "connection_id": connection and connection.connection_id,
            "invitation": invitation.serialize(),
            "invitation_url": invitation.to_url(base_url),
        }
    except (ConnectionManagerError, BaseModelError) as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err

    if connection and connection.alias:
        result["alias"] = connection.alias

    return web.json_response(result)


@docs(
    tags=["connection"],
    summary="Generate firebase dynamic link for connection invitation",
)
@match_info_schema(GenerateFirebaseDynamicLinkForConnectionInvitationMatchInfoSchema())
@response_schema(
    GenerateFirebaseDynamicLinkForConnectionInvitationResponseSchema(), 200
)
async def generate_firebase_dynamic_link_for_connection_invitation_handler(
    request: web.BaseRequest,
):
    """
    Request handler for generating firebase dynamic link for connection invitation.

    Args:
        request: aiohttp request object

    Returns:
        The firebase dynamic link

    """

    # Request context.
    context = request.app["request_context"]

    # Path params.
    conn_id = request.match_info["conn_id"]

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)
    try:
        # Call the function
        res = await mgr.create_connection_qr_code(
            conn_id
        )

    except (ConnectionManagerError, BaseModelError) as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err

    return web.json_response(res)


@docs(
    tags=["connection"],
    summary="Send existing connections message to remote agent.",
    responses={
        200: {
            "description": "Success",
        }
    },
)
@match_info_schema(SendExistingConnectionsMessageHandlerMatchInfoSchema())
@request_schema(SendExistingConnectionsMessageHandlerRequestSchema())
async def send_existing_connections_message_handler(request: web.BaseRequest):
    """Send existing connections message to remote agent."""

    context = request.app["request_context"]
    conn_id = request.match_info["conn_id"]

    # Fetch request body
    body = await request.json()
    theirdid = body.get("theirdid")

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)

    try:
        # Call the function
        await mgr.send_existing_connections_message(
            theirdid,
            conn_id
        )

    except ADAManagerError as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err

    return web.json_response({}, status=200)


@docs(
    tags=["connection"],
    summary="Fetch existing connection details if any for a current connection.",
)
@match_info_schema(GetExistingConnectionMatchInfoSchema())
@response_schema(GetExistingConnectionResponseSchema(), 200)
async def get_existing_connections_handler(request: web.BaseRequest):
    """Fetch existing connection details if any for a current connection."""

    # Request context
    context = request.app["request_context"]

    # Path params.
    conn_id = request.match_info["conn_id"]

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)

    # Call the function
    result = await mgr.get_existing_connection_record_for_new_connection_id(
        conn_id
    )
    result = result.serialize() if result else {}

    return web.json_response(result)


def connection_sort_key(conn):
    """Get the sorting key for a particular connection."""
    if conn["state"] == ConnectionRecord.STATE_INACTIVE:
        pfx = "2"
    elif conn["state"] == ConnectionRecord.STATE_INVITATION:
        pfx = "1"
    else:
        pfx = "0"
    return pfx + conn["created_at"]


@docs(
    tags=["connection"],
    summary="Query agent-to-agent connections (v2)",
)
@querystring_schema(ConnectionsListQueryStringSchemaV2())
@response_schema(ConnectionListSchema(), 200)
async def connections_list_v2(request: web.BaseRequest):
    """
    Request handler for searching connection records.

    Args:
        request: aiohttp request object

    Returns:
        The connection list response

    """
    context = request.app["request_context"]
    tag_filter = {}
    for param_name in (
        "invitation_id",
        "my_did",
        "their_did",
        "request_id",
        "invitation_key"
    ):
        if param_name in request.query and request.query[param_name] != "":
            tag_filter[param_name] = request.query[param_name]
    post_filter = {}
    for param_name in (
        "alias",
        "initiator",
        "state",
        "their_role",
    ):
        if param_name in request.query and request.query[param_name] != "":
            post_filter[param_name] = request.query[param_name]

    # Pagination parameters
    page = clean_and_get_field_from_dict(request.query, "page")
    page = int(page) if page is not None else page
    page_size = clean_and_get_field_from_dict(request.query, "page_size")
    page_size = int(page_size) if page_size is not None else page_size

    # Category
    org_flag = clean_and_get_field_from_dict(request.query, "org_flag")
    marketplace_flag = clean_and_get_field_from_dict(request.query, "marketplace_flag")

    mgr = V2ADAManager(context)

    pagination_result = await mgr.query_connections_and_categorise_results(
        tag_filter,
        post_filter,
        page if page else 1,
        page_size if page_size else 10,
        str_to_bool(org_flag) if org_flag else org_flag,
        str_to_bool(marketplace_flag) if marketplace_flag else marketplace_flag
    )

    return web.json_response(pagination_result._asdict())

Functions

def connection_sort_key(conn)

Get the sorting key for a particular connection.

Expand source code
def connection_sort_key(conn):
    """Get the sorting key for a particular connection."""
    if conn["state"] == ConnectionRecord.STATE_INACTIVE:
        pfx = "2"
    elif conn["state"] == ConnectionRecord.STATE_INVITATION:
        pfx = "1"
    else:
        pfx = "0"
    return pfx + conn["created_at"]
async def connections_list_v2(request: aiohttp.web_request.BaseRequest)

Request handler for searching connection records.

Args

request
aiohttp request object

Returns

The connection list response

Expand source code
@docs(
    tags=["connection"],
    summary="Query agent-to-agent connections (v2)",
)
@querystring_schema(ConnectionsListQueryStringSchemaV2())
@response_schema(ConnectionListSchema(), 200)
async def connections_list_v2(request: web.BaseRequest):
    """
    Request handler for searching connection records.

    Args:
        request: aiohttp request object

    Returns:
        The connection list response

    """
    context = request.app["request_context"]
    tag_filter = {}
    for param_name in (
        "invitation_id",
        "my_did",
        "their_did",
        "request_id",
        "invitation_key"
    ):
        if param_name in request.query and request.query[param_name] != "":
            tag_filter[param_name] = request.query[param_name]
    post_filter = {}
    for param_name in (
        "alias",
        "initiator",
        "state",
        "their_role",
    ):
        if param_name in request.query and request.query[param_name] != "":
            post_filter[param_name] = request.query[param_name]

    # Pagination parameters
    page = clean_and_get_field_from_dict(request.query, "page")
    page = int(page) if page is not None else page
    page_size = clean_and_get_field_from_dict(request.query, "page_size")
    page_size = int(page_size) if page_size is not None else page_size

    # Category
    org_flag = clean_and_get_field_from_dict(request.query, "org_flag")
    marketplace_flag = clean_and_get_field_from_dict(request.query, "marketplace_flag")

    mgr = V2ADAManager(context)

    pagination_result = await mgr.query_connections_and_categorise_results(
        tag_filter,
        post_filter,
        page if page else 1,
        page_size if page_size else 10,
        str_to_bool(org_flag) if org_flag else org_flag,
        str_to_bool(marketplace_flag) if marketplace_flag else marketplace_flag
    )

    return web.json_response(pagination_result._asdict())

Request handler for generating firebase dynamic link for connection invitation.

Args

request
aiohttp request object

Returns

The firebase dynamic link

Expand source code
@docs(
    tags=["connection"],
    summary="Generate firebase dynamic link for connection invitation",
)
@match_info_schema(GenerateFirebaseDynamicLinkForConnectionInvitationMatchInfoSchema())
@response_schema(
    GenerateFirebaseDynamicLinkForConnectionInvitationResponseSchema(), 200
)
async def generate_firebase_dynamic_link_for_connection_invitation_handler(
    request: web.BaseRequest,
):
    """
    Request handler for generating firebase dynamic link for connection invitation.

    Args:
        request: aiohttp request object

    Returns:
        The firebase dynamic link

    """

    # Request context.
    context = request.app["request_context"]

    # Path params.
    conn_id = request.match_info["conn_id"]

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)
    try:
        # Call the function
        res = await mgr.create_connection_qr_code(
            conn_id
        )

    except (ConnectionManagerError, BaseModelError) as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err

    return web.json_response(res)
async def get_existing_connections_handler(request: aiohttp.web_request.BaseRequest)

Fetch existing connection details if any for a current connection.

Expand source code
@docs(
    tags=["connection"],
    summary="Fetch existing connection details if any for a current connection.",
)
@match_info_schema(GetExistingConnectionMatchInfoSchema())
@response_schema(GetExistingConnectionResponseSchema(), 200)
async def get_existing_connections_handler(request: web.BaseRequest):
    """Fetch existing connection details if any for a current connection."""

    # Request context
    context = request.app["request_context"]

    # Path params.
    conn_id = request.match_info["conn_id"]

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)

    # Call the function
    result = await mgr.get_existing_connection_record_for_new_connection_id(
        conn_id
    )
    result = result.serialize() if result else {}

    return web.json_response(result)
async def send_existing_connections_message_handler(request: aiohttp.web_request.BaseRequest)

Send existing connections message to remote agent.

Expand source code
@docs(
    tags=["connection"],
    summary="Send existing connections message to remote agent.",
    responses={
        200: {
            "description": "Success",
        }
    },
)
@match_info_schema(SendExistingConnectionsMessageHandlerMatchInfoSchema())
@request_schema(SendExistingConnectionsMessageHandlerRequestSchema())
async def send_existing_connections_message_handler(request: web.BaseRequest):
    """Send existing connections message to remote agent."""

    context = request.app["request_context"]
    conn_id = request.match_info["conn_id"]

    # Fetch request body
    body = await request.json()
    theirdid = body.get("theirdid")

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)

    try:
        # Call the function
        await mgr.send_existing_connections_message(
            theirdid,
            conn_id
        )

    except ADAManagerError as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err

    return web.json_response({}, status=200)
async def v2_connections_create_invitation(request: aiohttp.web_request.BaseRequest)

Request handler for creating a new connection invitation.

Args

request
aiohttp request object

Returns

The connection invitation details

Expand source code
@docs(
    tags=["connection"],
    summary="Create a new connection invitation (Overridden API)",
)
@querystring_schema(V2CreateInvitationQueryStringSchema())
@response_schema(V2InvitationResultSchema(), 200)
async def v2_connections_create_invitation(request: web.BaseRequest):
    """
    Request handler for creating a new connection invitation.

    Args:
        request: aiohttp request object

    Returns:
        The connection invitation details

    """

    context = request.app["request_context"]
    auto_accept = json.loads(request.query.get("auto_accept", "null"))
    alias = request.query.get("alias")
    public = json.loads(request.query.get("public", "false"))
    multi_use = json.loads(request.query.get("multi_use", "false"))

    if public and not context.settings.get("public_invites"):
        raise web.HTTPForbidden(
            reason="Configuration does not include public invitations"
        )
    base_url = context.settings.get("invite_base_url")

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)
    try:
        (connection, invitation) = await mgr.create_invitation(
            auto_accept=auto_accept, public=public, multi_use=multi_use, alias=alias
        )

        result = {
            "connection_id": connection and connection.connection_id,
            "invitation": invitation.serialize(),
            "invitation_url": invitation.to_url(base_url),
        }
    except (ConnectionManagerError, BaseModelError) as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err

    if connection and connection.alias:
        result["alias"] = connection.alias

    return web.json_response(result)
async def wellknown_connection_handler(request: aiohttp.web_request.BaseRequest)

Handler for well-known connection for the agent.

Expand source code
@docs(
    tags=["connection"],
    summary="Well-known endpoint for connection",
)
async def wellknown_connection_handler(request: web.BaseRequest):
    """Handler for well-known connection for the agent."""
    context = request.app["request_context"]
    auto_accept = True
    alias = ""
    public = False
    multi_use = False

    connection_mgr = ConnectionManager(context)
    try:
        (connection, invitation) = await connection_mgr.create_invitation(
            auto_accept=auto_accept, public=public, multi_use=multi_use, alias=alias
        )

        result = {
            "ServiceEndpoint": invitation.serialize()["serviceEndpoint"],
            "RoutingKey": "",
            "Invitation": {
                "label": invitation.label,
                "serviceEndpoint": invitation.serialize()["serviceEndpoint"],
                "routingKeys": [],
                "recipientKeys": invitation.serialize()["recipientKeys"],
                "@id": str(uuid.uuid4()),
                "@type": "did:sov:BzCbsNYhMrjHiqZDTUASHg;spec/connections/1.0/invitation",
            },
        }
    except (ConnectionManagerError, BaseModelError) as err:
        raise web.HTTPBadRequest(reason=err.roll_up) from err

    return web.json_response(result)