diff options
Diffstat (limited to 'roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python')
12 files changed, 1530 insertions, 0 deletions
diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/Makefile b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/Makefile new file mode 100644 index 000000000..d5d88f87f --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/Makefile @@ -0,0 +1,57 @@ +# Copyright 2016 The Brotli Authors. All rights reserved. +# +# Distributed under MIT license. +# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT + + +# Default (Build) +.PHONY: all +# Build +.PHONY: build +# Develop +.PHONY: develop +# Install +.PHONY: install +# Test +.PHONY: test +# Clean +.PHONY: clean +# Format +.PHONY: fix + + +PYTHON ?= python +YAPF ?= yapf + +EXT_SUFFIX=$(shell $(PYTHON) -c 'import sysconfig; print(sysconfig.get_config_var("EXT_SUFFIX"))') +EXT_SOURCES=$(shell find . -name '*.cc') +EXTENSIONS=$(EXT_SOURCES:%.cc=%$(EXT_SUFFIX)) + + +all: build + +build: $(EXTENSIONS) + +$(EXTENSIONS): $(EXT_SOURCES) + @cd .. && $(PYTHON) setup.py build_ext --inplace + +develop: + @cd .. && $(PYTHON) setup.py develop + +install: + @cd .. && $(PYTHON) setup.py install + +test: build + @echo 'running tests' + @$(PYTHON) -m unittest discover -v -p '*_test.py' + +clean: + @cd .. && $(PYTHON) setup.py clean + @find .. -name '*.pyc' | xargs rm -v + @find .. -name '*.so' | xargs rm -v + @find .. -type d -name '__pycache__' | xargs rm -v -r + @find .. -type d -name '*.egg-info' | xargs rm -v -r + +fix: + @echo 'formatting code' + -@$(YAPF) --in-place --recursive --verify . diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/README.md b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/README.md new file mode 100644 index 000000000..6a9068aa4 --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/README.md @@ -0,0 +1,54 @@ +This directory contains the code for the Python `brotli` module, +`bro.py` tool, and roundtrip tests. + +Only Python 2.7+ is supported. + +We provide a `Makefile` to simplify common development commands. + +### Installation + +If you just want to install the latest release of the Python `brotli` +module, we recommend installing from [PyPI][]: + + $ pip install brotli + +Alternatively, you may install directly from source by running the +following command from this directory: + + $ make install + +### Development + +You may run the following commands from this directory: + + $ make # Build the module in-place + + $ make test # Test the module + + $ make clean # Remove all temporary files and build output + +If you wish to make the module available while still being +able to edit the source files, you can use the `setuptools` +"[development mode][]": + + $ make develop # Install the module in "development mode" + +### Code Style + +Brotli's code follows the [Google Python Style Guide][]. To +automatically format your code, first install [YAPF][]: + + $ pip install yapf + +Then, to format all files in the project, you can run: + + $ make fix # Automatically format code + +See the [YAPF usage][] documentation for more information. + + +[PyPI]: https://pypi.org/project/Brotli/ +[development mode]: https://setuptools.readthedocs.io/en/latest/setuptools.html#development-mode +[Google Python Style Guide]: https://google.github.io/styleguide/pyguide.html +[YAPF]: https://github.com/google/yapf +[YAPF usage]: https://github.com/google/yapf#usage diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/_brotli.cc b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/_brotli.cc new file mode 100644 index 000000000..d4075bd45 --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/_brotli.cc @@ -0,0 +1,753 @@ +#define PY_SSIZE_T_CLEAN 1 +#include <Python.h> +#include <bytesobject.h> +#include <structmember.h> +#include <vector> +#include "../common/version.h" +#include <brotli/decode.h> +#include <brotli/encode.h> + +#if PY_MAJOR_VERSION >= 3 +#define PyInt_Check PyLong_Check +#define PyInt_AsLong PyLong_AsLong +#endif + +static PyObject *BrotliError; + +static int as_bounded_int(PyObject *o, int* result, int lower_bound, int upper_bound) { + long value = PyInt_AsLong(o); + if ((value < (long) lower_bound) || (value > (long) upper_bound)) { + return 0; + } + *result = (int) value; + return 1; +} + +static int mode_convertor(PyObject *o, BrotliEncoderMode *mode) { + if (!PyInt_Check(o)) { + PyErr_SetString(BrotliError, "Invalid mode"); + return 0; + } + + int mode_value = -1; + if (!as_bounded_int(o, &mode_value, 0, 255)) { + PyErr_SetString(BrotliError, "Invalid mode"); + return 0; + } + *mode = (BrotliEncoderMode) mode_value; + if (*mode != BROTLI_MODE_GENERIC && + *mode != BROTLI_MODE_TEXT && + *mode != BROTLI_MODE_FONT) { + PyErr_SetString(BrotliError, "Invalid mode"); + return 0; + } + + return 1; +} + +static int quality_convertor(PyObject *o, int *quality) { + if (!PyInt_Check(o)) { + PyErr_SetString(BrotliError, "Invalid quality"); + return 0; + } + + if (!as_bounded_int(o, quality, 0, 11)) { + PyErr_SetString(BrotliError, "Invalid quality. Range is 0 to 11."); + return 0; + } + + return 1; +} + +static int lgwin_convertor(PyObject *o, int *lgwin) { + if (!PyInt_Check(o)) { + PyErr_SetString(BrotliError, "Invalid lgwin"); + return 0; + } + + if (!as_bounded_int(o, lgwin, 10, 24)) { + PyErr_SetString(BrotliError, "Invalid lgwin. Range is 10 to 24."); + return 0; + } + + return 1; +} + +static int lgblock_convertor(PyObject *o, int *lgblock) { + if (!PyInt_Check(o)) { + PyErr_SetString(BrotliError, "Invalid lgblock"); + return 0; + } + + if (!as_bounded_int(o, lgblock, 0, 24) || (*lgblock != 0 && *lgblock < 16)) { + PyErr_SetString(BrotliError, "Invalid lgblock. Can be 0 or in range 16 to 24."); + return 0; + } + + return 1; +} + +static BROTLI_BOOL compress_stream(BrotliEncoderState* enc, BrotliEncoderOperation op, + std::vector<uint8_t>* output, + uint8_t* input, size_t input_length) { + BROTLI_BOOL ok = BROTLI_TRUE; + Py_BEGIN_ALLOW_THREADS + + size_t available_in = input_length; + const uint8_t* next_in = input; + size_t available_out = 0; + uint8_t* next_out = NULL; + + while (ok) { + ok = BrotliEncoderCompressStream(enc, op, + &available_in, &next_in, + &available_out, &next_out, NULL); + if (!ok) + break; + + size_t buffer_length = 0; // Request all available output. + const uint8_t* buffer = BrotliEncoderTakeOutput(enc, &buffer_length); + if (buffer_length) { + (*output).insert((*output).end(), buffer, buffer + buffer_length); + } + + if (available_in || BrotliEncoderHasMoreOutput(enc)) { + continue; + } + + break; + } + + Py_END_ALLOW_THREADS + return ok; +} + +PyDoc_STRVAR(brotli_Compressor_doc, +"An object to compress a byte string.\n" +"\n" +"Signature:\n" +" Compressor(mode=MODE_GENERIC, quality=11, lgwin=22, lgblock=0)\n" +"\n" +"Args:\n" +" mode (int, optional): The compression mode can be MODE_GENERIC (default),\n" +" MODE_TEXT (for UTF-8 format text input) or MODE_FONT (for WOFF 2.0). \n" +" quality (int, optional): Controls the compression-speed vs compression-\n" +" density tradeoff. The higher the quality, the slower the compression.\n" +" Range is 0 to 11. Defaults to 11.\n" +" lgwin (int, optional): Base 2 logarithm of the sliding window size. Range\n" +" is 10 to 24. Defaults to 22.\n" +" lgblock (int, optional): Base 2 logarithm of the maximum input block size.\n" +" Range is 16 to 24. If set to 0, the value will be set based on the\n" +" quality. Defaults to 0.\n" +"\n" +"Raises:\n" +" brotli.error: If arguments are invalid.\n"); + +typedef struct { + PyObject_HEAD + BrotliEncoderState* enc; +} brotli_Compressor; + +static void brotli_Compressor_dealloc(brotli_Compressor* self) { + BrotliEncoderDestroyInstance(self->enc); + #if PY_MAJOR_VERSION >= 3 + Py_TYPE(self)->tp_free((PyObject*)self); + #else + self->ob_type->tp_free((PyObject*)self); + #endif +} + +static PyObject* brotli_Compressor_new(PyTypeObject *type, PyObject *args, PyObject *keywds) { + brotli_Compressor *self; + self = (brotli_Compressor *)type->tp_alloc(type, 0); + + if (self != NULL) { + self->enc = BrotliEncoderCreateInstance(0, 0, 0); + } + + return (PyObject *)self; +} + +static int brotli_Compressor_init(brotli_Compressor *self, PyObject *args, PyObject *keywds) { + BrotliEncoderMode mode = (BrotliEncoderMode) -1; + int quality = -1; + int lgwin = -1; + int lgblock = -1; + int ok; + + static const char *kwlist[] = {"mode", "quality", "lgwin", "lgblock", NULL}; + + ok = PyArg_ParseTupleAndKeywords(args, keywds, "|O&O&O&O&:Compressor", + const_cast<char **>(kwlist), + &mode_convertor, &mode, + &quality_convertor, &quality, + &lgwin_convertor, &lgwin, + &lgblock_convertor, &lgblock); + if (!ok) + return -1; + if (!self->enc) + return -1; + + if ((int) mode != -1) + BrotliEncoderSetParameter(self->enc, BROTLI_PARAM_MODE, (uint32_t)mode); + if (quality != -1) + BrotliEncoderSetParameter(self->enc, BROTLI_PARAM_QUALITY, (uint32_t)quality); + if (lgwin != -1) + BrotliEncoderSetParameter(self->enc, BROTLI_PARAM_LGWIN, (uint32_t)lgwin); + if (lgblock != -1) + BrotliEncoderSetParameter(self->enc, BROTLI_PARAM_LGBLOCK, (uint32_t)lgblock); + + return 0; +} + +PyDoc_STRVAR(brotli_Compressor_process_doc, +"Process \"string\" for compression, returning a string that contains \n" +"compressed output data. This data should be concatenated to the output \n" +"produced by any preceding calls to the \"process()\" or flush()\" methods. \n" +"Some or all of the input may be kept in internal buffers for later \n" +"processing, and the compressed output data may be empty until enough input \n" +"has been accumulated.\n" +"\n" +"Signature:\n" +" compress(string)\n" +"\n" +"Args:\n" +" string (bytes): The input data\n" +"\n" +"Returns:\n" +" The compressed output data (bytes)\n" +"\n" +"Raises:\n" +" brotli.error: If compression fails\n"); + +static PyObject* brotli_Compressor_process(brotli_Compressor *self, PyObject *args) { + PyObject* ret = NULL; + std::vector<uint8_t> output; + Py_buffer input; + BROTLI_BOOL ok = BROTLI_TRUE; + +#if PY_MAJOR_VERSION >= 3 + ok = (BROTLI_BOOL)PyArg_ParseTuple(args, "y*:process", &input); +#else + ok = (BROTLI_BOOL)PyArg_ParseTuple(args, "s*:process", &input); +#endif + + if (!ok) + return NULL; + + if (!self->enc) { + ok = BROTLI_FALSE; + goto end; + } + + ok = compress_stream(self->enc, BROTLI_OPERATION_PROCESS, + &output, static_cast<uint8_t*>(input.buf), input.len); + +end: + PyBuffer_Release(&input); + if (ok) { + ret = PyBytes_FromStringAndSize((char*)(output.size() ? &output[0] : NULL), output.size()); + } else { + PyErr_SetString(BrotliError, "BrotliEncoderCompressStream failed while processing the stream"); + } + + return ret; +} + +PyDoc_STRVAR(brotli_Compressor_flush_doc, +"Process all pending input, returning a string containing the remaining\n" +"compressed data. This data should be concatenated to the output produced by\n" +"any preceding calls to the \"process()\" or \"flush()\" methods.\n" +"\n" +"Signature:\n" +" flush()\n" +"\n" +"Returns:\n" +" The compressed output data (bytes)\n" +"\n" +"Raises:\n" +" brotli.error: If compression fails\n"); + +static PyObject* brotli_Compressor_flush(brotli_Compressor *self) { + PyObject *ret = NULL; + std::vector<uint8_t> output; + BROTLI_BOOL ok = BROTLI_TRUE; + + if (!self->enc) { + ok = BROTLI_FALSE; + goto end; + } + + ok = compress_stream(self->enc, BROTLI_OPERATION_FLUSH, + &output, NULL, 0); + +end: + if (ok) { + ret = PyBytes_FromStringAndSize((char*)(output.size() ? &output[0] : NULL), output.size()); + } else { + PyErr_SetString(BrotliError, "BrotliEncoderCompressStream failed while flushing the stream"); + } + + return ret; +} + +PyDoc_STRVAR(brotli_Compressor_finish_doc, +"Process all pending input and complete all compression, returning a string\n" +"containing the remaining compressed data. This data should be concatenated\n" +"to the output produced by any preceding calls to the \"process()\" or\n" +"\"flush()\" methods.\n" +"After calling \"finish()\", the \"process()\" and \"flush()\" methods\n" +"cannot be called again, and a new \"Compressor\" object should be created.\n" +"\n" +"Signature:\n" +" finish(string)\n" +"\n" +"Returns:\n" +" The compressed output data (bytes)\n" +"\n" +"Raises:\n" +" brotli.error: If compression fails\n"); + +static PyObject* brotli_Compressor_finish(brotli_Compressor *self) { + PyObject *ret = NULL; + std::vector<uint8_t> output; + BROTLI_BOOL ok = BROTLI_TRUE; + + if (!self->enc) { + ok = BROTLI_FALSE; + goto end; + } + + ok = compress_stream(self->enc, BROTLI_OPERATION_FINISH, + &output, NULL, 0); + + if (ok) { + ok = BrotliEncoderIsFinished(self->enc); + } + +end: + if (ok) { + ret = PyBytes_FromStringAndSize((char*)(output.empty() ? NULL : &output[0]), output.size()); + } else { + PyErr_SetString(BrotliError, "BrotliEncoderCompressStream failed while finishing the stream"); + } + + return ret; +} + +static PyMemberDef brotli_Compressor_members[] = { + {NULL} /* Sentinel */ +}; + +static PyMethodDef brotli_Compressor_methods[] = { + {"process", (PyCFunction)brotli_Compressor_process, METH_VARARGS, brotli_Compressor_process_doc}, + {"flush", (PyCFunction)brotli_Compressor_flush, METH_NOARGS, brotli_Compressor_flush_doc}, + {"finish", (PyCFunction)brotli_Compressor_finish, METH_NOARGS, brotli_Compressor_finish_doc}, + {NULL} /* Sentinel */ +}; + +static PyTypeObject brotli_CompressorType = { + #if PY_MAJOR_VERSION >= 3 + PyVarObject_HEAD_INIT(NULL, 0) + #else + PyObject_HEAD_INIT(NULL) + 0, /* ob_size*/ + #endif + "brotli.Compressor", /* tp_name */ + sizeof(brotli_Compressor), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)brotli_Compressor_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + brotli_Compressor_doc, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + brotli_Compressor_methods, /* tp_methods */ + brotli_Compressor_members, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)brotli_Compressor_init, /* tp_init */ + 0, /* tp_alloc */ + brotli_Compressor_new, /* tp_new */ +}; + +static BROTLI_BOOL decompress_stream(BrotliDecoderState* dec, + std::vector<uint8_t>* output, + uint8_t* input, size_t input_length) { + BROTLI_BOOL ok = BROTLI_TRUE; + Py_BEGIN_ALLOW_THREADS + + size_t available_in = input_length; + const uint8_t* next_in = input; + size_t available_out = 0; + uint8_t* next_out = NULL; + + BrotliDecoderResult result = BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT; + while (result == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT) { + result = BrotliDecoderDecompressStream(dec, + &available_in, &next_in, + &available_out, &next_out, NULL); + size_t buffer_length = 0; // Request all available output. + const uint8_t* buffer = BrotliDecoderTakeOutput(dec, &buffer_length); + if (buffer_length) { + (*output).insert((*output).end(), buffer, buffer + buffer_length); + } + } + ok = result != BROTLI_DECODER_RESULT_ERROR && !available_in; + + Py_END_ALLOW_THREADS + return ok; +} + +PyDoc_STRVAR(brotli_Decompressor_doc, +"An object to decompress a byte string.\n" +"\n" +"Signature:\n" +" Decompressor()\n" +"\n" +"Raises:\n" +" brotli.error: If arguments are invalid.\n"); + +typedef struct { + PyObject_HEAD + BrotliDecoderState* dec; +} brotli_Decompressor; + +static void brotli_Decompressor_dealloc(brotli_Decompressor* self) { + BrotliDecoderDestroyInstance(self->dec); + #if PY_MAJOR_VERSION >= 3 + Py_TYPE(self)->tp_free((PyObject*)self); + #else + self->ob_type->tp_free((PyObject*)self); + #endif +} + +static PyObject* brotli_Decompressor_new(PyTypeObject *type, PyObject *args, PyObject *keywds) { + brotli_Decompressor *self; + self = (brotli_Decompressor *)type->tp_alloc(type, 0); + + if (self != NULL) { + self->dec = BrotliDecoderCreateInstance(0, 0, 0); + } + + return (PyObject *)self; +} + +static int brotli_Decompressor_init(brotli_Decompressor *self, PyObject *args, PyObject *keywds) { + int ok; + + static const char *kwlist[] = {NULL}; + + ok = PyArg_ParseTupleAndKeywords(args, keywds, "|:Decompressor", + const_cast<char **>(kwlist)); + if (!ok) + return -1; + if (!self->dec) + return -1; + + return 0; +} + +PyDoc_STRVAR(brotli_Decompressor_process_doc, +"Process \"string\" for decompression, returning a string that contains \n" +"decompressed output data. This data should be concatenated to the output \n" +"produced by any preceding calls to the \"process()\" method. \n" +"Some or all of the input may be kept in internal buffers for later \n" +"processing, and the decompressed output data may be empty until enough input \n" +"has been accumulated.\n" +"\n" +"Signature:\n" +" decompress(string)\n" +"\n" +"Args:\n" +" string (bytes): The input data\n" +"\n" +"Returns:\n" +" The decompressed output data (bytes)\n" +"\n" +"Raises:\n" +" brotli.error: If decompression fails\n"); + +static PyObject* brotli_Decompressor_process(brotli_Decompressor *self, PyObject *args) { + PyObject* ret = NULL; + std::vector<uint8_t> output; + Py_buffer input; + BROTLI_BOOL ok = BROTLI_TRUE; + +#if PY_MAJOR_VERSION >= 3 + ok = (BROTLI_BOOL)PyArg_ParseTuple(args, "y*:process", &input); +#else + ok = (BROTLI_BOOL)PyArg_ParseTuple(args, "s*:process", &input); +#endif + + if (!ok) + return NULL; + + if (!self->dec) { + ok = BROTLI_FALSE; + goto end; + } + + ok = decompress_stream(self->dec, &output, static_cast<uint8_t*>(input.buf), input.len); + +end: + PyBuffer_Release(&input); + if (ok) { + ret = PyBytes_FromStringAndSize((char*)(output.empty() ? NULL : &output[0]), output.size()); + } else { + PyErr_SetString(BrotliError, "BrotliDecoderDecompressStream failed while processing the stream"); + } + + return ret; +} + +PyDoc_STRVAR(brotli_Decompressor_is_finished_doc, +"Checks if decoder instance reached the final state.\n" +"\n" +"Signature:\n" +" is_finished()\n" +"\n" +"Returns:\n" +" True if the decoder is in a state where it reached the end of the input\n" +" and produced all of the output\n" +" False otherwise\n" +"\n" +"Raises:\n" +" brotli.error: If decompression fails\n"); + +static PyObject* brotli_Decompressor_is_finished(brotli_Decompressor *self) { + PyObject *ret = NULL; + std::vector<uint8_t> output; + BROTLI_BOOL ok = BROTLI_TRUE; + + if (!self->dec) { + ok = BROTLI_FALSE; + PyErr_SetString(BrotliError, "BrotliDecoderState is NULL while checking is_finished"); + goto end; + } + + if (BrotliDecoderIsFinished(self->dec)) { + Py_RETURN_TRUE; + } else { + Py_RETURN_FALSE; + } + +end: + if (ok) { + ret = PyBytes_FromStringAndSize((char*)(output.empty() ? NULL : &output[0]), output.size()); + } else { + PyErr_SetString(BrotliError, "BrotliDecoderDecompressStream failed while finishing the stream"); + } + + return ret; +} + +static PyMemberDef brotli_Decompressor_members[] = { + {NULL} /* Sentinel */ +}; + +static PyMethodDef brotli_Decompressor_methods[] = { + {"process", (PyCFunction)brotli_Decompressor_process, METH_VARARGS, brotli_Decompressor_process_doc}, + {"is_finished", (PyCFunction)brotli_Decompressor_is_finished, METH_NOARGS, brotli_Decompressor_is_finished_doc}, + {NULL} /* Sentinel */ +}; + +static PyTypeObject brotli_DecompressorType = { + #if PY_MAJOR_VERSION >= 3 + PyVarObject_HEAD_INIT(NULL, 0) + #else + PyObject_HEAD_INIT(NULL) + 0, /* ob_size*/ + #endif + "brotli.Decompressor", /* tp_name */ + sizeof(brotli_Decompressor), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)brotli_Decompressor_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + brotli_Decompressor_doc, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + brotli_Decompressor_methods, /* tp_methods */ + brotli_Decompressor_members, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)brotli_Decompressor_init, /* tp_init */ + 0, /* tp_alloc */ + brotli_Decompressor_new, /* tp_new */ +}; + +PyDoc_STRVAR(brotli_decompress__doc__, +"Decompress a compressed byte string.\n" +"\n" +"Signature:\n" +" decompress(string)\n" +"\n" +"Args:\n" +" string (bytes): The compressed input data.\n" +"\n" +"Returns:\n" +" The decompressed byte string.\n" +"\n" +"Raises:\n" +" brotli.error: If decompressor fails.\n"); + +static PyObject* brotli_decompress(PyObject *self, PyObject *args, PyObject *keywds) { + PyObject *ret = NULL; + Py_buffer input; + const uint8_t* next_in; + size_t available_in; + int ok; + + static const char *kwlist[] = {"string", NULL}; + +#if PY_MAJOR_VERSION >= 3 + ok = PyArg_ParseTupleAndKeywords(args, keywds, "y*|:decompress", + const_cast<char **>(kwlist), &input); +#else + ok = PyArg_ParseTupleAndKeywords(args, keywds, "s*|:decompress", + const_cast<char **>(kwlist), &input); +#endif + + if (!ok) + return NULL; + + std::vector<uint8_t> output; + + /* >>> Pure C block; release python GIL. */ + Py_BEGIN_ALLOW_THREADS + + BrotliDecoderState* state = BrotliDecoderCreateInstance(0, 0, 0); + + BrotliDecoderResult result = BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT; + next_in = static_cast<uint8_t*>(input.buf); + available_in = input.len; + while (result == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT) { + size_t available_out = 0; + result = BrotliDecoderDecompressStream(state, &available_in, &next_in, + &available_out, 0, 0); + const uint8_t* next_out = BrotliDecoderTakeOutput(state, &available_out); + if (available_out != 0) + output.insert(output.end(), next_out, next_out + available_out); + } + ok = result == BROTLI_DECODER_RESULT_SUCCESS && !available_in; + BrotliDecoderDestroyInstance(state); + + Py_END_ALLOW_THREADS + /* <<< Pure C block end. Python GIL reacquired. */ + + PyBuffer_Release(&input); + if (ok) { + ret = PyBytes_FromStringAndSize((char*)(output.size() ? &output[0] : NULL), output.size()); + } else { + PyErr_SetString(BrotliError, "BrotliDecompress failed"); + } + + return ret; +} + +static PyMethodDef brotli_methods[] = { + {"decompress", (PyCFunction)brotli_decompress, METH_VARARGS | METH_KEYWORDS, brotli_decompress__doc__}, + {NULL, NULL, 0, NULL} +}; + +PyDoc_STRVAR(brotli_doc, "Implementation module for the Brotli library."); + +#if PY_MAJOR_VERSION >= 3 +#define INIT_BROTLI PyInit__brotli +#define CREATE_BROTLI PyModule_Create(&brotli_module) +#define RETURN_BROTLI return m +#define RETURN_NULL return NULL + +static struct PyModuleDef brotli_module = { + PyModuleDef_HEAD_INIT, + "_brotli", /* m_name */ + brotli_doc, /* m_doc */ + 0, /* m_size */ + brotli_methods, /* m_methods */ + NULL, /* m_reload */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL /* m_free */ +}; +#else +#define INIT_BROTLI init_brotli +#define CREATE_BROTLI Py_InitModule3("_brotli", brotli_methods, brotli_doc) +#define RETURN_BROTLI return +#define RETURN_NULL return +#endif + +PyMODINIT_FUNC INIT_BROTLI(void) { + PyObject *m = CREATE_BROTLI; + + BrotliError = PyErr_NewException((char*) "brotli.error", NULL, NULL); + if (BrotliError != NULL) { + Py_INCREF(BrotliError); + PyModule_AddObject(m, "error", BrotliError); + } + + if (PyType_Ready(&brotli_CompressorType) < 0) { + RETURN_NULL; + } + Py_INCREF(&brotli_CompressorType); + PyModule_AddObject(m, "Compressor", (PyObject *)&brotli_CompressorType); + + if (PyType_Ready(&brotli_DecompressorType) < 0) { + RETURN_NULL; + } + Py_INCREF(&brotli_DecompressorType); + PyModule_AddObject(m, "Decompressor", (PyObject *)&brotli_DecompressorType); + + PyModule_AddIntConstant(m, "MODE_GENERIC", (int) BROTLI_MODE_GENERIC); + PyModule_AddIntConstant(m, "MODE_TEXT", (int) BROTLI_MODE_TEXT); + PyModule_AddIntConstant(m, "MODE_FONT", (int) BROTLI_MODE_FONT); + + char version[16]; + snprintf(version, sizeof(version), "%d.%d.%d", + BROTLI_VERSION >> 24, (BROTLI_VERSION >> 12) & 0xFFF, BROTLI_VERSION & 0xFFF); + PyModule_AddStringConstant(m, "__version__", version); + + RETURN_BROTLI; +} diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/bro.py b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/bro.py new file mode 100755 index 000000000..7a094b4c1 --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/bro.py @@ -0,0 +1,160 @@ +#! /usr/bin/env python +"""Compression/decompression utility using the Brotli algorithm.""" + +from __future__ import print_function +import argparse +import sys +import os +import platform + +import brotli + +# default values of encoder parameters +DEFAULT_PARAMS = { + 'mode': brotli.MODE_GENERIC, + 'quality': 11, + 'lgwin': 22, + 'lgblock': 0, +} + + +def get_binary_stdio(stream): + """ Return the specified standard input, output or errors stream as a + 'raw' buffer object suitable for reading/writing binary data from/to it. + """ + assert stream in ['stdin', 'stdout', 'stderr'], 'invalid stream name' + stdio = getattr(sys, stream) + if sys.version_info[0] < 3: + if sys.platform == 'win32': + # set I/O stream binary flag on python2.x (Windows) + runtime = platform.python_implementation() + if runtime == 'PyPy': + # the msvcrt trick doesn't work in pypy, so I use fdopen + mode = 'rb' if stream == 'stdin' else 'wb' + stdio = os.fdopen(stdio.fileno(), mode, 0) + else: + # this works with CPython -- untested on other implementations + import msvcrt + msvcrt.setmode(stdio.fileno(), os.O_BINARY) + return stdio + else: + # get 'buffer' attribute to read/write binary data on python3.x + if hasattr(stdio, 'buffer'): + return stdio.buffer + else: + orig_stdio = getattr(sys, '__%s__' % stream) + return orig_stdio.buffer + + +def main(args=None): + + parser = argparse.ArgumentParser( + prog=os.path.basename(__file__), description=__doc__) + parser.add_argument( + '--version', action='version', version=brotli.__version__) + parser.add_argument( + '-i', + '--input', + metavar='FILE', + type=str, + dest='infile', + help='Input file', + default=None) + parser.add_argument( + '-o', + '--output', + metavar='FILE', + type=str, + dest='outfile', + help='Output file', + default=None) + parser.add_argument( + '-f', + '--force', + action='store_true', + help='Overwrite existing output file', + default=False) + parser.add_argument( + '-d', + '--decompress', + action='store_true', + help='Decompress input file', + default=False) + params = parser.add_argument_group('optional encoder parameters') + params.add_argument( + '-m', + '--mode', + metavar='MODE', + type=int, + choices=[0, 1, 2], + help='The compression mode can be 0 for generic input, ' + '1 for UTF-8 encoded text, or 2 for WOFF 2.0 font data. ' + 'Defaults to 0.') + params.add_argument( + '-q', + '--quality', + metavar='QUALITY', + type=int, + choices=list(range(0, 12)), + help='Controls the compression-speed vs compression-density ' + 'tradeoff. The higher the quality, the slower the ' + 'compression. Range is 0 to 11. Defaults to 11.') + params.add_argument( + '--lgwin', + metavar='LGWIN', + type=int, + choices=list(range(10, 25)), + help='Base 2 logarithm of the sliding window size. Range is ' + '10 to 24. Defaults to 22.') + params.add_argument( + '--lgblock', + metavar='LGBLOCK', + type=int, + choices=[0] + list(range(16, 25)), + help='Base 2 logarithm of the maximum input block size. ' + 'Range is 16 to 24. If set to 0, the value will be set based ' + 'on the quality. Defaults to 0.') + # set default values using global DEFAULT_PARAMS dictionary + parser.set_defaults(**DEFAULT_PARAMS) + + options = parser.parse_args(args=args) + + if options.infile: + if not os.path.isfile(options.infile): + parser.error('file "%s" not found' % options.infile) + with open(options.infile, 'rb') as infile: + data = infile.read() + else: + if sys.stdin.isatty(): + # interactive console, just quit + parser.error('no input') + infile = get_binary_stdio('stdin') + data = infile.read() + + if options.outfile: + if os.path.isfile(options.outfile) and not options.force: + parser.error('output file exists') + outfile = open(options.outfile, 'wb') + else: + outfile = get_binary_stdio('stdout') + + try: + if options.decompress: + data = brotli.decompress(data) + else: + data = brotli.compress( + data, + mode=options.mode, + quality=options.quality, + lgwin=options.lgwin, + lgblock=options.lgblock) + except brotli.error as e: + parser.exit(1, + 'bro: error: %s: %s' % (e, options.infile or 'sys.stdin')) + + outfile.write(data) + outfile.close() + + +if __name__ == '__main__': + main() diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/brotli.py b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/brotli.py new file mode 100644 index 000000000..d66966ba6 --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/brotli.py @@ -0,0 +1,56 @@ +# Copyright 2016 The Brotli Authors. All rights reserved. +# +# Distributed under MIT license. +# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT + +"""Functions to compress and decompress data using the Brotli library.""" + +import _brotli + + +# The library version. +__version__ = _brotli.__version__ + +# The compression mode. +MODE_GENERIC = _brotli.MODE_GENERIC +MODE_TEXT = _brotli.MODE_TEXT +MODE_FONT = _brotli.MODE_FONT + +# The Compressor object. +Compressor = _brotli.Compressor + +# The Decompressor object. +Decompressor = _brotli.Decompressor + +# Compress a byte string. +def compress(string, mode=MODE_GENERIC, quality=11, lgwin=22, lgblock=0): + """Compress a byte string. + + Args: + string (bytes): The input data. + mode (int, optional): The compression mode can be MODE_GENERIC (default), + MODE_TEXT (for UTF-8 format text input) or MODE_FONT (for WOFF 2.0). + quality (int, optional): Controls the compression-speed vs compression- + density tradeoff. The higher the quality, the slower the compression. + Range is 0 to 11. Defaults to 11. + lgwin (int, optional): Base 2 logarithm of the sliding window size. Range + is 10 to 24. Defaults to 22. + lgblock (int, optional): Base 2 logarithm of the maximum input block size. + Range is 16 to 24. If set to 0, the value will be set based on the + quality. Defaults to 0. + + Returns: + The compressed byte string. + + Raises: + brotli.error: If arguments are invalid, or compressor fails. + """ + compressor = Compressor(mode=mode, quality=quality, lgwin=lgwin, + lgblock=lgblock) + return compressor.process(string) + compressor.finish() + +# Decompress a compressed byte string. +decompress = _brotli.decompress + +# Raised if compression or decompression fails. +error = _brotli.error diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/__init__.py b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/__init__.py diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/_test_utils.py b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/_test_utils.py new file mode 100644 index 000000000..104e6548e --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/_test_utils.py @@ -0,0 +1,112 @@ +from __future__ import print_function +import filecmp +import glob +import itertools +import os +import sys +import sysconfig +import tempfile +import unittest + + +project_dir = os.path.abspath(os.path.join(__file__, '..', '..', '..')) +src_dir = os.path.join(project_dir, 'python') +test_dir = os.path.join(project_dir, 'tests') + +python_exe = sys.executable or 'python' +bro_path = os.path.join(src_dir, 'bro.py') +BRO_ARGS = [python_exe, bro_path] + +# Get the platform/version-specific build folder. +# By default, the distutils build base is in the same location as setup.py. +platform_lib_name = 'lib.{platform}-{version[0]}.{version[1]}'.format( + platform=sysconfig.get_platform(), version=sys.version_info) +build_dir = os.path.join(project_dir, 'bin', platform_lib_name) + +# Prepend the build folder to sys.path and the PYTHONPATH environment variable. +if build_dir not in sys.path: + sys.path.insert(0, build_dir) +TEST_ENV = os.environ.copy() +if 'PYTHONPATH' not in TEST_ENV: + TEST_ENV['PYTHONPATH'] = build_dir +else: + TEST_ENV['PYTHONPATH'] = build_dir + os.pathsep + TEST_ENV['PYTHONPATH'] + +TESTDATA_DIR = os.path.join(test_dir, 'testdata') + +TESTDATA_FILES = [ + 'empty', # Empty file + '10x10y', # Small text + 'alice29.txt', # Large text + 'random_org_10k.bin', # Small data + 'mapsdatazrh', # Large data +] + +TESTDATA_PATHS = [os.path.join(TESTDATA_DIR, f) for f in TESTDATA_FILES] + +TESTDATA_PATHS_FOR_DECOMPRESSION = glob.glob( + os.path.join(TESTDATA_DIR, '*.compressed')) + +TEMP_DIR = tempfile.mkdtemp() + + +def get_temp_compressed_name(filename): + return os.path.join(TEMP_DIR, os.path.basename(filename + '.bro')) + + +def get_temp_uncompressed_name(filename): + return os.path.join(TEMP_DIR, os.path.basename(filename + '.unbro')) + + +def bind_method_args(method, *args, **kwargs): + return lambda self: method(self, *args, **kwargs) + + +def generate_test_methods(test_case_class, + for_decompression=False, + variants=None): + # Add test methods for each test data file. This makes identifying problems + # with specific compression scenarios easier. + if for_decompression: + paths = TESTDATA_PATHS_FOR_DECOMPRESSION + else: + paths = TESTDATA_PATHS + opts = [] + if variants: + opts_list = [] + for k, v in variants.items(): + opts_list.append([r for r in itertools.product([k], v)]) + for o in itertools.product(*opts_list): + opts_name = '_'.join([str(i) for i in itertools.chain(*o)]) + opts_dict = dict(o) + opts.append([opts_name, opts_dict]) + else: + opts.append(['', {}]) + for method in [m for m in dir(test_case_class) if m.startswith('_test')]: + for testdata in paths: + for (opts_name, opts_dict) in opts: + f = os.path.splitext(os.path.basename(testdata))[0] + name = 'test_{method}_{options}_{file}'.format( + method=method, options=opts_name, file=f) + func = bind_method_args( + getattr(test_case_class, method), testdata, **opts_dict) + setattr(test_case_class, name, func) + + +class TestCase(unittest.TestCase): + + def tearDown(self): + for f in TESTDATA_PATHS: + try: + os.unlink(get_temp_compressed_name(f)) + except OSError: + pass + try: + os.unlink(get_temp_uncompressed_name(f)) + except OSError: + pass + + def assertFilesMatch(self, first, second): + self.assertTrue( + filecmp.cmp(first, second, shallow=False), + 'File {} differs from {}'.format(first, second)) diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/bro_test.py b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/bro_test.py new file mode 100644 index 000000000..b55129def --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/bro_test.py @@ -0,0 +1,102 @@ +# Copyright 2016 The Brotli Authors. All rights reserved. +# +# Distributed under MIT license. +# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT + +import subprocess +import unittest + +from . import _test_utils +import brotli + +BRO_ARGS = _test_utils.BRO_ARGS +TEST_ENV = _test_utils.TEST_ENV + + +def _get_original_name(test_data): + return test_data.split('.compressed')[0] + + +class TestBroDecompress(_test_utils.TestCase): + + def _check_decompression(self, test_data): + # Verify decompression matches the original. + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + original = _get_original_name(test_data) + self.assertFilesMatch(temp_uncompressed, original) + + def _decompress_file(self, test_data): + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + args = BRO_ARGS + ['-f', '-d', '-i', test_data, '-o', temp_uncompressed] + subprocess.check_call(args, env=TEST_ENV) + + def _decompress_pipe(self, test_data): + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + args = BRO_ARGS + ['-d'] + with open(temp_uncompressed, 'wb') as out_file: + with open(test_data, 'rb') as in_file: + subprocess.check_call( + args, stdin=in_file, stdout=out_file, env=TEST_ENV) + + def _test_decompress_file(self, test_data): + self._decompress_file(test_data) + self._check_decompression(test_data) + + def _test_decompress_pipe(self, test_data): + self._decompress_pipe(test_data) + self._check_decompression(test_data) + + +_test_utils.generate_test_methods(TestBroDecompress, for_decompression=True) + + +class TestBroCompress(_test_utils.TestCase): + + VARIANTS = {'quality': (1, 6, 9, 11), 'lgwin': (10, 15, 20, 24)} + + def _check_decompression(self, test_data, **kwargs): + # Write decompression to temp file and verify it matches the original. + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + original = test_data + args = BRO_ARGS + ['-f', '-d'] + args.extend(['-i', temp_compressed, '-o', temp_uncompressed]) + subprocess.check_call(args, env=TEST_ENV) + self.assertFilesMatch(temp_uncompressed, original) + + def _compress_file(self, test_data, **kwargs): + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + args = BRO_ARGS + ['-f'] + if 'quality' in kwargs: + args.extend(['-q', str(kwargs['quality'])]) + if 'lgwin' in kwargs: + args.extend(['--lgwin', str(kwargs['lgwin'])]) + args.extend(['-i', test_data, '-o', temp_compressed]) + subprocess.check_call(args, env=TEST_ENV) + + def _compress_pipe(self, test_data, **kwargs): + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + args = BRO_ARGS + if 'quality' in kwargs: + args.extend(['-q', str(kwargs['quality'])]) + if 'lgwin' in kwargs: + args.extend(['--lgwin', str(kwargs['lgwin'])]) + with open(temp_compressed, 'wb') as out_file: + with open(test_data, 'rb') as in_file: + subprocess.check_call( + args, stdin=in_file, stdout=out_file, env=TEST_ENV) + + def _test_compress_file(self, test_data, **kwargs): + self._compress_file(test_data, **kwargs) + self._check_decompression(test_data) + + def _test_compress_pipe(self, test_data, **kwargs): + self._compress_pipe(test_data, **kwargs) + self._check_decompression(test_data) + + +_test_utils.generate_test_methods( + TestBroCompress, variants=TestBroCompress.VARIANTS) + +if __name__ == '__main__': + unittest.main() diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/compress_test.py b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/compress_test.py new file mode 100644 index 000000000..46ff68f50 --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/compress_test.py @@ -0,0 +1,41 @@ +# Copyright 2016 The Brotli Authors. All rights reserved. +# +# Distributed under MIT license. +# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT + +import unittest + +from . import _test_utils +import brotli + + +class TestCompress(_test_utils.TestCase): + + VARIANTS = {'quality': (1, 6, 9, 11), 'lgwin': (10, 15, 20, 24)} + + def _check_decompression(self, test_data, **kwargs): + kwargs = {} + # Write decompression to temp file and verify it matches the original. + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + original = test_data + with open(temp_uncompressed, 'wb') as out_file: + with open(temp_compressed, 'rb') as in_file: + out_file.write(brotli.decompress(in_file.read(), **kwargs)) + self.assertFilesMatch(temp_uncompressed, original) + + def _compress(self, test_data, **kwargs): + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + with open(temp_compressed, 'wb') as out_file: + with open(test_data, 'rb') as in_file: + out_file.write(brotli.compress(in_file.read(), **kwargs)) + + def _test_compress(self, test_data, **kwargs): + self._compress(test_data, **kwargs) + self._check_decompression(test_data, **kwargs) + + +_test_utils.generate_test_methods(TestCompress, variants=TestCompress.VARIANTS) + +if __name__ == '__main__': + unittest.main() diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/compressor_test.py b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/compressor_test.py new file mode 100644 index 000000000..2d4791996 --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/compressor_test.py @@ -0,0 +1,94 @@ +# Copyright 2016 The Brotli Authors. All rights reserved. +# +# Distributed under MIT license. +# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT + +import functools +import unittest + +from . import _test_utils +import brotli + + +# Do not inherit from TestCase here to ensure that test methods +# are not run automatically and instead are run as part of a specific +# configuration below. +class _TestCompressor(object): + + CHUNK_SIZE = 2048 + + def tearDown(self): + self.compressor = None + + def _check_decompression(self, test_data): + # Write decompression to temp file and verify it matches the original. + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + original = test_data + with open(temp_uncompressed, 'wb') as out_file: + with open(temp_compressed, 'rb') as in_file: + out_file.write(brotli.decompress(in_file.read())) + self.assertFilesMatch(temp_uncompressed, original) + + def _test_single_process(self, test_data): + # Write single-shot compression to temp file. + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + with open(temp_compressed, 'wb') as out_file: + with open(test_data, 'rb') as in_file: + out_file.write(self.compressor.process(in_file.read())) + out_file.write(self.compressor.finish()) + self._check_decompression(test_data) + + def _test_multiple_process(self, test_data): + # Write chunked compression to temp file. + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + with open(temp_compressed, 'wb') as out_file: + with open(test_data, 'rb') as in_file: + read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) + for data in iter(read_chunk, b''): + out_file.write(self.compressor.process(data)) + out_file.write(self.compressor.finish()) + self._check_decompression(test_data) + + def _test_multiple_process_and_flush(self, test_data): + # Write chunked and flushed compression to temp file. + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + with open(temp_compressed, 'wb') as out_file: + with open(test_data, 'rb') as in_file: + read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) + for data in iter(read_chunk, b''): + out_file.write(self.compressor.process(data)) + out_file.write(self.compressor.flush()) + out_file.write(self.compressor.finish()) + self._check_decompression(test_data) + + +_test_utils.generate_test_methods(_TestCompressor) + + +class TestCompressorQuality1(_TestCompressor, _test_utils.TestCase): + + def setUp(self): + self.compressor = brotli.Compressor(quality=1) + + +class TestCompressorQuality6(_TestCompressor, _test_utils.TestCase): + + def setUp(self): + self.compressor = brotli.Compressor(quality=6) + + +class TestCompressorQuality9(_TestCompressor, _test_utils.TestCase): + + def setUp(self): + self.compressor = brotli.Compressor(quality=9) + + +class TestCompressorQuality11(_TestCompressor, _test_utils.TestCase): + + def setUp(self): + self.compressor = brotli.Compressor(quality=11) + + +if __name__ == '__main__': + unittest.main() diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/decompress_test.py b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/decompress_test.py new file mode 100644 index 000000000..814e56332 --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/decompress_test.py @@ -0,0 +1,42 @@ +# Copyright 2016 The Brotli Authors. All rights reserved. +# +# Distributed under MIT license. +# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT + +import unittest + +from . import _test_utils +import brotli + + +def _get_original_name(test_data): + return test_data.split('.compressed')[0] + + +class TestDecompress(_test_utils.TestCase): + + def _check_decompression(self, test_data): + # Verify decompression matches the original. + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + original = _get_original_name(test_data) + self.assertFilesMatch(temp_uncompressed, original) + + def _decompress(self, test_data): + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + with open(temp_uncompressed, 'wb') as out_file: + with open(test_data, 'rb') as in_file: + out_file.write(brotli.decompress(in_file.read())) + + def _test_decompress(self, test_data): + self._decompress(test_data) + self._check_decompression(test_data) + + def test_garbage_appended(self): + with self.assertRaises(brotli.error): + brotli.decompress(brotli.compress(b'a') + b'a') + + +_test_utils.generate_test_methods(TestDecompress, for_decompression=True) + +if __name__ == '__main__': + unittest.main() diff --git a/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/decompressor_test.py b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/decompressor_test.py new file mode 100644 index 000000000..05918ada8 --- /dev/null +++ b/roms/edk2/BaseTools/Source/C/BrotliCompress/brotli/python/tests/decompressor_test.py @@ -0,0 +1,59 @@ +# Copyright 2016 The Brotli Authors. All rights reserved. +# +# Distributed under MIT license. +# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT + +import functools +import unittest + +from . import _test_utils +import brotli + + +def _get_original_name(test_data): + return test_data.split('.compressed')[0] + + +class TestDecompressor(_test_utils.TestCase): + + CHUNK_SIZE = 1 + + def setUp(self): + self.decompressor = brotli.Decompressor() + + def tearDown(self): + self.decompressor = None + + def _check_decompression(self, test_data): + # Verify decompression matches the original. + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + original = _get_original_name(test_data) + self.assertFilesMatch(temp_uncompressed, original) + + def _decompress(self, test_data): + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + with open(temp_uncompressed, 'wb') as out_file: + with open(test_data, 'rb') as in_file: + read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) + for data in iter(read_chunk, b''): + out_file.write(self.decompressor.process(data)) + self.assertTrue(self.decompressor.is_finished()) + + def _test_decompress(self, test_data): + self._decompress(test_data) + self._check_decompression(test_data) + + def test_garbage_appended(self): + with self.assertRaises(brotli.error): + self.decompressor.process(brotli.compress(b'a') + b'a') + + def test_already_finished(self): + self.decompressor.process(brotli.compress(b'a')) + with self.assertRaises(brotli.error): + self.decompressor.process(b'a') + + +_test_utils.generate_test_methods(TestDecompressor, for_decompression=True) + +if __name__ == '__main__': + unittest.main() |