Prefilters: compute before the filter pipeline#

Python-Blosc2 now has support for prefilters, fillers and postfilters. The prefilters are user defined functions that can be executed before compressing the data when filling a schunk. In this tutorial we will see how these work, so let’s start by creating our schunk!

Prefilters#

Because a prefilter is a python function, we will not be able to use parallelism, so nthreads has to be 1 when compressing:

[1]:
import blosc2
import numpy as np

typesize = 4
cparams = {
    "nthreads": 1,
    "typesize": typesize,
}

storage = {
    "cparams": cparams,
}

chunk_len = 10_000
schunk = blosc2.SChunk(chunksize=chunk_len * typesize, **storage)
schunk
[1]:
<blosc2.schunk.SChunk at 0x7faa59ebfd00>

Now that we have the schunk, we can create its prefilter.

Setting a prefilter#

For setting the prefilter, you will first have to create it as a function that receives three params: input, output and the offset in schunk where the block starts. Then, you will use a decorator and pass to it the input data type that the prefilter will receive and the output data type that it will fill and append to the schunk:

[2]:
input_dtype = np.int32
output_dtype = np.float32


@schunk.prefilter(input_dtype, output_dtype)
def prefilter(input, output, offset):
    output[:] = input - np.pi + offset

Awesome! Now each time we add data in the schunk, the prefilter will modify it before storing it. Let’s append an array and see that the actual appended data has been modified:

[3]:
buffer = np.arange(chunk_len * 100, dtype=input_dtype)
schunk[: buffer.size] = buffer

out = np.empty(10, dtype=output_dtype)
schunk.get_slice(stop=10, out=out)
print(buffer[:10])
print(out)
[0 1 2 3 4 5 6 7 8 9]
[-3.1415927  -2.1415927  -1.1415926  -0.14159265  0.8584073   1.8584074
  2.8584073   3.8584073   4.8584075   5.8584075 ]

As you can see, the data was modified according to the prefilter function.

Removing a prefilter#

What if we don’t want the prefilter to be executed anymore? Then you can remove the prefilter from the schunk just like so:

[4]:
schunk.remove_prefilter("prefilter")

Re-enabling parallelism#

To take advantage again of multi-threading, you can change the number of threads when compressing to a higher number:

[5]:
schunk.cparams = {"nthreads": 8}
schunk.cparams
[5]:
{'codec': <Codec.ZSTD: 5>,
 'codec_meta': 0,
 'clevel': 1,
 'use_dict': 0,
 'typesize': 4,
 'nthreads': 8,
 'blocksize': 0,
 'splitmode': <SplitMode.ALWAYS_SPLIT: 1>,
 'filters': [<Filter.NOFILTER: 0>,
  <Filter.NOFILTER: 0>,
  <Filter.NOFILTER: 0>,
  <Filter.NOFILTER: 0>,
  <Filter.NOFILTER: 0>,
  <Filter.SHUFFLE: 1>],
 'filters_meta': [0, 0, 0, 0, 0, 0]}

You can see that the only compression parameters changed where those in the dictionary.

Fillers#

So far, we have seen a way to set a function that will be executed each time we append some data. Now, we may instead want to fill an empty schunk with some more complex operation only once, and then update the data without being modified. This is where fillers come into play.

A filler is a function that receives a tuple of inputs, an output and the offset where the block begins. First let’s create another empty schunk:

[6]:
schunk_fill = blosc2.SChunk(chunksize=chunk_len * typesize, **storage)

Next, we will create our filler function and associate it to the schunk with the decorator, passing the inputs tuple with their data type, an output dtype and the number of elements you want the schunk to have. We will use as an input our previous schunk that we created:

[7]:
nelem = schunk.nbytes // schunk.typesize


@schunk_fill.filler(((schunk, output_dtype),), np.int32, nelem)
def filler(inputs_tuple, output, offset):
    output[:] = inputs_tuple[0] + offset

Let’s see how the appended data looks like:

[8]:
out = np.empty(nelem, dtype=np.int32)
schunk_fill.get_slice(out=out)
out
[8]:
array([     -3,      -2,      -1, ..., 2979993, 2979994, 2979995],
      dtype=int32)

That looks right. What if we want to update the schunk?

[9]:
new_data = np.ones(chunk_len, dtype=np.int32)

schunk_fill[: new_data.size] = new_data
schunk_fill.get_slice(out=out)
out
[9]:
array([      1,       1,       1, ..., 2979993, 2979994, 2979995],
      dtype=int32)

As you can see, the filler function has not been applied to the new data. That makes sense because the filler, contrarily to a regular prefilter, is only active during the schunk creation.

Conclusions#

If you want a function to be applied each time before compressing some data, you will use a prefilter. But if you just want to use it once to fill an empty schunk, you may want to use a filler.

Prefilters can also be applied to a NDArray data through its SChunk unidimensional chunks (NDArray.schunk).

See the next tutorial for a similar tutorial with postfilters.