Module mydata_did.v1_0.routes.data_controller_functions_routes
Expand source code
import logging
from aiohttp import web
from aiohttp_apispec import (
docs,
request_schema
)
from dexa_sdk.managers.ada_manager import V2ADAManager
from ..routes.maps.tag_maps import (
TAGS_DATA_CONTROLLER_FUNCTIONS_LABEL,
)
from .openapi.schemas import (
UpdateControllerDetailsRequestSchema
)
LOGGER = logging.getLogger(__name__)
PAGINATION_PAGE_SIZE = 10
@docs(
tags=[TAGS_DATA_CONTROLLER_FUNCTIONS_LABEL],
summary="Send data controller details message to remote agent hosted by Data Controller",
responses={
200: {
"description": "Success",
}
},
)
async def send_data_controller_details_message_handler(request: web.BaseRequest):
"""Send data controller details message to remote agent hosted by Data Controller."""
context = request.app["request_context"]
connection_id = request.match_info["connection_id"]
# Initialise MyData DID Manager.
mgr = V2ADAManager(context=context)
# Call the function
await mgr.send_data_controller_details_message(connection_id)
return web.json_response({}, status=200)
@docs(
tags=[TAGS_DATA_CONTROLLER_FUNCTIONS_LABEL],
summary="Update data controller details",
)
@request_schema(UpdateControllerDetailsRequestSchema())
async def update_data_controller_details(request: web.BaseRequest):
"""Update data controller details"""
# Request context
context = request.app["request_context"]
# Request body
controller_details = await request.json()
# Initialise MyData DID Manager.
mgr = V2ADAManager(context=context)
# Call the function
record = await mgr.update_controller_details(
organisation_name=controller_details.get("organisation_name"),
cover_image_url=controller_details.get("cover_image_url"),
logo_image_url=controller_details.get("logo_image_url"),
location=controller_details.get("location"),
organisation_type=controller_details.get("organisation_type"),
description=controller_details.get("description"),
policy_url=controller_details.get("policy_url"),
eula_url=controller_details.get("eula_url")
)
return web.json_response(record.serialize())
Functions
async def send_data_controller_details_message_handler(request: aiohttp.web_request.BaseRequest)
-
Send data controller details message to remote agent hosted by Data Controller.
Expand source code
@docs( tags=[TAGS_DATA_CONTROLLER_FUNCTIONS_LABEL], summary="Send data controller details message to remote agent hosted by Data Controller", responses={ 200: { "description": "Success", } }, ) async def send_data_controller_details_message_handler(request: web.BaseRequest): """Send data controller details message to remote agent hosted by Data Controller.""" context = request.app["request_context"] connection_id = request.match_info["connection_id"] # Initialise MyData DID Manager. mgr = V2ADAManager(context=context) # Call the function await mgr.send_data_controller_details_message(connection_id) return web.json_response({}, status=200)
async def update_data_controller_details(request: aiohttp.web_request.BaseRequest)
-
Update data controller details
Expand source code
@docs( tags=[TAGS_DATA_CONTROLLER_FUNCTIONS_LABEL], summary="Update data controller details", ) @request_schema(UpdateControllerDetailsRequestSchema()) async def update_data_controller_details(request: web.BaseRequest): """Update data controller details""" # Request context context = request.app["request_context"] # Request body controller_details = await request.json() # Initialise MyData DID Manager. mgr = V2ADAManager(context=context) # Call the function record = await mgr.update_controller_details( organisation_name=controller_details.get("organisation_name"), cover_image_url=controller_details.get("cover_image_url"), logo_image_url=controller_details.get("logo_image_url"), location=controller_details.get("location"), organisation_type=controller_details.get("organisation_type"), description=controller_details.get("description"), policy_url=controller_details.get("policy_url"), eula_url=controller_details.get("eula_url") ) return web.json_response(record.serialize())