SChunk

The basic compressed data container (aka super-chunk). This class consists of a set of useful parameters and methods that allow not only to create compressed data, and decompress it, but also to manage the data in a more sophisticated way. For example, it is possible to append new data, update existing data, delete data, etc.

class blosc2.SChunk(chunksize: int | None = None, data: object = None, **kwargs: dict | CParams | Storage | DParams)[source]
Attributes:
blocksize

The block size (in bytes).

cbytes

Amount of compressed data bytes (data size + chunk headers size).

chunkshape

Number of elements per chunk.

chunksize

Number of bytes in each chunk.

contiguous

Whether the SChunk is stored contiguously or sparsely.

cparams

blosc2.CParams instance with the compression parameters.

cratio

Compression ratio.

dparams

blosc2.DParams instance with the decompression parameters.

meta

Access to the fixed-length metadata of the SChunk.

nbytes

Amount of uncompressed data bytes.

nchunks

The number of chunks.

typesize

Type size of the SChunk.

urlpath

Path where the SChunk is stored.

vlmeta

Access to the variable-length metadata of the SChunk.

Methods

append_data(data)

Append a data buffer to the SChunk.

decompress_chunk(nchunk[, dst])

Decompress the chunk given by its index nchunk.

delete_chunk(nchunk)

Delete the specified chunk from the SChunk.

fill_special(nitems, special_value[, value])

Fill the SChunk with a special value.

filler(inputs_tuple, schunk_dtype[, nelem])

Decorator to set a filler function.

get_chunk(nchunk)

Return the compressed chunk that is in the SChunk.

get_slice([start, stop, out])

Get a slice from start to stop.

insert_chunk(nchunk, chunk)

Insert an already compressed chunk into the SChunk.

insert_data(nchunk, data, copy)

Insert the data in the specified position in the SChunk.

iterchunks(dtype)

Iterate over the self chunks of the SChunk.

iterchunks_info()

Iterate over the chunks of the SChunk, providing info on index and special values.

postfilter(input_dtype[, output_dtype])

Decorator to set a function as a postfilter.

prefilter(input_dtype[, output_dtype])

Decorator to set a function as a prefilter.

remove_postfilter(func_name[, _new_ctx])

Remove the postfilter from the SChunk instance.

remove_prefilter(func_name[, _new_ctx])

Remove the prefilter from the SChunk instance.

to_cframe()

Get a bytes object containing the serialized SChunk instance.

update_chunk(nchunk, chunk)

Update an existing chunk in the SChunk.

update_data(nchunk, data, copy)

Update the chunk in the specified position with the given data.

Special Methods:

__init__([chunksize, data])

Create a new super-chunk, or open an existing one.

__len__()

Return the number of items in the SChunk.

__getitem__(item)

Get a slice from the SChunk.

__setitem__(key, value)

Set slice to value.

Constructor

__init__(chunksize: int | None = None, data: object = None, **kwargs: dict | CParams | Storage | DParams) None[source]

Create a new super-chunk, or open an existing one.

Parameters:
  • chunksize (int, optional) – The size, in bytes, of the chunks in the super-chunk. If not provided, it is set automatically to a reasonable value.

  • data (bytes-like object, optional) – The data to be split into different chunks of size chunksize. If None, the Schunk instance will be empty initially.

  • kwargs (dict, optional) –

    Storage parameters. The default values are in blosc2.Storage. Supported keyword arguments:

    storage: blosc2.Storage or dict

    All the storage parameters that you want to use as a blosc2.Storage or dict instance.

    cparams: blosc2.CParams or dict

    All the compression parameters that you want to use as a blosc2.CParams or dict instance.

    dparams: blosc2.DParams or dict

    All the decompression parameters that you want to use as a blosc2.DParams or dict instance.

    others: Any

    If storage is not passed, all the parameters of a blosc2.Storage can be passed as keyword arguments.

Examples

>>> import blosc2
>>> import numpy as np
>>> import os.path
>>> import shutil
>>> import tempfile
>>> cparams = blosc2.CParams()
>>> dparams = blosc2.DParams()
>>> storage = blosc2.Storage(contiguous=True)
>>> schunk = blosc2.SChunk(cparams=cparams, dparams=dparams, storage=storage)

In the following, we will write and read a super-chunk to and from disk via memory-mapped files.

>>> a = np.arange(3, dtype=np.int64)
>>> chunksize = a.size * a.itemsize
>>> n_chunks = 2
>>> tmpdirname = tempfile.mkdtemp()
>>> urlpath = os.path.join(tmpdirname, 'schunk.b2frame')

Optional: we intend to write 2 chunks of 24 bytes each, and we expect the compressed size to be smaller than the original size. Therefore, we generously set the initial size of the mapping to 48 bytes effectively avoiding remappings.

>>> initial_mapping_size = chunksize * n_chunks
>>> schunk_mmap = blosc2.SChunk(
...     chunksize=chunksize,
...     mmap_mode="w+",
...     initial_mapping_size=initial_mapping_size,
...     urlpath=urlpath,
... )
>>> schunk_mmap.append_data(a)
1
>>> schunk_mmap.append_data(a * 2)
2

Optional: explicitly close the file and free the mapping.

>>> del schunk_mmap

Reading the data back again via memory-mapped files:

>>> schunk_mmap = blosc2.open(urlpath, mmap_mode="r")
>>> np.frombuffer(schunk_mmap.decompress_chunk(0), dtype=np.int64).tolist()
[0, 1, 2]
>>> np.frombuffer(schunk_mmap.decompress_chunk(1), dtype=np.int64).tolist()
[0, 2, 4]
>>> shutil.rmtree(tmpdirname)

Utility Methods

__len__() int[source]

Return the number of items in the SChunk.

__getitem__(item: int | slice) str | bytes[source]

Get a slice from the SChunk.

Parameters:

item (int or slice) – The index or slice for the data. Note that the step parameter is not honored.

Returns:

out – The decompressed slice as a Python str or bytes object.

Return type:

str or bytes

Raises:
  • ValueError – If the size to get is negative. If item.start is greater than or equal to the number of items in the SChunk.

  • RunTimeError – If a problem is detected.

  • IndexError – If step is not 1.

See also

get_slice()

Examples

>>> import blosc2
>>> import numpy as np
>>> nchunks = 4
>>> chunk_size = 200 * 1000 * 4
>>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(chunksize=chunk_size, data=data, cparams=cparams)
>>> # Use __getitem__ to retrieve the same slice of data from the SChunk
>>> res = schunk[150:155]
>>> f"Slice data: {np.frombuffer(res, dtype=np.int32)}"
Slice data: [150 151 152 153 154]
__setitem__(key: int | slice, value: object) None[source]

Set slice to value.

Parameters:
  • key (int or slice) – The index of the slice to update. Note that step parameter is not honored.

  • value (bytes-like object) – An object supporting the Buffer Protocol used to fill the slice.

Returns:

out

Return type:

None

Raises:
  • ValueError – If the object cannot be modified. If the size to get is negative. If there is not enough space in value to update the slice. If start is greater than the number of items in the SChunk.

  • RunTimeError – If a problem is detected.

  • IndexError – If step is not 1.

Notes

This method can also be used to append new data if key.stop is greater than the number of items in the SChunk.

Examples

>>> import blosc2
>>> import numpy as np
>>> nchunks = 4
>>> chunk_size = 200 * 1000 * 4
>>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(data=data, cparams=cparams)
>>> # Create a new array of values to update the slice (values from 1000 to 1999 multiplied by 2)
>>> start_ = 1000
>>> stop = 2000
>>> new_values = np.arange(start_, stop, dtype=np.int32) * 2
>>> schunk[start_:stop] = new_values
>>> # Retrieve the updated slice using the slicing syntax
>>> retrieved_slice = np.frombuffer(schunk[start_:stop], dtype=np.int32)
>>> f"First 10 values of the updated slice: {retrieved_slice[:10]}"
>>> f"Last 10 values of the updated slice: {retrieved_slice[-10:]}"
First 10 values of the updated slice: [2000 2002 2004 2006 2008 2010 2012 2014 2016 2018]
Last 10 values of the updated slice: [3980 3982 3984 3986 3988 3990 3992 3994 3996 3998]
append_data(data: object) int[source]

Append a data buffer to the SChunk.

The data buffer must be of size chunksize specified in SChunk.__init__.

Parameters:

data (bytes-like object) – The data to be compressed and added as a chunk.

Returns:

out – The number of chunks in the SChunk.

Return type:

int

Raises:

RunTimeError – If the data could not be appended.

Examples

>>> import blosc2
>>> import numpy as np
>>> schunk = blosc2.SChunk(chunksize=200*1000*4)
>>> data = np.arange(200 * 1000, dtype='int32')
>>> schunk.append_data(data)
1
decompress_chunk(nchunk: int, dst: object = None) str | bytes[source]

Decompress the chunk given by its index nchunk.

Parameters:
  • nchunk (int) – The index of the chunk that will be decompressed.

  • dst (NumPy object or bytearray) – The destination NumPy object or bytearray to fill, the length of which must be greater than 0. The user must ensure that it has enough capacity to host the decompressed chunk. Default is None, meaning that a new bytes object is created, filled and returned.

Returns:

out – The decompressed chunk as a Python str or bytes object if dst is None. Otherwise, it returns None because the result will already be in dst.

Return type:

str or bytes

Raises:

RunTimeError – If a problem is detected.

Examples

>>> import blosc2
>>> cparams = blosc2.CParams(typesize=1)
>>> schunk = blosc2.SChunk(cparams=cparams)
>>> buffer = b"wermqeoir23"
>>> schunk.append_data(buffer)
1
>>> schunk.decompress_chunk(0)
b'wermqeoir23'
>>> # Construct a mutable bytearray object
>>> bytes_obj = bytearray(len(buffer))
>>> schunk.decompress_chunk(0, dst=bytes_obj)
>>> bytes_obj == buffer
True
delete_chunk(nchunk: int) int[source]

Delete the specified chunk from the SChunk.

Parameters:

nchunk (int) – The index of the chunk that will be removed.

Returns:

out – The number of chunks in the SChunk.

Return type:

int

Raises:

RunTimeError – If a problem is detected.

Examples

>>> import blosc2
>>> import numpy as np
>>> # Create an SChunk with 3 chunks
>>> nchunks = 3
>>> data = np.arange(200 * 1000 * nchunks, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(chunksize=200 * 1000 * 4, data=data, cparams=cparams)
>>> # Check the number of chunks before deletion
>>> schunk.nchunks
3
>>>  # Delete the second chunk (index 1)
>>> schunk.delete_chunk(1)
>>>  # Check the number of chunks after deletion
>>> schunk.nchunks
2
fill_special(nitems: int, special_value: SpecialValue, value: bytes | int | float | bool | None = None) int[source]

Fill the SChunk with a special value. The SChunk must be empty.

Parameters:
  • nitems (int) – The number of items to fill with the special value.

  • special_value (SpecialValue) – The special value to be used for filling the SChunk.

  • value (bytes, int, float, bool (optional)) – The value to fill the SChunk. This parameter is only supported if special_value is blosc2.SpecialValue.VALUE.

Returns:

out – The number of chunks in the SChunk.

Return type:

int

Raises:

RunTimeError – If the SChunk could not be filled with the special value.

Examples

>>> import blosc2
>>> import numpy as np
>>> import time
>>> nitems = 100_000_000
>>> dtype = np.dtype(np.float64)
>>> # Measure the time to create SChunk from a NumPy array
>>> t0 = time.time()
>>> data = np.full(nitems, np.pi, dtype)
>>> cparams = blosc2.CParams(typesize=dtype.itemsize)
>>> schunk = blosc2.SChunk(data=data, cparams=cparams)
>>> t = (time.time() - t0) * 1000.
>>> f"Time creating a schunk with a numpy array: {t:10.3f} ms"
Time creating a schunk with a numpy array:    710.273 ms
>>> # Measure the time to create SChunk using fill_special
>>> t0 = time.time()
>>> cparams = blosc2.CParams(typesize=dtype.itemsize)
>>> schunk = blosc2.SChunk(cparams=cparams)
>>> schunk.fill_special(nitems, blosc2.SpecialValue.VALUE, np.pi)
>>> t = (time.time() - t0) * 1000.
>>> f"Time passing directly the value to `fill_special`: {t:10.3f} ms"
Time passing directly the value to `fill_special`:      2.109 ms
filler(inputs_tuple: tuple[tuple], schunk_dtype: dtype, nelem: int | None = None) None[source]

Decorator to set a filler function.

This function will fill self according to nelem. It will receive three parameters: a tuple with the inputs as ndarrays from which to read, the ndarray to fill self and the offset inside the SChunk instance where the corresponding block begins (see example below).

Parameters:
  • inputs_tuple (tuple of tuples) – Tuple containing a tuple for each argument that the function will receive, along with their corresponding np.dtype. Supported operand types are SChunk, ndarray and Python scalars.

  • schunk_dtype (np.dtype) – The data type to use to fill self.

  • nelem (int) – Number of elements to append to self. If None (default) it will be the number of elements from the operands.

Returns:

out

Return type:

None

Notes

  • Compression nthreads must be 1 when using this.

  • This does not need to be removed from the created SChunk instance.

See also

prefilter()

Examples

# Set the compression and decompression parameters
schunk_dtype = np.dtype(np.float64)
cparams = blosc2.CParams(typesize=schunk_dtype.itemsize, nthreads=1)
# Create empty SChunk
schunk = blosc2.SChunk(chunksize=20_000 * schunk_dtype.itemsize, cparams=cparams)

# Create operands
op_dtype = np.dtype(np.int32)
data = np.full(20_000 * 3, 12, dtype=op_dtype)
schunk_op = blosc2.SChunk(chunksize=20_000 * op_dtype.itemsize, data=data)

# Create filler
@schunk.filler(((schunk_op, op_dtype), (np.e, np.float32)), schunk_dtype)
def filler(inputs_tuple, output, offset):
    output[:] = inputs_tuple[0] - inputs_tuple[1]
get_chunk(nchunk: int) bytes[source]

Return the compressed chunk that is in the SChunk.

Parameters:

nchunk (int) – The index of the chunk that will be returned.

Returns:

out – The compressed chunk.

Return type:

bytes object

Raises:

RunTimeError – If a problem is detected.

Examples

>>> import blosc2
>>> import numpy as np
>>> # Create an SChunk with 3 chunks
>>> nchunks = 3
>>> data = np.arange(200 * 1000 * nchunks, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(data=data, cparams=cparams)
>>> # Retrieve the first chunk (index 0)
>>> chunk = schunk.get_chunk(0)
>>> # Check the type and length of the compressed chunk
>>> type(chunk)
<class 'bytes'>
>>> len(chunk)
10552
get_slice(start: int = 0, stop: int | None = None, out: object = None) str | bytes | None[source]

Get a slice from start to stop.

Parameters:
  • start (int) – The starting index of the slice. Default is 0.

  • stop (int) – The ending index of the slice (exclusive). Default is until the SChunk ends.

  • out (bytes-like object or bytearray) –

    The target object (supporting the Buffer Protocol) to fill. Verify that the buffer has enough space for the decompressed data. If None is provided, a new bytes object will be created, filled, and returned.

Returns:

out – The decompressed slice a Python str or bytes object if out is None. Otherwise, it returns None since the result will already be in out.

Return type:

str or bytes or None

Raises:
  • ValueError – If the size to get is negative. If there is not enough space in out. If start is greater or equal to the number of items in the SChunk.

  • RunTimeError – If a problem is detected.

See also

__getitem__()

Examples

>>> import blosc2
>>> import numpy as np
>>> nchunks = 4
>>> chunk_size = 200 * 1000 * 4
>>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(data=data, cparams=cparams)
>>> # Define the slice parameters
>>> start_index = 200 * 1000
>>> stop_index = 2 * 200 * 1000
>>> # Prepare an output buffer
>>> slice_size = stop_index - start_index
>>> out_buffer = bytearray(slice_size * 4)  # Ensure the buffer is large enough
>>> result = schunk.get_slice(start=start_index, stop=stop_index, out=out_buffer)
>>> # Convert bytearray to NumPy array for easier inspection
>>> slice_array = np.frombuffer(out_buffer, dtype=np.int32)
>>> f"Slice data: {slice_array[:10]} ..."  # Print the first 10 elements
Slice data: [200000 200001 200002 200003 200004 200005 200006 200007 200008 200009] ...
insert_chunk(nchunk: int, chunk: bytes) int[source]

Insert an already compressed chunk into the SChunk.

Parameters:
  • nchunk (int) – The index at which the chunk will be inserted.

  • chunk (bytes object) – The compressed chunk.

Returns:

out – The number of chunks in the SChunk.

Return type:

int

Raises:

RunTimeError – If a problem is detected.

Examples

>>> import blosc2
>>> import numpy as np
>>> # Create an SChunk with 2 chunks
>>> data = np.arange(400 * 1000, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(chunksize=200*1000*4, data=data, cparams=cparams)
>>> # Get a compressed chunk from the SChunk
>>> chunk = schunk.get_chunk(0)
>>> # Insert a chunk in the second position (index 1)"
>>> schunk.insert_chunk(1, chunk)
>>> # Verify the total number of chunks after insertion
>>> schunk.nchunks
3
insert_data(nchunk: int, data: object, copy: bool) int[source]

Insert the data in the specified position in the SChunk.

Parameters:
  • nchunk (int) – The index at which the chunk will be inserted.

  • data (bytes object) – The data that will be compressed and inserted as a chunk.

  • copy (bool) – Whether to make an internal copy of the chunk to insert it or not.

Returns:

out – The number of chunks in the SChunk.

Return type:

int

Raises:

RunTimeError – If a problem is detected.

Examples

>>> import blosc2
>>> import numpy as np
>>> # Create an SChunk with 2 chunks
>>> data = np.arange(400 * 1000, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(chunksize=200*1000*4, data=data, cparams=cparams)
>>> # Create a new array to insert into the second chunk of the SChunk
>>> new_data = np.arange(200 * 1000, dtype=np.int32)
>>> # Insert the new data at position 1, compressing it
>>> schunk.insert_data(1, new_data, copy=True)
>>> # Verify the total number of chunks after insertion
>>> schunk.nchunks
3
iterchunks(dtype: dtype) Iterator[ndarray][source]

Iterate over the self chunks of the SChunk.

Parameters:

dtype (np.dtype) – The data type to use for the decompressed chunks.

Yields:

chunk (NumPy ndarray) – The decompressed chunk.

Examples

>>> import blosc2
>>> import numpy as np
>>> # Create sample data and an SChunk
>>> data = np.arange(400 * 1000, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(data=data, cparams=cparams)
>>> # Iterate over chunks using the iterchunks method
>>> for chunk in schunk.iterchunks(dtype=np.int32):
>>>     f"Chunk shape: {chunk.shape} "
>>>     f"First 5 elements of chunk: {chunk[:5]}"
Chunk shape: (400000,)
First 5 elements of chunk: [0 1 2 3 4]
iterchunks_info() Iterator[info][source]

Iterate over the chunks of the SChunk, providing info on index and special values.

Yields:

info (namedtuple) –

A namedtuple with the following fields:

nchunk: int

The index of the chunk.

cratio: float

The compression ratio of the chunk.

special: SpecialValue

The special value enum of the chunk; if 0, the chunk is not special.

repeated_value: bytes or None

The repeated value for the chunk; if not SpecialValue.VALUE, it is None.

lazychunk: bytes

A buffer with the complete lazy chunk.

Examples

>>> import blosc2
>>> import numpy as np
>>> # Create sample data and an SChunk
>>> data = np.arange(400 * 1000, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(data=data, cparams=cparams)
>>> # Iterate over chunks and print detailed information
>>> for chunk_info in schunk.iterchunks_info():
>>>     f"Chunk index: {chunk_info.nchunk}"
>>>     f"Compression ratio: {chunk_info.cratio:.2f}"
>>>     f"Special value: {chunk_info.special.name}"
>>>     f"Repeated value: {chunk_info.repeated_value[:10] if chunk_info.repeated_value else None}"
Chunk index: 0
Compression ratio: 223.56
Special value: NOT_SPECIAL
Repeated value: None
postfilter(input_dtype: dtype, output_dtype: dtype = None) None[source]

Decorator to set a function as a postfilter.

The postfilter function will be executed each time after decompressing blocks of data. It will receive three parameters:

  • the input ndarray to be read from

  • the output ndarray to be filled out

  • the offset inside the SChunk instance where the corresponding block begins (see example below).

Parameters:
  • input_dtype (np.dtype) – Data type of the input that will receive the postfilter function.

  • output_dtype (np.dtype) – Data type of the output that will receive and fill the postfilter function. If None (default) it will be set to input_dtype.

Returns:

out

Return type:

None

Notes

Examples

# Create SChunk
input_dtype = np.dtype(np.int64)
cparams = blosc2.CParams(typesize=input_dtype.itemsize)
dparams = blosc2.DParams(nthreads=1)
schunk = blosc2.SChunk(
    chunksize=20_000 * input_dtype.itemsize, cparams=cparams, dparams=dparams
)

# Create postfilter and associate it to the schunk
@schunk.postfilter(input_dtype)
def postfilter(input, output, offset):
    output[:] = offset + np.arange(input.size)
prefilter(input_dtype: dtype, output_dtype: dtype = None) None[source]

Decorator to set a function as a prefilter.

This function will be executed each time before compressing the data. It will receive three parameters:

  • The actual data as a ndarray from which to read,

  • The ndarray to be filled,

  • The offset inside the SChunk instance where the corresponding block begins (see example below).

Parameters:
  • input_dtype (np.dtype) – Data type of the input that will be processed the prefilter function.

  • output_dtype (np.dtype, optional) – Data type of the output that will be filled by the prefilter function. If None (default), it will be the same as input_dtype.

Returns:

out

Return type:

None

Notes

Examples

# Set the compression and decompression parameters
input_dtype = np.dtype(np.int32)
output_dtype = np.dtype(np.float32)
cparams = blosc2.CParams(typesize=output_dtype.itemsize, nthreads=1)
# Create schunk
schunk = blosc2.SChunk(chunksize=200 * 1000 * input_dtype.itemsize, cparams=cparams)

# Set prefilter with decorator
@schunk.prefilter(input_dtype, output_dtype)
def prefilter(input, output, offset):
    output[:] = input - np.pi
remove_postfilter(func_name: str, _new_ctx: bool = True) None[source]

Remove the postfilter from the SChunk instance.

Parameters:

func_name (str) – The name of the postfilter function to remove.

Returns:

out

Return type:

None

Examples

>>> import blosc2
>>> import numpy as np
>>> dtype = np.dtype(np.int32)
>>> cparams = blosc2.CParams(typesize=dtype.itemsize)
>>> dparams = blosc2.DParams(nthreads=1)
>>> data = np.arange(500, dtype=np.int32)
>>> schunk = blosc2.SChunk(data=data, cparams=cparams, dparams=dparams)
>>> # Define the postfilter function
>>> @schunk.postfilter(dtype)
>>> def postfilter(input, output, offset):
>>>     output[:] = input + offset + np.arange(input.size)
>>> out = np.empty(data.size, dtype=dtype)
>>> schunk.get_slice(out=out)
>>> f"Data slice with postfilter applied (first 8 elements): {out[:8]}"
Data slice with postfilter applied (first 8 elements): [ 0  2  4  6  8 10 12 14]
>>> schunk.remove_postfilter('postfilter')
>>> retrieved_data = np.empty(data.size, dtype=dtype)
>>> schunk.get_slice(out=retrieved_data)
>>> f"Original data (first 8 elements): {data[:8]}"
Original data (first 8 elements): [0 1 2 3 4 5 6 7]
remove_prefilter(func_name: str, _new_ctx: bool = True) None[source]

Remove the prefilter from the SChunk instance.

Parameters:

func_name (str) – Name of the prefilter function.

Returns:

out

Return type:

None

Examples

>>> import blosc2
>>> import numpy as np
>>> dtype = np.dtype(np.int32)
>>> cparams = blosc2.CParams(typesize=dtype.itemsize, nthreads=1)
>>> data = np.arange(1000, dtype=np.int32)
>>> output_dtype = np.float32
>>> schunk = blosc2.SChunk(cparams=cparams)
>>> # Define the prefilter function
>>> @schunk.prefilter(dtype, output_dtype)
>>> def prefilter(input, output, offset):
>>>     output[:] = input - np.pi
>>> schunk[:1000] = data
>>> # Retrieve and convert compressed data with the prefilter to a NumPy array.
>>> compressed_array_with_filter = np.frombuffer(schunk.get_slice(), dtype=output_dtype)
>>> f"Compressed data with prefilter applied (first 8 elements): {compressed_array_with_filter[:8]}"
Compressed data with prefilter applied (first 8 elements): [-3.1415927  -2.1415927  -1.1415926  -0.14159265  0.8584073   1.8584074
 2.8584073   3.8584073 ]
>>> schunk.remove_prefilter('prefilter')
>>> schunk[:1000] = data
>>> compressed_array_without_filter = np.frombuffer(schunk.get_slice(), dtype=dtype)
>>> f"Compressed data without prefilter (first 8 elements): {compressed_array_without_filter[:8]}"
Compressed data without prefilter (first 8 elements): [0. 1. 2. 3. 4. 5. 6. 7.]
to_cframe() bytes[source]

Get a bytes object containing the serialized SChunk instance.

Returns:

out – The buffer containing the serialized SChunk instance.

Return type:

bytes

Examples

>>> import blosc2
>>> import numpy as np
>>> nchunks = 4
>>> chunk_size = 200 * 1000 * 4
>>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(data=data, cparams=cparams)
>>> # Serialize the SChunk instance to a bytes object
>>> serialized_schunk = schunk.to_cframe()
>>> f"Serialized SChunk length: {len(serialized_schunk)} bytes"
Serialized SChunk length: 14129 bytes
>>> # Create a new SChunk from the serialized data
>>> deserialized_schunk = blosc2.schunk_from_cframe(serialized_schunk)
>>> start = 500
>>> stop = 505
>>> sl_bytes = deserialized_schunk[start:stop]
>>> sl = np.frombuffer(sl_bytes, dtype=np.int32)
>>> res = data[start:stop]
>>> f"Original slice: {res}"
Original slice: [500 501 502 503 504]
>>> f"Deserialized slice: {sl}"
Deserialized slice: [500 501 502 503 504]
update_chunk(nchunk: int, chunk: bytes) int[source]

Update an existing chunk in the SChunk.

Parameters:
  • nchunk (int) – The index of the chunk to be updated.

  • chunk (bytes object) – The new compressed chunk that will replace the old chunk’s content.

Returns:

out – The number of chunks in the SChunk.

Return type:

int

Raises:

RunTimeError – If a problem is detected.

Examples

>>> import blosc2
>>> import numpy as np
>>> nchunks = 5
>>> chunk_size = 200 * 1000 * 4
>>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(chunksize=chunk_size, data=data, cparams=cparams)
>>> f"Initial number of chunks: {schunk.nchunks}"
Initial number of chunks: 5
>>> c_index = 1
>>> new_data = np.full(chunk_size // 4, fill_value=c_index, dtype=np.int32).tobytes()
>>> compressed_data = blosc2.compress2(new_data, typesize=4)
>>> # Update the 2nd chunk (index 1) with new data
>>> nchunks = schunk.update_chunk(c_index, compressed_data)
>>> f"Number of chunks after update: {nchunks}"
Number of chunks after update: 5
update_data(nchunk: int, data: object, copy: bool) int[source]

Update the chunk in the specified position with the given data.

Parameters:
  • nchunk (int) – The index of the chunk to be updated.

  • data (bytes object) – The data to be compressed and will replace the old chunk.

  • copy (bool) – Whether to make an internal copy of the chunk before updating it.

Returns:

out – The number of chunks in the SChunk.

Return type:

int

Raises:

RunTimeError – If a problem is detected.

Examples

>>> import blosc2
>>> import numpy as np
>>> nchunks = 4
>>> chunk_size = 200 * 1000 * 4
>>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(chunksize=chunk_size, data=data, cparams=cparams)
>>> f"Initial number of chunks: {schunk.nchunks}"
Initial number of chunks: 4
>>> c_index = 1 # Update the 2nd chunk (index 1)
>>> new_data = np.full(chunk_size // 4, fill_value=c_index, dtype=np.int32).tobytes()
>>> nchunks = schunk.update_data(c_index, new_data, copy=True)
>>> f"Number of chunks after update: {schunk.nchunks}"
Number of chunks after update: 4
property blocksize: int

The block size (in bytes).

property cbytes: int

Amount of compressed data bytes (data size + chunk headers size).

property chunkshape: int

Number of elements per chunk.

property chunksize: int

Number of bytes in each chunk.

property contiguous: bool

Whether the SChunk is stored contiguously or sparsely.

property cparams: CParams

blosc2.CParams instance with the compression parameters.

property cratio: float

Compression ratio.

property dparams: DParams

blosc2.DParams instance with the decompression parameters.

property meta: Meta

Access to the fixed-length metadata of the SChunk.

property nbytes: int

Amount of uncompressed data bytes.

property nchunks: int

The number of chunks.

property typesize: int

Type size of the SChunk.

property urlpath: str

Path where the SChunk is stored.

property vlmeta: vlmeta

Access to the variable-length metadata of the SChunk.

Constructors

blosc2.schunk_from_cframe(cframe: bytes | str, copy: bool = False) SChunk[source]

Create a SChunk instance from a contiguous frame buffer.

Parameters:
  • cframe (bytes or str) – The bytes object containing the in-memory cframe.

  • copy (bool) – Whether to internally make a copy. If False, the user is responsible for keeping a reference to cframe. Default is False.

Returns:

out – A new SChunk containing the data passed.

Return type:

SChunk

See also

to_cframe()

Examples

>>> import numpy as np
>>> import blosc2
>>> nchunks = 4
>>> chunk_size = 200 * 1000 * 4
>>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32)
>>> cparams = blosc2.CParams(typesize=4)
>>> schunk = blosc2.SChunk(data=data, cparams=cparams)
>>> serialized_schunk = schunk.to_cframe()
>>> print(f"Serialized SChunk length: {len(serialized_schunk)} bytes")
Serialized SChunk length: 14129 bytes
>>> deserialized_schunk = blosc2.schunk_from_cframe(serialized_schunk)
>>> start = 1000
>>> stop = 1005
>>> sl_bytes = deserialized_schunk[start:stop]
>>> sl = np.frombuffer(sl_bytes, dtype=np.int32)
>>> print("Slice from deserialized SChunk:", sl)
Slice from deserialized SChunk: [1000 1001 1002 1003 1004]
>>> expected_slice = data[start:stop]
>>> print("Expected slice:", expected_slice)
Expected slice: [1000 1001 1002 1003 1004]