Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 91 additions & 3 deletions ciftools/binary/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ class StringArray(BinaryCIFEncoder):
def encode(self, data: Union[np.ndarray, List[str]]) -> EncodedCIFData:
strings: List[str] = []
offsets = [0]
indices = np.empty(len(data), dtype="<i4")
# indices = np.empty(len(data), dtype="<i4")
indices = [None] * len(data)

_pack_strings(
data,
Expand All @@ -315,6 +316,32 @@ def encode(self, data: Union[np.ndarray, List[str]]) -> EncodedCIFData:
offsets,
)

encoded_offsets = _OFFSET_ENCODER.encode(np.array(offsets, dtype="<i4"))
encoded_data = _DATA_ENCODER.encode(np.array(indices, dtype='<i4'))

encoding: StringArrayEncoding = {
"dataEncoding": encoded_data["encoding"],
"kind": EncodingEnun.StringArray,
"stringData": "".join(strings),
"offsetEncoding": encoded_offsets["encoding"],
"offsets": encoded_offsets["data"], # type: ignore
}

return EncodedCIFData(data=encoded_data["data"], encoding=[encoding])

def encode_not_optimized(self, data: Union[np.ndarray, List[str]]) -> EncodedCIFData:
# list of strings
strings: List[str] = []
offsets = [0]
indices = np.empty(len(data), dtype="<i4")

_pack_strings_not_optimized(
data,
indices,
strings,
offsets,
)

encoded_offsets = _OFFSET_ENCODER.encode(np.array(offsets, dtype="<i4"))
encoded_data = _DATA_ENCODER.encode(indices)

Expand All @@ -330,8 +357,69 @@ def encode(self, data: Union[np.ndarray, List[str]]) -> EncodedCIFData:


# TODO: benchmark if JIT helps here
@jit(nopython=False, forceobj=True)
def _pack_strings(data: List[str], indices: np.ndarray, strings: List[str], offsets: List[int]) -> None:
# @jit(nopython=False, forceobj=True)
def _pack_strings(data: List[str], indices: list, strings: List[str], offsets: List[int]) -> None:
acc_len = 0
str_map: Dict[str, int] = dict()

_packing_loop(data, indices, strings, offsets, str_map, acc_len)



# @jit(nopython=False, forceobj=True)
# @jit(nopython=True)
def _packing_loop(data: List[str], indices: list, strings: List[str], offsets: List[int], str_map: Dict[str, int], acc_len):
# Slower
# data = np.array(data)
# for i, s in np.ndenumerate(data):
# # handle null strings.
# if not s:
# indices[i] = -1
# continue

# index = str_map.get(s)
# if index is None:
# # increment the length
# acc_len += len(s)

# # store the string and index
# index = len(strings)
# strings.append(s)
# str_map[s] = index

# # write the offset
# offsets.append(acc_len)

# indices[i] = index

# Original loop
# https://medium.com/@giorgiosgl1997/python-fastest-way-to-iterate-a-list-6ea5348243bb
# For loop + enumerate seems fastest
for i, s in enumerate(data):
# handle null strings.
if not s:
indices[i] = -1
continue

index = str_map.get(s)
if index is None:
# increment the length
acc_len += len(s)

# store the string and index
index = len(strings)
strings.append(s)
# strings += s,
str_map[s] = index

# write the offset
offsets.append(acc_len)
# offsets += acc_len,

indices[i] = index


def _pack_strings_not_optimized(data: List[str], indices: np.ndarray, strings: List[str], offsets: List[int]) -> None:
acc_len = 0
str_map: Dict[str, int] = dict()

Expand Down
108 changes: 106 additions & 2 deletions ciftools/binary/writer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Any, List, Optional

from numba import jit
import msgpack
import numpy as np
from ciftools.binary.encoded_data import (
Expand Down Expand Up @@ -65,12 +65,57 @@ def write_category(self, category: CIFCategoryDesc, data: List[Any]) -> None:

self._data_blocks[-1]["categories"].append(encoded)

def write_category_not_optimized(self, category: CIFCategoryDesc, data: List[Any]) -> None:
if not self._data:
raise Exception("The writer contents have already been encoded, no more writing.")

if not self._data_blocks:
raise Exception("No data block created.")

instances = [_DataWrapper(data=d, count=category.get_row_count(d)) for d in data]
instances = list(filter(lambda i: i.count > 0, instances))
if not instances:
return

total_count = 0
for cat in instances:
total_count += cat.count
if not total_count:
return

fields = category.get_field_descriptors(instances[0].data)
encoded: EncodedCIFCategory = {"name": f"_{category.name}", "rowCount": total_count, "columns": []}
for f in fields:
encoded["columns"].append(_encode_field_not_optimized(f, instances, total_count))

self._data_blocks[-1]["categories"].append(encoded)

def encode(self) -> bytes:
encoded_data = msgpack.dumps(self._data)
self._data = None
self._data_blocks = []
return encoded_data

def _first_loop(presence, mask, category, array, field, d, offset):
for i in range(category.count):
p = presence(d, i)
if p:
mask[offset] = p
all_present = False
else:
array[offset] = field.value(d, i) # type: ignore
offset += 1


return array, all_present, mask, offset

@jit(forceobj=True)
def _second_loop(category, array, offset, field, d):
for i in range(category.count):
array[offset] = field.value(d, i) # type: ignore
offset += 1

return array, offset

def _encode_field(field: CIFFieldDesc, data: List[_DataWrapper], total_count: int) -> EncodedCIFColumn:
array = field.create_array(total_count)
Expand All @@ -79,6 +124,65 @@ def _encode_field(field: CIFFieldDesc, data: List[_DataWrapper], total_count: in
presence = field.presence
all_present = True

offset = 0
for category in data:
d = category.data

category_array = field.value_array and field.value_array(d)
if category_array is not None:
if len(category_array) != category.count:
raise ValueError(f"provided values array must have the same length as the category count field")

array[offset : offset + category.count] = category_array # type: ignore

category_mask = field.presence_array and field.presence_array(d)
if category_mask is not None:
if len(category_mask) != category.count:
raise ValueError(f"provided mask array must have the same length as the category count field")
mask[offset : offset + category.count] = category_mask

offset += category.count

elif presence is not None:
# TODO: check if JIT will help
for i in range(category.count):
p = presence(d, i)
if p:
mask[offset] = p
all_present = False
else:
array[offset] = field.value(d, i) # type: ignore
offset += 1
else:
# TODO: check if JIT will help
array, offset = _second_loop(category, array, offset, field, d)

encoder = field.encoder(data[0].data) if len(data) > 0 else BYTE_ARRAY
encoded = encoder.encode(array)

if not isinstance(encoded["data"], bytes):
raise ValueError(
f"The encoding must result in bytes but it was {str(type(encoded['data']))}. Fix the encoding chain."
)

mask_data: Optional[EncodedCIFData] = None

if not all_present:
mask_rle = RUN_LENGTH.encode(mask)
if len(mask_rle["data"]) < len(mask):
mask_data = mask_rle
else:
mask_data = BYTE_ARRAY.encode(mask)

return {"name": field.name, "data": encoded, "mask": mask_data}

def _encode_field_not_optimized(field: CIFFieldDesc, data: List[_DataWrapper], total_count: int) -> EncodedCIFColumn:
array = field.create_array(total_count)

mask = np.zeros(total_count, dtype=np.dtype(np.uint8))
presence = field.presence
all_present = True

offset = 0
for category in data:
d = category.data
Expand Down Expand Up @@ -131,4 +235,4 @@ def _encode_field(field: CIFFieldDesc, data: List[_DataWrapper], total_count: in
else:
mask_data = BYTE_ARRAY.encode(mask)

return {"name": field.name, "data": encoded, "mask": mask_data}
return {"name": field.name, "data": encoded, "mask": mask_data}
62 changes: 62 additions & 0 deletions temp/benchmark_binary_CIF_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@

import numpy as np
from timeit import default_timer as timer
from ciftools.binary.writer import BinaryCIFWriter
from ciftools.serialization import create_binary_writer
from tests._encoding import prepare_test_data, LatticeIdsCategory

INPUTS_FOR_TESTING = [
prepare_test_data(500000, 40)
]

print(f'Test data')
print(INPUTS_FOR_TESTING[0])

WRITER = create_binary_writer(encoder="ciftools-test")

# write lattice ids
WRITER.start_data_block("lattice_ids")

test_input = INPUTS_FOR_TESTING[0]

def benchmark_writer(test_input, optimization, writer: BinaryCIFWriter):
if not optimization:
writer.write_category_not_optimized(LatticeIdsCategory, [test_input])
else:
writer.write_category(LatticeIdsCategory, [test_input])



start_not_optimized = timer()
benchmark_writer(test_input=test_input, optimization=False, writer=WRITER)
stop_not_optimized = timer()

start_not_optimized_2nd = timer()
benchmark_writer(test_input=test_input, optimization=False, writer=WRITER)
stop_not_optimized_2nd = timer()

start_not_optimized_3rd = timer()
benchmark_writer(test_input=test_input, optimization=False, writer=WRITER)
stop_not_optimized_3rd = timer()

start_optimized = timer()
benchmark_writer(test_input=test_input, optimization=True, writer=WRITER)
stop_optimized = timer()

start_optimized_2nd = timer()
benchmark_writer(test_input=test_input, optimization=True, writer=WRITER)
stop_optimized_2nd = timer()

start_optimized_3rd = timer()
benchmark_writer(test_input=test_input, optimization=True, writer=WRITER)
stop_optimized_3rd = timer()

not_optimized = stop_not_optimized - start_not_optimized
not_optimized_2nd = stop_not_optimized_2nd - start_not_optimized_2nd
not_optimized_3rd = stop_not_optimized_3rd - start_not_optimized_3rd

optimized = stop_optimized - start_optimized
optimized_2nd = stop_optimized_2nd - start_optimized_2nd
optimized_3rd = stop_optimized_3rd - start_optimized_3rd

print(not_optimized, not_optimized_2nd, not_optimized_3rd, optimized, optimized_2nd, optimized_3rd)
71 changes: 71 additions & 0 deletions temp/benchmark_string_array_jit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from argparse import ArgumentError
from ciftools.binary.encoder import STRING_ARRAY
import pytest
import msgpack
import numpy as np
import random, string
from timeit import default_timer as timer

MAX_LENGTH = 30

def _random_string(length):
return ''.join(random.choice(string.ascii_letters) for i in range(length))


# Source:
def _generate_random_strings_list(size: int):
l = [_random_string(random.randint(1, MAX_LENGTH)) for i in range(size)]
return l


INPUTS_FOR_ENCODING = [
# roughly 0.8, 8, 80 MB
_generate_random_strings_list(10**5),
# _generate_random_strings_list(10**6),
# _generate_random_strings_list(10**7)
]

def encoding(encoding_input, optimization):
encoder = STRING_ARRAY
if not optimization:
encoded = encoder.encode_not_optimized(encoding_input)
else:
encoded = encoder.encode(encoding_input)

encoding_input = INPUTS_FOR_ENCODING[0]

start_not_optimized = timer()
encoding(encoding_input=encoding_input, optimization=False)
stop_not_optimized = timer()

start_not_optimized_2nd = timer()
encoding(encoding_input=encoding_input, optimization=False)
stop_not_optimized_2nd = timer()

start_not_optimized_3rd = timer()
encoding(encoding_input=encoding_input, optimization=False)
stop_not_optimized_3rd = timer()

start_optimized = timer()
encoding(encoding_input=encoding_input, optimization=True)
stop_optimized = timer()

start_optimized_2nd = timer()
encoding(encoding_input=encoding_input, optimization=True)
stop_optimized_2nd = timer()

start_optimized_3rd = timer()
encoding(encoding_input=encoding_input, optimization=True)
stop_optimized_3rd = timer()

not_optimized = stop_not_optimized - start_not_optimized
not_optimized_2nd = stop_not_optimized_2nd - start_not_optimized_2nd
not_optimized_3rd = stop_not_optimized_3rd - start_not_optimized_3rd

optimized = stop_optimized - start_optimized
optimized_2nd = stop_optimized_2nd - start_optimized_2nd
optimized_3rd = stop_optimized_3rd - start_optimized_3rd

print(not_optimized, not_optimized_2nd, not_optimized_3rd, optimized, optimized_2nd, optimized_3rd)
# 0.678212083876133 0.07555452641099691 0.0717478571459651 0.8585679298266768 0.12347683496773243 0.1249676188454032
# quite close to pytest-benchmark
Loading