Module mydata_did.v1_0.routes.routes

Expand source code
import os
import typing
import logging
import jwt

from aiohttp import web
from aiohttp import frozenlist
from ..manager import ADAManager, ADAManagerError

from .maps.tag_maps import (
    TAGS_MYDATA_DID_OPERATIONS,
    TAGS_DATA_AGREEMENT_CORE_FUNCTIONS,
    TAGS_DATA_AGREEMENT_AUDITOR_FUNCTIONS,
    TAGS_JSONLD_FUNCTIONS,
    TAGS_DATA_CONTROLLER_FUNCTIONS
)

from .maps.route_maps import ROUTES_ADA


LOGGER = logging.getLogger(__name__)

PAGINATION_PAGE_SIZE = 10


@web.middleware
async def authentication_middleware(
    request: web.BaseRequest, handler: typing.Coroutine
):
    """
    Authentication middleware.

    Authenticate the request if the request headers contains
    Authorization header with value of ApiKey <api_key>.
    """

    # Context.
    context = request.app["request_context"]

    # Fetch iGrant.io config from os environment.
    config_org_id = context.settings.get("intermediary.igrantio_org_id")
    config_api_key_secret = context.settings.get("intermediary.igrantio_org_api_key_secret")

    if not config_org_id:
        # Intermediary config not available.
        return await handler(request)

    # Fetch authorization header.
    authorization_header = request.headers.get("Authorization")

    # Fetch api key from authorization header.
    header_api_key = authorization_header.split("ApiKey ")[1] if authorization_header else None

    if not header_api_key:
        raise web.HTTPUnauthorized(reason="Missing Authorization header.")

    # Authenticate the request.
    try:
        jwt.decode(header_api_key, config_api_key_secret, algorithms=["HS256"])
    except jwt.exceptions.InvalidTokenError:
        try:
            jwt.decode(
                header_api_key, config_api_key_secret, algorithms=["HS256"], audience="dataverifier"
            )
        except jwt.exceptions.InvalidTokenError:
            raise web.HTTPUnauthorized(reason="Invalid API Key.")

    # Override settings api key.
    context.settings["intermediary.igrantio_org_api_key"] = header_api_key

    # Call the handler.
    return await handler(request)


async def register(app: web.Application):

    app._middlewares = frozenlist.FrozenList(
        app.middlewares[:] + [authentication_middleware]
    )

    app.add_routes(ROUTES_ADA)


def post_process_routes(app: web.Application):
    """Amend swagger API."""

    # Add top-level tags description
    if "tags" not in app._state["swagger_dict"]:
        app._state["swagger_dict"]["tags"] = []

    app._state["swagger_dict"]["tags"].append(TAGS_MYDATA_DID_OPERATIONS)
    app._state["swagger_dict"]["tags"].append(TAGS_DATA_AGREEMENT_CORE_FUNCTIONS)
    app._state["swagger_dict"]["tags"].append(TAGS_DATA_AGREEMENT_AUDITOR_FUNCTIONS)
    app._state["swagger_dict"]["tags"].append(TAGS_JSONLD_FUNCTIONS)
    app._state["swagger_dict"]["tags"].append(TAGS_DATA_CONTROLLER_FUNCTIONS)

Functions

async def authentication_middleware(request: aiohttp.web_request.BaseRequest, handler: Coroutine[+T_co, -T_contra, +V_co])

Authentication middleware.

Authenticate the request if the request headers contains Authorization header with value of ApiKey .

Expand source code
@web.middleware
async def authentication_middleware(
    request: web.BaseRequest, handler: typing.Coroutine
):
    """
    Authentication middleware.

    Authenticate the request if the request headers contains
    Authorization header with value of ApiKey <api_key>.
    """

    # Context.
    context = request.app["request_context"]

    # Fetch iGrant.io config from os environment.
    config_org_id = context.settings.get("intermediary.igrantio_org_id")
    config_api_key_secret = context.settings.get("intermediary.igrantio_org_api_key_secret")

    if not config_org_id:
        # Intermediary config not available.
        return await handler(request)

    # Fetch authorization header.
    authorization_header = request.headers.get("Authorization")

    # Fetch api key from authorization header.
    header_api_key = authorization_header.split("ApiKey ")[1] if authorization_header else None

    if not header_api_key:
        raise web.HTTPUnauthorized(reason="Missing Authorization header.")

    # Authenticate the request.
    try:
        jwt.decode(header_api_key, config_api_key_secret, algorithms=["HS256"])
    except jwt.exceptions.InvalidTokenError:
        try:
            jwt.decode(
                header_api_key, config_api_key_secret, algorithms=["HS256"], audience="dataverifier"
            )
        except jwt.exceptions.InvalidTokenError:
            raise web.HTTPUnauthorized(reason="Invalid API Key.")

    # Override settings api key.
    context.settings["intermediary.igrantio_org_api_key"] = header_api_key

    # Call the handler.
    return await handler(request)
def post_process_routes(app: aiohttp.web_app.Application)

Amend swagger API.

Expand source code
def post_process_routes(app: web.Application):
    """Amend swagger API."""

    # Add top-level tags description
    if "tags" not in app._state["swagger_dict"]:
        app._state["swagger_dict"]["tags"] = []

    app._state["swagger_dict"]["tags"].append(TAGS_MYDATA_DID_OPERATIONS)
    app._state["swagger_dict"]["tags"].append(TAGS_DATA_AGREEMENT_CORE_FUNCTIONS)
    app._state["swagger_dict"]["tags"].append(TAGS_DATA_AGREEMENT_AUDITOR_FUNCTIONS)
    app._state["swagger_dict"]["tags"].append(TAGS_JSONLD_FUNCTIONS)
    app._state["swagger_dict"]["tags"].append(TAGS_DATA_CONTROLLER_FUNCTIONS)
async def register(app: aiohttp.web_app.Application)
Expand source code
async def register(app: web.Application):

    app._middlewares = frozenlist.FrozenList(
        app.middlewares[:] + [authentication_middleware]
    )

    app.add_routes(ROUTES_ADA)