diff --git a/ChangeLog.rst b/ChangeLog.rst index 7046f3a..8d83147 100644 --- a/ChangeLog.rst +++ b/ChangeLog.rst @@ -2,6 +2,7 @@ wolfCrypt-py Release next (TBD, 2026) * Drop support for end-of-life Python versions (<= 3.9) * Add extra nonce parameter to Random generator +* The RsaPublic key parameter is now mandatory as it is always needed by an internal function call. wolfCrypt-py Release 5.8.4 (Jan 7, 2026) diff --git a/pyproject.toml b/pyproject.toml index 24606f3..186ec55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ classifiers = [ dynamic = ["version"] dependencies = [ "cffi>=1.0.0,<2", + "typing-extensions", ] [project.urls] @@ -99,7 +100,7 @@ target-version = "py310" # Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. # Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or # McCabe complexity (`C901`) by default. -select = ["E4", "E7", "E9", "F", "B", "UP"] +select = ["E4", "E7", "E9", "F", "B", "UP", "ANN"] ignore = ["UP031", "UP025", "UP032"] # Allow fix for all enabled rules (when `--fix`) is provided. @@ -109,6 +110,10 @@ unfixable = [] # Allow unused variables when underscore-prefixed. dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" +[tool.ruff.lint.per-file-ignores] +"scripts/*.py" = ["ANN"] +"tests/*.py" = ["ANN"] + [tool.ruff.format] # Like Black, use double quotes for strings. quote-style = "double" @@ -135,3 +140,13 @@ docstring-code-format = false # This only has an effect when the `docstring-code-format` setting is # enabled. docstring-code-line-length = "dynamic" + +[tool.ty.environment] +python-version = "3.10" +root = ["."] + +[tool.ty.src] +exclude = ["./lib"] + +[tool.ty.rules] +all = "warn" diff --git a/setup.py b/setup.py index 9682610..3f62796 100755 --- a/setup.py +++ b/setup.py @@ -63,5 +63,5 @@ install_requires=["cffi>=1.0.0,<2"], cffi_modules=["./scripts/build_ffi.py:ffibuilder"], - package_data={"wolfcrypt": ["*.dll", "**/*.pyi"]} + package_data={"wolfcrypt": ["*.dll", "**/*.pyi", "py.typed"]}, ) diff --git a/tests/test_aesgcmstream.py b/tests/test_aesgcmstream.py index c22858b..e6fe07c 100644 --- a/tests/test_aesgcmstream.py +++ b/tests/test_aesgcmstream.py @@ -19,6 +19,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA # pylint: disable=redefined-outer-name +# ty: ignore[possibly-missing-import] from wolfcrypt._ffi import lib as _lib @@ -35,6 +36,7 @@ def test_encrypt(): gcm = AesGcmStream(key, iv) buf = gcm.encrypt("hello world") authTag = gcm.final() + assert authTag is not None assert b2h(authTag) == bytes('ac8fcee96dc6ef8e5236da19b6197d2e', 'utf-8') assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") gcmdec = AesGcmStream(key, iv) @@ -48,6 +50,7 @@ def test_encrypt_short_tag(): gcm = AesGcmStream(key, iv, 12) buf = gcm.encrypt("hello world") authTag = gcm.final() + assert authTag is not None assert b2h(authTag) == bytes('ac8fcee96dc6ef8e5236da19', 'utf-8') assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") gcmdec = AesGcmStream(key, iv, 12) @@ -62,6 +65,7 @@ def test_multipart(): buf = gcm.encrypt("hello") buf += gcm.encrypt(" world") authTag = gcm.final() + assert authTag is not None assert b2h(authTag) == bytes('ac8fcee96dc6ef8e5236da19b6197d2e', 'utf-8') assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") gcmdec = AesGcmStream(key, iv) @@ -78,6 +82,7 @@ def test_encrypt_aad(): gcm.set_aad(aad) buf = gcm.encrypt("hello world") authTag = gcm.final() + assert authTag is not None print(b2h(authTag)) assert b2h(authTag) == bytes('8f85338aa0b13f48f8b17482dbb8acca', 'utf-8') assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") @@ -96,6 +101,7 @@ def test_multipart_aad(): buf = gcm.encrypt("hello") buf += gcm.encrypt(" world") authTag = gcm.final() + assert authTag is not None assert b2h(authTag) == bytes('8f85338aa0b13f48f8b17482dbb8acca', 'utf-8') assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") gcmdec = AesGcmStream(key, iv) @@ -114,6 +120,7 @@ def test_encrypt_aad_bad(): gcm.set_aad(aad) buf = gcm.encrypt("hello world") authTag = gcm.final() + assert authTag is not None print(b2h(authTag)) assert b2h(authTag) == bytes('8f85338aa0b13f48f8b17482dbb8acca', 'utf-8') assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") @@ -142,6 +149,7 @@ def test_invalid_tag_bytes(): gcm = AesGcmStream(key, iv, tag_bytes=good) gcm.encrypt("hello world") tag = gcm.final() + assert tag is not None assert len(tag) == good def test_decrypt_rejects_wrong_tag_length(): @@ -150,6 +158,7 @@ def test_decrypt_rejects_wrong_tag_length(): gcm = AesGcmStream(key, iv, tag_bytes=16) buf = gcm.encrypt("hello world") authTag = gcm.final() + assert authTag is not None assert len(authTag) == 16 # Truncated tag: would silently lower the verification window to diff --git a/tests/test_asn.py b/tests/test_asn.py index a26b853..6ef4f6e 100644 --- a/tests/test_asn.py +++ b/tests/test_asn.py @@ -19,6 +19,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA # pylint: disable=redefined-outer-name +# ty: ignore[possibly-missing-import] from collections import namedtuple import pytest @@ -90,8 +91,8 @@ def signature_vectors(): "51a4edaf9f1199f93e448482f27c43a53e0bc65b04e9848128e3" "60314e864190e6bb9812bfbf4b40994f2c1d4ca7aad9"), hash_cls=Sha256, - pub_key=RsaPublic.from_pem(pub_key_pem), - priv_key=RsaPrivate.from_pem(priv_key_pem) + pub_key=RsaPublic.from_pem(pub_key_pem), # ty: ignore[possibly-missing-attribute] + priv_key=RsaPrivate.from_pem(priv_key_pem) # ty: ignore[possibly-missing-attribute] )) return vectors diff --git a/tests/test_chacha20poly1305.py b/tests/test_chacha20poly1305.py index 07616ea..fe691ce 100644 --- a/tests/test_chacha20poly1305.py +++ b/tests/test_chacha20poly1305.py @@ -28,7 +28,7 @@ from wolfcrypt.utils import t2b from wolfcrypt.exceptions import WolfCryptError from binascii import unhexlify as h2b - from wolfcrypt.ciphers import ChaCha20Poly1305 + from wolfcrypt.ciphers import ChaCha20Poly1305 # ty: ignore[possibly-missing-import] def test_encrypt_decrypt(): key = h2b("808182838485868788898a8b8c8d8e8f909192939495969798999a9b9c9d9e9f") diff --git a/tests/test_ciphers.py b/tests/test_ciphers.py index 9e4b3f2..e04dd9c 100644 --- a/tests/test_ciphers.py +++ b/tests/test_ciphers.py @@ -19,6 +19,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA # pylint: disable=redefined-outer-name +# ty: ignore[possibly-missing-import] import os import random @@ -60,10 +61,8 @@ @pytest.fixture def vectors(): - TestVector = namedtuple("TestVector", """key iv plaintext ciphertext - ciphertext_ctr raw_key - pkcs8_key pem""") - TestVector.__new__.__defaults__ = (None,) * len(TestVector._fields) + fields = ("key", "iv", "plaintext", "ciphertext", "ciphertext_ctr", "raw_key", "pkcs8_key", "pem") + TestVector = namedtuple("TestVector", fields, defaults=(None,) * len(fields)) # test vector dictionary vectorArray = {} @@ -75,19 +74,19 @@ def vectors(): plaintext=t2b("now is the time "), ciphertext=h2b("959492575f4281532ccc9d4677a233cb"), ciphertext_ctr = h2b('287528ddf484b1055debbe751eb52b8a') - ) + ) # ty: ignore[missing-argument] if _lib.CHACHA_ENABLED: vectorArray[ChaCha]=TestVector( key="0123456789abcdef01234567890abcdef", iv="1234567890abcdef", - ) + ) # ty: ignore[missing-argument] if _lib.DES3_ENABLED: vectorArray[Des3]=TestVector( key=h2b("0123456789abcdeffedeba987654321089abcdef01234567"), iv=h2b("1234567890abcdef"), plaintext=t2b("Now is the time for all "), ciphertext=h2b("43a0297ed184f80e8964843212d508981894157487127db0") - ) + ) # ty: ignore[missing-argument] if _lib.RSA_ENABLED: vectorArray[RsaPublic]=TestVector( key=h2b( @@ -98,7 +97,7 @@ def vectors(): "0E22E96BA426BA4CE8C1FD4A6F2B1FEF8AAEF69062E5641EEB2B3C67C8DC" "2700F6916865A90203010001"), pem=os.path.join(certs_dir, "server-keyPub.pem") - ) + ) # ty: ignore[missing-argument] vectorArray[RsaPrivate]=TestVector( key=h2b( "3082025C02010002818100BC730EA849F374A2A9EF18A5DA559921F9C8EC" @@ -164,7 +163,7 @@ def vectors(): "1666d37c742b15b4a2febf086b1a5d3f" "9012b105863129dbd9e2"), pem=os.path.join(certs_dir, "server-key.pem") - ) + ) # ty: ignore[missing-argument] if _lib.ECC_ENABLED: vectorArray[EccPublic]=TestVector( @@ -178,7 +177,7 @@ def vectors(): "55bff40f44509a3dce9bb7f0c54df5707bd4ec248e1980ec5a4ca22403622c9b" "daefa2351243847616c6569506cc01a9bdf6751a42f7bda9b236225fc75d7fb4" ) - ) + ) # ty: ignore[missing-argument] vectorArray[EccPrivate]=TestVector( key=h2b( "30770201010420F8CF926BBD1E28F1A8ABA1234F3274188850AD7EC7EC92" @@ -192,7 +191,7 @@ def vectors(): "daefa2351243847616c6569506cc01a9bdf6751a42f7bda9b236225fc75d7fb4" "f8cf926bbd1e28f1a8aba1234f3274188850ad7ec7ec92f88f974daf568965c7" ) - ) + ) # ty: ignore[missing-argument] if _lib.ED25519_ENABLED: vectorArray[Ed25519Private]=TestVector( @@ -200,33 +199,33 @@ def vectors(): "47CD22B276161AA18BA1E0D13DBE84FE4840E4395D784F555A92E8CF739B" "F86B" ) - ) + ) # ty: ignore[missing-argument] vectorArray[Ed25519Public]=TestVector( key=h2b( "8498C65F4841145F9C51E8BFF4504B5527E0D5753964B7CB3C707A2B9747" "FC96" ) - ) + ) # ty: ignore[missing-argument] if _lib.ED448_ENABLED: vectorArray[Ed448Private]=TestVector( key=h2b("c2b29804e9a893c9e275cac1f8a3033f3d4b78b79eb427ed359fdeb8" "82d657c129c7930936b181971b795167ad18cabeeb52b59b94f115ad" "59" ) - ) + ) # ty: ignore[missing-argument] vectorArray[Ed448Public]=TestVector( key=h2b("89fb2b5a5ab67dd317794cc5f1700cace295b043f3ad73a66299e10a" "d3fc0a28289ddd1c641598a354113867a42e82ad844b4d858d92e4e7" "80" ) - ) + ) # ty: ignore[missing-argument] return vectorArray algo_params = [] if _lib.AES_ENABLED: - algo_params.append(Aes) + algo_params.append(Aes) # ty: ignore[possibly-unresolved-reference] if _lib.DES3_ENABLED: - algo_params.append(Des3) + algo_params.append(Des3) # ty: ignore[possibly-unresolved-reference] @pytest.fixture(params=algo_params) def cipher_cls(request): @@ -320,7 +319,7 @@ def chacha_obj(vectors): return r @pytest.fixture - def test_chacha_enc_dec(chacha_obj): + def test_chacha_enc_dec(chacha_obj, vectors): plaintext = t2b("Everyone gets Friday off.") cyt = chacha_obj.encrypt(plaintext) chacha_obj.set_iv(vectors[ChaCha].iv) @@ -372,25 +371,25 @@ def rsa_public_pss(vectors): def rsa_private_pem(vectors): with open(vectors[RsaPrivate].pem, "rb") as f: pem = f.read() - return RsaPrivate.from_pem(pem) + return RsaPrivate.from_pem(pem) # ty: ignore[possibly-missing-attribute] @pytest.fixture def rsa_public_pem(vectors): with open(vectors[RsaPublic].pem, "rb") as f: pem = f.read() - return RsaPublic.from_pem(pem) + return RsaPublic.from_pem(pem) # ty: ignore[possibly-missing-attribute] @pytest.fixture def rsa_private_pem_rng(vectors, rng): with open(vectors[RsaPrivate].pem, "rb") as f: pem = f.read() - return RsaPrivate.from_pem(pem, rng=rng) + return RsaPrivate.from_pem(pem, rng=rng) # ty: ignore[possibly-missing-attribute] @pytest.fixture def rsa_public_pem_rng(vectors, rng): with open(vectors[RsaPublic].pem, "rb") as f: pem = f.read() - return RsaPublic.from_pem(pem, rng=rng) + return RsaPublic.from_pem(pem, rng=rng) # ty: ignore[possibly-missing-attribute] def test_new_rsa_raises(vectors): with pytest.raises(WolfCryptError): @@ -401,7 +400,7 @@ def test_new_rsa_raises(vectors): if _lib.KEYGEN_ENABLED: with pytest.raises(WolfCryptError): # invalid key size - RsaPrivate.make_key(16384) + RsaPrivate.make_key(16384) # ty: ignore[possibly-missing-attribute] def test_rsa_encrypt_decrypt(rsa_private, rsa_public): diff --git a/tests/test_delete_descriptor_binding.py b/tests/test_delete_descriptor_binding.py index 963e844..862fa1f 100644 --- a/tests/test_delete_descriptor_binding.py +++ b/tests/test_delete_descriptor_binding.py @@ -55,39 +55,39 @@ def _static_attrs(): yield Random, "_delete" if _lib.SHA_ENABLED: - from wolfcrypt.hashes import Sha + from wolfcrypt.hashes import Sha # ty: ignore[possibly-missing-import] yield Sha, "_delete" yield Sha, "_copy" if _lib.SHA256_ENABLED: - from wolfcrypt.hashes import Sha256 + from wolfcrypt.hashes import Sha256 # ty: ignore[possibly-missing-import] yield Sha256, "_delete" yield Sha256, "_copy" if _lib.SHA384_ENABLED: - from wolfcrypt.hashes import Sha384 + from wolfcrypt.hashes import Sha384 # ty: ignore[possibly-missing-import] yield Sha384, "_delete" yield Sha384, "_copy" if _lib.SHA512_ENABLED: - from wolfcrypt.hashes import Sha512 + from wolfcrypt.hashes import Sha512 # ty: ignore[possibly-missing-import] yield Sha512, "_delete" yield Sha512, "_copy" if _lib.HMAC_ENABLED: - from wolfcrypt.hashes import _Hmac + from wolfcrypt.hashes import _Hmac # ty: ignore[possibly-missing-import] yield _Hmac, "_delete" if _lib.AESGCM_STREAM_ENABLED: - from wolfcrypt.ciphers import AesGcmStream + from wolfcrypt.ciphers import AesGcmStream # ty: ignore[possibly-missing-import] yield AesGcmStream, "_delete" if _lib.RSA_ENABLED: - from wolfcrypt.ciphers import _Rsa + from wolfcrypt.ciphers import _Rsa # ty: ignore[possibly-missing-import] yield _Rsa, "_delete" if _lib.ECC_ENABLED: - from wolfcrypt.ciphers import _Ecc + from wolfcrypt.ciphers import _Ecc # ty: ignore[possibly-missing-import] yield _Ecc, "_delete" if _lib.ED25519_ENABLED: - from wolfcrypt.ciphers import _Ed25519 + from wolfcrypt.ciphers import _Ed25519 # ty: ignore[possibly-missing-import] yield _Ed25519, "_delete" if _lib.ED448_ENABLED: - from wolfcrypt.ciphers import _Ed448 + from wolfcrypt.ciphers import _Ed448 # ty: ignore[possibly-missing-import] yield _Ed448, "_delete" @@ -173,7 +173,6 @@ def recorder(*args, **kwargs): r = Random() native = r.native_object r.__del__() - r.native_object = None # prevent real cleanup on the way out assert received, "recorder was never called" args, kwargs = received[-1] assert kwargs == {} diff --git a/tests/test_hashes.py b/tests/test_hashes.py index 3fcf78d..75d6420 100644 --- a/tests/test_hashes.py +++ b/tests/test_hashes.py @@ -19,6 +19,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA # pylint: disable=redefined-outer-name +# ty: ignore[possibly-missing-import] from collections import namedtuple import pytest @@ -116,26 +117,26 @@ def vectors(): hash_params = [] if _lib.SHA_ENABLED: - hash_params.append(Sha) + hash_params.append(Sha) # ty: ignore[possibly-unresolved-reference] if _lib.SHA256_ENABLED: - hash_params.append(Sha256) + hash_params.append(Sha256) # ty: ignore[possibly-unresolved-reference] if _lib.SHA384_ENABLED: - hash_params.append(Sha384) + hash_params.append(Sha384) # ty: ignore[possibly-unresolved-reference] if _lib.SHA512_ENABLED: - hash_params.append(Sha512) + hash_params.append(Sha512) # ty: ignore[possibly-unresolved-reference] if _lib.SHA3_ENABLED: - hash_params.append(Sha3) + hash_params.append(Sha3) # ty: ignore[possibly-unresolved-reference] hmac_params = [] if _lib.HMAC_ENABLED: if _lib.SHA_ENABLED: - hmac_params.append(HmacSha) + hmac_params.append(HmacSha) # ty: ignore[possibly-unresolved-reference] if _lib.SHA256_ENABLED: - hmac_params.append(HmacSha256) + hmac_params.append(HmacSha256) # ty: ignore[possibly-unresolved-reference] if _lib.SHA384_ENABLED: - hmac_params.append(HmacSha384) + hmac_params.append(HmacSha384) # ty: ignore[possibly-unresolved-reference] if _lib.SHA512_ENABLED: - hmac_params.append(HmacSha512) + hmac_params.append(HmacSha512) # ty: ignore[possibly-unresolved-reference] @pytest.fixture(params=(hash_params + hmac_params)) def hash_cls(request): diff --git a/tests/test_hkdf.py b/tests/test_hkdf.py index 7c215c9..26ecb12 100644 --- a/tests/test_hkdf.py +++ b/tests/test_hkdf.py @@ -19,6 +19,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA # pylint: disable=redefined-outer-name +# ty: ignore[possibly-missing-import] import pytest diff --git a/tests/test_mldsa.py b/tests/test_mldsa.py index 023b734..7c195de 100644 --- a/tests/test_mldsa.py +++ b/tests/test_mldsa.py @@ -19,6 +19,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA # pylint: disable=redefined-outer-name +# ty: ignore[possibly-missing-import] from wolfcrypt._ffi import lib as _lib @@ -183,7 +184,7 @@ def test_sign_with_seed(mldsa_type, rng): # test that the seed type is checked (should be bytes-like, not string) with pytest.raises(TypeError): - _ = mldsa_priv.sign_with_seed(message, "") + _ = mldsa_priv.sign_with_seed(message, "") # ty: ignore[invalid-argument-type] def test_sign_with_seed_and_context(mldsa_type, rng): signature_seed = rng.bytes(ML_DSA_SIGNATURE_SEED_LENGTH) diff --git a/tests/test_mlkem.py b/tests/test_mlkem.py index c84af8d..515b2f6 100644 --- a/tests/test_mlkem.py +++ b/tests/test_mlkem.py @@ -19,6 +19,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA # pylint: disable=redefined-outer-name +# ty: ignore[possibly-missing-import] from wolfcrypt._ffi import lib as _lib diff --git a/tests/test_pwdbased.py b/tests/test_pwdbased.py index 0d2d1ca..18a07ab 100644 --- a/tests/test_pwdbased.py +++ b/tests/test_pwdbased.py @@ -19,6 +19,8 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA # pylint: disable=redefined-outer-name +# ty: ignore[possibly-missing-import] + from collections import namedtuple import pytest from wolfcrypt._ffi import lib as _lib diff --git a/wolfcrypt/__init__.py b/wolfcrypt/__init__.py index 99a27e8..c2514df 100644 --- a/wolfcrypt/__init__.py +++ b/wolfcrypt/__init__.py @@ -17,6 +17,9 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA +from typing import cast + +from _cffi_backend import Lib from wolfcrypt._version import __version__, __wolfssl_version__ @@ -50,7 +53,7 @@ if hasattr(_lib, 'WC_RNG_SEED_CB_ENABLED'): if _lib.WC_RNG_SEED_CB_ENABLED: - ret = _lib.wc_SetSeed_Cb(_ffi.addressof(_lib, "wc_GenerateSeed")) + ret = _lib.wc_SetSeed_Cb(_ffi.addressof(cast(Lib, _lib), "wc_GenerateSeed")) if ret < 0: raise WolfCryptApiError("wc_SetSeed_Cb failed", ret) if _lib.FIPS_ENABLED and _lib.FIPS_VERSION >= 5: diff --git a/wolfcrypt/_ffi/__init__.pyi b/wolfcrypt/_ffi/__init__.pyi index f3a1ba1..6617e32 100644 --- a/wolfcrypt/_ffi/__init__.pyi +++ b/wolfcrypt/_ffi/__init__.pyi @@ -1,3 +1,23 @@ +# __init__.pyi +# +# Copyright (C) 2026 wolfSSL Inc. +# +# This file is part of wolfSSL. (formerly known as CyaSSL) +# +# wolfSSL is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# wolfSSL is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + import _cffi_backend import wolfcrypt._ffi.lib as lib diff --git a/wolfcrypt/_ffi/lib.pyi b/wolfcrypt/_ffi/lib.pyi index 047e5cf..b104955 100644 --- a/wolfcrypt/_ffi/lib.pyi +++ b/wolfcrypt/_ffi/lib.pyi @@ -1,3 +1,22 @@ +# lib.pyi +# +# Copyright (C) 2026 wolfSSL Inc. +# +# This file is part of wolfSSL. (formerly known as CyaSSL) +# +# wolfSSL is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# wolfSSL is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA from _cffi_backend import FFI from typing import TypeAlias @@ -15,6 +34,7 @@ DES3_ENABLED: int ECC_ENABLED: int ED25519_ENABLED: int ED448_ENABLED: int +ERROR_STRINGS_ENABLED: int FIPS_ENABLED: int HMAC_ENABLED: int KEYGEN_ENABLED: int @@ -24,6 +44,7 @@ ML_KEM_ENABLED: int MPAPI_ENABLED: int PWDBASED_ENABLED: int RSA_ENABLED: int +RSA_BLINDING_ENABLED: int RSA_PSS_ENABLED: int SHA_ENABLED: int SHA3_ENABLED: int @@ -32,6 +53,224 @@ SHA384_ENABLED: int SHA512_ENABLED: int WC_RNG_SEED_CB_ENABLED: int +# Error codes +WC_FAILURE: int +MAX_CODE_E: int +WC_FIRST_E: int +WC_SPAN1_FIRST_E: int +MP_MEM: int +MP_VAL: int +MP_WOULDBLOCK: int +MP_NOT_INF: int +OPEN_RAN_E: int +READ_RAN_E: int +WINCRYPT_E: int +CRYPTGEN_E: int +RAN_BLOCK_E: int +BAD_MUTEX_E: int +WC_TIMEOUT_E: int +WC_PENDING_E: int +WC_NO_PENDING_E: int +MP_INIT_E: int +MP_READ_E: int +MP_EXPTMOD_E: int +MP_TO_E: int +MP_SUB_E: int +MP_ADD_E: int +MP_MUL_E: int +MP_MULMOD_E: int +MP_MOD_E: int +MP_INVMOD_E: int +MP_CMP_E: int +MP_ZERO_E: int +AES_EAX_AUTH_E: int +KEY_EXHAUSTED_E: int +MEMORY_E: int +VAR_STATE_CHANGE_E: int +FIPS_DEGRADED_E: int +FIPS_CODE_SZ_E: int +FIPS_DATA_SZ_E: int +RSA_WRONG_TYPE_E: int +RSA_BUFFER_E: int +BUFFER_E: int +ALGO_ID_E: int +PUBLIC_KEY_E: int +DATE_E: int +SUBJECT_E: int +ISSUER_E: int +CA_TRUE_E: int +EXTENSIONS_E: int +ASN_PARSE_E: int +ASN_VERSION_E: int +ASN_GETINT_E: int +ASN_RSA_KEY_E: int +ASN_OBJECT_ID_E: int +ASN_TAG_NULL_E: int +ASN_EXPECT_0_E: int +ASN_BITSTR_E: int +ASN_UNKNOWN_OID_E: int +ASN_DATE_SZ_E: int +ASN_BEFORE_DATE_E: int +ASN_AFTER_DATE_E: int +ASN_SIG_OID_E: int +ASN_TIME_E: int +ASN_INPUT_E: int +ASN_SIG_CONFIRM_E: int +ASN_SIG_HASH_E: int +ASN_SIG_KEY_E: int +ASN_DH_KEY_E: int +KDF_SRTP_KAT_FIPS_E: int +ASN_CRIT_EXT_E: int +ASN_ALT_NAME_E: int +ASN_NO_PEM_HEADER: int +ED25519_KAT_FIPS_E: int +ED448_KAT_FIPS_E: int +PBKDF2_KAT_FIPS_E: int +WC_KEY_MISMATCH_E: int +ECC_BAD_ARG_E: int +ASN_ECC_KEY_E: int +ECC_CURVE_OID_E: int +BAD_FUNC_ARG: int +NOT_COMPILED_IN: int +UNICODE_SIZE_E: int +NO_PASSWORD: int +ALT_NAME_E: int +BAD_OCSP_RESPONDER: int +CRL_CERT_DATE_ERR: int +AES_GCM_AUTH_E: int +AES_CCM_AUTH_E: int +ASYNC_INIT_E: int +COMPRESS_INIT_E: int +COMPRESS_E: int +DECOMPRESS_INIT_E: int +DECOMPRESS_E: int +BAD_ALIGN_E: int +ASN_NO_SIGNER_E: int +ASN_CRL_CONFIRM_E: int +ASN_CRL_NO_SIGNER_E: int +ASN_OCSP_CONFIRM_E: int +BAD_STATE_E: int +BAD_PADDING_E: int +REQ_ATTRIBUTE_E: int +PKCS7_OID_E: int +PKCS7_RECIP_E: int +FIPS_NOT_ALLOWED_E: int +ASN_NAME_INVALID_E: int +RNG_FAILURE_E: int +HMAC_MIN_KEYLEN_E: int +RSA_PAD_E: int +LENGTH_ONLY_E: int +IN_CORE_FIPS_E: int +AES_KAT_FIPS_E: int +DES3_KAT_FIPS_E: int +HMAC_KAT_FIPS_E: int +RSA_KAT_FIPS_E: int +DRBG_KAT_FIPS_E: int +DRBG_CONT_FIPS_E: int +AESGCM_KAT_FIPS_E: int +THREAD_STORE_KEY_E: int +THREAD_STORE_SET_E: int +MAC_CMP_FAILED_E: int +IS_POINT_E: int +ECC_INF_E: int +ECC_PRIV_KEY_E: int +ECC_OUT_OF_RANGE_E: int +SRP_CALL_ORDER_E: int +SRP_VERIFY_E: int +SRP_BAD_KEY_E: int +ASN_NO_SKID: int +ASN_NO_AKID: int +ASN_NO_KEYUSAGE: int +SKID_E: int +AKID_E: int +KEYUSAGE_E: int +CERTPOLICIES_E: int +WC_INIT_E: int +SIG_VERIFY_E: int +BAD_COND_E: int +SIG_TYPE_E: int +HASH_TYPE_E: int +FIPS_INVALID_VER_E: int +WC_KEY_SIZE_E: int +ASN_COUNTRY_SIZE_E: int +MISSING_RNG_E: int +ASN_PATHLEN_SIZE_E: int +ASN_PATHLEN_INV_E: int +BAD_KEYWRAP_ALG_E: int +BAD_KEYWRAP_IV_E: int +WC_CLEANUP_E: int +ECC_CDH_KAT_FIPS_E: int +DH_CHECK_PUB_E: int +BAD_PATH_ERROR: int +ASYNC_OP_E: int +ECC_PRIVATEONLY_E: int +EXTKEYUSAGE_E: int +WC_HW_E: int +WC_HW_WAIT_E: int +PSS_SALTLEN_E: int +PRIME_GEN_E: int +BER_INDEF_E: int +RSA_OUT_OF_RANGE_E: int +RSAPSS_PAT_FIPS_E: int +ECDSA_PAT_FIPS_E: int +DH_KAT_FIPS_E: int +AESCCM_KAT_FIPS_E: int +SHA3_KAT_FIPS_E: int +ECDHE_KAT_FIPS_E: int +AES_GCM_OVERFLOW_E: int +AES_CCM_OVERFLOW_E: int +RSA_KEY_PAIR_E: int +DH_CHECK_PRIV_E: int +WC_AFALG_SOCK_E: int +WC_DEVCRYPTO_E: int +ZLIB_INIT_ERROR: int +ZLIB_COMPRESS_ERROR: int +ZLIB_DECOMPRESS_ERROR: int +PKCS7_NO_SIGNER_E: int +WC_PKCS7_WANT_READ_E: int +CRYPTOCB_UNAVAILABLE: int +PKCS7_SIGNEEDS_CHECK: int +PSS_SALTLEN_RECOVER_E: int +CHACHA_POLY_OVERFLOW: int +ASN_SELF_SIGNED_E: int +SAKKE_VERIFY_FAIL_E: int +MISSING_IV: int +MISSING_KEY: int +BAD_LENGTH_E: int +ECDSA_KAT_FIPS_E: int +RSA_PAT_FIPS_E: int +KDF_TLS12_KAT_FIPS_E: int +KDF_TLS13_KAT_FIPS_E: int +KDF_SSH_KAT_FIPS_E: int +DHE_PCT_E: int +ECC_PCT_E: int +FIPS_PRIVATE_KEY_LOCKED_E: int +PROTOCOLCB_UNAVAILABLE: int +AES_SIV_AUTH_E: int +NO_VALID_DEVID: int +IO_FAILED_E: int +SYSLIB_FAILED_E: int +USE_HW_PSK: int +ENTROPY_RT_E: int +ENTROPY_APT_E: int +ASN_DEPTH_E: int +ASN_LEN_E: int +SM4_GCM_AUTH_E: int +SM4_CCM_AUTH_E: int +WC_SPAN1_LAST_E: int +WC_SPAN1_MIN_CODE_E: int +WC_SPAN2_FIRST_E: int +DEADLOCK_AVERTED_E: int +ASCON_AUTH_E: int +WC_ACCEL_INHIBIT_E: int +BAD_INDEX_E: int +INTERRUPTED_E: int +WC_SPAN2_LAST_E: int +WC_LAST_E: int +WC_SPAN2_MIN_CODE_E: int +MIN_CODE_E: int +# end of the error codes + FIPS_VERSION: int WC_MGF1NONE: int @@ -68,9 +307,261 @@ WC_ML_DSA_87: int WC_KEYTYPE_ALL: int +PRIVATEKEY_TYPE: int +PUBLICKEY_TYPE: int +CERT_TYPE: int +MAX_DER_DIGEST_SZ: int +SHAh: int +SHA256h: int +SHA384h: int +SHA512h: int + +DILITHIUM_SEED_SZ: int +ECC_TIMING_RESISTANCE_ENABLED: int +WC_RSA_OAEP_PAD: int + RNG: TypeAlias = FFI.CData +def wc_SetSeed_Cb(cb: FFI.CData) -> int: ... +def wolfCrypt_SetPrivateKeyReadEnable_fips(enable: int, key_type: int) -> int: ... +def wc_GetErrorString(error: int) -> FFI.CData: ... + def wc_InitRngNonce_ex(rng: RNG, nonce: bytes, nonce_size: int, heap: FFI.CData, device_id: int) -> int: ... def wc_RNG_GenerateByte(rng: RNG, buffer: FFI.CData) -> int: ... def wc_RNG_GenerateBlock(rng: RNG, buffer: FFI.CData, len: int) -> int: ... def wc_FreeRng(rng: RNG) -> None: ... + +DerBufferPtr: TypeAlias = FFI.CData +EncryptedInfo: TypeAlias = FFI.CData +IntPtr: TypeAlias = FFI.CData +BytePtr: TypeAlias = FFI.CData + +def wc_PemToDer(buff: bytes, buf_size: int, type: int, der: DerBufferPtr, heap: FFI.CData, info: EncryptedInfo, + key_format: IntPtr) -> int: ... +def wc_DerToPemEx(der: bytes, der_size: int, output: FFI.CData, output_size: int, cipher_info: FFI.CData, + type: int) -> int: ... +def wc_FreeDer(der: DerBufferPtr) -> None: ... +def wc_EncodeSignature(out: BytePtr, digest: bytes, digest_size: int, hash_oid: int) -> int: ... + +def wc_PBKDF2(out: BytePtr, passwd: bytes, pass_len: int, salt: bytes, salt_len: int, iterations: int, keylen: int, + hash_type: int) -> int: ... + +def wc_InitSha(obj: FFI.CData) -> int: ... +def wc_ShaCopy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_ShaUpdate(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_ShaFinal(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_ShaFree(obj: FFI.CData) -> None: ... + +def wc_InitSha256(obj: FFI.CData) -> int: ... +def wc_Sha256Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha256Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha256Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha256Free(obj: FFI.CData) -> None: ... + +def wc_InitSha384(obj: FFI.CData) -> int: ... +def wc_Sha384Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha384Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha384Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha384Free(obj: FFI.CData) -> None: ... + +def wc_InitSha512(obj: FFI.CData) -> int: ... +def wc_Sha512Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha512Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha512Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha512Free(obj: FFI.CData) -> None: ... + +def wc_InitSha3_224(obj: FFI.CData, heap: FFI.CData, dev_id: int) -> int: ... +def wc_Sha3_224_Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha3_224_Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha3_224_Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha3_224_Free(obj: FFI.CData) -> None: ... + +def wc_InitSha3_256(obj: FFI.CData, heap: FFI.CData, dev_id: int) -> int: ... +def wc_Sha3_256_Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha3_256_Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha3_256_Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha3_256_Free(obj: FFI.CData) -> None: ... + +def wc_InitSha3_384(obj: FFI.CData, heap: FFI.CData, dev_id: int) -> int: ... +def wc_Sha3_384_Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha3_384_Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha3_384_Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha3_384_Free(obj: FFI.CData) -> None: ... + +def wc_InitSha3_512(obj: FFI.CData, heap: FFI.CData, dev_id: int) -> int: ... +def wc_Sha3_512_Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha3_512_Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha3_512_Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha3_512_Free(obj: FFI.CData) -> None: ... + +def wc_HmacInit(hmac: FFI.CData, heap: FFI.CData, dev_id: int) -> int: ... +def wc_HmacSetKey(hmac: FFI.CData, type: int, key: bytes, length: int) -> int: ... +def wc_HmacUpdate(hmac: FFI.CData, data: bytes, size: int) -> int: ... +def wc_HmacFinal(hmac: FFI.CData, hash: FFI.CData) -> int: ... +def wc_HmacFree(hmac: FFI.CData) -> None: ... + +def wc_HKDF(type: int, in_key: bytes, in_key_size: int, salt: bytes, salt_size: int, info: bytes, info_size: int, out: FFI.CData, out_size: int) -> int: ... +def wc_HKDF_Extract(type: int, salt: bytes, salt_size: int, in_key: bytes, in_key_size: int, out: FFI.CData) -> int: ... +def wc_HKDF_Expand(type: int, in_key: bytes, in_key_size: int, info: bytes, info_size: int, out: FFI.CData, out_size: int) -> int: ... + +CAes: TypeAlias = FFI.CData +CAesSivAssoc: TypeAlias = FFI.CData + +def wc_AesInit(aes: CAes, heap: FFI.CData, dev_id: int) -> int: ... +def wc_AesFree(aes: CAes) -> int: ... +def wc_AesSetKey(aes: CAes, key: bytes, len: int, iv: bytes, dir: int) -> int: ... +def wc_AesCbcEncrypt(aes: CAes, ciphertext_out: BytePtr, plaintext_in: bytes, plaintext_len: int) -> int: ... +def wc_AesCbcDecrypt(aes: CAes, plaintext_out: BytePtr, ciphertext_in: bytes, ciphertext_len: int) -> int: ... +def wc_AesCtrEncrypt(aes: CAes, ciphertext_out: BytePtr, plaintext_in: bytes, plaintext_len: int) -> int: ... +def wc_AesSivEncrypt_ex(key: bytes, key_len: int, assoc: CAesSivAssoc, num_assoc: int, nonce: bytes, nonce_len: int, plaintext: bytes, plaintext_len: int, siv: BytePtr, ciphertext: BytePtr) -> int: ... +def wc_AesSivDecrypt_ex(key: bytes, key_len: int, assoc: CAesSivAssoc, num_assoc: int, nonce: bytes, nonce_len: int, ciphertext: bytes, ciphertext_len: int, siv: bytes, plaintext: BytePtr) -> int: ... +def wc_AesGcmInit(aes: CAes, key: bytes, key_len: int, iv: bytes, iv_len: int) -> int: ... +def wc_AesGcmEncryptUpdate(aes: CAes, ciphertext: BytePtr, plaintext: bytes, plaintext_len: int, auth: bytes, auth_len: int) -> int: ... +def wc_AesGcmEncryptFinal(aes: CAes, auth_tag: BytePtr, auth_tag_len: int) -> int: ... +def wc_AesGcmDecryptUpdate(aes: CAes, plaintext: BytePtr, ciphertext: bytes, ciphertext_len: int, auth: bytes, auth_len: int) -> int: ... +def wc_AesGcmDecryptFinal(aes: CAes, auth: bytes, auth_len: int) -> int: ... + +ChaCha: TypeAlias = FFI.CData + +def wc_Chacha_SetKey(ctx: ChaCha, key: bytes, key_len: int) -> int: ... +def wc_Chacha_SetIV(ctx: ChaCha, iv: bytes, counter: int) -> int: ... +def wc_Chacha_Process(ctx: ChaCha, cipher: BytePtr, plain: bytes, msg_len: int) -> int: ... +def wc_ChaCha20Poly1305_Encrypt(key: bytes, iv: bytes, aad: bytes, aad_len: int, plaintext: bytes, plaintext_len: int, cipher_out: BytePtr, auth_tag: BytePtr) -> int: ... +def wc_ChaCha20Poly1305_Decrypt(key: bytes, iv: bytes, aad: bytes, aad_len: int, ciphertext: bytes, ciphertext_len: int, auth_tag: bytes, plaintext_out: BytePtr) -> int: ... + +DesKey: TypeAlias = FFI.CData + +def wc_Des3_SetKey(des: DesKey, key: bytes, iv: bytes, dir: int) -> int: ... +def wc_Des3_CbcEncrypt(des: DesKey, out: BytePtr, message: bytes, message_len: int) -> int: ... +def wc_Des3_CbcDecrypt(des: DesKey, out: BytePtr, encrypted: bytes, encrypted_len: int) -> int: ... + +RsaKey: TypeAlias = FFI.CData + +def wc_InitRsaKey(key: RsaKey, heap: FFI.CData) -> int: ... +def wc_RsaSetRNG(key: RsaKey, rng: RNG) -> int: ... +def wc_FreeRsaKey(key: RsaKey) -> int: ... +def wc_RsaPublicKeyDecode(input: bytes, in_out_idx: IntPtr, key: RsaKey, input_len:int ) -> int: ... +def wc_RsaPublicEncrypt(plaintext: bytes, plaintext_len: int, out: BytePtr, out_len: int, key: RsaKey, rng: RNG) -> int: ... +def wc_RsaPublicEncrypt_ex(plaintext: bytes, plaintext_len: int, out: BytePtr, out_len: int, key: RsaKey, rng: RNG, pad_type: int, hash_type: int, mgf: int, label: bytes | BytePtr, label_len: int) -> int: ... +def wc_RsaSSL_Verify(signature: bytes, signature_len: int, out: BytePtr, out_len: int, key: RsaKey) -> int: ... +def wc_RsaPSS_Verify(signature: bytes, signature_len: int, out: BytePtr, out_len: int, hash_type: int, mgf: int, key: RsaKey) -> int: ... +def wc_RsaPSS_CheckPadding(digest: bytes, digest_len: int, sig: bytes | BytePtr, sig_len: int, hash_type: int) -> int: ... +def wc_MakeRsaKey(key: RsaKey, size: int, e: int, rng: RNG) -> int: ... +def wc_RsaPrivateKeyDecode(input: bytes, in_out_idx: IntPtr, key: RsaKey, in_len: int) -> int: ... +def wc_RsaEncryptSize(key: RsaKey) -> int: ... +def wc_RsaKeyToDer(key: RsaKey, output: BytePtr, in_len: int) -> int: ... +def wc_RsaPrivateDecrypt(ciphertext: bytes, ciphertext_len: int, out: BytePtr, out_len: int, key: RsaKey) -> int: ... +def wc_RsaPrivateDecrypt_ex(ciphertext: bytes, ciphertext_len: int, out: BytePtr, out_len: int, key: RsaKey, pad_type: int, hash_type: int, mgf: int, label: bytes | BytePtr, label_len: int) -> int: ... +def wc_RsaSSL_Sign(plaintext: bytes, plaintext_len: int, out: BytePtr, out_len: int, key: RsaKey, rng: RNG) -> int: ... +def wc_RsaPSS_Sign(digest: bytes, digest_len: int, out: BytePtr, out_len: int, hash_type: int, mgf: int, key: RsaKey, rng: RNG) -> int: ... + +def wc_GetPkcs8TraditionalOffset(input: BytePtr, in_out_idx: IntPtr, size: int) -> int: ... +def wc_RsaKeyToPublicDer(key: RsaKey, output: BytePtr, in_len: int) -> int: ... + + +MpInt: TypeAlias = FFI.CData + +def mp_init(a: MpInt) -> int: ... +def mp_clear(a: MpInt) -> None: ... +def mp_read_unsigned_bin(a: MpInt, uns: bytes, uns_len: int) -> int: ... +def mp_to_unsigned_bin_len(a: MpInt, out: BytePtr, out_len: int) -> int: ... + +EccKey: TypeAlias = FFI.CData + +def wc_ecc_init(key: EccKey) -> int: ... +def wc_ecc_free(key: EccKey) -> int: ... +def wc_ecc_size(key: EccKey) -> int: ... +def wc_ecc_sig_size(key: EccKey) -> int: ... +def wc_ecc_get_curve_size_from_id(curve_id: int) -> int: ... +def wc_ecc_import_unsigned(key: EccKey, qx: bytes, qy: bytes, d: bytes | FFI.CData, curve_id: int) -> int: ... +def wc_ecc_export_public_raw(key: EccKey, qx: BytePtr, qx_len: IntPtr, qy: BytePtr, qy_len: IntPtr) -> int: ... +def wc_ecc_import_x963(x963: bytes, x963_len: int, key: EccKey) -> int: ... +def wc_ecc_export_x963(key: EccKey, out: BytePtr, out_len: IntPtr) -> int: ... +def wc_ecc_verify_hash(sig: bytes, sig_len: int, hash: bytes, hash_len: int, res: IntPtr, key: EccKey) -> int: ... +def wc_ecc_verify_hash_ex(r: MpInt, s: MpInt, hash: bytes, hash_len: int, res: IntPtr, key: EccKey) -> int: ... +def wc_ecc_make_key(rng: RNG, key_size: int, key: EccKey) -> int: ... +def wc_ecc_set_rng(key: EccKey, rng: RNG) -> int: ... +def wc_ecc_sign_hash(hash: bytes, hash_len: int, out: BytePtr, out_len: IntPtr, rng: RNG, key: EccKey) -> int: ... +def wc_ecc_sign_hash_ex(hash: bytes, hash_len: int, rng: RNG, key: EccKey, r: MpInt, s: MpInt) -> int: ... +def wc_ecc_shared_secret(private_key: EccKey, public_key: EccKey, out: BytePtr, out_len: IntPtr) -> int: ... +def wc_ecc_export_private_raw(key: EccKey, qx: BytePtr, qx_len: IntPtr, qy: BytePtr, qy_len: IntPtr, d: BytePtr, d_len: IntPtr) -> int: ... + +def wc_EccPublicKeyDecode(pub_der: bytes, in_out_idx: IntPtr, key: EccKey, pub_der_len: int) -> int: ... +def wc_EccPrivateKeyDecode(priv_der: bytes, in_out_idx: IntPtr, key: EccKey, priv_der_len: int) -> int: ... +def wc_EccPublicKeyToDer(key: EccKey, pub_der: BytePtr, pub_der_len: int, with_alg_curve: int) -> int: ... +def wc_EccKeyToDer(key: EccKey, priv_key_der: BytePtr, priv_key_len: int) -> int: ... + +Ed25519Key: TypeAlias = FFI.CData + +def wc_ed25519_make_public(key: Ed25519Key, pub_key: BytePtr, pub_key_len: int) -> int: ... +def wc_ed25519_make_key(rng: RNG, keysize: int, key: Ed25519Key) -> int: ... +def wc_ed25519_sign_msg(msg: bytes, msg_len: int, out: BytePtr, out_len: IntPtr, key: Ed25519Key) -> int: ... +def wc_ed25519_verify_msg(sig: bytes, sig_len: int, msg: bytes, msg_len: int, res: IntPtr, key: Ed25519Key) -> int: ... +def wc_ed25519_init(key: Ed25519Key) -> int: ... +def wc_ed25519_free(key: Ed25519Key) -> None: ... +def wc_ed25519_import_public(pub: bytes | BytePtr, pub_len: int, key: Ed25519Key) -> int: ... +def wc_ed25519_import_private_only(priv: bytes | BytePtr, priv_len: int, key: Ed25519Key) -> int: ... +def wc_ed25519_import_private_key(priv: bytes, priv_len: int, pub: bytes, pub_len: int, key: Ed25519Key) -> int: ... +def wc_ed25519_export_public(key: Ed25519Key, pub: BytePtr, pub_len: IntPtr) -> int: ... +def wc_ed25519_export_private_only(key: Ed25519Key, priv: BytePtr, priv_len: IntPtr) -> int: ... +def wc_ed25519_size(key: Ed25519Key) -> int: ... +def wc_ed25519_priv_size(key: Ed25519Key) -> int: ... +def wc_ed25519_pub_size(key: Ed25519Key) -> int: ... +def wc_ed25519_sig_size(key: Ed25519Key) -> int: ... + +Ed448Key: TypeAlias = FFI.CData + +def wc_ed448_make_public(key: Ed448Key, pub_key: BytePtr, pub_key_len: int) -> int: ... +def wc_ed448_make_key(rng: RNG, keysize: int, key: Ed448Key) -> int: ... +def wc_ed448_sign_msg(msg: bytes, msg_len: int, out: BytePtr, out_len: IntPtr, key: Ed448Key, context: bytes | FFI.CData, context_len: int) -> int: ... +def wc_ed448_verify_msg(sig: bytes, sig_len: int, msg: bytes, msg_len: int, res: IntPtr, key: Ed448Key, context: bytes | FFI.CData, context_len: int) -> int: ... +def wc_ed448_init(key: Ed448Key) -> int: ... +def wc_ed448_free(key: Ed448Key) -> None: ... +def wc_ed448_import_public(pub: bytes | BytePtr, pub_len: int, key: Ed448Key) -> int: ... +def wc_ed448_import_private_only(priv: bytes | BytePtr, priv_len: int, key: Ed448Key) -> int: ... +def wc_ed448_import_private_key(priv: bytes, priv_len: int, pub: bytes, pub_len: int, key: Ed448Key) -> int: ... +def wc_ed448_export_public(key: Ed448Key, pub: BytePtr, pub_len: IntPtr) -> int: ... +def wc_ed448_export_private_only(key: Ed448Key, priv: BytePtr, priv_len: IntPtr) -> int: ... +def wc_ed448_size(key: Ed448Key) -> int: ... +def wc_ed448_priv_size(key: Ed448Key) -> int: ... +def wc_ed448_pub_size(key: Ed448Key) -> int: ... +def wc_ed448_sig_size(key: Ed448Key) -> int: ... + +MlKemKey: TypeAlias = FFI.CData + +def wc_KyberKey_Init(type: int, key: MlKemKey, heap: FFI.CData, dev_id: int) -> int: ... +def wc_KyberKey_Free(key: MlKemKey) -> int: ... +def wc_KyberKey_MakeKey(key: MlKemKey, rng: RNG) -> int: ... +def wc_KyberKey_MakeKeyWithRandom(key: MlKemKey, rand: bytes, len: int) -> int: ... +def wc_KyberKey_CipherTextSize(key: MlKemKey, len: IntPtr) -> int: ... +def wc_KyberKey_SharedSecretSize(key: MlKemKey, len: IntPtr) -> int: ... +def wc_KyberKey_Encapsulate(key: MlKemKey, ct: BytePtr, ss: BytePtr, rng: RNG) -> int: ... +def wc_KyberKey_EncapsulateWithRandom(key: MlKemKey, ct: BytePtr, ss: BytePtr, rand: bytes, len: int) -> int: ... +def wc_KyberKey_Decapsulate(key: MlKemKey, ss: BytePtr, ct: bytes, len: int) -> int: ... +def wc_KyberKey_DecodePrivateKey(key: MlKemKey, private_key: bytes, private_key_len: int) -> int: ... +def wc_KyberKey_DecodePublicKey(key: MlKemKey, public_key: bytes, public_key_len: int) -> int: ... +def wc_KyberKey_PublicKeySize(key: MlKemKey, len: IntPtr) -> int: ... +def wc_KyberKey_PrivateKeySize(key: MlKemKey, len: IntPtr) -> int: ... +def wc_KyberKey_EncodePrivateKey(key: MlKemKey, out: BytePtr, len: int) -> int: ... +def wc_KyberKey_EncodePublicKey(key: MlKemKey, out: BytePtr, len: int) -> int: ... + +DilithiumKey: TypeAlias = FFI.CData + +def wc_dilithium_init_ex(key: DilithiumKey, heap: FFI.CData, dev_id: int) -> int: ... +def wc_dilithium_set_level(key: DilithiumKey, level: int) -> int: ... +def wc_dilithium_free(key: DilithiumKey) -> None: ... +def wc_dilithium_import_public(public: bytes, public_len: int, key: DilithiumKey) -> int: ... +def wc_dilithium_export_public(key: DilithiumKey, public: BytePtr, public_len: IntPtr) -> int: ... +def wc_dilithium_verify_ctx_msg(sig: bytes, sig_len: int, ctx: bytes, ctx_len: int, msg: bytes, msg_len: int, res: IntPtr, key: DilithiumKey) -> int: ... +def wc_dilithium_verify_msg(sig: bytes, sig_len: int, msg: bytes, msg_len: int, res: IntPtr, key: DilithiumKey) -> int: ... +def wc_dilithium_make_key(key: DilithiumKey, rng: RNG) -> int: ... +def wc_dilithium_make_key_from_seed(key: DilithiumKey, seed: bytes | list[int] | tuple[int]) -> int: ... +def wc_dilithium_export_private(key: DilithiumKey, out: BytePtr, out_len: IntPtr) -> int: ... +def wc_dilithium_import_private(priv: bytes, priv_size: int, key: DilithiumKey) -> int: ... +def wc_dilithium_sign_ctx_msg(ctx: bytes, ctx_len: int, msg: bytes, msg_len: int, sig: BytePtr, sig_len: IntPtr, key: DilithiumKey, rng: RNG) -> int: ... +def wc_dilithium_sign_msg(msg: bytes, msg_len: int, sig: BytePtr, sig_len: IntPtr, key: DilithiumKey, rng: RNG) -> int: ... +def wc_dilithium_sign_ctx_msg_with_seed(ctx: bytes, ctx_len: int, msg: bytes, msg_len: int, sig: BytePtr, sig_len: IntPtr, key: DilithiumKey, seed: bytes | list[int] | tuple[int]) -> int: ... +def wc_dilithium_sign_msg_with_seed(msg: bytes, msg_len: int, sig: BytePtr, sig_len: IntPtr, key: DilithiumKey, seed: bytes | list[int] | tuple[int]) -> int: ... +def wc_MlDsaKey_GetPrivLen(key: DilithiumKey, len: IntPtr) -> int: ... +def wc_MlDsaKey_GetPubLen(key: DilithiumKey, len: IntPtr) -> int: ... +def wc_MlDsaKey_GetSigLen(key: DilithiumKey, len: IntPtr) -> int: ... diff --git a/wolfcrypt/asn.py b/wolfcrypt/asn.py index dc702d1..988716f 100644 --- a/wolfcrypt/asn.py +++ b/wolfcrypt/asn.py @@ -20,23 +20,27 @@ # pylint: disable=no-member,no-name-in-module +from __future__ import annotations + import hmac as _hmac from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib from wolfcrypt.exceptions import WolfCryptError, WolfCryptApiError +from wolfcrypt.hashes import _Hash +from .types import SupportsRsaSign, SupportsRsaVerify if _lib.SHA_ENABLED: - from wolfcrypt.hashes import Sha + from wolfcrypt.hashes import Sha # ty: ignore[possibly-missing-import] if _lib.SHA256_ENABLED: - from wolfcrypt.hashes import Sha256 + from wolfcrypt.hashes import Sha256 # ty: ignore[possibly-missing-import] if _lib.SHA384_ENABLED: - from wolfcrypt.hashes import Sha384 + from wolfcrypt.hashes import Sha384 # ty: ignore[possibly-missing-import] if _lib.SHA512_ENABLED: - from wolfcrypt.hashes import Sha512 + from wolfcrypt.hashes import Sha512 # ty: ignore[possibly-missing-import] if _lib.ASN_ENABLED: - def pem_to_der(pem, pem_type): + def pem_to_der(pem: bytes, pem_type: int) -> bytes: der = _ffi.new("DerBuffer**") ret = _lib.wc_PemToDer(pem, len(pem), pem_type, der, _ffi.NULL, _ffi.NULL, _ffi.NULL) @@ -49,7 +53,7 @@ def pem_to_der(pem, pem_type): _lib.wc_FreeDer(der) return result - def der_to_pem(der, pem_type): + def der_to_pem(der: bytes, pem_type: int) -> bytes: pem_length = _lib.wc_DerToPemEx(der, len(der), _ffi.NULL, 0, _ffi.NULL, pem_type) if pem_length <= 0: @@ -63,7 +67,7 @@ def der_to_pem(der, pem_type): return _ffi.buffer(pem, pem_length)[:] - def hash_oid_from_class(hash_cls): + def hash_oid_from_class(hash_cls: type[_Hash]) -> int: if _lib.SHA_ENABLED and hash_cls == Sha: return _lib.SHAh elif _lib.SHA256_ENABLED and hash_cls == Sha256: @@ -75,7 +79,7 @@ def hash_oid_from_class(hash_cls): else: raise WolfCryptError(f"Unknown hash class {hash_cls.__name__}") - def make_signature(data, hash_cls, key=None): + def make_signature(data: bytes, hash_cls: type[_Hash], key: SupportsRsaSign | None = None) -> bytes: hash_obj = hash_cls() hash_obj.update(data) digest = hash_obj.digest() @@ -93,7 +97,7 @@ def make_signature(data, hash_cls, key=None): else: return plaintext_sig - def check_signature(signature, data, hash_cls, pub_key): + def check_signature(signature: bytes, data: bytes, hash_cls: type[_Hash], pub_key: SupportsRsaVerify) -> bool: computed_signature = make_signature(data, hash_cls) decrypted_signature = pub_key.verify(signature) return _hmac.compare_digest(computed_signature, decrypted_signature) diff --git a/wolfcrypt/ciphers.py b/wolfcrypt/ciphers.py index 562aabd..fd1cdb5 100644 --- a/wolfcrypt/ciphers.py +++ b/wolfcrypt/ciphers.py @@ -20,16 +20,24 @@ # pylint: disable=no-member,no-name-in-module +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Sequence from enum import IntEnum +from typing_extensions import override from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib -from wolfcrypt.utils import t2b -from wolfcrypt.random import Random -from wolfcrypt.asn import pem_to_der +from wolfcrypt.exceptions import WolfCryptError, WolfCryptApiError from wolfcrypt.hashes import hash_type_to_cls +from wolfcrypt.random import Random +from wolfcrypt.utils import BytesOrStr, t2b +from .types import SupportsRsaSign, SupportsRsaVerify + +if _lib.ASN_ENABLED: + from wolfcrypt.asn import pem_to_der # ty: ignore[possibly-missing-import] -from wolfcrypt.exceptions import WolfCryptError, WolfCryptApiError # key direction flags _ENCRYPTION = 0 @@ -110,13 +118,12 @@ HASH_TYPE_BLAKE2S = _lib.WC_HASH_TYPE_BLAKE2S - -class _Cipher: +class _Cipher(ABC): """ A **PEP 272: Block Encryption Algorithms** compliant **Symmetric Key Cipher**. """ - def __init__(self, key, mode, IV=None): + def __init__(self, key: BytesOrStr, mode: int, IV: BytesOrStr | None = None) -> None: if mode not in _FEEDBACK_MODES: raise ValueError("this mode is not supported") @@ -152,10 +159,35 @@ def __init__(self, key, mode, IV=None): if IV: self._IV = IV else: # pragma: no cover - self._IV = _ffi.new(f"byte[{self.block_size}]") + self._IV = bytes(self.block_size) + + @property + @abstractmethod + def _native_type(self) -> str: ... + + @property + @abstractmethod + def block_size(self) -> int: ... + + @property + @abstractmethod + def key_size(self) -> int: ... + + @property + @abstractmethod + def _key_sizes(self) -> list[int]: ... + + @abstractmethod + def _set_key(self, direction: int) -> int: ... + + @abstractmethod + def _encrypt(self, destination: _ffi.CData, source: bytes) -> int: ... + + @abstractmethod + def _decrypt(self, destination: _ffi.CData, source: bytes) -> int: ... @classmethod - def new(cls, key, mode, IV=None, **kwargs): # pylint: disable=W0613 + def new(cls, key: BytesOrStr, mode: int, IV: BytesOrStr | None = None, **kwargs: int) -> _Cipher: # pylint: disable=W0613 """ Returns a ciphering object, using the secret key contained in the string **key**, and using the feedback mode **mode**, which @@ -168,7 +200,7 @@ def new(cls, key, mode, IV=None, **kwargs): # pylint: disable=W0613 """ return cls(key, mode, IV) - def encrypt(self, string): + def encrypt(self, string: BytesOrStr) -> bytes: """ Encrypts a non-empty string, using the key-dependent data in the object, and with the appropriate feedback mode. @@ -200,7 +232,7 @@ def encrypt(self, string): return _ffi.buffer(result)[:] - def decrypt(self, string): + def decrypt(self, string: BytesOrStr) -> bytes: """ Decrypts **string**, using the key-dependent data in the object and with the appropriate feedback mode. @@ -244,17 +276,22 @@ class Aes(_Cipher): _key_sizes = [16, 24, 32] _native_type = "Aes *" - def _set_key(self, direction): + @override + def _set_key(self, direction: int) -> int: if direction == _ENCRYPTION: + assert self._enc is not None return _lib.wc_AesSetKey( self._enc, self._key, len(self._key), self._IV, _ENCRYPTION) + assert self._dec is not None if self.mode == MODE_CTR: return _lib.wc_AesSetKey( self._dec, self._key, len(self._key), self._IV, _ENCRYPTION) return _lib.wc_AesSetKey( self._dec, self._key, len(self._key), self._IV, _DECRYPTION) - def _encrypt(self, destination, source): + @override + def _encrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._enc is not None if self.mode == MODE_CBC: return _lib.wc_AesCbcEncrypt(self._enc, destination, source, len(source)) @@ -264,7 +301,9 @@ def _encrypt(self, destination, source): else: raise ValueError("Invalid mode associated to cipher") - def _decrypt(self, destination, source): + @override + def _decrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._dec is not None if self.mode == MODE_CBC: return _lib.wc_AesCbcDecrypt(self._dec, destination, source, len(source)) @@ -283,12 +322,12 @@ class AesSiv: _key_sizes = [32, 48, 64] block_size = 16 - def __init__(self, key): + def __init__(self, key: BytesOrStr) -> None: self._key = t2b(key) if len(self._key) not in AesSiv._key_sizes: raise ValueError(f"key must be {AesSiv._key_sizes} in length, not {len(self._key)}") - def encrypt(self, associated_data, nonce, plaintext): + def encrypt(self, associated_data: BytesOrStr | Sequence[bytes] | Sequence[bytearray] | Sequence[str] | Sequence[memoryview], nonce: BytesOrStr, plaintext: BytesOrStr) -> tuple[bytes, bytes]: """ Encrypt plaintext data using the nonce provided. The associated data is not encrypted but is included in the authentication tag. @@ -302,20 +341,20 @@ def encrypt(self, associated_data, nonce, plaintext): # Prepare the associated data blocks. Make sure to hold on to the # returned references until the C function has been called in order # to prevent garbage collection of them until the function is done. - associated_data, _refs = ( + prep_associated_data, _refs = ( AesSiv._prepare_associated_data(associated_data)) nonce = t2b(nonce) plaintext = t2b(plaintext) siv = _ffi.new(f"byte[{AesSiv.block_size}]") ciphertext = _ffi.new(f"byte[{len(plaintext)}]") ret = _lib.wc_AesSivEncrypt_ex(self._key, len(self._key), - associated_data, len(associated_data), nonce, len(nonce), + prep_associated_data, len(prep_associated_data), nonce, len(nonce), plaintext, len(plaintext), siv, ciphertext) if ret < 0: # pragma: no cover raise WolfCryptApiError("AES-SIV encryption error", ret) return _ffi.buffer(siv)[:], _ffi.buffer(ciphertext)[:] - def decrypt(self, associated_data, nonce, siv, ciphertext): + def decrypt(self, associated_data: BytesOrStr | Sequence[bytes] | Sequence[bytearray] | Sequence[str] | Sequence[memoryview], nonce: BytesOrStr, siv: BytesOrStr, ciphertext: BytesOrStr) -> bytes: """ Decrypt the ciphertext using the nonce and SIV provided. The integrity of the associated data is checked. @@ -329,7 +368,7 @@ def decrypt(self, associated_data, nonce, siv, ciphertext): # Prepare the associated data blocks. Make sure to hold on to the # returned references until the C function has been called in order # to prevent garbage collection of them until the function is done. - associated_data, _refs = ( + prep_associated_data, _refs = ( AesSiv._prepare_associated_data(associated_data)) nonce = t2b(nonce) siv = t2b(siv) @@ -338,14 +377,14 @@ def decrypt(self, associated_data, nonce, siv, ciphertext): ciphertext = t2b(ciphertext) plaintext = _ffi.new(f"byte[{len(ciphertext)}]") ret = _lib.wc_AesSivDecrypt_ex(self._key, len(self._key), - associated_data, len(associated_data), nonce, len(nonce), + prep_associated_data, len(prep_associated_data), nonce, len(nonce), ciphertext, len(ciphertext), siv, plaintext) if ret < 0: raise WolfCryptApiError("AES-SIV decryption error", ret) return _ffi.buffer(plaintext)[:] @staticmethod - def _prepare_associated_data(associated_data): + def _prepare_associated_data(associated_data: BytesOrStr | Sequence[bytes] | Sequence[bytearray] | Sequence[str] | Sequence[memoryview]) -> tuple[_ffi.CData, bytes | list[bytes]]: """ Prepare associated data for sending to C library. @@ -362,10 +401,10 @@ def _prepare_associated_data(associated_data): if isinstance(associated_data, (str, bytes, bytearray, memoryview)): # A single block is provided. # Make sure we have bytes. - associated_data = t2b(associated_data) + associated_data_bytes = t2b(associated_data) result = _ffi.new("AesSivAssoc[1]") - result[0].assoc = _ffi.from_buffer(associated_data) - result[0].assocSz = len(associated_data) + result[0].assoc = _ffi.from_buffer(associated_data_bytes) + result[0].assocSz = len(associated_data_bytes) else: # It is assumed that a list is provided. num_blocks = len(associated_data) @@ -373,14 +412,14 @@ def _prepare_associated_data(associated_data): raise WolfCryptError("AES-SIV does not support more than 126 blocks " f"of associated data, got: {num_blocks}") # Make sure we have bytes. - associated_data = [t2b(block) for block in associated_data] + associated_data_bytes = [t2b(block) for block in associated_data] result = _ffi.new("AesSivAssoc[]", num_blocks) - for index, block in enumerate(associated_data): + for index, block in enumerate(associated_data_bytes): result[index].assoc = _ffi.from_buffer(block) result[index].assocSz = len(block) # Return the converted associated data blocks so the caller can # hold on to them until the function has been called. - return result, associated_data + return result, associated_data_bytes if _lib.AESGCM_STREAM_ENABLED: @@ -394,7 +433,7 @@ class AesGcmStream: # making sure _lib.wc_AesFree outlives Aes instances _delete = staticmethod(_lib.wc_AesFree) - def __init__(self, key, IV, tag_bytes=16): + def __init__(self, key: BytesOrStr, IV: BytesOrStr, tag_bytes: int = 16) -> None: """ tag_bytes is the number of bytes to use for the authentication tag during encryption """ @@ -420,12 +459,12 @@ def __init__(self, key, IV, tag_bytes=16): if ret < 0: raise WolfCryptApiError("Init error", ret) - def __del__(self): + def __del__(self) -> None: if getattr(self, '_init_done', False): self._delete(self._native_object) self._init_done = False - def set_aad(self, data): + def set_aad(self, data: BytesOrStr) -> None: """ Set the additional authentication data for the stream """ @@ -433,10 +472,10 @@ def set_aad(self, data): raise WolfCryptError("AAD can only be set before encrypt() or decrypt() is called") self._aad = t2b(data) - def get_aad(self): + def get_aad(self) -> bytes: return self._aad - def encrypt(self, data): + def encrypt(self, data: BytesOrStr) -> bytes: """ Add more data to the encryption stream """ @@ -453,7 +492,7 @@ def encrypt(self, data): raise WolfCryptApiError("Encryption error", ret) return bytes(buf) - def decrypt(self, data): + def decrypt(self, data: BytesOrStr) -> bytes: """ Add more data to the decryption stream """ @@ -470,7 +509,7 @@ def decrypt(self, data): raise WolfCryptApiError("Decryption error", ret) return bytes(buf) - def final(self, authTag=None): + def final(self, authTag: BytesOrStr | None = None) -> bytes | None: """ When encrypting, finalize the stream and return an authentication tag for the stream. When decrypting, verify the authentication tag for the stream. @@ -479,11 +518,11 @@ def final(self, authTag=None): if self._mode is None: raise WolfCryptError("Final called with no encryption or decryption") elif self._mode == _ENCRYPTION: - authTag = _ffi.new(f"byte[{self._tag_bytes}]") - ret = _lib.wc_AesGcmEncryptFinal(self._native_object, authTag, self._tag_bytes) + authTag_out = _ffi.new(f"byte[{self._tag_bytes}]") + ret = _lib.wc_AesGcmEncryptFinal(self._native_object, authTag_out, self._tag_bytes) if ret < 0: raise WolfCryptApiError("Encryption error", ret) - return _ffi.buffer(authTag)[:] + return _ffi.buffer(authTag_out)[:] else: if authTag is None: raise WolfCryptError("authTag parameter required") @@ -509,7 +548,7 @@ class ChaCha(_Cipher): _IV_nonce = b"" _IV_counter = 0 - def __init__(self, key="", size=32): # pylint: disable=unused-argument + def __init__(self, key: BytesOrStr = "", size: int = 32) -> None: # pylint: disable=unused-argument # size is kept for backwards compatibility; key length is now # derived from the actual key and validated against _key_sizes. self._native_object = _ffi.new(self._native_type) @@ -528,7 +567,8 @@ def __init__(self, key="", size=32): # pylint: disable=unused-argument # collide with _ENCRYPTION (0) or _DECRYPTION (1). _REKEY_BOTH = -1 - def _set_key(self, direction): + @override + def _set_key(self, direction: int) -> int: if self._key is None: return -1 # _REKEY_BOTH re-keys whichever contexts are already allocated, @@ -539,12 +579,14 @@ def _set_key(self, direction): do_enc = self._enc and direction in (self._REKEY_BOTH, _ENCRYPTION) do_dec = self._dec and direction in (self._REKEY_BOTH, _DECRYPTION) if do_enc: + assert self._enc is not None ret = _lib.wc_Chacha_SetKey(self._enc, self._key, len(self._key)) if ret == 0: ret = _lib.wc_Chacha_SetIV(self._enc, self._IV_nonce, self._IV_counter) if ret != 0: return ret if do_dec: + assert self._dec is not None ret = _lib.wc_Chacha_SetKey(self._dec, self._key, len(self._key)) if ret == 0: ret = _lib.wc_Chacha_SetIV(self._dec, self._IV_nonce, self._IV_counter) @@ -552,17 +594,20 @@ def _set_key(self, direction): return ret return 0 - def _encrypt(self, destination, source): + @override + def _encrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._enc is not None return _lib.wc_Chacha_Process(self._enc, destination, source, len(source)) - - def _decrypt(self, destination, source): + @override + def _decrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._dec is not None return _lib.wc_Chacha_Process(self._dec, destination, source, len(source)) _NONCE_SIZE = 12 - def set_iv(self, nonce, counter = 0): + def set_iv(self, nonce: BytesOrStr, counter: int = 0) -> None: self._IV_nonce = t2b(nonce) if len(self._IV_nonce) != self._NONCE_SIZE: raise ValueError(f"nonce must be {self._NONCE_SIZE} bytes, got {len(self._IV_nonce)}") @@ -581,12 +626,12 @@ class ChaCha20Poly1305: _key_sizes = [32] _tag_bytes = 16 - def __init__(self, key): + def __init__(self, key: BytesOrStr) -> None: self._key = t2b(key) if len(self._key) not in self._key_sizes: raise ValueError(f"key must be {self._key_sizes} in length, not {len(self._key)}") - def encrypt(self, aad, iv, plaintext): + def encrypt(self, aad: BytesOrStr, iv: BytesOrStr, plaintext: BytesOrStr) -> tuple[bytes, bytes]: """ Encrypt plaintext data using the IV/nonce provided. The associated data (aad) is not encrypted but is included in the @@ -602,11 +647,11 @@ def encrypt(self, aad, iv, plaintext): ciphertext = _ffi.new(f"byte[{len(plaintext)}]") authTag = _ffi.new(f"byte[{self._tag_bytes}]") ret = _lib.wc_ChaCha20Poly1305_Encrypt( - _ffi.from_buffer(self._key), - _ffi.from_buffer(iv), - _ffi.from_buffer(aad), + self._key, + iv, + aad, len(aad), - _ffi.from_buffer(plaintext), + plaintext, len(plaintext), ciphertext, authTag @@ -615,7 +660,7 @@ def encrypt(self, aad, iv, plaintext): raise WolfCryptApiError("Encryption error", ret) return bytes(ciphertext), bytes(authTag) - def decrypt(self, aad, iv, authTag, ciphertext): + def decrypt(self, aad: BytesOrStr, iv: BytesOrStr, authTag: BytesOrStr, ciphertext: BytesOrStr) -> bytes: """ Decrypt the ciphertext using the IV/nonce and authentication tag provided. The integrity of the associated data (aad) is checked. @@ -632,13 +677,13 @@ def decrypt(self, aad, iv, authTag, ciphertext): ciphertext = t2b(ciphertext) plaintext = _ffi.new(f"byte[{len(ciphertext)}]") ret = _lib.wc_ChaCha20Poly1305_Decrypt( - _ffi.from_buffer(self._key), - _ffi.from_buffer(iv), - _ffi.from_buffer(aad), + self._key, + iv, + aad, len(aad), - _ffi.from_buffer(ciphertext), + ciphertext, len(ciphertext), - _ffi.from_buffer(authTag), + authTag, plaintext ) if ret < 0: @@ -655,9 +700,10 @@ class Des3(_Cipher): """ block_size = 8 key_size = 24 + _key_sizes = [24] _native_type = "Des3 *" - def __init__(self, key, mode, IV=None): + def __init__(self, key: BytesOrStr, mode: int, IV: BytesOrStr | None = None) -> None: # Intentionally stricter than _Cipher.__init__, which accepts both # CBC and CTR. wolfCrypt has no 3DES-CTR implementation, so reject # MODE_CTR here with a clearer error before delegating. @@ -665,30 +711,33 @@ def __init__(self, key, mode, IV=None): raise ValueError("Des3 only supports MODE_CBC") super().__init__(key, mode, IV) - def _set_key(self, direction): + @override + def _set_key(self, direction: int) -> int: if direction == _ENCRYPTION: - return _lib.wc_Des3_SetKey(self._enc, self._key, - self._IV, _ENCRYPTION) + assert self._enc is not None + return _lib.wc_Des3_SetKey(self._enc, self._key, self._IV, _ENCRYPTION) - return _lib.wc_Des3_SetKey(self._dec, self._key, - self._IV, _DECRYPTION) + assert self._dec is not None + return _lib.wc_Des3_SetKey(self._dec, self._key, self._IV, _DECRYPTION) - def _encrypt(self, destination, source): - return _lib.wc_Des3_CbcEncrypt(self._enc, destination, - source, len(source)) + @override + def _encrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._enc is not None + return _lib.wc_Des3_CbcEncrypt(self._enc, destination, source, len(source)) - def _decrypt(self, destination, source): - return _lib.wc_Des3_CbcDecrypt(self._dec, destination, - source, len(source)) + @override + def _decrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._dec is not None + return _lib.wc_Des3_CbcDecrypt(self._dec, destination, source, len(source)) if _lib.RSA_ENABLED: class _Rsa: # pylint: disable=too-few-public-methods RSA_MIN_PAD_SIZE = 11 - _mgf = None + _mgf: int | None = None _hash_type = None - def __init__(self, rng=None): + def __init__(self, rng: Random | None = None) -> None: if rng is None: rng = Random() @@ -707,14 +756,14 @@ def __init__(self, rng=None): # making sure _lib.wc_FreeRsaKey outlives RsaKey instances _delete = staticmethod(_lib.wc_FreeRsaKey) - def __del__(self): + def __del__(self) -> None: if self.native_object: self._delete(self.native_object) - def set_mgf(self, mgf): + def set_mgf(self, mgf: int) -> None: self._mgf = mgf - def _get_mgf(self): + def _get_mgf(self) -> None: if self._hash_type == _lib.WC_HASH_TYPE_SHA: self._mgf = _lib.WC_MGF1SHA1 elif self._hash_type == _lib.WC_HASH_TYPE_SHA224: @@ -730,19 +779,17 @@ def _get_mgf(self): - class RsaPublic(_Rsa): - def __init__(self, key=None, hash_type=None, rng=None): + class RsaPublic(_Rsa, SupportsRsaVerify): + def __init__(self, key: BytesOrStr, hash_type: int | None = None, rng: Random | None = None) -> None: super().__init__(rng) - if key is not None: - key = t2b(key) + key = t2b(key) self._hash_type = hash_type idx = _ffi.new("word32*") idx[0] = 0 - ret = _lib.wc_RsaPublicKeyDecode(key, idx, - self.native_object, len(key)) + ret = _lib.wc_RsaPublicKeyDecode(key, idx, self.native_object, len(key)) if ret < 0: raise WolfCryptApiError("Invalid key error", ret) @@ -753,11 +800,11 @@ def __init__(self, key=None, hash_type=None, rng=None): if _lib.ASN_ENABLED: @classmethod - def from_pem(cls, file, hash_type=None, rng=None): + def from_pem(cls, file: bytes, hash_type: int | None = None, rng: Random | None = None) -> RsaPublic: der = pem_to_der(file, _lib.PUBLICKEY_TYPE) return cls(key=der, hash_type=hash_type, rng=rng) - def encrypt(self, plaintext): + def encrypt(self, plaintext: BytesOrStr) -> bytes: """ Encrypts **plaintext**, using the public key data in the object. The plaintext's length must not be greater than: @@ -780,7 +827,7 @@ def encrypt(self, plaintext): return _ffi.buffer(ciphertext)[:] - def encrypt_oaep(self, plaintext, label=""): + def encrypt_oaep(self, plaintext: BytesOrStr, label: BytesOrStr = "") -> bytes: if not self._hash_type: raise WolfCryptError("Hash type not set. Cannot use OAEP padding without a hash type.") plaintext = t2b(plaintext) @@ -788,6 +835,7 @@ def encrypt_oaep(self, plaintext, label=""): ciphertext = _ffi.new(f"byte[{self.output_size}]") if self._mgf is None: self._get_mgf() + assert self._mgf is not None ret = _lib.wc_RsaPublicEncrypt_ex(plaintext, len(plaintext), ciphertext, self.output_size, self.native_object, @@ -800,7 +848,8 @@ def encrypt_oaep(self, plaintext, label=""): return _ffi.buffer(ciphertext)[:] - def verify(self, signature): + @override + def verify(self, signature: BytesOrStr) -> bytes: """ Verifies **signature**, using the public key data in the object. The signature's length must be equal to: @@ -822,7 +871,7 @@ def verify(self, signature): return _ffi.buffer(plaintext, ret)[:] if _lib.RSA_PSS_ENABLED: - def verify_pss(self, plaintext, signature): + def verify_pss(self, plaintext: BytesOrStr, signature: BytesOrStr) -> bool: """ Verifies **signature**, using the public key data in the object. The signature's length must be equal to: @@ -842,6 +891,7 @@ def verify_pss(self, plaintext, signature): signature = t2b(signature) if self._mgf is None: self._get_mgf() + assert self._mgf is not None verify = _ffi.new(f"byte[{self.output_size}]") ret = _lib.wc_RsaPSS_Verify(signature, len(signature), @@ -862,16 +912,16 @@ def verify_pss(self, plaintext, signature): return ret == 0 - class RsaPrivate(RsaPublic): + class RsaPrivate(RsaPublic, SupportsRsaSign): if _lib.KEYGEN_ENABLED: @classmethod - def make_key(cls, size, rng=None, hash_type=None): + def make_key(cls, size: int, rng: Random | None = None, hash_type: int | None = None) -> RsaPrivate: """ Generates a new key pair of desired length **size**. """ if rng is None: rng = Random() - rsa = cls(hash_type=hash_type) + rsa = cls(hash_type=hash_type, rng=rng) ret = _lib.wc_MakeRsaKey(rsa.native_object, size, 65537, rng.native_object) @@ -883,12 +933,9 @@ def make_key(cls, size, rng=None, hash_type=None): if rsa.output_size < 0: # pragma: no cover raise WolfCryptApiError("Invalid key size error", rsa.output_size) - # Retain RNG reference defensively. - rsa._rng = rng - return rsa - def __init__(self, key=None, hash_type=None, rng=None): # pylint: disable=super-init-not-called + def __init__(self, key: BytesOrStr | None = None, hash_type: int | None = None, rng: Random | None = None) -> None: # pylint: disable=super-init-not-called _Rsa.__init__(self, rng) # pylint: disable=non-parent-init-called self._hash_type = hash_type @@ -921,13 +968,14 @@ def __init__(self, key=None, hash_type=None, rng=None): # pylint: disable=super raise WolfCryptApiError("Invalid key size error", self.output_size) if _lib.ASN_ENABLED: + @override @classmethod - def from_pem(cls, file, hash_type=None, rng=None): + def from_pem(cls, file: bytes, hash_type: int | None = None, rng: Random | None = None) -> RsaPrivate: der = pem_to_der(file, _lib.PRIVATEKEY_TYPE) return cls(key=der, hash_type=hash_type, rng=rng) if _lib.KEYGEN_ENABLED: - def encode_key(self): + def encode_key(self) -> tuple[bytes, bytes]: """ Encodes the RSA private and public keys in an ASN sequence. @@ -949,7 +997,7 @@ def encode_key(self): return _ffi.buffer(priv, privlen)[:], _ffi.buffer(pub, publen)[:] - def decrypt(self, ciphertext): + def decrypt(self, ciphertext: BytesOrStr) -> bytes: """ Decrypts **ciphertext**, using the private key data in the object. The ciphertext's length must be equal to: @@ -970,7 +1018,7 @@ def decrypt(self, ciphertext): return _ffi.buffer(plaintext, ret)[:] - def decrypt_oaep(self, ciphertext, label=""): + def decrypt_oaep(self, ciphertext: BytesOrStr, label: BytesOrStr = "") -> bytes: """ Decrypts **ciphertext**, using the private key data in the object. The ciphertext's length must be equal to: @@ -986,6 +1034,7 @@ def decrypt_oaep(self, ciphertext, label=""): plaintext = _ffi.new(f"byte[{self.output_size}]") if self._mgf is None: self._get_mgf() + assert self._mgf is not None ret = _lib.wc_RsaPrivateDecrypt_ex(ciphertext, len(ciphertext), plaintext, self.output_size, self.native_object, @@ -997,7 +1046,8 @@ def decrypt_oaep(self, ciphertext, label=""): return _ffi.buffer(plaintext, ret)[:] - def sign(self, plaintext): + @override + def sign(self, plaintext: BytesOrStr) -> bytes: """ Signs **plaintext**, using the private key data in the object. The plaintext's length must not be greater than: @@ -1020,7 +1070,7 @@ def sign(self, plaintext): return _ffi.buffer(signature, self.output_size)[:] if _lib.RSA_PSS_ENABLED: - def sign_pss(self, plaintext): + def sign_pss(self, plaintext: BytesOrStr) -> bytes: """ Signs **plaintext**, using the private key data in the object. The plaintext's length must not be greater than: @@ -1042,6 +1092,7 @@ def sign_pss(self, plaintext): signature = _ffi.new(f"byte[{self.output_size}]") if self._mgf is None: self._get_mgf() + assert self._mgf is not None ret = _lib.wc_RsaPSS_Sign(digest, len(digest), signature, self.output_size, @@ -1057,7 +1108,7 @@ def sign_pss(self, plaintext): if _lib.ECC_ENABLED: class _Ecc: # pylint: disable=too-few-public-methods - def __init__(self): + def __init__(self) -> None: self.native_object = _ffi.new("ecc_key *") ret = _lib.wc_ecc_init(self.native_object) if ret < 0: # pragma: no cover @@ -1066,27 +1117,27 @@ def __init__(self): # making sure _lib.wc_ecc_free outlives ecc_key instances _delete = staticmethod(_lib.wc_ecc_free) - def __del__(self): + def __del__(self) -> None: if self.native_object: self._delete(self.native_object) @property - def size(self): + def size(self) -> int: return _lib.wc_ecc_size(self.native_object) @property - def max_signature_size(self): + def max_signature_size(self) -> int: return _lib.wc_ecc_sig_size(self.native_object) class EccPublic(_Ecc): - def __init__(self, key=None): + def __init__(self, key: BytesOrStr | None = None) -> None: _Ecc.__init__(self) if key: self.decode_key(key) - def decode_key(self, key): + def decode_key(self, key: BytesOrStr) -> None: """ Decodes an ECC public key from an ASN sequence. """ @@ -1104,7 +1155,7 @@ def decode_key(self, key): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def decode_key_raw(self, qx, qy, curve_id=ECC_SECP256R1): + def decode_key_raw(self, qx: BytesOrStr, qy: BytesOrStr, curve_id: int = ECC_SECP256R1) -> None: """ Decodes an ECC public key from its raw elements: (Qx,Qy) """ @@ -1122,7 +1173,7 @@ def decode_key_raw(self, qx, qy, curve_id=ECC_SECP256R1): if ret != 0: raise WolfCryptApiError("Key decode error", ret) - def encode_key(self, with_curve=True): + def encode_key(self, with_curve: bool = True) -> bytes: """ Encodes the ECC public key in an ASN sequence. @@ -1137,7 +1188,7 @@ def encode_key(self, with_curve=True): return _ffi.buffer(key, ret)[:] - def encode_key_raw(self): + def encode_key_raw(self) -> tuple[bytes, bytes]: """ Encodes the ECC public key in its two raw elements @@ -1158,7 +1209,7 @@ def encode_key_raw(self): return _ffi.buffer(Qx, qx_size[0])[:], _ffi.buffer(Qy, qy_size[0])[:] - def import_x963(self, x963): + def import_x963(self, x963: bytes) -> None: """ Imports an ECC public key in ANSI X9.63 format. """ @@ -1166,7 +1217,7 @@ def import_x963(self, x963): if ret != 0: raise WolfCryptApiError("x963 import error", ret) - def export_x963(self): + def export_x963(self) -> bytes: """ Exports the public key data of the object in ANSI X9.63 format. @@ -1182,7 +1233,7 @@ def export_x963(self): return _ffi.buffer(x963, x963_size[0])[:] - def verify(self, signature, data): + def verify(self, signature: bytes, data: BytesOrStr) -> bool: """ Verifies **signature**, using the public key data in the object. @@ -1201,7 +1252,7 @@ def verify(self, signature, data): return status[0] == 1 if _lib.MPAPI_ENABLED: - def verify_raw(self, R, S, data): + def verify_raw(self, R: bytes, S: bytes, data: BytesOrStr) -> bool: """ Verifies signature from its raw elements **R** and **S**, using the public key data in the object. @@ -1245,18 +1296,19 @@ def verify_raw(self, R, S, data): class EccPrivate(EccPublic): - def __init__(self, key=None, rng=None): + def __init__(self, key: BytesOrStr | None = None, rng: Random | None = None) -> None: super().__init__(key) self._rng = rng @classmethod - def make_key(cls, size, rng=None): + def make_key(cls, size: int, rng: Random | None = None) -> EccPrivate: """ Generates a new key pair of desired length **size**. """ if rng is None: rng = Random() ecc = cls(rng=rng) + assert ecc._rng is not None ret = _lib.wc_ecc_make_key(ecc._rng.native_object, size, ecc.native_object) @@ -1271,7 +1323,8 @@ def make_key(cls, size, rng=None): return ecc - def decode_key(self, key): + @override + def decode_key(self, key: BytesOrStr) -> None: """ Decodes an ECC private key from an ASN sequence. """ @@ -1289,7 +1342,8 @@ def decode_key(self, key): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def decode_key_raw(self, qx, qy, d, curve_id=ECC_SECP256R1): + @override + def decode_key_raw(self, qx: BytesOrStr, qy: BytesOrStr, d: BytesOrStr, curve_id: int = ECC_SECP256R1) -> None: """ Decodes an ECC private key from its raw elements: public (Qx,Qy) and private(d) @@ -1310,7 +1364,8 @@ def decode_key_raw(self, qx, qy, d, curve_id=ECC_SECP256R1): if ret != 0: raise WolfCryptApiError("Key decode error", ret) - def encode_key(self): + @override + def encode_key(self) -> bytes: """ Encodes the ECC private key in an ASN sequence. @@ -1324,7 +1379,8 @@ def encode_key(self): return _ffi.buffer(key, ret)[:] - def encode_key_raw(self): + @override + def encode_key_raw(self) -> tuple[bytes, bytes, bytes]: """ Encodes the ECC private key in its three raw elements @@ -1348,7 +1404,7 @@ def encode_key_raw(self): return _ffi.buffer(Qx, qx_size[0])[:], _ffi.buffer(Qy, qy_size[0])[:], _ffi.buffer(d, d_size[0])[:] - def shared_secret(self, peer): + def shared_secret(self, peer: EccPublic) -> bytes: """ Generates a new secret key using the private key data in the object and the peer's public key. @@ -1368,7 +1424,7 @@ def shared_secret(self, peer): return _ffi.buffer(shared_secret, secret_size[0])[:] - def sign(self, plaintext, rng=None): + def sign(self, plaintext: BytesOrStr, rng: Random | None = None) -> bytes: """ Signs **plaintext**, using the private key data in the object. @@ -1393,7 +1449,7 @@ def sign(self, plaintext, rng=None): return _ffi.buffer(signature, signature_size[0])[:] if _lib.MPAPI_ENABLED: - def sign_raw(self, plaintext, rng=None): + def sign_raw(self, plaintext: BytesOrStr, rng: Random | None = None) -> tuple[bytes, bytes]: """ Signs **plaintext**, using the private key data in the object. @@ -1441,7 +1497,7 @@ def sign_raw(self, plaintext, rng=None): if _lib.ED25519_ENABLED: class _Ed25519: # pylint: disable=too-few-public-methods - def __init__(self): + def __init__(self) -> None: self.native_object = _ffi.new("ed25519_key *") ret = _lib.wc_ed25519_init(self.native_object) if ret < 0: # pragma: no cover @@ -1450,27 +1506,27 @@ def __init__(self): # making sure _lib.wc_ed25519_free outlives ed25519_key instances _delete = staticmethod(_lib.wc_ed25519_free) - def __del__(self): + def __del__(self) -> None: if self.native_object: self._delete(self.native_object) @property - def size(self): + def size(self) -> int: return _lib.wc_ed25519_size(self.native_object) @property - def max_signature_size(self): + def max_signature_size(self) -> int: return _lib.wc_ed25519_sig_size(self.native_object) class Ed25519Public(_Ed25519): - def __init__(self, key=None): + def __init__(self, key: BytesOrStr | None = None) -> None: _Ed25519.__init__(self) if key: self.decode_key(key) - def decode_key(self, key): + def decode_key(self, key: BytesOrStr) -> None: """ Decodes an ED25519 public key """ @@ -1489,7 +1545,7 @@ def decode_key(self, key): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def encode_key(self): + def encode_key(self) -> bytes: """ Encodes the ED25519 public key @@ -1506,7 +1562,7 @@ def encode_key(self): return _ffi.buffer(key, size[0])[:] - def verify(self, signature, data): + def verify(self, signature: bytes, data: BytesOrStr) -> bool: """ Verifies **signature**, using the public key data in the object. @@ -1527,16 +1583,18 @@ def verify(self, signature, data): class Ed25519Private(Ed25519Public): - def __init__(self, key=None, pub=None): + def __init__(self, key: BytesOrStr | None = None, pub: bytes | None = None) -> None: _Ed25519.__init__(self) + self._rng = None + if key and not pub: self.decode_key(key) if key and pub: self.decode_key(key,pub) @classmethod - def make_key(cls, size, rng=None): + def make_key(cls, size: int, rng: Random | None = None) -> Ed25519Private: """ Generates a new key pair of desired length **size**. """ @@ -1555,7 +1613,8 @@ def make_key(cls, size, rng=None): return ed25519 - def decode_key(self, key, pub = None): + @override + def decode_key(self, key: BytesOrStr, pub: bytes | None = None) -> None: """ Decodes an ED25519 private + pub key """ @@ -1591,7 +1650,8 @@ def decode_key(self, key, pub = None): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def encode_key(self): + @override + def encode_key(self) -> tuple[bytes, bytes]: """ Encodes the ED25519 private key. @@ -1616,7 +1676,7 @@ def encode_key(self): return _ffi.buffer(key, priv_size[0])[:], _ffi.buffer(pubkey, pub_size[0])[:] - def sign(self, plaintext): + def sign(self, plaintext: BytesOrStr) -> bytes: """ Signs **plaintext**, using the private key data in the object. @@ -1639,7 +1699,7 @@ def sign(self, plaintext): if _lib.ED448_ENABLED: class _Ed448: # pylint: disable=too-few-public-methods - def __init__(self): + def __init__(self) -> None: self.native_object = _ffi.new("ed448_key *") ret = _lib.wc_ed448_init(self.native_object) if ret < 0: # pragma: no cover @@ -1648,27 +1708,27 @@ def __init__(self): # making sure _lib.wc_ed448_free outlives ed448_key instances _delete = staticmethod(_lib.wc_ed448_free) - def __del__(self): + def __del__(self) -> None: if self.native_object: self._delete(self.native_object) @property - def size(self): + def size(self) -> int: return _lib.wc_ed448_size(self.native_object) @property - def max_signature_size(self): + def max_signature_size(self) -> int: return _lib.wc_ed448_sig_size(self.native_object) class Ed448Public(_Ed448): - def __init__(self, key=None): + def __init__(self, key: BytesOrStr | None = None) -> None: _Ed448.__init__(self) if key: self.decode_key(key) - def decode_key(self, key): + def decode_key(self, key: BytesOrStr) -> None: """ Decodes an ED448 public key """ @@ -1687,7 +1747,7 @@ def decode_key(self, key): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def encode_key(self): + def encode_key(self) -> bytes: """ Encodes the ED448 public key @@ -1704,7 +1764,7 @@ def encode_key(self): return _ffi.buffer(key, size[0])[:] - def verify(self, signature, data, ctx=None): + def verify(self, signature: bytes, data: BytesOrStr, ctx: BytesOrStr | None = None) -> bool: """ Verifies **signature**, using the public key data in the object. @@ -1733,16 +1793,17 @@ def verify(self, signature, data, ctx=None): class Ed448Private(Ed448Public): - def __init__(self, key=None, pub=None): + def __init__(self, key: BytesOrStr | None = None, pub: bytes | None = None) -> None: _Ed448.__init__(self) + self._rng = None if key and not pub: self.decode_key(key) if key and pub: - self.decode_key(key,pub) + self.decode_key(key, pub) @classmethod - def make_key(cls, size, rng=None): + def make_key(cls, size: int, rng: Random | None = None) -> Ed448Private: """ Generates a new key pair of desired length **size**. """ @@ -1761,7 +1822,8 @@ def make_key(cls, size, rng=None): return ed448 - def decode_key(self, key, pub = None): + @override + def decode_key(self, key: BytesOrStr, pub: bytes | None = None) -> None: """ Decodes an ED448 private + pub key """ @@ -1797,7 +1859,8 @@ def decode_key(self, key, pub = None): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def encode_key(self): + @override + def encode_key(self) -> tuple[bytes, bytes]: """ Encodes the ED448 private key. @@ -1822,7 +1885,7 @@ def encode_key(self): return _ffi.buffer(key, priv_size[0])[:], _ffi.buffer(pubkey, pub_size[0])[:] - def sign(self, plaintext, ctx=None): + def sign(self, plaintext: BytesOrStr, ctx : BytesOrStr | None = None) -> bytes: """ Signs **plaintext**, using the private key data in the object. @@ -1873,7 +1936,7 @@ class MlKemType(IntEnum): class _MlKemBase: INVALID_DEVID = _lib.INVALID_DEVID - def __init__(self, mlkem_type): + def __init__(self, mlkem_type: MlKemType) -> None: self.init_done = False self.native_object = _ffi.new("KyberKey *") ret = _lib.wc_KyberKey_Init( @@ -1884,13 +1947,14 @@ def __init__(self, mlkem_type): raise WolfCryptApiError("wc_KyberKey_Init() error", ret) self.init_done = True + self._rng = None - def __del__(self): + def __del__(self) -> None: if self.init_done: _lib.wc_KyberKey_Free(self.native_object) @property - def ct_size(self): + def ct_size(self) -> int: """ :return: cipher text size in bytes :rtype: int @@ -1904,7 +1968,7 @@ def ct_size(self): return len[0] @property - def ss_size(self): + def ss_size(self) -> int: """ :return: shared secret size in bytes :rtype: int @@ -1918,7 +1982,7 @@ def ss_size(self): return len[0] @property - def _pub_key_size(self): + def _pub_key_size(self) -> int: len = _ffi.new("word32 *") ret = _lib.wc_KyberKey_PublicKeySize(self.native_object, len) @@ -1927,7 +1991,7 @@ def _pub_key_size(self): return len[0] - def _encode_pub_key(self): + def _encode_pub_key(self) -> bytes: pub_key_size = self._pub_key_size pub_key = _ffi.new(f"unsigned char[{pub_key_size}]") ret = _lib.wc_KyberKey_EncodePublicKey( @@ -1941,21 +2005,21 @@ def _encode_pub_key(self): class MlKemPublic(_MlKemBase): @property - def key_size(self): + def key_size(self) -> int: """ :return: public key size in bytes :rtype: int """ return self._pub_key_size - def encode_key(self): + def encode_key(self) -> bytes: """ :return: exported public key :rtype: bytes """ return self._encode_pub_key() - def decode_key(self, pub_key): + def decode_key(self, pub_key: BytesOrStr) -> None: """ :param pub_key: public key to be imported :type pub_key: bytes or str @@ -1963,14 +2027,14 @@ def decode_key(self, pub_key): pub_key_bytestype = t2b(pub_key) ret = _lib.wc_KyberKey_DecodePublicKey( self.native_object, - _ffi.from_buffer(pub_key_bytestype), + pub_key_bytestype, len(pub_key_bytestype), ) if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_KyberKey_DecodePublicKey() error", ret) - def encapsulate(self, rng=None): + def encapsulate(self, rng: Random | None = None) -> tuple[bytes, bytes]: """ :param rng: random number generator for an encupsulation :type rng: Random @@ -1992,7 +2056,7 @@ def encapsulate(self, rng=None): return _ffi.buffer(ss, ss_size)[:], _ffi.buffer(ct, ct_size)[:] - def encapsulate_with_random(self, rand): + def encapsulate_with_random(self, rand: bytes) -> tuple[bytes, bytes]: """ :param rand: random number for an encapsulation :type rand: bytes @@ -2004,7 +2068,7 @@ def encapsulate_with_random(self, rand): ct = _ffi.new(f"unsigned char[{ct_size}]") ss = _ffi.new(f"unsigned char[{ss_size}]") ret = _lib.wc_KyberKey_EncapsulateWithRandom( - self.native_object, ct, ss, _ffi.from_buffer(rand), len(rand) + self.native_object, ct, ss, rand, len(rand) ) if ret < 0: # pragma: no cover @@ -2014,7 +2078,7 @@ def encapsulate_with_random(self, rand): class MlKemPrivate(_MlKemBase): @classmethod - def make_key(cls, mlkem_type, rng=None): + def make_key(cls, mlkem_type: MlKemType, rng: Random | None = None) -> MlKemPrivate: """ :param mlkem_type: ML-KEM type :type mlkem_type: MlKemType @@ -2037,7 +2101,7 @@ def make_key(cls, mlkem_type, rng=None): return mlkem_priv @classmethod - def make_key_with_random(cls, mlkem_type, rand): + def make_key_with_random(cls, mlkem_type: MlKemType, rand: bytes) -> MlKemPrivate: """ :param mlkem_type: ML-KEM type :type mlkem_type: MlKemType @@ -2047,9 +2111,7 @@ def make_key_with_random(cls, mlkem_type, rand): :rtype: MlKemPrivate """ mlkem_priv = cls(mlkem_type) - ret = _lib.wc_KyberKey_MakeKeyWithRandom( - mlkem_priv.native_object, _ffi.from_buffer(rand), len(rand) - ) + ret = _lib.wc_KyberKey_MakeKeyWithRandom(mlkem_priv.native_object, rand, len(rand)) if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_KyberKey_MakeKeyWithRandom() error", ret) @@ -2057,7 +2119,7 @@ def make_key_with_random(cls, mlkem_type, rand): return mlkem_priv @property - def pub_key_size(self): + def pub_key_size(self) -> int: """ :return: public key size in bytes :rtype: int @@ -2065,7 +2127,7 @@ def pub_key_size(self): return self._pub_key_size @property - def priv_key_size(self): + def priv_key_size(self) -> int: """ :return: private key size in bytes :rtype: int @@ -2078,14 +2140,14 @@ def priv_key_size(self): return len[0] - def encode_pub_key(self): + def encode_pub_key(self) -> bytes: """ :return: exported public key :rtype: bytes """ return self._encode_pub_key() - def encode_priv_key(self): + def encode_priv_key(self) -> bytes: """ :return: exported private key :rtype: bytes @@ -2101,7 +2163,7 @@ def encode_priv_key(self): return _ffi.buffer(priv_key, priv_key_size)[:] - def decode_key(self, priv_key: tuple[bytes, str]): + def decode_key(self, priv_key: BytesOrStr) -> None: """ :param priv_key: private key to be imported :type priv_key: bytes or str @@ -2109,14 +2171,14 @@ def decode_key(self, priv_key: tuple[bytes, str]): priv_key_bytestype = t2b(priv_key) ret = _lib.wc_KyberKey_DecodePrivateKey( self.native_object, - _ffi.from_buffer(priv_key_bytestype), + priv_key_bytestype, len(priv_key_bytestype), ) if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_KyberKey_DecodePrivateKey() error", ret) - def decapsulate(self, ct): + def decapsulate(self, ct: BytesOrStr) -> bytes: """ :param ct: cipher text :type ct: bytes or str @@ -2129,7 +2191,7 @@ def decapsulate(self, ct): ret = _lib.wc_KyberKey_Decapsulate( self.native_object, ss, - _ffi.from_buffer(ct_bytestype), + ct_bytestype, len(ct_bytestype), ) @@ -2164,7 +2226,7 @@ class _MlDsaBase: INVALID_DEVID = _lib.INVALID_DEVID ML_DSA_KEYGEN_SEED_LENGTH = _lib.DILITHIUM_SEED_SZ - def __init__(self, mldsa_type): + def __init__(self, mldsa_type: MlDsaType) -> None: self._init_done = False self.native_object = _ffi.new("dilithium_key *") ret = _lib.wc_dilithium_init_ex( @@ -2174,6 +2236,7 @@ def __init__(self, mldsa_type): if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_init_ex() error", ret) + self._rng = None self._init_done = True ret = _lib.wc_dilithium_set_level(self.native_object, mldsa_type) @@ -2181,12 +2244,12 @@ def __init__(self, mldsa_type): if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_set_level() error", ret) - def __del__(self): + def __del__(self) -> None: if self._init_done: _lib.wc_dilithium_free(self.native_object) @property - def _pub_key_size(self): + def _pub_key_size(self) -> int: size = _ffi.new("int *") ret = _lib.wc_MlDsaKey_GetPubLen(self.native_object, size) @@ -2196,7 +2259,7 @@ def _pub_key_size(self): return size[0] @property - def sig_size(self): + def sig_size(self) -> int: """ :return: signature size in bytes :rtype: int @@ -2209,10 +2272,10 @@ def sig_size(self): return size[0] - def _decode_pub_key(self, pub_key): + def _decode_pub_key(self, pub_key: BytesOrStr) -> None: pub_key_bytestype = t2b(pub_key) ret = _lib.wc_dilithium_import_public( - _ffi.from_buffer(pub_key_bytestype), + pub_key_bytestype, len(pub_key_bytestype), self.native_object, ) @@ -2220,7 +2283,7 @@ def _decode_pub_key(self, pub_key): if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_import_public() error", ret) - def _encode_pub_key(self): + def _encode_pub_key(self) -> bytes: in_size = self._pub_key_size pub_key = _ffi.new(f"byte[{in_size}]") out_size = _ffi.new("word32 *") @@ -2235,7 +2298,7 @@ def _encode_pub_key(self): return _ffi.buffer(pub_key, out_size[0])[:] - def verify(self, signature, message, ctx=None): + def verify(self, signature: BytesOrStr, message: BytesOrStr, ctx: BytesOrStr | None = None) -> bool: """ :param signature: signature to be verified :type signature: bytes or str @@ -2253,11 +2316,11 @@ def verify(self, signature, message, ctx=None): if ctx is not None: ctx_bytestype = t2b(ctx) ret = _lib.wc_dilithium_verify_ctx_msg( - _ffi.from_buffer(sig_bytestype), + sig_bytestype, len(sig_bytestype), - _ffi.from_buffer(ctx_bytestype), + ctx_bytestype, len(ctx_bytestype), - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), res, self.native_object, @@ -2266,9 +2329,9 @@ def verify(self, signature, message, ctx=None): raise WolfCryptApiError("wc_dilithium_verify_ctx_msg() error", ret) else: ret = _lib.wc_dilithium_verify_msg( - _ffi.from_buffer(sig_bytestype), + sig_bytestype, len(sig_bytestype), - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), res, self.native_object, @@ -2281,7 +2344,7 @@ def verify(self, signature, message, ctx=None): class MlDsaPrivate(_MlDsaBase): @classmethod - def make_key(cls, mldsa_type, rng=None): + def make_key(cls, mldsa_type: MlDsaType, rng: Random | None = None) -> MlDsaPrivate: """ :param mldsa_type: ML-DSA type :type mldsa_type: MlDsaType @@ -2306,29 +2369,22 @@ def make_key(cls, mldsa_type, rng=None): return mldsa_priv @classmethod - def make_key_from_seed(cls, mldsa_type, seed): + def make_key_from_seed(cls, mldsa_type: MlDsaType, seed: bytes | list[int] | tuple[int]) -> MlDsaPrivate: """ Deterministically generate the key from a seed. :param mldsa_type: ML-DSA type :type mldsa_type: MlDsaType :param seed: the (32 byte) seed from which to deterministically create the key - :type seed: bytes + :type seed: bytes or list/tuple of int """ mldsa_priv = cls(mldsa_type) - try: - seed_view = memoryview(seed) - except TypeError as exception: - raise TypeError( - "seed must support the buffer protocol, such as `bytes` or `bytearray`" - ) from exception - if len(seed_view) != cls.ML_DSA_KEYGEN_SEED_LENGTH: + if len(seed) != cls.ML_DSA_KEYGEN_SEED_LENGTH: raise ValueError( f"Seed for generating ML-DSA key must be {cls.ML_DSA_KEYGEN_SEED_LENGTH} bytes" ) - ret = _lib.wc_dilithium_make_key_from_seed(mldsa_priv.native_object, - _ffi.from_buffer(seed_view)) + ret = _lib.wc_dilithium_make_key_from_seed(mldsa_priv.native_object, seed) if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_make_key_from_seed() error", ret) @@ -2336,7 +2392,7 @@ def make_key_from_seed(cls, mldsa_type, seed): return mldsa_priv @property - def pub_key_size(self): + def pub_key_size(self) -> int: """ :return: public key size in bytes :rtype: int @@ -2344,7 +2400,7 @@ def pub_key_size(self): return self._pub_key_size @property - def priv_key_size(self): + def priv_key_size(self) -> int: """ :return: private key size in bytes :rtype: int @@ -2357,14 +2413,14 @@ def priv_key_size(self): return size[0] - self.pub_key_size - def encode_pub_key(self): + def encode_pub_key(self) -> bytes: """ :return: exported public key :rtype: bytes """ return self._encode_pub_key() - def encode_priv_key(self): + def encode_priv_key(self) -> bytes: """ :return: exported private key :rtype: bytes @@ -2385,7 +2441,7 @@ def encode_priv_key(self): return _ffi.buffer(priv_key, out_size[0])[:] - def decode_key(self, priv_key, pub_key=None): + def decode_key(self, priv_key: BytesOrStr, pub_key: BytesOrStr | None = None) -> None: """ :param priv_key: private key to be imported :type priv_key: bytes or str @@ -2394,7 +2450,7 @@ def decode_key(self, priv_key, pub_key=None): """ priv_key_bytestype = t2b(priv_key) ret = _lib.wc_dilithium_import_private( - _ffi.from_buffer(priv_key_bytestype), + priv_key_bytestype, len(priv_key_bytestype), self.native_object, ) @@ -2405,7 +2461,7 @@ def decode_key(self, priv_key, pub_key=None): if pub_key is not None: self._decode_pub_key(pub_key) - def sign(self, message, rng=None, ctx=None): + def sign(self, message: BytesOrStr, rng: Random | None = None, ctx: BytesOrStr | None = None) -> bytes: """ :param message: message to be signed :type message: bytes or str @@ -2429,9 +2485,9 @@ def sign(self, message, rng=None, ctx=None): if len(ctx_bytestype) > 255: raise ValueError(f"context length {len(ctx_bytestype)} too large: must be 255 bytes or less") ret = _lib.wc_dilithium_sign_ctx_msg( - _ffi.from_buffer(ctx_bytestype), + ctx_bytestype, len(ctx_bytestype), # length must be < 256 bytes - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), signature, out_size, @@ -2442,7 +2498,7 @@ def sign(self, message, rng=None, ctx=None): raise WolfCryptApiError("wc_dilithium_sign_ctx_msg() error", ret) else: ret = _lib.wc_dilithium_sign_msg( - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), signature, out_size, @@ -2457,12 +2513,12 @@ def sign(self, message, rng=None, ctx=None): return _ffi.buffer(signature, out_size[0])[:] - def sign_with_seed(self, message, seed, ctx=None): + def sign_with_seed(self, message: BytesOrStr, seed: bytes | list[int] | tuple[int], ctx: BytesOrStr | None = None) -> bytes: """ :param message: message to be signed :type message: bytes or str :param seed: 32-byte seed for deterministic signature generation. - :type seed: bytes + :type seed: bytes or list/tuple of int (value in the range 0-255) :param ctx: context (optional, maximum 255 bytes) :type ctx: None for no context, str or bytes otherwise :return: signature @@ -2474,16 +2530,12 @@ def sign_with_seed(self, message, seed, ctx=None): out_size = _ffi.new("word32 *") out_size[0] = in_size - try: - seed_view = memoryview(seed) - except TypeError as exception: - raise TypeError( - "seed must support the buffer protocol, such as `bytes` or `bytearray`" - ) from exception - if len(seed_view) != ML_DSA_SIGNATURE_SEED_LENGTH: + if not isinstance(seed, (list, tuple, bytes)): + raise TypeError("seed must be bytes or list/tuple") + + if len(seed) != ML_DSA_SIGNATURE_SEED_LENGTH: raise ValueError( - f"Seed for generating a signature must be {ML_DSA_SIGNATURE_SEED_LENGTH}" - "bytes." + f"Seed for generating a signature must be {ML_DSA_SIGNATURE_SEED_LENGTH} bytes." ) if ctx is not None: @@ -2493,25 +2545,25 @@ def sign_with_seed(self, message, seed, ctx=None): f"context length {len(ctx_bytestype)} too large: must be 255 or less" ) ret = _lib.wc_dilithium_sign_ctx_msg_with_seed( - _ffi.from_buffer(ctx_bytestype), + ctx_bytestype, len(ctx_bytestype), # length must be < 256 bytes - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), signature, out_size, self.native_object, - _ffi.from_buffer(seed_view), + seed, ) if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_sign_ctx_msg_with_seed() error", ret) else: ret = _lib.wc_dilithium_sign_msg_with_seed( - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), signature, out_size, self.native_object, - _ffi.from_buffer(seed_view), + seed, ) if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_sign_msg_with_seed() error", ret) @@ -2524,21 +2576,21 @@ def sign_with_seed(self, message, seed, ctx=None): class MlDsaPublic(_MlDsaBase): @property - def key_size(self): + def key_size(self) -> int: """ :return: public key size in bytes :rtype: int """ return self._pub_key_size - def decode_key(self, pub_key): + def decode_key(self, pub_key: BytesOrStr) -> None: """ :param pub_key: public key to be imported :type pub_key: bytes or str """ - return self._decode_pub_key(pub_key) + self._decode_pub_key(pub_key) - def encode_key(self): + def encode_key(self) -> bytes: """ :return: exported public key :rtype: bytes diff --git a/wolfcrypt/exceptions.py b/wolfcrypt/exceptions.py index 5b41bd7..c691a18 100644 --- a/wolfcrypt/exceptions.py +++ b/wolfcrypt/exceptions.py @@ -18,6 +18,9 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA +from __future__ import annotations + +from typing import cast from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib @@ -56,6 +59,6 @@ def error_string(err_code: int) -> str: :return: error string """ if _lib.ERROR_STRINGS_ENABLED: - return _ffi.string(_lib.wc_GetErrorString(err_code)).decode() + return cast(bytes, _ffi.string(_lib.wc_GetErrorString(err_code))).decode() else: return "" diff --git a/wolfcrypt/hashes.py b/wolfcrypt/hashes.py index eac0f22..1029c08 100644 --- a/wolfcrypt/hashes.py +++ b/wolfcrypt/hashes.py @@ -20,19 +20,24 @@ # pylint: disable=no-member,no-name-in-module, no-self-use +from __future__ import annotations + +from abc import ABC, abstractmethod + +from _cffi_backend import FFI +from typing_extensions import override from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib -from wolfcrypt.utils import t2b, b2h - from wolfcrypt.exceptions import WolfCryptApiError +from wolfcrypt.utils import t2b, b2h, BytesOrStr -class _Hash: +class _Hash(ABC): """ A **PEP 247: Cryptographic Hash Functions** compliant **Hash Function Interface**. """ - def __init__(self, string=None): + def __init__(self, string: BytesOrStr | None = None) -> None: self._native_object = _ffi.new(self._native_type) self._shallow_copy = False ret = self._init() @@ -42,17 +47,32 @@ def __init__(self, string=None): if string: self.update(string) + @abstractmethod + def _init(self) -> int: ... + + @abstractmethod + def _update(self, data: bytes) -> int: ... + + @abstractmethod + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: ... + + @property + @abstractmethod + def _native_size(self) -> int: ... + + @property + @abstractmethod + def _native_type(self) -> str: ... + + @property + @abstractmethod + def digest_size(self) -> int: ... + @classmethod - def new(cls, string=None): - """ - Creates a new hashing object and returns it. The optional - **string** parameter, if supplied, will be immediately - hashed into the object's starting state, as if - obj.update(string) was called. - """ - return cls(string) + @abstractmethod + def new(cls, string: BytesOrStr | None) -> _Hash: ... - def copy(self): + def copy(self) -> _Hash: """ Returns a separate copy of this hashing object. An update to this copy won't affect the original object. @@ -86,7 +106,7 @@ def copy(self): return copy - def update(self, string): + def update(self, string: BytesOrStr) -> None: """ Hashes **string** into the current state of the hashing object. update() can be called any number of times during @@ -98,7 +118,7 @@ def update(self, string): if ret < 0: # pragma: no cover raise WolfCryptApiError("Hash update error", ret) - def digest(self): + def digest(self) -> bytes: """ Returns the hash value of this hashing object as a string containing 8-bit data. The object is not altered in any @@ -137,7 +157,7 @@ def digest(self): return _ffi.buffer(result, self.digest_size)[:] - def hexdigest(self): + def hexdigest(self) -> bytes: """ Returns the hash value of this hashing object as a string containing hexadecimal digits. Lowercase letters are used @@ -147,8 +167,21 @@ def hexdigest(self): return b2h(self.digest()) +class _Sha(_Hash): + @override + @classmethod + def new(cls, string: BytesOrStr | None = None) -> _Hash: + """ + Creates a new hashing object and returns it. The optional + **string** parameter, if supplied, will be immediately + hashed into the object's starting state, as if + obj.update(string) was called. + """ + return cls(string) + + if _lib.SHA_ENABLED: - class Sha(_Hash): + class Sha(_Sha): """ **SHA-1** is a cryptographic hash function standardized by **NIST**. @@ -160,22 +193,25 @@ class Sha(_Hash): _delete = staticmethod(_lib.wc_ShaFree) _copy = staticmethod(_lib.wc_ShaCopy) - def __del__(self): + def __del__(self) -> None: if hasattr(self, '_native_object') and not getattr(self, '_shallow_copy', False): self._delete(self._native_object) - def _init(self): + @override + def _init(self) -> int: return _lib.wc_InitSha(self._native_object) - def _update(self, data): + @override + def _update(self, data: bytes) -> int: return _lib.wc_ShaUpdate(self._native_object, data, len(data)) - def _final(self, obj, ret): + @override + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: return _lib.wc_ShaFinal(obj, ret) if _lib.SHA256_ENABLED: - class Sha256(_Hash): + class Sha256(_Sha): """ **SHA-256** is a cryptographic hash function from the **SHA-2 family** and is standardized by **NIST**. @@ -188,22 +224,25 @@ class Sha256(_Hash): _delete = staticmethod(_lib.wc_Sha256Free) _copy = staticmethod(_lib.wc_Sha256Copy) - def __del__(self): + def __del__(self) -> None: if hasattr(self, '_native_object') and not getattr(self, '_shallow_copy', False): self._delete(self._native_object) - def _init(self): + @override + def _init(self) -> int: return _lib.wc_InitSha256(self._native_object) - def _update(self, data): + @override + def _update(self, data: bytes) -> int: return _lib.wc_Sha256Update(self._native_object, data, len(data)) - def _final(self, obj, ret): + @override + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: return _lib.wc_Sha256Final(obj, ret) if _lib.SHA384_ENABLED: - class Sha384(_Hash): + class Sha384(_Sha): """ **SHA-384** is a cryptographic hash function from the **SHA-2 family** and is standardized by **NIST**. @@ -216,22 +255,25 @@ class Sha384(_Hash): _delete = staticmethod(_lib.wc_Sha384Free) _copy = staticmethod(_lib.wc_Sha384Copy) - def __del__(self): + def __del__(self) -> None: if hasattr(self, '_native_object') and not getattr(self, '_shallow_copy', False): self._delete(self._native_object) - def _init(self): + @override + def _init(self) -> int: return _lib.wc_InitSha384(self._native_object) - def _update(self, data): + @override + def _update(self, data: bytes) -> int: return _lib.wc_Sha384Update(self._native_object, data, len(data)) - def _final(self, obj, ret): + @override + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: return _lib.wc_Sha384Final(obj, ret) if _lib.SHA512_ENABLED: - class Sha512(_Hash): + class Sha512(_Sha): """ **SHA-512** is a cryptographic hash function from the **SHA-2 family** and is standardized by **NIST**. @@ -244,21 +286,24 @@ class Sha512(_Hash): _delete = staticmethod(_lib.wc_Sha512Free) _copy = staticmethod(_lib.wc_Sha512Copy) - def __del__(self): + def __del__(self) -> None: if hasattr(self, '_native_object') and not getattr(self, '_shallow_copy', False): self._delete(self._native_object) - def _init(self): + @override + def _init(self) -> int: return _lib.wc_InitSha512(self._native_object) - def _update(self, data): + @override + def _update(self, data: bytes) -> int: return _lib.wc_Sha512Update(self._native_object, data, len(data)) - def _final(self, obj, ret): + @override + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: return _lib.wc_Sha512Final(obj, ret) if _lib.SHA3_ENABLED: - class Sha3(_Hash): + class Sha3(_Sha): """ **SHA3 ** is a cryptographic hash function family standardized by **NIST**. @@ -267,6 +312,7 @@ class Sha3(_Hash): Using SHA3-384 by default, unless a different digest size is passed through __init__. """ + digest_size = None _native_type = "wc_Sha3 *" _native_size = _ffi.sizeof("wc_Sha3") SHA3_224_DIGEST_SIZE = 28 @@ -288,16 +334,19 @@ class Sha3(_Hash): 64: _lib.wc_Sha3_512_Copy, } - def __del__(self): + def __del__(self) -> None: # Unlike the SHA-1/2 classes, Sha3's _delete is set per-instance # from a size->function dict and is None for invalid sizes, so # we need the extra truthiness check. - if (hasattr(self, '_native_object') - and not getattr(self, '_shallow_copy', False) - and getattr(self, '_delete', None)): + if ( + hasattr(self, '_native_object') + and not getattr(self, '_shallow_copy', False) + and getattr(self, '_delete', None) + and self._delete is not None + ): self._delete(self._native_object) - def __init__(self, string=None, size=SHA3_384_DIGEST_SIZE): # pylint: disable=W0231 + def __init__(self, string: BytesOrStr | None = None, size: int = SHA3_384_DIGEST_SIZE) -> None: # pylint: disable=W0231 self._native_object = _ffi.new(self._native_type) self._shallow_copy = False self.digest_size = size @@ -309,11 +358,13 @@ def __init__(self, string=None, size=SHA3_384_DIGEST_SIZE): # pylint: disable=W if string: self.update(string) + @override @classmethod - def new(cls, string=None, size=SHA3_384_DIGEST_SIZE): + def new(cls, string: BytesOrStr | None = None, size: int = SHA3_384_DIGEST_SIZE) -> Sha3: return cls(string, size) - def copy(self): + @override + def copy(self) -> Sha3: # Bypass __init__ to avoid calling _init() on a state that _copy # immediately overwrites (which would leak internal resources in # async/HW-accelerated builds). Mark as shallow up front so @@ -337,12 +388,8 @@ def copy(self): # Keep _shallow_copy = True: memmove shares state with self. return c - def _init(self): - if (self.digest_size != Sha3.SHA3_224_DIGEST_SIZE and - self.digest_size != Sha3.SHA3_256_DIGEST_SIZE and - self.digest_size != Sha3.SHA3_384_DIGEST_SIZE and - self.digest_size != Sha3.SHA3_512_DIGEST_SIZE): - return -1 + @override + def _init(self) -> int: if self.digest_size == Sha3.SHA3_224_DIGEST_SIZE: return _lib.wc_InitSha3_224(self._native_object, _ffi.NULL, 0) if self.digest_size == Sha3.SHA3_256_DIGEST_SIZE: @@ -351,7 +398,10 @@ def _init(self): return _lib.wc_InitSha3_384(self._native_object, _ffi.NULL, 0) if self.digest_size == Sha3.SHA3_512_DIGEST_SIZE: return _lib.wc_InitSha3_512(self._native_object, _ffi.NULL, 0) - def _update(self, data): + return -1 + + @override + def _update(self, data: bytes) -> int: if self.digest_size == Sha3.SHA3_224_DIGEST_SIZE: return _lib.wc_Sha3_224_Update(self._native_object, data, len(data)) if self.digest_size == Sha3.SHA3_256_DIGEST_SIZE: @@ -360,7 +410,10 @@ def _update(self, data): return _lib.wc_Sha3_384_Update(self._native_object, data, len(data)) if self.digest_size == Sha3.SHA3_512_DIGEST_SIZE: return _lib.wc_Sha3_512_Update(self._native_object, data, len(data)) - def _final(self, obj, ret): + return -1 + + @override + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: if self.digest_size == Sha3.SHA3_224_DIGEST_SIZE: return _lib.wc_Sha3_224_Final(obj, ret) if self.digest_size == Sha3.SHA3_256_DIGEST_SIZE: @@ -369,6 +422,7 @@ def _final(self, obj, ret): return _lib.wc_Sha3_384_Final(obj, ret) if self.digest_size == Sha3.SHA3_512_DIGEST_SIZE: return _lib.wc_Sha3_512_Final(obj, ret) + return -1 # Hmac types @@ -405,24 +459,29 @@ class _Hmac(_Hash): _native_size = _ffi.sizeof("Hmac") _delete = staticmethod(_lib.wc_HmacFree) - def __del__(self): + def __del__(self) -> None: if hasattr(self, '_native_object') and not getattr(self, '_shallow_copy', False): self._delete(self._native_object) - def __init__(self, key, string=None): # pylint: disable=W0231 + def __init__(self, key: BytesOrStr, string: BytesOrStr | None = None) -> None: # pylint: disable=W0231 key = t2b(key) self._native_object = _ffi.new(self._native_type) self._shallow_copy = False - ret = self._init(self._type, key) + ret = self._hmac_init(self._type, key) if ret < 0: # pragma: no cover raise WolfCryptApiError("Hmac init error", ret) if string: self.update(string) + @override + def _init(self) -> int: + return -1 + + @override @classmethod - def new(cls, key, string=None): # pylint: disable=W0221 + def new(cls, key: BytesOrStr, string: BytesOrStr | None = None) -> _Hash: # pylint: disable=W0221 # ty: ignore[invalid-method-override] """ Creates a new hashing object and returns it. **key** is a required parameter containing a string giving the key @@ -432,7 +491,12 @@ def new(cls, key, string=None): # pylint: disable=W0221 """ return cls(key, string) - def _init(self, hmac, key): + + @property + @abstractmethod + def _type(self) -> int: ... + + def _hmac_init(self, hmac: int, key: bytes) -> int: ret = _lib.wc_HmacInit(self._native_object, _ffi.NULL, -2) if ret < 0: raise WolfCryptApiError("wc_HmacInit error", ret) @@ -446,10 +510,12 @@ def _init(self, hmac, key): raise WolfCryptApiError("wc_HmacSetKey error", ret) return ret - def _update(self, data): + @override + def _update(self, data: bytes) -> int: return _lib.wc_HmacUpdate(self._native_object, data, len(data)) - def _final(self, obj, ret): + @override + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: return _lib.wc_HmacFinal(obj, ret) @@ -462,7 +528,7 @@ class HmacSha(_Hmac): It produces a [ **512-bit | 64 bytes** ] message digest. """ _type = _TYPE_SHA - digest_size = Sha.digest_size + digest_size = Sha.digest_size # ty: ignore[possibly-unresolved-reference] if _lib.SHA256_ENABLED: @@ -474,7 +540,7 @@ class HmacSha256(_Hmac): It produces a [ **512-bit | 64 bytes** ] message digest. """ _type = _TYPE_SHA256 - digest_size = Sha256.digest_size + digest_size = Sha256.digest_size # ty: ignore[possibly-unresolved-reference] if _lib.SHA384_ENABLED: @@ -486,7 +552,7 @@ class HmacSha384(_Hmac): It produces a [ **512-bit | 64 bytes** ] message digest. """ _type = _TYPE_SHA384 - digest_size = Sha384.digest_size + digest_size = Sha384.digest_size # ty: ignore[possibly-unresolved-reference] if _lib.SHA512_ENABLED: @@ -498,9 +564,9 @@ class HmacSha512(_Hmac): It produces a [ **512-bit | 64 bytes** ] message digest. """ _type = _TYPE_SHA512 - digest_size = Sha512.digest_size + digest_size = Sha512.digest_size # ty: ignore[possibly-unresolved-reference] -def hash_type_to_cls(hash_type): +def hash_type_to_cls(hash_type: int) -> type[_Hash] | None: if _lib.SHA_ENABLED and hash_type == _lib.WC_HASH_TYPE_SHA: hash_cls = Sha elif _lib.SHA256_ENABLED and hash_type == _lib.WC_HASH_TYPE_SHA256: diff --git a/wolfcrypt/hkdf.py b/wolfcrypt/hkdf.py index c06ab87..d11b619 100644 --- a/wolfcrypt/hkdf.py +++ b/wolfcrypt/hkdf.py @@ -28,8 +28,9 @@ if _lib.HKDF_ENABLED: + from wolfcrypt.hashes import _Hmac # ty: ignore[possibly-missing-import] - def HKDF(hash_cls, in_key, salt=None, info=None, out_len=None): + def HKDF(hash_cls: type[_Hmac], in_key: bytes | str, salt: bytes | str | None = None, info: bytes | str | None = None, out_len: int | None = None) -> bytes: """ Perform HKDF Extract-and-Expand in one call (wraps wc_HKDF). @@ -55,6 +56,7 @@ def HKDF(hash_cls, in_key, salt=None, info=None, out_len=None): if out_len is None: out_len = hash_cls.digest_size + assert out_len is not None out = _ffi.new(f"byte[{out_len}]") ret = _lib.wc_HKDF( @@ -73,7 +75,7 @@ def HKDF(hash_cls, in_key, salt=None, info=None, out_len=None): return _ffi.buffer(out, out_len)[:] - def HKDF_Extract(hash_cls, salt, in_key): + def HKDF_Extract(hash_cls: type[_Hmac], salt: bytes | str | None, in_key: bytes | str) -> bytes: """ HKDF-Extract: PRK = HMAC-Hash(salt, IKM) Wraps wc_HKDF_Extract. @@ -100,7 +102,7 @@ def HKDF_Extract(hash_cls, salt, in_key): return _ffi.buffer(out, out_len)[:] - def HKDF_Expand(hash_cls, prk, info, out_len): + def HKDF_Expand(hash_cls: type[_Hmac], prk: bytes | str, info: bytes | str | None, out_len: int) -> bytes: """ HKDF-Expand: OKM = HKDF-Expand(PRK, info, L) Wraps wc_HKDF_Expand. diff --git a/wolfcrypt/pwdbased.py b/wolfcrypt/pwdbased.py index 56c83b0..ecdbd41 100644 --- a/wolfcrypt/pwdbased.py +++ b/wolfcrypt/pwdbased.py @@ -20,13 +20,15 @@ # pylint: disable=no-member,no-name-in-module +from __future__ import annotations + from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib from wolfcrypt.exceptions import WolfCryptApiError if _lib.PWDBASED_ENABLED: - def PBKDF2(password, salt, iterations, key_length, hash_type): + def PBKDF2(password: bytes | str, salt: bytes | str, iterations: int, key_length: int, hash_type: int) -> bytes: if isinstance(salt, str): salt = str.encode(salt) diff --git a/wolfcrypt/py.typed b/wolfcrypt/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/wolfcrypt/random.py b/wolfcrypt/random.py index 1c4da84..f97e66b 100644 --- a/wolfcrypt/random.py +++ b/wolfcrypt/random.py @@ -25,7 +25,7 @@ from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib -from wolfcrypt.exceptions import WolfCryptApiError +from wolfcrypt.exceptions import WolfCryptApiError, WolfCryptError class Random: @@ -34,31 +34,37 @@ class Random: """ def __init__(self, nonce: __builtins__.bytes = b"", device_id: int = -2) -> None: - self.native_object: _lib.RNG | None = _ffi.new("WC_RNG *") + self._native_object: _lib.RNG | None = None + self._native_object = _ffi.new("WC_RNG *") - ret = _lib.wc_InitRngNonce_ex(self.native_object, nonce, len(nonce), _ffi.NULL, device_id) + ret = _lib.wc_InitRngNonce_ex(self._native_object, nonce, len(nonce), _ffi.NULL, device_id) if ret < 0: # pragma: no cover - self.native_object = None + self._native_object = None raise WolfCryptApiError("RNG init error", ret) # making sure _lib.wc_FreeRng outlives WC_RNG instances _delete = staticmethod(_lib.wc_FreeRng) def __del__(self) -> None: - if self.native_object: + if self._native_object is not None: try: - self._delete(self.native_object) + self._delete(self._native_object) except AttributeError: # Can occur during interpreter shutdown pass + @property + def native_object(self) -> _lib.RNG: + if self._native_object is None: + raise WolfCryptError("RNG not initialized") + return self._native_object + def byte(self) -> __builtins__.bytes: """ Generate and return a random byte. """ result = _ffi.new("byte[1]") - assert self.native_object is not None ret = _lib.wc_RNG_GenerateByte(self.native_object, result) if ret < 0: # pragma: no cover raise WolfCryptApiError("RNG generate byte error", ret) @@ -71,7 +77,6 @@ def bytes(self, length: int) -> __builtins__.bytes: """ result = _ffi.new(f"byte[{length}]") - assert self.native_object is not None ret = _lib.wc_RNG_GenerateBlock(self.native_object, result, length) if ret < 0: # pragma: no cover raise WolfCryptApiError("RNG generate block error", ret) diff --git a/wolfcrypt/types.py b/wolfcrypt/types.py new file mode 100644 index 0000000..116147c --- /dev/null +++ b/wolfcrypt/types.py @@ -0,0 +1,35 @@ +# types.py +# +# Copyright (C) 2026 wolfSSL Inc. +# +# This file is part of wolfSSL. (formerly known as CyaSSL) +# +# wolfSSL is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# wolfSSL is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + +from abc import abstractmethod +from typing import Protocol + +from .utils import BytesOrStr + + +class SupportsRsaSign(Protocol): + @abstractmethod + def sign(self, plaintext: BytesOrStr) -> bytes: + raise NotImplementedError + +class SupportsRsaVerify(Protocol): + @abstractmethod + def verify(self, signature: BytesOrStr) -> bytes: + raise NotImplementedError diff --git a/wolfcrypt/utils.py b/wolfcrypt/utils.py index 5de6363..6793488 100644 --- a/wolfcrypt/utils.py +++ b/wolfcrypt/utils.py @@ -22,10 +22,13 @@ from __future__ import annotations +from typing import TypeAlias + from binascii import hexlify as b2h, unhexlify as h2b # noqa: F401 +BytesOrStr: TypeAlias = bytes | bytearray | memoryview | str -def t2b(string: bytes | bytearray | memoryview | str) -> bytes: +def t2b(string: BytesOrStr) -> bytes: """ Converts text to bytes.