User-defined codecs and filters

While Python-Blosc2 offers many standard codecs and filters to enable custom compression, one may wish to use user-defined compression algorithms. This is possible in Python-Blosc2 via Python functions which may act as user-defined codecs and filters. These will work as normal codecs or filters respectively following the order depicted below:

1dd054d0a09b46a38509ea6f098ba3fc

So when compressing, the first step will be to apply the prefilter (if any), then the filter pipeline with a maximum of six filters and, last but not least, the codec. For decompressing, the order will be the other way around: first the codec, then the filter pipeline and finally the postfilter (if any).

In this tutorial we will see how to create and use custom codecs and filters (see the next tutorial for post-/prefilters).

User-defined codecs

Predefined codecs in Blosc2 use low-level C functions and so are amenable to parallelisation. Because a user-defined codec has Python code, we will not be able to use parallelism, so nthreads has to be 1 when compressing and decompressing. We set nthreads=1 in the CParams and DParams objects that we will use to create the SChunk instance. When using user-defined codes, we may also specify codec_meta in the CParams instance as an integer between 0 and 255 (see compcode_meta here). This meta will be passed to the codec’s encoder and decoder functions, where it can be interpreted as one desires. We may also pass filters_meta in the CParams object, which will be passed to the user-defined filters forward and backward functions. Later on, we will update the CParams object with our user-defined codec and filters, and update the meta at the same time.

[1]:
import sys

import numpy as np

import blosc2

dtype = np.dtype(np.int32)
cparams = blosc2.CParams(nthreads=1, typesize=dtype.itemsize)
dparams = blosc2.DParams(nthreads=1)

chunk_len = 1000
schunk = blosc2.SChunk(chunksize=chunk_len * dtype.itemsize, cparams=cparams, dparams=dparams)

Creating a codec

To create a codec we need two functions: one for compressing (aka encoder) and another for decompressing (aka decoder). In order to explain the procedure, we will create a codec for repeated values. First we programme the encoder function:

[2]:
def encoder(input, output, meta, schunk):
    nd_input = input.view(dtype)
    # Check if all the values are the same
    if np.max(nd_input) == np.min(nd_input):
        # output = [value, nrep]
        output[0 : schunk.typesize] = input[0 : schunk.typesize]
        byteorder = "little" if meta == 0 else "big"
        n = nd_input.size.to_bytes(4, byteorder)
        output[schunk.typesize : schunk.typesize + 4] = [n[i] for i in range(4)]
        return schunk.typesize + 4
    else:
        # memcpy
        return 0

In order to be compatible with the Blosc2 internal compression machinery, which operates blockwise, the encoder function requires 4 arguments: the input data block; the output buffer into which the data is compressed; the codec meta (which here we decide will be used to indicate the “endianness” of the bytes); and the SChunk instance which hosts the compressed block. The encoder must then return the size of the compressed buffer in bytes. If it cannot compress the data, it must return 0 - Blosc2 will then know to simply copy the block without compressing. The image below depicts what our encoder does:

f126aee28b2942df93c02006a89f694a

Now let’s go for the decoder, which also expects to receive the same 4 arguments, and operates blockwise.

[3]:
def decoder(input, output, meta, schunk):
    byteorder = "little" if meta == 0 else "big"
    if byteorder == "little":
        nd_input = input.view("<i4")
    else:
        nd_input = input.view(">i4")
    nd_output = output.view("i4")
    nd_output[0 : nd_input[1]] = [nd_input[0]] * nd_input[1]
    return nd_input[1] * schunk.typesize

The decoder function must return the size of the decompressed buffer in bytes; it receives the output filled by the encoder as the input param, and will recreate the data again following this scheme:

803a5943e17a4f4db3b3f534cb819b85

Note that if a block was memcopied (uncompressed) by Blosc2 the decoder will be skipped when requesting data from the SChunk.

Registering and Using a codec

Once the codec’s procedures are defined, we can register it to the local Blosc2 codec registry! For that, we must choose an identifier between 160 and 255.

[4]:
codec_name = "our_codec"
codec_id = 160
blosc2.register_codec(codec_name, codec_id, encoder, decoder)

The codec can now be specified in the compression params of an SChunk instance using its id. We also pass the codec_meta that we want our codec to use in the encoder and decoder. Since we designed the codec to receive the original data with no changes, we specify that no filters are to be used:

[5]:
codec_meta = 0 if sys.byteorder == "little" else 1
for k, v in {
    "codec": codec_id,
    "codec_meta": codec_meta,
    "filters": [blosc2.Filter.NOFILTER],
    "filters_meta": [0],
}.items():
    setattr(cparams, k, v)
schunk.cparams = cparams
schunk.cparams
[5]:
CParams(codec=160, codec_meta=0, clevel=1, use_dict=False, typesize=4, nthreads=1, blocksize=0, splitmode=<SplitMode.AUTO_SPLIT: 3>, filters=[<Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>], filters_meta=[0, 0, 0, 0, 0, 0], tuner=<Tuner.STUNE: 0>)

Note that it is important to update the whole cparams attribute at the same time, and not the individual attributes e.g. cparams.codec, since the latter do not have setters defined (whereas SChunk does have a cparams setter defined), and so will not update the compression parameters correctly; i.e. schunk.cparams.codec = 160 will not correctly update the internal C machinery. Now we can check that our codec works well by appending and recovering some data, composed of three chunks, each of which is made of a different repeated value - the compression goes blockwise, so many blocks will be composed of a single repeated value and will be compressed by the codec.

[6]:
fill_value = 1234
a = np.full(chunk_len, fill_value, dtype=dtype)
b = np.full(chunk_len, fill_value + 1, dtype=dtype)
c = np.full(chunk_len, fill_value + 2, dtype=dtype)
data = np.concat((a, b, c))
schunk[0 : data.size] = data
print("schunk cratio: ", round(schunk.cratio, 2))

out = np.empty(data.shape, dtype=dtype)
schunk.get_slice(out=out)

np.array_equal(data, out)
schunk cratio:  83.33
[6]:
True

Awesome, it works! However, if the array is not composed of blocks with repeated values our codec will not compress anything. In the next section, we will create and use a filter and perform a little modification to our codec so that we can compress even if the data is made out of equally spaced values.

User-defined filters

Writing and registering filters is not too different to writing and registering codecs. Filters do not directly compress data, but rather manipulate it to make it easier to compress.

Creating a filter

As for user-defined codecs, to create a user-defined filter we will first need to create two functions: one for the compression process (aka forward) and another one for the decompression process (aka backward).

Let’s write first the forward function. Its signature is exactly the same as the encoder/decoder signature, although here the meta will be passed from the filters_meta attribute of the CParams instance associated to schunk (which does not necessarily have to be used). Neither the forward nor backward functions have to return anything.

[7]:
def forward(input, output, meta, schunk):
    nd_input = input.view(dtype)
    nd_output = output.view(dtype)

    start = nd_input[0]
    nd_output[0] = start
    nd_output[1:] = nd_input[1:] - nd_input[:-1]

As you can see, our forward function keeps the start value, and then it computes the difference between each element and the one next to it just like the following image shows. As a consequence, after passing through the filter, equally spaced data will be processed into an array with many repeated values. Later on, we will write a new codec which will be able to compress/decompress this filtered data.

5cad55902e3b409c8c080f19bd7c6ac0

The backward function applies the inverse transform to the forward function, so it will reconstruct the original data.

[8]:
def backward(input, output, meta, schunk):
    nd_input = input.view(dtype)
    nd_output = output.view(dtype)

    nd_output[0] = nd_input[0]
    for i in range(1, nd_output.size):
        nd_output[i] = nd_output[i - 1] + nd_input[i]

Hence when called on the output of the forward function, it will reconstruct the original data as follows:

2a7c4b309aa844f09681507a0ef76517

Registering and Using a filter

Once we have the two required functions, we can register our filter. In the same way we did for the codecs, we have to choose an identifier between 160 and 255:

[9]:
filter_id = 160
blosc2.register_filter(filter_id, forward, backward)

The filter can now be introduced into the SChunk’s filter pipeline via updating the cparams attribute of the SChunk instance with a list of the filters to be applied, indicated by their unique id (in this case just the filter we created), and their corresponding filters_meta (in this case it is unimportant, as the filter does not use it). We also need to update the codec used so that we can take advantage of the filter first though.

Writing a new codec for the filtered data

Next, we are going to create another codec to compress data passed by the filter. This will get the start value and the step when compressing, and will rebuild the data from those values when decompressing:

[10]:
def encoder2(input, output, meta, schunk):
    nd_input = input.view(dtype)
    if np.min(nd_input[1:]) == np.max(nd_input[1:]):
        output[0 : schunk.typesize] = input[0 : schunk.typesize]  # start
        step = int(nd_input[1])
        n = step.to_bytes(4, sys.byteorder)
        output[schunk.typesize : schunk.typesize + 4] = [n[i] for i in range(4)]
        return schunk.typesize + 4
    else:
        # Not compressible, tell Blosc2 to do a memcpy
        return 0


def decoder2(input, output, meta, schunk):
    nd_input = input.view(dtype)
    nd_output = output.view(dtype)
    nd_output[0] = nd_input[0]
    nd_output[1:] = nd_input[1]

    return nd_output.size * schunk.typesize

Their corresponding schemes are as follows:

60add8f2a0f74c4f81b6455f6484cfe9

ff86f59a4bc84ea08bca808fa1fbe62f

As the previous id is already in use, we will register it with another identifier:

[11]:
blosc2.register_codec(codec_name="our_codec2", id=184, encoder=encoder2, decoder=decoder2)

Now we update the schunk’s cparams to use the new codec as well as the filter we just registered. We will also set the codec_meta to 0, although it isn’t used by our new codec.

[12]:
cparams.filters = [filter_id]
cparams.filters_meta = [0]
cparams.codec = 184
cparams.codec_meta = 0
schunk.cparams = cparams
schunk.cparams
[12]:
CParams(codec=184, codec_meta=0, clevel=1, use_dict=False, typesize=4, nthreads=1, blocksize=0, splitmode=<SplitMode.AUTO_SPLIT: 3>, filters=[160, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>], filters_meta=[0, 0, 0, 0, 0, 0], tuner=<Tuner.STUNE: 0>)

We will check that it actually works by updating the data:

[13]:
nchunks = 3
new_data = np.arange(chunk_len, chunk_len * (nchunks + 1), dtype=dtype)

schunk[0 : new_data.size] = new_data
print("schunk compression ratio: ", round(schunk.cratio, 2))

out = np.empty(new_data.shape, dtype=dtype)
schunk.get_slice(out=out)
np.array_equal(new_data, out)
schunk compression ratio:  83.33
[13]:
True

As can be seen, we obtained the same compression ratio as before - since we store each of the 3 chunks using 8 bytes each.

Conclusion and NDArray arrays

So now, whenever you need it, you can register a codec or filter and use it in your data! Note that one can also define and apply codecs and filters to blosc2.NDArray objects, since they are based on the SChunk class, like so:

[14]:
array = blosc2.zeros((30, 30))
array.schunk.cparams = blosc2.CParams(
    **{"codec": 184, "filters": [filter_id], "filters_meta": [0], "nthreads": 1}
)
array.schunk.cparams
[14]:
CParams(codec=184, codec_meta=0, clevel=1, use_dict=False, typesize=8, nthreads=1, blocksize=0, splitmode=<SplitMode.AUTO_SPLIT: 3>, filters=[160, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>, <Filter.NOFILTER: 0>], filters_meta=[0, 0, 0, 0, 0, 0], tuner=<Tuner.STUNE: 0>)