Module mydata_did.v1_0.utils.jsonld.data_agreement

Sign and verify functions for json-ld based data agreements.

Expand source code
"""Sign and verify functions for json-ld based data agreements."""

import json
import typing

from aries_cloudagent.wallet.util import (
    b58_to_bytes,
    b64_to_bytes,
    b64_to_str,
    bytes_to_b58,
    bytes_to_b64,
    str_to_b64,
)

from aries_cloudagent.wallet.base import BaseWallet

from .create_verify_data import create_verify_data


MULTIBASE_B58_BTC = "z"
MULTICODEC_ED25519_PUB = b"\xed"


def did_mydata(verkey: str) -> str:
    """Qualify verkey into DID MyData if need be."""

    if verkey.startswith(f"did:mydata:{MULTIBASE_B58_BTC}"):
        return verkey

    return f"did:mydata:{MULTIBASE_B58_BTC}" + bytes_to_b58(
        MULTICODEC_ED25519_PUB + b58_to_bytes(verkey)
    )


def b64encode(str):
    """Url Safe B64 Encode."""
    return str_to_b64(str, urlsafe=True, pad=False)


def b64decode(bytes):
    """Url Safe B64 Decode."""
    return b64_to_str(bytes, urlsafe=True)


def create_jws(encoded_header, verify_data):
    """Compose JWS."""
    return (encoded_header + ".").encode("utf-8") + verify_data


async def jws_sign(verify_data, verkey, wallet):
    """Sign JWS."""

    header = {"alg": "EdDSA", "b64": False, "crit": ["b64"]}

    encoded_header = b64encode(json.dumps(header))

    jws_to_sign = create_jws(encoded_header, verify_data)

    signature = await wallet.sign_message(jws_to_sign, verkey)

    encoded_signature = bytes_to_b64(signature, urlsafe=True, pad=False)

    return encoded_header + ".." + encoded_signature


def verify_jws_header(header):
    """Check header requirements."""

    if (
        not (
            header["alg"] == "EdDSA"
            and header["b64"] is False
            and isinstance(header["crit"], list)
            and len(header["crit"]) == 1
            and header["crit"][0] == "b64"
        )
        and len(header) == 3
    ):
        raise Exception("Invalid JWS header parameters for Ed25519Signature2018.")


async def jws_verify(verify_data, signature, public_key, wallet):
    """Detatched jws verify handling."""

    encoded_header, _, encoded_signature = signature.partition("..")
    decoded_header = json.loads(b64decode(encoded_header))

    verify_jws_header(decoded_header)

    decoded_signature = b64_to_bytes(encoded_signature, urlsafe=True)

    jws_to_verify = create_jws(encoded_header, verify_data)

    verified = await wallet.verify_message(jws_to_verify, decoded_signature, public_key)

    return verified


async def sign_data_agreement(data_agreement, signature_options, verkey, wallet):
    """Sign data agreement."""

    proof_chain = False
    if (
        "proof" in data_agreement
    ):
        # Detected the document is being signed more than once, 
        # therefore expected output is a proof chain of 2 elements.

        proof_chain = True

    framed, verify_data_hex_string = create_verify_data(data_agreement, signature_options, proof_chain)

    verify_data_bytes = bytes.fromhex(verify_data_hex_string)

    jws = await jws_sign(verify_data_bytes, verkey, wallet)

    if "proofChain" not in data_agreement:
        if not proof_chain:
            # For single proof

            document_with_proof = {**data_agreement, "proof": {**signature_options, "proofValue": jws}}
        else:
            # For proof chain with 2 elements

            old_proof = data_agreement.pop("proof", None)
            new_proof = {**signature_options, "proofValue": jws}
            document_with_proof = {**data_agreement, "proofChain": [old_proof, new_proof]}
    else:

        # For proof chain with more than 2 elements

        document_with_proof = {**data_agreement}
        document_with_proof["proofChain"].append({
            **signature_options, "proofValue": jws
        })

    return document_with_proof


async def verify_data_agreement(doc, verkey, wallet, drop_proof_chain:bool = True):
    """Verify data agreement."""

    proof_chain = False
    old_proof = None
    new_proof = None
    if (
        "proofChain" in doc
    ):
        # Detected the document is being signed more than once, therefore it is a proof chain.
        proof_chain = True

        if drop_proof_chain:
            # For proof chain with 2 elements

            old_proof = doc["proofChain"][0]
            new_proof = doc["proofChain"][1]

            doc.pop("proofChain", None)
            
            doc["proof"] = old_proof
        else:
            # For proof chain with more than 2 elements

            new_proof = doc["proofChain"][-1]

            doc["proofChain"] = doc["proofChain"][:-1]

    framed, verify_data_hex_string = create_verify_data(doc, doc["proof"] if not proof_chain else new_proof, proof_chain)

    verify_data_bytes = bytes.fromhex(verify_data_hex_string)

    valid = await jws_verify(verify_data_bytes, framed["proof"]["proofValue"] if not proof_chain else new_proof["proofValue"], verkey, wallet)

    return valid

async def verify_data_agreement_with_proof_chain(doc: dict = None, verkeys: typing.List[str] = None, wallet: BaseWallet = None) -> bool:

    proof_chain = doc.pop("proofChain", None)
    genesis_proof = proof_chain[0]

    genesis_doc = {**doc, "proof": genesis_proof}
    genesis_doc["event"] = genesis_doc["event"][0]

    current_doc = {**doc, "proofChain": proof_chain}

    genesis_valid = await verify_data_agreement(genesis_doc.copy(), verkeys[0], wallet)

    current_valid = await verify_data_agreement(current_doc.copy(), verkeys[1], wallet)

    return genesis_valid and current_valid

Functions

def b64decode(bytes)

Url Safe B64 Decode.

Expand source code
def b64decode(bytes):
    """Url Safe B64 Decode."""
    return b64_to_str(bytes, urlsafe=True)
def b64encode(str)

Url Safe B64 Encode.

Expand source code
def b64encode(str):
    """Url Safe B64 Encode."""
    return str_to_b64(str, urlsafe=True, pad=False)
def create_jws(encoded_header, verify_data)

Compose JWS.

Expand source code
def create_jws(encoded_header, verify_data):
    """Compose JWS."""
    return (encoded_header + ".").encode("utf-8") + verify_data
def did_mydata(verkey: str) ‑> str

Qualify verkey into DID MyData if need be.

Expand source code
def did_mydata(verkey: str) -> str:
    """Qualify verkey into DID MyData if need be."""

    if verkey.startswith(f"did:mydata:{MULTIBASE_B58_BTC}"):
        return verkey

    return f"did:mydata:{MULTIBASE_B58_BTC}" + bytes_to_b58(
        MULTICODEC_ED25519_PUB + b58_to_bytes(verkey)
    )
async def jws_sign(verify_data, verkey, wallet)

Sign JWS.

Expand source code
async def jws_sign(verify_data, verkey, wallet):
    """Sign JWS."""

    header = {"alg": "EdDSA", "b64": False, "crit": ["b64"]}

    encoded_header = b64encode(json.dumps(header))

    jws_to_sign = create_jws(encoded_header, verify_data)

    signature = await wallet.sign_message(jws_to_sign, verkey)

    encoded_signature = bytes_to_b64(signature, urlsafe=True, pad=False)

    return encoded_header + ".." + encoded_signature
async def jws_verify(verify_data, signature, public_key, wallet)

Detatched jws verify handling.

Expand source code
async def jws_verify(verify_data, signature, public_key, wallet):
    """Detatched jws verify handling."""

    encoded_header, _, encoded_signature = signature.partition("..")
    decoded_header = json.loads(b64decode(encoded_header))

    verify_jws_header(decoded_header)

    decoded_signature = b64_to_bytes(encoded_signature, urlsafe=True)

    jws_to_verify = create_jws(encoded_header, verify_data)

    verified = await wallet.verify_message(jws_to_verify, decoded_signature, public_key)

    return verified
async def sign_data_agreement(data_agreement, signature_options, verkey, wallet)

Sign data agreement.

Expand source code
async def sign_data_agreement(data_agreement, signature_options, verkey, wallet):
    """Sign data agreement."""

    proof_chain = False
    if (
        "proof" in data_agreement
    ):
        # Detected the document is being signed more than once, 
        # therefore expected output is a proof chain of 2 elements.

        proof_chain = True

    framed, verify_data_hex_string = create_verify_data(data_agreement, signature_options, proof_chain)

    verify_data_bytes = bytes.fromhex(verify_data_hex_string)

    jws = await jws_sign(verify_data_bytes, verkey, wallet)

    if "proofChain" not in data_agreement:
        if not proof_chain:
            # For single proof

            document_with_proof = {**data_agreement, "proof": {**signature_options, "proofValue": jws}}
        else:
            # For proof chain with 2 elements

            old_proof = data_agreement.pop("proof", None)
            new_proof = {**signature_options, "proofValue": jws}
            document_with_proof = {**data_agreement, "proofChain": [old_proof, new_proof]}
    else:

        # For proof chain with more than 2 elements

        document_with_proof = {**data_agreement}
        document_with_proof["proofChain"].append({
            **signature_options, "proofValue": jws
        })

    return document_with_proof
async def verify_data_agreement(doc, verkey, wallet, drop_proof_chain: bool = True)

Verify data agreement.

Expand source code
async def verify_data_agreement(doc, verkey, wallet, drop_proof_chain:bool = True):
    """Verify data agreement."""

    proof_chain = False
    old_proof = None
    new_proof = None
    if (
        "proofChain" in doc
    ):
        # Detected the document is being signed more than once, therefore it is a proof chain.
        proof_chain = True

        if drop_proof_chain:
            # For proof chain with 2 elements

            old_proof = doc["proofChain"][0]
            new_proof = doc["proofChain"][1]

            doc.pop("proofChain", None)
            
            doc["proof"] = old_proof
        else:
            # For proof chain with more than 2 elements

            new_proof = doc["proofChain"][-1]

            doc["proofChain"] = doc["proofChain"][:-1]

    framed, verify_data_hex_string = create_verify_data(doc, doc["proof"] if not proof_chain else new_proof, proof_chain)

    verify_data_bytes = bytes.fromhex(verify_data_hex_string)

    valid = await jws_verify(verify_data_bytes, framed["proof"]["proofValue"] if not proof_chain else new_proof["proofValue"], verkey, wallet)

    return valid
async def verify_data_agreement_with_proof_chain(doc: dict = None, verkeys: List[str] = None, wallet: aries_cloudagent.wallet.base.BaseWallet = None) ‑> bool
Expand source code
async def verify_data_agreement_with_proof_chain(doc: dict = None, verkeys: typing.List[str] = None, wallet: BaseWallet = None) -> bool:

    proof_chain = doc.pop("proofChain", None)
    genesis_proof = proof_chain[0]

    genesis_doc = {**doc, "proof": genesis_proof}
    genesis_doc["event"] = genesis_doc["event"][0]

    current_doc = {**doc, "proofChain": proof_chain}

    genesis_valid = await verify_data_agreement(genesis_doc.copy(), verkeys[0], wallet)

    current_valid = await verify_data_agreement(current_doc.copy(), verkeys[1], wallet)

    return genesis_valid and current_valid
def verify_jws_header(header)

Check header requirements.

Expand source code
def verify_jws_header(header):
    """Check header requirements."""

    if (
        not (
            header["alg"] == "EdDSA"
            and header["b64"] is False
            and isinstance(header["crit"], list)
            and len(header["crit"]) == 1
            and header["crit"][0] == "b64"
        )
        and len(header) == 3
    ):
        raise Exception("Invalid JWS header parameters for Ed25519Signature2018.")