diff --git a/AUTHORS.rst b/AUTHORS.rst index a3ae67c..6523cab 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -6,4 +6,5 @@ The following people have contributed to Python-ASN1. Collectively they own the * Geert Jansen * Sebastien Andrivet +* Filippo Santovito diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 36cef5d..8b761d3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,14 @@ Changelog ========= +3.1.1 (2026-31-01) +------------------ + +* Support for using Encoder via context manager +* Support for encoding sequences via context manager +* Support for encoding sets via context manager +* Support for using Decoder via context manager + 3.1.0 (2025-05-16) ------------------ diff --git a/Dockerfile b/Dockerfile index 5e61a59..ab8baf1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM ubuntu:20.04 +FROM ubuntu:22.04 LABEL authors="Sebastien Andrivet" ARG DEBIAN_FRONTEND=noninteractive @@ -12,10 +12,10 @@ RUN apt-get update && \ python2.7 \ python3.4 \ python3.5 \ - python3.6 python3.6-distutils \ + # python3.6 python3.6-distutils \ python3.7 python3.7-distutils \ python3.8 python3.8-distutils \ - python3.9 \ + # python3.9 \ python3.10 \ python3.11 \ python3.12 \ diff --git a/docs/credits.rst b/docs/credits.rst index 2f8fcaf..bb9ae37 100644 --- a/docs/credits.rst +++ b/docs/credits.rst @@ -7,5 +7,5 @@ The following projects have provided inspiration for Python-ASN1: * `Samba`_ In particular **libads** for the stack based approach for building constructed types. -.. _pyasn1: https://pyasn1.sourceforge.net/ +.. _pyasn1: https://web.archive.org/web/20191231214902/http://snmplabs.com/pyasn1/index.html .. _Samba: https://github.com/samba-team/samba/tree/master/source3/libads diff --git a/docs/usage.rst b/docs/usage.rst index fb6c716..232ee90 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -93,6 +93,16 @@ If you want to encode data and retrieve its DER-encoded representation, use code encoder.write('1.2.3', asn1.Numbers.ObjectIdentifier) encoded_bytes = encoder.output() +or using the new context manager api: + +.. code-block:: python + + import asn1 + + with asn1.Encoder() as encoder: + encoder.write('1.2.3', asn1.Numbers.ObjectIdentifier) + encoded_bytes = encoder.output() + It is also possible to encode data directly to a file or any stream: .. code-block:: python @@ -104,6 +114,16 @@ It is also possible to encode data directly to a file or any stream: encoder.start(f) encoder.write('1.2.3', asn1.Numbers.ObjectIdentifier) +or using the new context manager api: + +.. code-block:: python + + import asn1 + + with open('output.der', 'wb') as f: + with asn1.Encoder(stream=f) as encoder: + encoder.write('1.2.3', asn1.Numbers.ObjectIdentifier) + You can encode complex data structures such as sequences and sets: .. code-block:: python @@ -139,6 +159,22 @@ If you want to precisely specify the ASN.1 type, you have to use the `Encoder.en encoder.leave() encoder.leave() +or using the new context manager api: + +.. code-block:: python + + import asn1 + + with open('output.der', 'wb') as f: + with asn1.Encoder(stream=f) as encoder: + with encoder.sequence(): + encoder.write('test1', asn1.Numbers.PrintableString) + encoder.write('test2', asn1.Numbers.PrintableString) + with encoder.sequence(): + encoder.write(1, asn1.Numbers.Integer) + encoder.write(0.125, asn1.Numbers.Real) + encoder.write(b'\x01\x02\x03', asn1.Numbers.OctetString) + This also allows to encode data progressively, without having to keep everything in memory. DER and CER @@ -196,6 +232,15 @@ If you want to decode ASN.1 from BER (DER, CER, ...) encoded bytes, use code suc decoder.start(encoded_bytes) tag, value = decoder.read() +or using the new context manager api: + +.. code-block:: python + + import asn1 + + with asn1.Decoder(stream=encoded_bytes) as decoder: + tag, value = decoder.read() + It is also possible to decode data directly from a file or any stream: .. code-block:: python diff --git a/examples/examples-cm-decoder.py b/examples/examples-cm-decoder.py new file mode 100644 index 0000000..3a25707 --- /dev/null +++ b/examples/examples-cm-decoder.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Python-ASN1. Python-ASN1 is free software that is +# made available under the MIT license. Consult the file "LICENSE" that +# is distributed together with this file for the exact licensing terms. +# +# Python-ASN1 is copyright (c) 2007-2016 by the Python-ASN1 authors. See +# the file "AUTHORS" for a complete overview. + +from __future__ import absolute_import, division, print_function, unicode_literals + +from builtins import open +import asn1 +import pprint + + +def example1(): + """Decoding from a file.""" + print("Example 1") + with open("example3.der", "rb") as f: + with asn1.Decoder(stream=f) as decoder: + tag, value = decoder.read() + print(tag) + print(value) + print() + + +def example2(): + """Decoding of a bit string with unused bits.""" + print("Example 2") + encoded = b"\x23\x0c\x03\x02\x00\x0b\x03\x02\x00\x0b\x03\x02\x04\x0f" + with asn1.Decoder(stream=encoded) as decoder: + tag, (val, unused) = decoder.read(asn1.ReadFlags.WithUnused) + print("Tag:", tag) + print("Value:", val) + print("Unused bits:", unused) + print() + + +def example3(): + """Decoding of sequences.""" + print("Example 3") + encoded = b"\x30\x80\x13\x05\x74\x65\x73\x74\x31\x13\x05\x74\x65\x73\x74\x32\x30\x80\x02\x01\x01\x09\x03\x80\xfd\x01\x04\x03\x01\x02\x03\x00\x00\x00\x00" + with asn1.Decoder(stream=encoded) as decoder: + tag, value = decoder.read() + print(tag) + pprint.pprint(value) + print() + + +def example4(): + """Decoding of a complex data.""" + print("Example 4") + encoded = ( + b"\x30\x82\x04\x0e\x30\x82\x03\x77\xa0\x03\x02\x01\x02\x02\x02\x15" + b"\x30\x30\x0d\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x01\x05\x05\x00" + b"\x30\x81\xbb\x31\x0b\x30\x09\x06\x03\x55\x04\x06\x13\x02\x2d\x2d" + b"\x31\x12\x30\x10\x06\x03\x55\x04\x08\x13\x09\x53\x6f\x6d\x65\x53" + b"\x74\x61\x74\x65\x31\x11\x30\x0f\x06\x03\x55\x04\x07\x13\x08\x53" + b"\x6f\x6d\x65\x43\x69\x74\x79\x31\x19\x30\x17\x06\x03\x55\x04\x0a" + b"\x13\x10\x53\x6f\x6d\x65\x4f\x72\x67\x61\x6e\x69\x7a\x61\x74\x69" + b"\x6f\x6e\x31\x1f\x30\x1d\x06\x03\x55\x04\x0b\x13\x16\x53\x6f\x6d" + b"\x65\x4f\x72\x67\x61\x6e\x69\x7a\x61\x74\x69\x6f\x6e\x61\x6c\x55" + b"\x6e\x69\x74\x31\x1e\x30\x1c\x06\x03\x55\x04\x03\x13\x15\x6c\x6f" + b"\x63\x61\x6c\x68\x6f\x73\x74\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d" + b"\x61\x69\x6e\x31\x29\x30\x27\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01" + b"\x09\x01\x16\x1a\x72\x6f\x6f\x74\x40\x6c\x6f\x63\x61\x6c\x68\x6f" + b"\x73\x74\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d\x61\x69\x6e\x30\x1e" + b"\x17\x0d\x30\x38\x30\x32\x30\x35\x30\x39\x32\x33\x33\x31\x5a\x17" + b"\x0d\x30\x39\x30\x32\x30\x34\x30\x39\x32\x33\x33\x31\x5a\x30\x81" + b"\xbb\x31\x0b\x30\x09\x06\x03\x55\x04\x06\x13\x02\x2d\x2d\x31\x12" + b"\x30\x10\x06\x03\x55\x04\x08\x13\x09\x53\x6f\x6d\x65\x53\x74\x61" + b"\x74\x65\x31\x11\x30\x0f\x06\x03\x55\x04\x07\x13\x08\x53\x6f\x6d" + b"\x65\x43\x69\x74\x79\x31\x19\x30\x17\x06\x03\x55\x04\x0a\x13\x10" + b"\x53\x6f\x6d\x65\x4f\x72\x67\x61\x6e\x69\x7a\x61\x74\x69\x6f\x6e" + b"\x31\x1f\x30\x1d\x06\x03\x55\x04\x0b\x13\x16\x53\x6f\x6d\x65\x4f" + b"\x72\x67\x61\x6e\x69\x7a\x61\x74\x69\x6f\x6e\x61\x6c\x55\x6e\x69" + b"\x74\x31\x1e\x30\x1c\x06\x03\x55\x04\x03\x13\x15\x6c\x6f\x63\x61" + b"\x6c\x68\x6f\x73\x74\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d\x61\x69" + b"\x6e\x31\x29\x30\x27\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x09\x01" + b"\x16\x1a\x72\x6f\x6f\x74\x40\x6c\x6f\x63\x61\x6c\x68\x6f\x73\x74" + b"\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d\x61\x69\x6e\x30\x81\x9f\x30" + b"\x0d\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01\x05\x00\x03\x81" + b"\x8d\x00\x30\x81\x89\x02\x81\x81\x00\xd5\x18\xcd\x40\x91\x90\x27" + b"\x5a\x77\x37\x22\xca\xba\x05\xdf\x13\x31\xe8\x74\x43\x4f\x7e\x08" + b"\xa3\xa5\x76\xcd\x7b\xdd\x37\xd0\x7f\x12\x9e\x81\x73\x87\x55\x66" + b"\x0d\xda\x68\xee\x38\xeb\x34\xe2\xf4\xeb\x95\xd5\xe0\xde\xef\x08" + b"\x57\xf9\x03\x14\x69\xa8\x6f\x7c\xa4\xfa\x64\x51\x39\x36\xd5\x09" + b"\x37\x61\x83\x13\x8c\x41\x25\xba\x60\x91\x20\x86\x5b\x60\xb5\xe2" + b"\x83\x65\x66\xad\x06\xb3\x45\x71\x83\x67\xd2\xe5\x5f\x40\x42\x4b" + b"\x37\xf8\x87\xd0\x09\x49\xb8\xad\x34\x76\xa3\x1b\xbf\xc1\x0f\xb7" + b"\xfb\x43\xbe\x62\x33\x02\x02\x10\x61\x02\x03\x01\x00\x01\xa3\x82" + b"\x01\x1d\x30\x82\x01\x19\x30\x1d\x06\x03\x55\x1d\x0e\x04\x16\x04" + b"\x14\x0a\x4b\xfa\x87\x54\x17\x7e\x30\xb4\x21\x71\x56\x51\x0f\xd2" + b"\x91\xc3\x30\x02\x36\x30\x81\xe9\x06\x03\x55\x1d\x23\x04\x81\xe1" + b"\x30\x81\xde\x80\x14\x0a\x4b\xfa\x87\x54\x17\x7e\x30\xb4\x21\x71" + b"\x56\x51\x0f\xd2\x91\xc3\x30\x02\x36\xa1\x81\xc1\xa4\x81\xbe\x30" + b"\x81\xbb\x31\x0b\x30\x09\x06\x03\x55\x04\x06\x13\x02\x2d\x2d\x31" + b"\x12\x30\x10\x06\x03\x55\x04\x08\x13\x09\x53\x6f\x6d\x65\x53\x74" + b"\x61\x74\x65\x31\x11\x30\x0f\x06\x03\x55\x04\x07\x13\x08\x53\x6f" + b"\x6d\x65\x43\x69\x74\x79\x31\x19\x30\x17\x06\x03\x55\x04\x0a\x13" + b"\x10\x53\x6f\x6d\x65\x4f\x72\x67\x61\x6e\x69\x7a\x61\x74\x69\x6f" + b"\x6e\x31\x1f\x30\x1d\x06\x03\x55\x04\x0b\x13\x16\x53\x6f\x6d\x65" + b"\x4f\x72\x67\x61\x6e\x69\x7a\x61\x74\x69\x6f\x6e\x61\x6c\x55\x6e" + b"\x69\x74\x31\x1e\x30\x1c\x06\x03\x55\x04\x03\x13\x15\x6c\x6f\x63" + b"\x61\x6c\x68\x6f\x73\x74\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d\x61" + b"\x69\x6e\x31\x29\x30\x27\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x09" + b"\x01\x16\x1a\x72\x6f\x6f\x74\x40\x6c\x6f\x63\x61\x6c\x68\x6f\x73" + b"\x74\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d\x61\x69\x6e\x82\x02\x15" + b"\x30\x30\x0c\x06\x03\x55\x1d\x13\x04\x05\x30\x03\x01\x01\xff\x30" + b"\x0d\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x01\x05\x05\x00\x03\x81" + b"\x81\x00\x4e\x12\x46\x58\xa3\x57\xc5\x9a\xab\xfa\x32\xf5\xde\x87" + b"\xfb\x77\xa8\x79\x38\x1d\x4f\xd3\x7c\x3a\x16\x60\x82\x7d\x92\xa1" + b"\x58\xd2\x53\x7b\x11\x90\xec\x6d\xb0\xb0\x58\xee\x33\xb4\x7b\x1d" + b"\xb8\x95\xd8\x98\xc3\x10\x81\x83\x08\x46\xe8\x9a\xb9\x6c\xbf\x8f" + b"\x9e\x73\xf7\x61\x89\xc4\x6a\x1b\xc1\x98\xc6\xab\xfc\x91\xb6\x59" + b"\xb8\xa5\x05\x91\x2a\xbb\xc4\x30\x16\x53\xbf\x1a\xfe\x2f\x01\x25" + b"\xae\xef\xc7\xb9\xfa\xa5\x53\xf8\xd9\xf5\x8f\xae\x91\xea\x57\x28" + b"\xfa\xdf\x34\x03\x29\xe8\x97\xee\x2e\x9e\x8a\x62\x45\xc7\xfc\x58" + b"\xb4\x5a" + ) + + with asn1.Decoder(stream=encoded) as decoder: + tag, value = decoder.read() + print(tag) + pprint.pprint(value) + print() + + +example1() +example2() +example3() +example4() diff --git a/examples/examples-cm-encoder.py b/examples/examples-cm-encoder.py new file mode 100644 index 0000000..691dd58 --- /dev/null +++ b/examples/examples-cm-encoder.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Python-ASN1. Python-ASN1 is free software that is +# made available under the MIT license. Consult the file "LICENSE" that +# is distributed together with this file for the exact licensing terms. +# +# Python-ASN1 is copyright (c) 2007-2016 by the Python-ASN1 authors. See +# the file "AUTHORS" for a complete overview. + +from __future__ import absolute_import, division, print_function, unicode_literals + +from builtins import open, str +import asn1 +import binascii +import pprint + + +def example1(): + """Encoding an object identifier.""" + print("Example 1") + with asn1.Encoder() as encoder: + encoder.write("1.2.3", asn1.Numbers.ObjectIdentifier) + print(str(binascii.hexlify(encoder.output(), " ", 1).upper(), encoding="ascii")) + print() + + +def example2(): + """Encoding an object identifier directly to a file.""" + print("Example 2") + with open("example2.der", "wb") as f: + with asn1.Encoder( + stream=f, encoding=asn1.Encoding.DER + ) as encoder: # CER is the default when using a stream + encoder.write("1.2.3", asn1.Numbers.ObjectIdentifier) + + +def example3(): + """Encoding of complex data.""" + print("Example 3") + with open("example3.der", "wb") as f: + with asn1.Encoder( + stream=f, encoding=asn1.Encoding.DER + ) as encoder: # CER is the default when using a stream + encoder.write(["test1", "test2", [1, 0.125, b"\x01\x02\x03"]]) + print() + + +def example4(): + """Encoding of sequences.""" + print("Example 4") + with asn1.Encoder() as encoder: + with encoder.sequence(): + encoder.write("test1", asn1.Numbers.PrintableString) + encoder.write("test2", asn1.Numbers.PrintableString) + with encoder.sequence(): + encoder.write(1, asn1.Numbers.Integer) + encoder.write(0.125, asn1.Numbers.Real) + encoder.write(b"\x01\x02\x03", asn1.Numbers.OctetString) + print(str(binascii.hexlify(encoder.output(), " ", 1).upper(), encoding="ascii")) + print() + + +def example5(): + """Using CER encoding with a stream (file).""" + print("Example 5") + with open("example6.cer", "wb") as f: + with asn1.Encoder(stream=f) as encoder: + encoder.write("1.2.3", asn1.Numbers.ObjectIdentifier) + + +def example6(): + """Using DER encoding with a stream (file).""" + print("Example 6") + with open("example7.der", "wb") as f: + with asn1.Encoder(stream=f, encoding=asn1.Encoding.DER) as encoder: + encoder.write("1.2.3", asn1.Numbers.ObjectIdentifier) + + +example1() +example2() +example3() +example4() +example5() +example6() diff --git a/setup.py b/setup.py index a2475f8..6079425 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ def read(*names, **kwargs): setup( name='asn1', - version='3.1.0', + version='3.1.1', license='BSD', description='Python-ASN1 is a simple ASN.1 encoder and decoder for Python 2.7+ and 3.5+.', long_description='%s\n%s' % ( diff --git a/src/asn1/core.py b/src/asn1/core.py index 980d9d4..207ba36 100644 --- a/src/asn1/core.py +++ b/src/asn1/core.py @@ -24,11 +24,16 @@ from builtins import int from builtins import range from builtins import str + +try: + from contextlib import _GeneratorContextManager # noqa F401 +except ImportError: + _GeneratorContextManager = None from contextlib import contextmanager from enum import IntEnum from functools import reduce -__version__ = "3.1.0" +__version__ = "3.1.1" from typing import Any # noqa: F401 from typing import Generator # noqa: F401 @@ -210,10 +215,10 @@ class Error(Exception): class Encoder(object): """ASN.1 encoder. Uses DER encoding.""" - def __init__(self): # type: () -> None + def __init__(self, stream=None, encoding=None): # type: (EncoderStream, Union[Encoding, None]) -> None """Constructor.""" - self._stream = None # type: EncoderStream # Output stream - self._encoding = None # type: Union[Encoding, None] # Encoding type (DER or CER) + self._stream = stream # type: EncoderStream # Output stream + self._encoding = encoding # type: Union[Encoding, None] # Encoding type (DER or CER) self._stack = None # type: List[List[bytes]] | None # Stack of encoded data def start(self, stream=None, encoding=None): @@ -303,39 +308,41 @@ def leave(self): # type: () -> None @contextmanager def construct(self, nr, cls=None): # type: (int, Union[int, None]) -> Generator[None, Any, None] """ - This method - context manager calls enter and leave methods, + This method is a context manager that calls the enter and leave methods for better code mapping. - Usage: - ``` - with encoder.construct(asn1.Numbers.Sequence): - encoder.write(1) + Usage:: + with encoder.construct(asn1.Numbers.Sequence): - encoder.write('foo') - encoder.write('bar') - encoder.write(2) - ``` - encoder.output() will result following structure: - SEQUENCE: - INTEGER: 1 + encoder.write(1) + with encoder.construct(asn1.Numbers.Sequence): + encoder.write('foo') + encoder.write('bar') + encoder.write(2) + + ``encoder.output()`` will result in the following structure:: + SEQUENCE: - STRING: foo - STRING: bar - INTEGER: 2 + INTEGER: 1 + SEQUENCE: + STRING: foo + STRING: bar + INTEGER: 2 Args: - nr (int): The desired ASN.1 type. Use ``Numbers`` enumeration. + nr (int): + The desired ASN.1 type. Use ``Numbers`` enumeration. - cls (int): This optional parameter specifies the class - of the constructed type. The default class to use is the - universal class. Use ``Classes`` enumeration. + cls (int, optional): + Specifies the class of the constructed type. + The default class is the universal class. + Use ``Classes`` enumeration. Returns: None Raises: - `Error` - + Error """ self.enter(nr, cls) yield @@ -718,13 +725,37 @@ def _emit_sequence(self, nr, cls, value): # type: (int, int, List) -> None self.write(item) self.leave() + def __enter__(self): + self.start(stream=self._stream, encoding=self._encoding) + return self + + def __exit__(self, exc_type, exc_value, traceback): + + # When using CER encoding, you do not need to call it at all. With CER, this method can only + # be called when using a BytesIO stream or no stream. + if self._encoding == Encoding.CER: + if isinstance(self._stream, io.BytesIO) or self._stream is None: + self.output() + else: + self.output() + return False + + def sequence(self, cls=None): # type: (Union[int, None]) -> _GeneratorContextManager[None, None, None] + return self.construct(Numbers.Sequence, cls) + + def set(self, cls=None): # type: (Union[int, None]) -> _GeneratorContextManager[None, None, None] + return self.construct(Numbers.Set, cls) + class Decoder(object): """ASN.1 decoder. Understands BER (and DER which is a subset).""" - def __init__(self): # type: () -> None + def __init__(self, stream=None): # type: (Union[DecoderStream, None]) -> None """Constructor.""" self._stream = None # type: Union[io.RawIOBase, io.BufferedIOBase, None] # Input stream + if stream is not None: + self._stream = self._prepare_stream(stream) + self._byte = bytes() # type: bytes # Cached byte (to be able to implement eof) self._position = 0 # type: int # Due to caching, tell does not give the right position self._tag = None # type: Union[Tag, None] # Cached Tag (to be able to implement peek) @@ -751,10 +782,7 @@ def start(self, stream): # type: (DecoderStream) -> None Raises: `Error` """ - if not isinstance(stream, bytes) and not isinstance(stream, io.RawIOBase) and not isinstance(stream, io.BufferedIOBase): - raise Error('Expecting bytes or a subclass of io.RawIOBase or BufferedIOBase. Get {} instead.'.format(type(stream))) - - self._stream = io.BytesIO(stream) if isinstance(stream, bytes) else stream # type: ignore + self._stream = self._prepare_stream(stream) self._tag = None self._byte = bytes() self._position = 0 @@ -893,6 +921,12 @@ def leave(self): # type: () -> None self._tag = None self._ends.pop() + def _prepare_stream(self, stream): # type: (DecoderStream) -> Union[io.RawIOBase, io.BufferedIOBase] + if not isinstance(stream, bytes) and not isinstance(stream, io.RawIOBase) and not isinstance(stream, io.BufferedIOBase): + raise Error('Expecting bytes or a subclass of io.RawIOBase or BufferedIOBase. Get {} instead.'.format(type(stream))) + stream = io.BytesIO(stream) if isinstance(stream, bytes) else stream + return stream + def _get_current_position(self): # type: () -> int return 0 if self._stream is None else self._position @@ -1327,3 +1361,13 @@ def _decode_constructed_definite(self, length): # type: (int) -> List[Any] raise Error('ASN1 decoding error: invalid length ({})'.format(length)) self._levels -= 1 return value + + def __enter__(self): + if self._stream is None: + raise Error('ASN1 decoding error: no stream to decode.') + + self.start(stream=self._stream) + return self + + def __exit__(self, exc_type, exc_value, traceback): + return False diff --git a/src/asn1/core.pyi b/src/asn1/core.pyi index 613c6d0..6a07d74 100644 --- a/src/asn1/core.pyi +++ b/src/asn1/core.pyi @@ -68,25 +68,31 @@ class Tag(NamedTuple): typ: int cls: int -EncoderStream: io.RawIOBase | io.BufferedWriter | None +EncoderStream = io.RawIOBase | io.BufferedWriter | None DecoderStream = io.RawIOBase | io.BufferedIOBase | bytes class Error(Exception): ... class Encoder: - def __init__(self) -> None: ... + def __init__(self, stream: EncoderStream = None, encoding: Encoding | None = None) -> None: ... def start(self, stream: EncoderStream = None, encoding: Encoding | None = None) -> None: ... def enter(self, nr: int, cls: int | None = None) -> None: ... def leave(self) -> None: ... def construct(self, nr: int, cls: int | None = None) -> Generator[None, Any, None]: ... def write(self, value: Any, nr: int | None = None, typ: int | None = None, cls: int | None = None) -> None: ... def output(self) -> bytes: ... + def __enter__(self) -> "Encoder": ... + def __exit__(self, exc_type, exc, tb) -> bool | None: ... + def sequence(self, cls: int | None =None): ... + def set(self, cls: int | None =None): ... class Decoder: - def __init__(self) -> None: ... + def __init__(self, stream: DecoderStream | None = None) -> None: ... def start(self, stream: DecoderStream) -> None: ... def peek(self) -> Tag | None: ... def read(self, flags: ReadFlags = ...) -> tuple[Tag | None, Any]: ... def eof(self) -> bool: ... def enter(self) -> None: ... def leave(self) -> None: ... + def __enter__(self) -> "Decoder": ... + def __exit__(self, exc_type, exc, tb) -> bool | None: ... diff --git a/tests/test_examples.py b/tests/test_examples.py index 1cf8821..75b36e3 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -25,6 +25,13 @@ def test_example1(): assert encoder.output() == b'\x06\x02\x2a\x03' +def test_example1_cm(): + """Encoding an object identifier.""" + with asn1.Encoder() as encoder: + encoder.write('1.2.3', asn1.Numbers.ObjectIdentifier) + assert encoder.output() == b'\x06\x02\x2a\x03' + + def test_example2(tmp_path): """Encoding an object identifier directly to a file.""" with open(str(tmp_path / 'example2.der'), 'wb') as f: @@ -38,6 +45,17 @@ def test_example2(tmp_path): assert data == b'\x06\x02\x2a\x03' +def test_example2_cm(tmp_path): + """Encoding an object identifier directly to a file.""" + with open(str(tmp_path / 'example2.der'), 'wb') as f: + with asn1.Encoder(stream=f, encoding=asn1.Encoding.DER) as encoder: + encoder.write('1.2.3', asn1.Numbers.ObjectIdentifier) + + with open(str(tmp_path / 'example2.der'), 'rb') as f: + data = f.read() + assert data == b'\x06\x02\x2a\x03' + + def test_example3(tmp_path): """Encoding of complex data.""" with open(str(tmp_path / 'example3.der'), 'wb') as f: @@ -55,6 +73,21 @@ def test_example3(tmp_path): assert data == b'\x30\x1d\x13\x05test1\x13\x05test20\x0D\x02\x01\x01\t\x03\x80\xfd\x01\x04\x03\x01\x02\x03' +def test_example3_cm(tmp_path): + """Encoding of complex data.""" + with open(str(tmp_path / 'example3.der'), 'wb') as f: + with asn1.Encoder(stream=f, encoding=asn1.Encoding.DER) as encoder: + encoder.write(['test1', 'test2', [ + 1, + 0.125, + b'\x01\x02\x03' + ]]) + + with open(str(tmp_path / 'example3.der'), 'rb') as f: + data = f.read() + assert data == b'\x30\x1d\x13\x05test1\x13\x05test20\x0D\x02\x01\x01\t\x03\x80\xfd\x01\x04\x03\x01\x02\x03' + + def test_example4(tmp_path): """Decoding from a file.""" with open(str(tmp_path / 'example4.der'), 'wb') as f: @@ -75,6 +108,23 @@ def test_example4(tmp_path): assert value == ['test1', 'test2', [1, 0.125, b'\x01\x02\x03']] +def test_example4_cm(tmp_path): + """Decoding from a file.""" + with open(str(tmp_path / 'example4.der'), 'wb') as f: + with asn1.Encoder(f, asn1.Encoding.DER) as encoder: + encoder.write(['test1', 'test2', [ + 1, + 0.125, + b'\x01\x02\x03' + ]]) + + with open(str(tmp_path / 'example4.der'), 'rb') as f: + with asn1.Decoder(stream=f) as decoder: + tag, value = decoder.read() + assert tag == (asn1.Numbers.Sequence, asn1.Types.Constructed, asn1.Classes.Universal) + assert value == ['test1', 'test2', [1, 0.125, b'\x01\x02\x03']] + + def test_example5(): """Decoding of a bit string with unused bits.""" encoded = b'\x23\x0C\x03\x02\x00\x0B\x03\x02\x00\x0B\x03\x02\x04\x0F' @@ -86,6 +136,16 @@ def test_example5(): assert unused == 4 +def test_example5_cm(): + """Decoding of a bit string with unused bits.""" + encoded = b'\x23\x0C\x03\x02\x00\x0B\x03\x02\x00\x0B\x03\x02\x04\x0F' + with asn1.Decoder(stream=encoded) as decoder: + tag, (val, unused) = decoder.read(asn1.ReadFlags.WithUnused) + assert tag == (asn1.Numbers.BitString, asn1.Types.Constructed, asn1.Classes.Universal) + assert val == b'\x00\xb0\xb0' + assert unused == 4 + + def test_example6(): """Encoding of sequences.""" encoder = asn1.Encoder() @@ -102,6 +162,19 @@ def test_example6(): assert encoder.output() == b'\x30\x1d\x13\x05test1\x13\x05test20\r\x02\x01\x01\t\x03\x80\xfd\x01\x04\x03\x01\x02\x03' +def test_example6_cm(): + """Encoding of sequences.""" + with asn1.Encoder() as encoder: + with encoder.sequence(): + encoder.write('test1', asn1.Numbers.PrintableString) + encoder.write('test2', asn1.Numbers.PrintableString) + with encoder.sequence(): + encoder.write(1, asn1.Numbers.Integer) + encoder.write(0.125, asn1.Numbers.Real) + encoder.write(b'\x01\x02\x03', asn1.Numbers.OctetString) + assert encoder.output() == b'\x30\x1d\x13\x05test1\x13\x05test20\r\x02\x01\x01\t\x03\x80\xfd\x01\x04\x03\x01\x02\x03' + + def test_example7(): """Decoding of sequences.""" encoded = b'\x30\x80\x13\x05\x74\x65\x73\x74\x31\x13\x05\x74\x65\x73\x74\x32\x30\x80\x02\x01\x01\x09\x03\x80\xFD\x01\x04\x03\x01\x02\x03\x00\x00\x00\x00' @@ -112,6 +185,15 @@ def test_example7(): assert value == ['test1', 'test2', [1, 0.125, b'\x01\x02\x03']] +def test_example7_cm(): + """Decoding of sequences.""" + encoded = b'\x30\x80\x13\x05\x74\x65\x73\x74\x31\x13\x05\x74\x65\x73\x74\x32\x30\x80\x02\x01\x01\x09\x03\x80\xFD\x01\x04\x03\x01\x02\x03\x00\x00\x00\x00' + with asn1.Decoder(stream=encoded) as decoder: + tag, value = decoder.read() + assert tag == (asn1.Numbers.Sequence, asn1.Types.Constructed, asn1.Classes.Universal) + assert value == ['test1', 'test2', [1, 0.125, b'\x01\x02\x03']] + + def test_example8(): """Decoding of a complex data.""" encoded = (b'\x30\x82\x04\x0e\x30\x82\x03\x77\xa0\x03\x02\x01\x02\x02\x02\x15' @@ -222,6 +304,116 @@ def test_example8(): ] +def test_example8_cm(): + """Decoding of a complex data.""" + encoded = (b'\x30\x82\x04\x0e\x30\x82\x03\x77\xa0\x03\x02\x01\x02\x02\x02\x15' + b'\x30\x30\x0d\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x01\x05\x05\x00' + b'\x30\x81\xbb\x31\x0b\x30\x09\x06\x03\x55\x04\x06\x13\x02\x2d\x2d' + b'\x31\x12\x30\x10\x06\x03\x55\x04\x08\x13\x09\x53\x6f\x6d\x65\x53' + b'\x74\x61\x74\x65\x31\x11\x30\x0f\x06\x03\x55\x04\x07\x13\x08\x53' + b'\x6f\x6d\x65\x43\x69\x74\x79\x31\x19\x30\x17\x06\x03\x55\x04\x0a' + b'\x13\x10\x53\x6f\x6d\x65\x4f\x72\x67\x61\x6e\x69\x7a\x61\x74\x69' + b'\x6f\x6e\x31\x1f\x30\x1d\x06\x03\x55\x04\x0b\x13\x16\x53\x6f\x6d' + b'\x65\x4f\x72\x67\x61\x6e\x69\x7a\x61\x74\x69\x6f\x6e\x61\x6c\x55' + b'\x6e\x69\x74\x31\x1e\x30\x1c\x06\x03\x55\x04\x03\x13\x15\x6c\x6f' + b'\x63\x61\x6c\x68\x6f\x73\x74\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d' + b'\x61\x69\x6e\x31\x29\x30\x27\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01' + b'\x09\x01\x16\x1a\x72\x6f\x6f\x74\x40\x6c\x6f\x63\x61\x6c\x68\x6f' + b'\x73\x74\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d\x61\x69\x6e\x30\x1e' + b'\x17\x0d\x30\x38\x30\x32\x30\x35\x30\x39\x32\x33\x33\x31\x5a\x17' + b'\x0d\x30\x39\x30\x32\x30\x34\x30\x39\x32\x33\x33\x31\x5a\x30\x81' + b'\xbb\x31\x0b\x30\x09\x06\x03\x55\x04\x06\x13\x02\x2d\x2d\x31\x12' + b'\x30\x10\x06\x03\x55\x04\x08\x13\x09\x53\x6f\x6d\x65\x53\x74\x61' + b'\x74\x65\x31\x11\x30\x0f\x06\x03\x55\x04\x07\x13\x08\x53\x6f\x6d' + b'\x65\x43\x69\x74\x79\x31\x19\x30\x17\x06\x03\x55\x04\x0a\x13\x10' + b'\x53\x6f\x6d\x65\x4f\x72\x67\x61\x6e\x69\x7a\x61\x74\x69\x6f\x6e' + b'\x31\x1f\x30\x1d\x06\x03\x55\x04\x0b\x13\x16\x53\x6f\x6d\x65\x4f' + b'\x72\x67\x61\x6e\x69\x7a\x61\x74\x69\x6f\x6e\x61\x6c\x55\x6e\x69' + b'\x74\x31\x1e\x30\x1c\x06\x03\x55\x04\x03\x13\x15\x6c\x6f\x63\x61' + b'\x6c\x68\x6f\x73\x74\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d\x61\x69' + b'\x6e\x31\x29\x30\x27\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x09\x01' + b'\x16\x1a\x72\x6f\x6f\x74\x40\x6c\x6f\x63\x61\x6c\x68\x6f\x73\x74' + b'\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d\x61\x69\x6e\x30\x81\x9f\x30' + b'\x0d\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01\x05\x00\x03\x81' + b'\x8d\x00\x30\x81\x89\x02\x81\x81\x00\xd5\x18\xcd\x40\x91\x90\x27' + b'\x5a\x77\x37\x22\xca\xba\x05\xdf\x13\x31\xe8\x74\x43\x4f\x7e\x08' + b'\xa3\xa5\x76\xcd\x7b\xdd\x37\xd0\x7f\x12\x9e\x81\x73\x87\x55\x66' + b'\x0d\xda\x68\xee\x38\xeb\x34\xe2\xf4\xeb\x95\xd5\xe0\xde\xef\x08' + b'\x57\xf9\x03\x14\x69\xa8\x6f\x7c\xa4\xfa\x64\x51\x39\x36\xd5\x09' + b'\x37\x61\x83\x13\x8c\x41\x25\xba\x60\x91\x20\x86\x5b\x60\xb5\xe2' + b'\x83\x65\x66\xad\x06\xb3\x45\x71\x83\x67\xd2\xe5\x5f\x40\x42\x4b' + b'\x37\xf8\x87\xd0\x09\x49\xb8\xad\x34\x76\xa3\x1b\xbf\xc1\x0f\xb7' + b'\xfb\x43\xbe\x62\x33\x02\x02\x10\x61\x02\x03\x01\x00\x01\xa3\x82' + b'\x01\x1d\x30\x82\x01\x19\x30\x1d\x06\x03\x55\x1d\x0e\x04\x16\x04' + b'\x14\x0a\x4b\xfa\x87\x54\x17\x7e\x30\xb4\x21\x71\x56\x51\x0f\xd2' + b'\x91\xc3\x30\x02\x36\x30\x81\xe9\x06\x03\x55\x1d\x23\x04\x81\xe1' + b'\x30\x81\xde\x80\x14\x0a\x4b\xfa\x87\x54\x17\x7e\x30\xb4\x21\x71' + b'\x56\x51\x0f\xd2\x91\xc3\x30\x02\x36\xa1\x81\xc1\xa4\x81\xbe\x30' + b'\x81\xbb\x31\x0b\x30\x09\x06\x03\x55\x04\x06\x13\x02\x2d\x2d\x31' + b'\x12\x30\x10\x06\x03\x55\x04\x08\x13\x09\x53\x6f\x6d\x65\x53\x74' + b'\x61\x74\x65\x31\x11\x30\x0f\x06\x03\x55\x04\x07\x13\x08\x53\x6f' + b'\x6d\x65\x43\x69\x74\x79\x31\x19\x30\x17\x06\x03\x55\x04\x0a\x13' + b'\x10\x53\x6f\x6d\x65\x4f\x72\x67\x61\x6e\x69\x7a\x61\x74\x69\x6f' + b'\x6e\x31\x1f\x30\x1d\x06\x03\x55\x04\x0b\x13\x16\x53\x6f\x6d\x65' + b'\x4f\x72\x67\x61\x6e\x69\x7a\x61\x74\x69\x6f\x6e\x61\x6c\x55\x6e' + b'\x69\x74\x31\x1e\x30\x1c\x06\x03\x55\x04\x03\x13\x15\x6c\x6f\x63' + b'\x61\x6c\x68\x6f\x73\x74\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d\x61' + b'\x69\x6e\x31\x29\x30\x27\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x09' + b'\x01\x16\x1a\x72\x6f\x6f\x74\x40\x6c\x6f\x63\x61\x6c\x68\x6f\x73' + b'\x74\x2e\x6c\x6f\x63\x61\x6c\x64\x6f\x6d\x61\x69\x6e\x82\x02\x15' + b'\x30\x30\x0c\x06\x03\x55\x1d\x13\x04\x05\x30\x03\x01\x01\xff\x30' + b'\x0d\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x01\x05\x05\x00\x03\x81' + b'\x81\x00\x4e\x12\x46\x58\xa3\x57\xc5\x9a\xab\xfa\x32\xf5\xde\x87' + b'\xfb\x77\xa8\x79\x38\x1d\x4f\xd3\x7c\x3a\x16\x60\x82\x7d\x92\xa1' + b'\x58\xd2\x53\x7b\x11\x90\xec\x6d\xb0\xb0\x58\xee\x33\xb4\x7b\x1d' + b'\xb8\x95\xd8\x98\xc3\x10\x81\x83\x08\x46\xe8\x9a\xb9\x6c\xbf\x8f' + b'\x9e\x73\xf7\x61\x89\xc4\x6a\x1b\xc1\x98\xc6\xab\xfc\x91\xb6\x59' + b'\xb8\xa5\x05\x91\x2a\xbb\xc4\x30\x16\x53\xbf\x1a\xfe\x2f\x01\x25' + b'\xae\xef\xc7\xb9\xfa\xa5\x53\xf8\xd9\xf5\x8f\xae\x91\xea\x57\x28' + b'\xfa\xdf\x34\x03\x29\xe8\x97\xee\x2e\x9e\x8a\x62\x45\xc7\xfc\x58' + b'\xb4\x5a') + + with asn1.Decoder(stream=encoded) as decoder: + tag, value = decoder.read() + + assert tag == (asn1.Numbers.Sequence, asn1.Types.Constructed, asn1.Classes.Universal) + assert value == [ + [ + [2], + 5424, + ['1.2.840.113549.1.1.5', None], + [ + [['2.5.4.6', '--']], + [['2.5.4.8', 'SomeState']], + [['2.5.4.7', 'SomeCity']], + [['2.5.4.10', 'SomeOrganization']], + [['2.5.4.11', 'SomeOrganizationalUnit']], + [['2.5.4.3', 'localhost.localdomain']], + [['1.2.840.113549.1.9.1', 'root@localhost.localdomain']] + ], + ['080205092331Z', '090204092331Z'], + [ + [['2.5.4.6', '--']], + [['2.5.4.8', 'SomeState']], + [['2.5.4.7', 'SomeCity']], + [['2.5.4.10', 'SomeOrganization']], + [['2.5.4.11', 'SomeOrganizationalUnit']], + [['2.5.4.3', 'localhost.localdomain']], + [['1.2.840.113549.1.9.1', 'root@localhost.localdomain']]], + [['1.2.840.113549.1.1.1', None], b'0\x81\x89\x02\x81\x81\x00\xd5\x18\xcd@\x91\x90\'Zw7"\xca\xba\x05\xdf\x131\xe8tCO~\x08\xa3\xa5v\xcd{\xdd7\xd0\x7f\x12\x9e\x81s\x87Uf\r\xdah\xee8\xeb4\xe2\xf4\xeb\x95\xd5\xe0\xde\xef\x08W\xf9\x03\x14i\xa8o|\xa4\xfadQ96\xd5\t7a\x83\x13\x8cA%\xba`\x91 \x86[`\xb5\xe2\x83ef\xad\x06\xb3Eq\x83g\xd2\xe5_@BK7\xf8\x87\xd0\tI\xb8\xad4v\xa3\x1b\xbf\xc1\x0f\xb7\xfbC\xbeb3\x02\x02\x10a\x02\x03\x01\x00\x01'], + [ + [ + ['2.5.29.14', b'\x04\x14\nK\xfa\x87T\x17~0\xb4!qVQ\x0f\xd2\x91\xc30\x026'], + ['2.5.29.35', b"0\x81\xde\x80\x14\nK\xfa\x87T\x17~0\xb4!qVQ\x0f\xd2\x91\xc30\x026\xa1\x81\xc1\xa4\x81\xbe0\x81\xbb1\x0b0\t\x06\x03U\x04\x06\x13\x02--1\x120\x10\x06\x03U\x04\x08\x13\tSomeState1\x110\x0f\x06\x03U\x04\x07\x13\x08SomeCity1\x190\x17\x06\x03U\x04\n\x13\x10SomeOrganization1\x1f0\x1d\x06\x03U\x04\x0b\x13\x16SomeOrganizationalUnit1\x1e0\x1c\x06\x03U\x04\x03\x13\x15localhost.localdomain1)0'\x06\t*\x86H\x86\xf7\r\x01\t\x01\x16\x1aroot@localhost.localdomain\x82\x02\x150"], + ['2.5.29.19', b'0\x03\x01\x01\xff'] + ] + ] + ], + ['1.2.840.113549.1.1.5', None], + b'N\x12FX\xa3W\xc5\x9a\xab\xfa2\xf5\xde\x87\xfbw\xa8y8\x1dO\xd3|:\x16`\x82}\x92\xa1X\xd2S{\x11\x90\xecm\xb0\xb0X\xee3\xb4{\x1d\xb8\x95\xd8\x98\xc3\x10\x81\x83\x08F\xe8\x9a\xb9l\xbf\x8f\x9es\xf7a\x89\xc4j\x1b\xc1\x98\xc6\xab\xfc\x91\xb6Y\xb8\xa5\x05\x91*\xbb\xc40\x16S\xbf\x1a\xfe/\x01%\xae\xef\xc7\xb9\xfa\xa5S\xf8\xd9\xf5\x8f\xae\x91\xeaW(\xfa\xdf4\x03)\xe8\x97\xee.\x9e\x8abE\xc7\xfcX\xb4Z' + ] + + def test_example9(tmp_path): """Using CER encoding with a stream (file).""" with open(str(tmp_path / 'exmple9.cer'), 'wb') as f: @@ -229,9 +421,23 @@ def test_example9(tmp_path): encoder.start(f) +def test_example9_cm(tmp_path): + """Using CER encoding with a stream (file).""" + with open(str(tmp_path / 'exmple9.cer'), 'wb') as f: + with asn1.Encoder(stream=f) as _: + pass + + def test_example10(tmp_path): """Using DER encoding with a stream (file).""" with open(str(tmp_path / 'exmple10.der'), 'wb') as f: encoder = asn1.Encoder() encoder.start(f, asn1.Encoding.DER) encoder.output() + + +def test_example10_cm(tmp_path): + """Using DER encoding with a stream (file).""" + with open(str(tmp_path / 'exmple10.der'), 'wb') as f: + with asn1.Encoder(stream=f, encoding=asn1.Encoding.DER) as _: + pass