Module mydata_did.v1_0.routes.data_controller_functions_routes

Expand source code
import logging
from aiohttp import web
from aiohttp_apispec import (
    docs,
    request_schema
)
from dexa_sdk.managers.ada_manager import V2ADAManager
from ..routes.maps.tag_maps import (
    TAGS_DATA_CONTROLLER_FUNCTIONS_LABEL,
)
from .openapi.schemas import (
    UpdateControllerDetailsRequestSchema
)


LOGGER = logging.getLogger(__name__)

PAGINATION_PAGE_SIZE = 10


@docs(
    tags=[TAGS_DATA_CONTROLLER_FUNCTIONS_LABEL],
    summary="Send data controller details message to remote agent hosted by Data Controller",
    responses={
        200: {
            "description": "Success",
        }
    },
)
async def send_data_controller_details_message_handler(request: web.BaseRequest):
    """Send data controller details message to remote agent hosted by Data Controller."""

    context = request.app["request_context"]
    connection_id = request.match_info["connection_id"]

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)

    # Call the function
    await mgr.send_data_controller_details_message(connection_id)

    return web.json_response({}, status=200)


@docs(
    tags=[TAGS_DATA_CONTROLLER_FUNCTIONS_LABEL],
    summary="Update data controller details",
)
@request_schema(UpdateControllerDetailsRequestSchema())
async def update_data_controller_details(request: web.BaseRequest):
    """Update data controller details"""
    # Request context
    context = request.app["request_context"]

    # Request body
    controller_details = await request.json()

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)

    # Call the function
    record = await mgr.update_controller_details(
        organisation_name=controller_details.get("organisation_name"),
        cover_image_url=controller_details.get("cover_image_url"),
        logo_image_url=controller_details.get("logo_image_url"),
        location=controller_details.get("location"),
        organisation_type=controller_details.get("organisation_type"),
        description=controller_details.get("description"),
        policy_url=controller_details.get("policy_url"),
        eula_url=controller_details.get("eula_url")
    )

    return web.json_response(record.serialize())

Functions

async def send_data_controller_details_message_handler(request: aiohttp.web_request.BaseRequest)

Send data controller details message to remote agent hosted by Data Controller.

Expand source code
@docs(
    tags=[TAGS_DATA_CONTROLLER_FUNCTIONS_LABEL],
    summary="Send data controller details message to remote agent hosted by Data Controller",
    responses={
        200: {
            "description": "Success",
        }
    },
)
async def send_data_controller_details_message_handler(request: web.BaseRequest):
    """Send data controller details message to remote agent hosted by Data Controller."""

    context = request.app["request_context"]
    connection_id = request.match_info["connection_id"]

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)

    # Call the function
    await mgr.send_data_controller_details_message(connection_id)

    return web.json_response({}, status=200)
async def update_data_controller_details(request: aiohttp.web_request.BaseRequest)

Update data controller details

Expand source code
@docs(
    tags=[TAGS_DATA_CONTROLLER_FUNCTIONS_LABEL],
    summary="Update data controller details",
)
@request_schema(UpdateControllerDetailsRequestSchema())
async def update_data_controller_details(request: web.BaseRequest):
    """Update data controller details"""
    # Request context
    context = request.app["request_context"]

    # Request body
    controller_details = await request.json()

    # Initialise MyData DID Manager.
    mgr = V2ADAManager(context=context)

    # Call the function
    record = await mgr.update_controller_details(
        organisation_name=controller_details.get("organisation_name"),
        cover_image_url=controller_details.get("cover_image_url"),
        logo_image_url=controller_details.get("logo_image_url"),
        location=controller_details.get("location"),
        organisation_type=controller_details.get("organisation_type"),
        description=controller_details.get("description"),
        policy_url=controller_details.get("policy_url"),
        eula_url=controller_details.get("eula_url")
    )

    return web.json_response(record.serialize())