SChunk¶
The basic compressed data container (aka super-chunk). This class consists of a set of useful parameters and methods that allow not only to create compressed data, and decompress it, but also to manage the data in a more sophisticated way. For example, it is possible to append new data, update existing data, delete data, etc.
- class blosc2.SChunk(chunksize: int | None = None, data: object = None, **kwargs: dict | CParams | Storage | DParams)[source]¶
- Attributes:
blocksize
The block size (in bytes).
cbytes
Amount of compressed data bytes (data size + chunk headers size).
chunkshape
Number of elements per chunk.
chunksize
Number of bytes in each chunk.
contiguous
Whether the SChunk is stored contiguously or sparsely.
cparams
blosc2.CParams
instance with the compression parameters.cratio
Compression ratio.
dparams
blosc2.DParams
instance with the decompression parameters.meta
Access to the fixed-length metadata of the SChunk.
nbytes
Amount of uncompressed data bytes.
nchunks
The number of chunks.
typesize
Type size of the SChunk.
urlpath
Path where the SChunk is stored.
vlmeta
Access to the variable-length metadata of the SChunk.
Methods
append_data
(data)Append a data buffer to the SChunk.
decompress_chunk
(nchunk[, dst])Decompress the chunk given by its index
nchunk
.delete_chunk
(nchunk)Delete the specified chunk from the SChunk.
fill_special
(nitems, special_value[, value])Fill the SChunk with a special value.
filler
(inputs_tuple, schunk_dtype[, nelem])Decorator to set a filler function.
get_chunk
(nchunk)Return the compressed chunk that is in the SChunk.
get_slice
([start, stop, out])Get a slice from
start
tostop
.insert_chunk
(nchunk, chunk)Insert an already compressed chunk into the SChunk.
insert_data
(nchunk, data, copy)Insert the data in the specified position in the SChunk.
iterchunks
(dtype)Iterate over the
self
chunks of the SChunk.Iterate over the chunks of the SChunk, providing info on index and special values.
postfilter
(input_dtype[, output_dtype])Decorator to set a function as a postfilter.
prefilter
(input_dtype[, output_dtype])Decorator to set a function as a prefilter.
remove_postfilter
(func_name[, _new_ctx])Remove the postfilter from the SChunk instance.
remove_prefilter
(func_name[, _new_ctx])Remove the prefilter from the SChunk instance.
Get a bytes object containing the serialized SChunk instance.
update_chunk
(nchunk, chunk)Update an existing chunk in the SChunk.
update_data
(nchunk, data, copy)Update the chunk in the specified position with the given data.
- Special Methods:
__init__
([chunksize, data])Create a new super-chunk, or open an existing one.
__len__
()Return the number of items in the SChunk.
__getitem__
(item)Get a slice from the SChunk.
__setitem__
(key, value)Set slice to
value
.Constructor¶
- __init__(chunksize: int | None = None, data: object = None, **kwargs: dict | CParams | Storage | DParams) None [source]¶
Create a new super-chunk, or open an existing one.
- Parameters:
chunksize¶ (int, optional) – The size, in bytes, of the chunks in the super-chunk. If not provided, it is set automatically to a reasonable value.
data¶ (bytes-like object, optional) – The data to be split into different chunks of size
chunksize
. If None, the Schunk instance will be empty initially.kwargs¶ (dict, optional) –
Storage parameters. The default values are in
blosc2.Storage
. Supported keyword arguments:- storage:
blosc2.Storage
or dict All the storage parameters that you want to use as a
blosc2.Storage
or dict instance.- cparams:
blosc2.CParams
or dict All the compression parameters that you want to use as a
blosc2.CParams
or dict instance.- dparams:
blosc2.DParams
or dict All the decompression parameters that you want to use as a
blosc2.DParams
or dict instance.- others: Any
If storage is not passed, all the parameters of a
blosc2.Storage
can be passed as keyword arguments.
- storage:
Examples
>>> import blosc2 >>> import numpy as np >>> import os.path >>> import shutil >>> import tempfile >>> cparams = blosc2.CParams() >>> dparams = blosc2.DParams() >>> storage = blosc2.Storage(contiguous=True) >>> schunk = blosc2.SChunk(cparams=cparams, dparams=dparams, storage=storage)
In the following, we will write and read a super-chunk to and from disk via memory-mapped files.
>>> a = np.arange(3, dtype=np.int64) >>> chunksize = a.size * a.itemsize >>> n_chunks = 2 >>> tmpdirname = tempfile.mkdtemp() >>> urlpath = os.path.join(tmpdirname, 'schunk.b2frame')
Optional: we intend to write 2 chunks of 24 bytes each, and we expect the compressed size to be smaller than the original size. Therefore, we generously set the initial size of the mapping to 48 bytes effectively avoiding remappings.
>>> initial_mapping_size = chunksize * n_chunks >>> schunk_mmap = blosc2.SChunk( ... chunksize=chunksize, ... mmap_mode="w+", ... initial_mapping_size=initial_mapping_size, ... urlpath=urlpath, ... ) >>> schunk_mmap.append_data(a) 1 >>> schunk_mmap.append_data(a * 2) 2
Optional: explicitly close the file and free the mapping.
>>> del schunk_mmap
Reading the data back again via memory-mapped files:
>>> schunk_mmap = blosc2.open(urlpath, mmap_mode="r") >>> np.frombuffer(schunk_mmap.decompress_chunk(0), dtype=np.int64).tolist() [0, 1, 2] >>> np.frombuffer(schunk_mmap.decompress_chunk(1), dtype=np.int64).tolist() [0, 2, 4] >>> shutil.rmtree(tmpdirname)
Utility Methods¶
- __getitem__(item: int | slice) str | bytes [source]¶
Get a slice from the SChunk.
- Parameters:
item¶ (int or slice) – The index or slice for the data. Note that the step parameter is not honored.
- Returns:
out – The decompressed slice as a Python str or bytes object.
- Return type:
str or bytes
- Raises:
ValueError – If the size to get is negative. If
item
.start is greater than or equal to the number of items in the SChunk.RunTimeError – If a problem is detected.
IndexError – If step is not 1.
See also
Examples
>>> import blosc2 >>> import numpy as np >>> nchunks = 4 >>> chunk_size = 200 * 1000 * 4 >>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(chunksize=chunk_size, data=data, cparams=cparams) >>> # Use __getitem__ to retrieve the same slice of data from the SChunk >>> res = schunk[150:155] >>> f"Slice data: {np.frombuffer(res, dtype=np.int32)}" Slice data: [150 151 152 153 154]
- __setitem__(key: int | slice, value: object) None [source]¶
Set slice to
value
.- Parameters:
key¶ (int or slice) – The index of the slice to update. Note that step parameter is not honored.
value¶ (bytes-like object) – An object supporting the Buffer Protocol used to fill the slice.
- Returns:
out
- Return type:
None
- Raises:
Notes
This method can also be used to append new data if
key
.stop is greater than the number of items in the SChunk.Examples
>>> import blosc2 >>> import numpy as np >>> nchunks = 4 >>> chunk_size = 200 * 1000 * 4 >>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(data=data, cparams=cparams) >>> # Create a new array of values to update the slice (values from 1000 to 1999 multiplied by 2) >>> start_ = 1000 >>> stop = 2000 >>> new_values = np.arange(start_, stop, dtype=np.int32) * 2 >>> schunk[start_:stop] = new_values >>> # Retrieve the updated slice using the slicing syntax >>> retrieved_slice = np.frombuffer(schunk[start_:stop], dtype=np.int32) >>> f"First 10 values of the updated slice: {retrieved_slice[:10]}" >>> f"Last 10 values of the updated slice: {retrieved_slice[-10:]}" First 10 values of the updated slice: [2000 2002 2004 2006 2008 2010 2012 2014 2016 2018] Last 10 values of the updated slice: [3980 3982 3984 3986 3988 3990 3992 3994 3996 3998]
- append_data(data: object) int [source]¶
Append a data buffer to the SChunk.
The data buffer must be of size chunksize specified in
SChunk.__init__
.- Parameters:
data¶ (bytes-like object) – The data to be compressed and added as a chunk.
- Returns:
out – The number of chunks in the SChunk.
- Return type:
int
- Raises:
RunTimeError – If the
data
could not be appended.
Examples
>>> import blosc2 >>> import numpy as np >>> schunk = blosc2.SChunk(chunksize=200*1000*4) >>> data = np.arange(200 * 1000, dtype='int32') >>> schunk.append_data(data) 1
- decompress_chunk(nchunk: int, dst: object = None) str | bytes [source]¶
Decompress the chunk given by its index
nchunk
.- Parameters:
nchunk¶ (int) – The index of the chunk that will be decompressed.
dst¶ (NumPy object or bytearray) – The destination NumPy object or bytearray to fill, the length of which must be greater than 0. The user must ensure that it has enough capacity to host the decompressed chunk. Default is None, meaning that a new bytes object is created, filled and returned.
- Returns:
out – The decompressed chunk as a Python str or bytes object if
dst
is None. Otherwise, it returns None because the result will already be indst
.- Return type:
str or bytes
- Raises:
RunTimeError – If a problem is detected.
Examples
>>> import blosc2 >>> cparams = blosc2.CParams(typesize=1) >>> schunk = blosc2.SChunk(cparams=cparams) >>> buffer = b"wermqeoir23" >>> schunk.append_data(buffer) 1 >>> schunk.decompress_chunk(0) b'wermqeoir23' >>> # Construct a mutable bytearray object >>> bytes_obj = bytearray(len(buffer)) >>> schunk.decompress_chunk(0, dst=bytes_obj) >>> bytes_obj == buffer True
- delete_chunk(nchunk: int) int [source]¶
Delete the specified chunk from the SChunk.
- Parameters:
nchunk¶ (int) – The index of the chunk that will be removed.
- Returns:
out – The number of chunks in the SChunk.
- Return type:
int
- Raises:
RunTimeError – If a problem is detected.
Examples
>>> import blosc2 >>> import numpy as np >>> # Create an SChunk with 3 chunks >>> nchunks = 3 >>> data = np.arange(200 * 1000 * nchunks, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(chunksize=200 * 1000 * 4, data=data, cparams=cparams) >>> # Check the number of chunks before deletion >>> schunk.nchunks 3 >>> # Delete the second chunk (index 1) >>> schunk.delete_chunk(1) >>> # Check the number of chunks after deletion >>> schunk.nchunks 2
- fill_special(nitems: int, special_value: SpecialValue, value: bytes | int | float | bool | None = None) int [source]¶
Fill the SChunk with a special value. The SChunk must be empty.
- Parameters:
nitems¶ (int) – The number of items to fill with the special value.
special_value¶ (SpecialValue) – The special value to be used for filling the SChunk.
value¶ (bytes, int, float, bool (optional)) – The value to fill the SChunk. This parameter is only supported if
special_value
isblosc2.SpecialValue.VALUE
.
- Returns:
out – The number of chunks in the SChunk.
- Return type:
int
- Raises:
RunTimeError – If the SChunk could not be filled with the special value.
Examples
>>> import blosc2 >>> import numpy as np >>> import time >>> nitems = 100_000_000 >>> dtype = np.dtype(np.float64) >>> # Measure the time to create SChunk from a NumPy array >>> t0 = time.time() >>> data = np.full(nitems, np.pi, dtype) >>> cparams = blosc2.CParams(typesize=dtype.itemsize) >>> schunk = blosc2.SChunk(data=data, cparams=cparams) >>> t = (time.time() - t0) * 1000. >>> f"Time creating a schunk with a numpy array: {t:10.3f} ms" Time creating a schunk with a numpy array: 710.273 ms >>> # Measure the time to create SChunk using fill_special >>> t0 = time.time() >>> cparams = blosc2.CParams(typesize=dtype.itemsize) >>> schunk = blosc2.SChunk(cparams=cparams) >>> schunk.fill_special(nitems, blosc2.SpecialValue.VALUE, np.pi) >>> t = (time.time() - t0) * 1000. >>> f"Time passing directly the value to `fill_special`: {t:10.3f} ms" Time passing directly the value to `fill_special`: 2.109 ms
- filler(inputs_tuple: tuple[tuple], schunk_dtype: dtype, nelem: int | None = None) None [source]¶
Decorator to set a filler function.
This function will fill
self
according tonelem
. It will receive three parameters: a tuple with the inputs as ndarrays from which to read, the ndarray to fillself
and the offset inside the SChunk instance where the corresponding block begins (see example below).- Parameters:
inputs_tuple¶ (tuple of tuples) – Tuple containing a tuple for each argument that the function will receive, along with their corresponding np.dtype. Supported operand types are SChunk, ndarray and Python scalars.
schunk_dtype¶ (np.dtype) – The data type to use to fill
self
.nelem¶ (int) – Number of elements to append to
self
. If None (default) it will be the number of elements from the operands.
- Returns:
out
- Return type:
None
Notes
Compression nthreads must be 1 when using this.
This does not need to be removed from the created SChunk instance.
See also
Examples
# Set the compression and decompression parameters schunk_dtype = np.dtype(np.float64) cparams = blosc2.CParams(typesize=schunk_dtype.itemsize, nthreads=1) # Create empty SChunk schunk = blosc2.SChunk(chunksize=20_000 * schunk_dtype.itemsize, cparams=cparams) # Create operands op_dtype = np.dtype(np.int32) data = np.full(20_000 * 3, 12, dtype=op_dtype) schunk_op = blosc2.SChunk(chunksize=20_000 * op_dtype.itemsize, data=data) # Create filler @schunk.filler(((schunk_op, op_dtype), (np.e, np.float32)), schunk_dtype) def filler(inputs_tuple, output, offset): output[:] = inputs_tuple[0] - inputs_tuple[1]
- get_chunk(nchunk: int) bytes [source]¶
Return the compressed chunk that is in the SChunk.
- Parameters:
nchunk¶ (int) – The index of the chunk that will be returned.
- Returns:
out – The compressed chunk.
- Return type:
bytes object
- Raises:
RunTimeError – If a problem is detected.
Examples
>>> import blosc2 >>> import numpy as np >>> # Create an SChunk with 3 chunks >>> nchunks = 3 >>> data = np.arange(200 * 1000 * nchunks, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(data=data, cparams=cparams) >>> # Retrieve the first chunk (index 0) >>> chunk = schunk.get_chunk(0) >>> # Check the type and length of the compressed chunk >>> type(chunk) <class 'bytes'> >>> len(chunk) 10552
- get_slice(start: int = 0, stop: int | None = None, out: object = None) str | bytes | None [source]¶
Get a slice from
start
tostop
.- Parameters:
start¶ (int) – The starting index of the slice. Default is 0.
stop¶ (int) – The ending index of the slice (exclusive). Default is until the SChunk ends.
out¶ (bytes-like object or bytearray) –
The target object (supporting the Buffer Protocol) to fill. Verify that the buffer has enough space for the decompressed data. If None is provided, a new bytes object will be created, filled, and returned.
- Returns:
out – The decompressed slice a Python str or bytes object if
out
is None. Otherwise, it returns None since the result will already be inout
.- Return type:
str or bytes or None
- Raises:
See also
Examples
>>> import blosc2 >>> import numpy as np >>> nchunks = 4 >>> chunk_size = 200 * 1000 * 4 >>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(data=data, cparams=cparams) >>> # Define the slice parameters >>> start_index = 200 * 1000 >>> stop_index = 2 * 200 * 1000 >>> # Prepare an output buffer >>> slice_size = stop_index - start_index >>> out_buffer = bytearray(slice_size * 4) # Ensure the buffer is large enough >>> result = schunk.get_slice(start=start_index, stop=stop_index, out=out_buffer) >>> # Convert bytearray to NumPy array for easier inspection >>> slice_array = np.frombuffer(out_buffer, dtype=np.int32) >>> f"Slice data: {slice_array[:10]} ..." # Print the first 10 elements Slice data: [200000 200001 200002 200003 200004 200005 200006 200007 200008 200009] ...
- insert_chunk(nchunk: int, chunk: bytes) int [source]¶
Insert an already compressed chunk into the SChunk.
- Parameters:
- Returns:
out – The number of chunks in the SChunk.
- Return type:
int
- Raises:
RunTimeError – If a problem is detected.
Examples
>>> import blosc2 >>> import numpy as np >>> # Create an SChunk with 2 chunks >>> data = np.arange(400 * 1000, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(chunksize=200*1000*4, data=data, cparams=cparams) >>> # Get a compressed chunk from the SChunk >>> chunk = schunk.get_chunk(0) >>> # Insert a chunk in the second position (index 1)" >>> schunk.insert_chunk(1, chunk) >>> # Verify the total number of chunks after insertion >>> schunk.nchunks 3
- insert_data(nchunk: int, data: object, copy: bool) int [source]¶
Insert the data in the specified position in the SChunk.
- Parameters:
- Returns:
out – The number of chunks in the SChunk.
- Return type:
int
- Raises:
RunTimeError – If a problem is detected.
Examples
>>> import blosc2 >>> import numpy as np >>> # Create an SChunk with 2 chunks >>> data = np.arange(400 * 1000, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(chunksize=200*1000*4, data=data, cparams=cparams) >>> # Create a new array to insert into the second chunk of the SChunk >>> new_data = np.arange(200 * 1000, dtype=np.int32) >>> # Insert the new data at position 1, compressing it >>> schunk.insert_data(1, new_data, copy=True) >>> # Verify the total number of chunks after insertion >>> schunk.nchunks 3
- iterchunks(dtype: dtype) Iterator[ndarray] [source]¶
Iterate over the
self
chunks of the SChunk.- Parameters:
dtype¶ (np.dtype) – The data type to use for the decompressed chunks.
- Yields:
chunk (NumPy ndarray) – The decompressed chunk.
Examples
>>> import blosc2 >>> import numpy as np >>> # Create sample data and an SChunk >>> data = np.arange(400 * 1000, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(data=data, cparams=cparams) >>> # Iterate over chunks using the iterchunks method >>> for chunk in schunk.iterchunks(dtype=np.int32): >>> f"Chunk shape: {chunk.shape} " >>> f"First 5 elements of chunk: {chunk[:5]}" Chunk shape: (400000,) First 5 elements of chunk: [0 1 2 3 4]
- iterchunks_info() Iterator[info] [source]¶
Iterate over the chunks of the SChunk, providing info on index and special values.
- Yields:
info (namedtuple) –
A namedtuple with the following fields:
- nchunk: int
The index of the chunk.
- cratio: float
The compression ratio of the chunk.
- special:
SpecialValue
The special value enum of the chunk; if 0, the chunk is not special.
- repeated_value: bytes or None
The repeated value for the chunk; if not SpecialValue.VALUE, it is None.
- lazychunk: bytes
A buffer with the complete lazy chunk.
Examples
>>> import blosc2 >>> import numpy as np >>> # Create sample data and an SChunk >>> data = np.arange(400 * 1000, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(data=data, cparams=cparams) >>> # Iterate over chunks and print detailed information >>> for chunk_info in schunk.iterchunks_info(): >>> f"Chunk index: {chunk_info.nchunk}" >>> f"Compression ratio: {chunk_info.cratio:.2f}" >>> f"Special value: {chunk_info.special.name}" >>> f"Repeated value: {chunk_info.repeated_value[:10] if chunk_info.repeated_value else None}" Chunk index: 0 Compression ratio: 223.56 Special value: NOT_SPECIAL Repeated value: None
- postfilter(input_dtype: dtype, output_dtype: dtype = None) None [source]¶
Decorator to set a function as a postfilter.
The postfilter function will be executed each time after decompressing blocks of data. It will receive three parameters:
the input ndarray to be read from
the output ndarray to be filled out
the offset inside the SChunk instance where the corresponding block begins (see example below).
- Parameters:
input_dtype¶ (np.dtype) – Data type of the input that will receive the postfilter function.
output_dtype¶ (np.dtype) – Data type of the output that will receive and fill the postfilter function. If None (default) it will be set to
input_dtype
.
- Returns:
out
- Return type:
None
Notes
nthreads must be 1 when decompressing.
The
input_dtype
itemsize must be the same as theoutput_dtype
itemsize.
See also
Examples
# Create SChunk input_dtype = np.dtype(np.int64) cparams = blosc2.CParams(typesize=input_dtype.itemsize) dparams = blosc2.DParams(nthreads=1) schunk = blosc2.SChunk( chunksize=20_000 * input_dtype.itemsize, cparams=cparams, dparams=dparams ) # Create postfilter and associate it to the schunk @schunk.postfilter(input_dtype) def postfilter(input, output, offset): output[:] = offset + np.arange(input.size)
- prefilter(input_dtype: dtype, output_dtype: dtype = None) None [source]¶
Decorator to set a function as a prefilter.
This function will be executed each time before compressing the data. It will receive three parameters:
The actual data as a ndarray from which to read,
The ndarray to be filled,
The offset inside the SChunk instance where the corresponding block begins (see example below).
- Parameters:
input_dtype¶ (np.dtype) – Data type of the input that will be processed the prefilter function.
output_dtype¶ (np.dtype, optional) – Data type of the output that will be filled by the prefilter function. If None (default), it will be the same as
input_dtype
.
- Returns:
out
- Return type:
None
Notes
nthreads must be 1 when compressing.
The
input_dtype
itemsize must be the same as theoutput_dtype
itemsize.
See also
Examples
# Set the compression and decompression parameters input_dtype = np.dtype(np.int32) output_dtype = np.dtype(np.float32) cparams = blosc2.CParams(typesize=output_dtype.itemsize, nthreads=1) # Create schunk schunk = blosc2.SChunk(chunksize=200 * 1000 * input_dtype.itemsize, cparams=cparams) # Set prefilter with decorator @schunk.prefilter(input_dtype, output_dtype) def prefilter(input, output, offset): output[:] = input - np.pi
- remove_postfilter(func_name: str, _new_ctx: bool = True) None [source]¶
Remove the postfilter from the SChunk instance.
- Parameters:
func_name¶ (str) – The name of the postfilter function to remove.
- Returns:
out
- Return type:
None
Examples
>>> import blosc2 >>> import numpy as np >>> dtype = np.dtype(np.int32) >>> cparams = blosc2.CParams(typesize=dtype.itemsize) >>> dparams = blosc2.DParams(nthreads=1) >>> data = np.arange(500, dtype=np.int32) >>> schunk = blosc2.SChunk(data=data, cparams=cparams, dparams=dparams) >>> # Define the postfilter function >>> @schunk.postfilter(dtype) >>> def postfilter(input, output, offset): >>> output[:] = input + offset + np.arange(input.size) >>> out = np.empty(data.size, dtype=dtype) >>> schunk.get_slice(out=out) >>> f"Data slice with postfilter applied (first 8 elements): {out[:8]}" Data slice with postfilter applied (first 8 elements): [ 0 2 4 6 8 10 12 14] >>> schunk.remove_postfilter('postfilter') >>> retrieved_data = np.empty(data.size, dtype=dtype) >>> schunk.get_slice(out=retrieved_data) >>> f"Original data (first 8 elements): {data[:8]}" Original data (first 8 elements): [0 1 2 3 4 5 6 7]
- remove_prefilter(func_name: str, _new_ctx: bool = True) None [source]¶
Remove the prefilter from the SChunk instance.
- Parameters:
func_name¶ (str) – Name of the prefilter function.
- Returns:
out
- Return type:
None
Examples
>>> import blosc2 >>> import numpy as np >>> dtype = np.dtype(np.int32) >>> cparams = blosc2.CParams(typesize=dtype.itemsize, nthreads=1) >>> data = np.arange(1000, dtype=np.int32) >>> output_dtype = np.float32 >>> schunk = blosc2.SChunk(cparams=cparams) >>> # Define the prefilter function >>> @schunk.prefilter(dtype, output_dtype) >>> def prefilter(input, output, offset): >>> output[:] = input - np.pi >>> schunk[:1000] = data >>> # Retrieve and convert compressed data with the prefilter to a NumPy array. >>> compressed_array_with_filter = np.frombuffer(schunk.get_slice(), dtype=output_dtype) >>> f"Compressed data with prefilter applied (first 8 elements): {compressed_array_with_filter[:8]}" Compressed data with prefilter applied (first 8 elements): [-3.1415927 -2.1415927 -1.1415926 -0.14159265 0.8584073 1.8584074 2.8584073 3.8584073 ] >>> schunk.remove_prefilter('prefilter') >>> schunk[:1000] = data >>> compressed_array_without_filter = np.frombuffer(schunk.get_slice(), dtype=dtype) >>> f"Compressed data without prefilter (first 8 elements): {compressed_array_without_filter[:8]}" Compressed data without prefilter (first 8 elements): [0. 1. 2. 3. 4. 5. 6. 7.]
- to_cframe() bytes [source]¶
Get a bytes object containing the serialized SChunk instance.
- Returns:
out – The buffer containing the serialized SChunk instance.
- Return type:
bytes
See also
Examples
>>> import blosc2 >>> import numpy as np >>> nchunks = 4 >>> chunk_size = 200 * 1000 * 4 >>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(data=data, cparams=cparams) >>> # Serialize the SChunk instance to a bytes object >>> serialized_schunk = schunk.to_cframe() >>> f"Serialized SChunk length: {len(serialized_schunk)} bytes" Serialized SChunk length: 14129 bytes >>> # Create a new SChunk from the serialized data >>> deserialized_schunk = blosc2.schunk_from_cframe(serialized_schunk) >>> start = 500 >>> stop = 505 >>> sl_bytes = deserialized_schunk[start:stop] >>> sl = np.frombuffer(sl_bytes, dtype=np.int32) >>> res = data[start:stop] >>> f"Original slice: {res}" Original slice: [500 501 502 503 504] >>> f"Deserialized slice: {sl}" Deserialized slice: [500 501 502 503 504]
- update_chunk(nchunk: int, chunk: bytes) int [source]¶
Update an existing chunk in the SChunk.
- Parameters:
- Returns:
out – The number of chunks in the SChunk.
- Return type:
int
- Raises:
RunTimeError – If a problem is detected.
Examples
>>> import blosc2 >>> import numpy as np >>> nchunks = 5 >>> chunk_size = 200 * 1000 * 4 >>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(chunksize=chunk_size, data=data, cparams=cparams) >>> f"Initial number of chunks: {schunk.nchunks}" Initial number of chunks: 5 >>> c_index = 1 >>> new_data = np.full(chunk_size // 4, fill_value=c_index, dtype=np.int32).tobytes() >>> compressed_data = blosc2.compress2(new_data, typesize=4) >>> # Update the 2nd chunk (index 1) with new data >>> nchunks = schunk.update_chunk(c_index, compressed_data) >>> f"Number of chunks after update: {nchunks}" Number of chunks after update: 5
- update_data(nchunk: int, data: object, copy: bool) int [source]¶
Update the chunk in the specified position with the given data.
- Parameters:
- Returns:
out – The number of chunks in the SChunk.
- Return type:
int
- Raises:
RunTimeError – If a problem is detected.
Examples
>>> import blosc2 >>> import numpy as np >>> nchunks = 4 >>> chunk_size = 200 * 1000 * 4 >>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(chunksize=chunk_size, data=data, cparams=cparams) >>> f"Initial number of chunks: {schunk.nchunks}" Initial number of chunks: 4 >>> c_index = 1 # Update the 2nd chunk (index 1) >>> new_data = np.full(chunk_size // 4, fill_value=c_index, dtype=np.int32).tobytes() >>> nchunks = schunk.update_data(c_index, new_data, copy=True) >>> f"Number of chunks after update: {schunk.nchunks}" Number of chunks after update: 4
- property blocksize: int¶
The block size (in bytes).
- property cbytes: int¶
Amount of compressed data bytes (data size + chunk headers size).
- property chunkshape: int¶
Number of elements per chunk.
- property chunksize: int¶
Number of bytes in each chunk.
- property contiguous: bool¶
Whether the SChunk is stored contiguously or sparsely.
- property cparams: CParams¶
blosc2.CParams
instance with the compression parameters.
- property cratio: float¶
Compression ratio.
- property dparams: DParams¶
blosc2.DParams
instance with the decompression parameters.
- property meta: Meta¶
Access to the fixed-length metadata of the SChunk.
- property nbytes: int¶
Amount of uncompressed data bytes.
- property nchunks: int¶
The number of chunks.
- property typesize: int¶
Type size of the SChunk.
- property urlpath: str¶
Path where the SChunk is stored.
- property vlmeta: vlmeta¶
Access to the variable-length metadata of the SChunk.
Constructors¶
- blosc2.schunk_from_cframe(cframe: bytes | str, copy: bool = False) SChunk [source]¶
Create a SChunk instance from a contiguous frame buffer.
- Parameters:
- Returns:
out – A new SChunk containing the data passed.
- Return type:
See also
to_cframe()
Examples
>>> import numpy as np >>> import blosc2 >>> nchunks = 4 >>> chunk_size = 200 * 1000 * 4 >>> data = np.arange(nchunks * chunk_size // 4, dtype=np.int32) >>> cparams = blosc2.CParams(typesize=4) >>> schunk = blosc2.SChunk(data=data, cparams=cparams) >>> serialized_schunk = schunk.to_cframe() >>> print(f"Serialized SChunk length: {len(serialized_schunk)} bytes") Serialized SChunk length: 14129 bytes >>> deserialized_schunk = blosc2.schunk_from_cframe(serialized_schunk) >>> start = 1000 >>> stop = 1005 >>> sl_bytes = deserialized_schunk[start:stop] >>> sl = np.frombuffer(sl_bytes, dtype=np.int32) >>> print("Slice from deserialized SChunk:", sl) Slice from deserialized SChunk: [1000 1001 1002 1003 1004] >>> expected_slice = data[start:stop] >>> print("Expected slice:", expected_slice) Expected slice: [1000 1001 1002 1003 1004]