diff --git a/CMakeLists.txt b/CMakeLists.txt index 664e2ea..ece475e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -274,27 +274,32 @@ if(BUILD_PYTHON) message(STATUS "Subdirectory 'extern/pybind11' added.") endif() - # Build the Python module - pybind11_add_module(daisy pybinds/setup.cpp) + # Build the Python module as daisy._core + pybind11_add_module(daisy_core pybinds/setup.cpp) + set_target_properties(daisy_core PROPERTIES + OUTPUT_NAME "_core" + PREFIX "" # no 'lib' prefix, produce _core.cpython-*.so + LIBRARY_OUTPUT_DIRECTORY "${PROJECT_SOURCE_DIR}/daisy" + ) if(DEBUG_MSG) - message(STATUS "Python module 'daisy' created from pybinds/setup.cpp.") + message(STATUS "Python module 'daisy._core' created from pybinds/setup.cpp.") endif() - target_link_libraries(daisy PRIVATE dino_lib) + target_link_libraries(daisy_core PRIVATE dino_lib) if(DEBUG_MSG) - message(STATUS "Python module 'daisy' linked to 'dino_lib'.") + message(STATUS "Python module 'daisy._core' linked to 'dino_lib'.") endif() # Pass BUILD_SING_ENABLED definition to Python module if (BUILD_SING_AVAILABLE) - target_compile_definitions(daisy PRIVATE SING_CUDA_ENABLED=1) + target_compile_definitions(daisy_core PRIVATE SING_CUDA_ENABLED=1) if(DEBUG_MSG) - message(STATUS "Python module 'daisy' compiled with SING_CUDA_ENABLED=1.") + message(STATUS "Python module 'daisy._core' compiled with SING_CUDA_ENABLED=1.") endif() else() - target_compile_definitions(daisy PRIVATE SING_CUDA_ENABLED=0) + target_compile_definitions(daisy_core PRIVATE SING_CUDA_ENABLED=0) if(DEBUG_MSG) - message(STATUS "Python module 'daisy' compiled with SING_CUDA_ENABLED=0.") + message(STATUS "Python module 'daisy._core' compiled with SING_CUDA_ENABLED=0.") endif() endif() @@ -314,13 +319,7 @@ if(BUILD_PYTHON) endif() endif() - # Set output directory for the .so file (demos/ so that import daisy finds daisy.*.so when run from demos) - set_target_properties(daisy PROPERTIES - LIBRARY_OUTPUT_DIRECTORY "${CMAKE_SOURCE_DIR}/demos" - ) - if(DEBUG_MSG) - message(STATUS "Python module 'daisy' output directory set to: ${CMAKE_SOURCE_DIR}/demos") - endif() + # (Python module 'daisy._core' already configured to output into ${PROJECT_SOURCE_DIR}/daisy above) else() if(DEBUG_MSG) message(STATUS "BUILD_PYTHON is OFF. Skipping Python bindings.") diff --git a/demos/CMakeLists.txt b/demos/CMakeLists.txt index d69e454..e82d8d3 100644 --- a/demos/CMakeLists.txt +++ b/demos/CMakeLists.txt @@ -202,6 +202,28 @@ if(BUILD_DEMO) message(STATUS "Include directories added for demo_Messi_DTW.") endif() + # ////// SING DTW ////// + if(BUILD_SING_AVAILABLE) + if(DEBUG_MSG) + message(STATUS "---") + message(STATUS "## Demo: Sing DTW") + message(STATUS "Attempting to add executable: demo_Sing_DTW") + endif() + add_executable(demo_Sing_DTW demo_Sing_DTW.cpp) + target_link_libraries(demo_Sing_DTW PRIVATE dino_lib commons_lib) + target_include_directories(demo_Sing_DTW PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/../lib + ${CMAKE_CURRENT_SOURCE_DIR}/../commons + ) + if(DEBUG_MSG) + message(STATUS "Executable demo_Sing_DTW added and configured (CUDA/Sing available).") + endif() + else() + if(DEBUG_MSG) + message(STATUS "BUILD_SING_AVAILABLE is FALSE. demo_Sing_DTW will NOT be built.") + endif() + endif() + # ////// ODYSSEY L2Square ////// if(DEBUG_MSG) message(STATUS "---") @@ -325,25 +347,23 @@ if(BUILD_DEMO) # ////// SING L2Square ////// if(DEBUG_MSG) message(STATUS "---") - message(STATUS "## Demo: Sing") + message(STATUS "## Demo: Sing L2Square") endif() - if(SING_CUDA_AVAILABLE) + if(BUILD_SING_AVAILABLE) if(DEBUG_MSG) - message(STATUS "SING_CUDA_AVAILABLE is TRUE. Building demo_Sing_L2Square.") + message(STATUS "BUILD_SING_AVAILABLE is TRUE. Building demo_Sing_L2Square.") endif() add_executable(demo_Sing_L2Square demo_Sing_L2Square.cpp) - - target_link_libraries(demo_Sing_L2Square PRIVATE dino_lib commons_lib CUDA::cudart) - + target_link_libraries(demo_Sing_L2Square PRIVATE dino_lib commons_lib) target_include_directories(demo_Sing_L2Square PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../lib ${CMAKE_CURRENT_SOURCE_DIR}/../commons ) else() if(DEBUG_MSG) - message(STATUS "SING_CUDA_AVAILABLE is FALSE. demo_Sing_L2Square will NOT be built.") + message(STATUS "BUILD_SING_AVAILABLE is FALSE. demo_Sing_L2Square will NOT be built.") endif() endif() diff --git a/demos/demo_Messi_DTW.cpp b/demos/demo_Messi_DTW.cpp index 62ba640..baf35db 100644 --- a/demos/demo_Messi_DTW.cpp +++ b/demos/demo_Messi_DTW.cpp @@ -16,7 +16,7 @@ int main(){ printf("Loaded %llu database points and %llu query points with dimension %llu\n", n_database, n_query, dim); daisy::Messi messi_search(daisy::DistanceType::DTW); - messi_search.setNumThreads(4); + messi_search.setNumThreads(1); int warp_window = std::max(1, static_cast(dim * 0.1)); messi_search.setWarpingWindow(warp_window); diff --git a/demos/demo_Messi_DTW.py b/demos/demo_Messi_DTW.py index e2acefd..78e3f09 100644 --- a/demos/demo_Messi_DTW.py +++ b/demos/demo_Messi_DTW.py @@ -8,9 +8,9 @@ def main(): - n_database = 200000 - dim = 96 - n_query = 10 + n_database = 840000 + dim = 128 + n_query = 20 k = 5 np.random.seed(100) diff --git a/demos/demo_Sing_DTW.cpp b/demos/demo_Sing_DTW.cpp new file mode 100644 index 0000000..571d332 --- /dev/null +++ b/demos/demo_Sing_DTW.cpp @@ -0,0 +1,45 @@ + #include "../commons/dataloaders.hpp" + #include "../lib/daisy.hpp" + #include + #include + + int main() { + daisy::idx_t n_database = 200000; + unsigned long long dim = 96; + unsigned long long n_query = 10; + daisy::idx_t k = 1; // current Sing DTW implementation supports only k = 1 + + float *database = loadRandomData(n_database, dim, 100, true); + float *query = loadRandomData(n_query, dim, 50, true); + + daisy::Sing sing_search(daisy::DistanceType::DTW); + sing_search.setNumThreads(1); + + // Sing::buildIndex expects a DataSource*, use InMemoryDataSource as in other components. + daisy::InMemoryDataSource data_source(database, n_database, dim); + printf("indexing\n"); + sing_search.buildIndex(&data_source); + printf("finished indexing\n"); + + daisy::idx_t *I = new daisy::idx_t[n_query * k]; + float *D = new float[n_query * k]; + printf("querying\n"); + sing_search.searchIndex(query, n_query, k, I, D); + printf("finished querying\n"); + + for (daisy::idx_t i = 0; i < n_query; i++) { + printf("Query %llu:\n", i); + printf(" Distances: "); + for (daisy::idx_t j = 0; j < k; j++) { + printf("%f ", D[i * k + j]); + } + } + + delete[] database; + delete[] query; + delete[] I; + delete[] D; + + return 0; + } + diff --git a/demos/demo_Sing_DTW.py b/demos/demo_Sing_DTW.py new file mode 100644 index 0000000..7973047 --- /dev/null +++ b/demos/demo_Sing_DTW.py @@ -0,0 +1,52 @@ +""" +Demo Sing DTW — build index from NumPy array and run DTW search. +Requires Sing to be built with CUDA (SING_CUDA_ENABLED). +Note: current Sing DTW implementation only supports k = 1. +""" +import sys +import os +import time +import numpy as np + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from daisy import DistanceType, Sing + + +def main(): + # Small-ish sizes so the demo runs quickly + n_database = 50000 + dim = 128 + n_query = 5 + k = 1 # DTW implementation in Sing currently supports only k = 1 + + np.random.seed(100) + db = np.random.randn(n_database, dim).astype(np.float32) + np.random.seed(50) + query = np.random.randn(n_query, dim).astype(np.float32) + print(f"Loaded {n_database} database points and {n_query} query points with dimension {dim}") + + sing_search = Sing(DistanceType.DTW) + # Optional: use multiple threads + sing_search.setNumThreads(4) + + sing_search.buildIndex(db) + print(f"buildIndex (DTW) done") + + print(f"Starting DTW search (n_query={n_query}, k={k})...") + I, D = sing_search.searchIndex(query, k) + print(f"Search done)") + + print("\nPython Sing DTW Search Results:") + for i in range(n_query): + print(f"Query {i}:") + print("Distances:", D[i]) + print("Indices:", I[i]) + print() + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/lib/algos/CMakeLists.txt b/lib/algos/CMakeLists.txt index a9a763d..d566240 100644 --- a/lib/algos/CMakeLists.txt +++ b/lib/algos/CMakeLists.txt @@ -128,9 +128,9 @@ if(DEBUG_MSG) endif() # ////// SING ////// -if (SING_CUDA_AVAILABLE) +if (BUILD_SING_AVAILABLE) if(DEBUG_MSG) - message(STATUS "SING_CUDA_AVAILABLE is TRUE. Configuring CUDA for SING components.") + message(STATUS "BUILD_SING_AVAILABLE is TRUE. Configuring CUDA for SING components.") message(STATUS "Adding CUDA Toolkit include directories to dino_lib...") endif() @@ -171,7 +171,7 @@ if (SING_CUDA_AVAILABLE) endif() else() if(DEBUG_MSG) - message(STATUS "SING_CUDA_AVAILABLE is FALSE. CUDA configuration for SING skipped.") + message(STATUS "BUILD_SING_AVAILABLE is FALSE. CUDA configuration for SING skipped.") endif() target_compile_definitions(dino_lib PUBLIC SING_CUDA_ENABLED=0) endif() diff --git a/lib/algos/Sing.cpp b/lib/algos/Sing.cpp index c5face1..bf18f6d 100644 --- a/lib/algos/Sing.cpp +++ b/lib/algos/Sing.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #if SING_CUDA_ENABLED #include "singlib.hpp" #include @@ -205,14 +206,14 @@ namespace daisy free(sax); } - static float nodedistance(float *paa, sax_type *sax, - sax_type *sax_cardinalities, - sax_type max_bit_cardinality, - int max_cardinality, - int number_of_segments, - int min_val, - int max_val, - float ratio_sqrt) + float nodedistance(float *paa, sax_type *sax, + sax_type *sax_cardinalities, + sax_type max_bit_cardinality, + int max_cardinality, + int number_of_segments, + int min_val, + int max_val, + float ratio_sqrt) { (void)max_bit_cardinality; (void)max_cardinality; @@ -365,6 +366,563 @@ namespace daisy } } + float nodedistancedtw(float *paa, float *paaU, float *paaL, sax_type *sax, + sax_type *sax_cardinalities, + sax_type max_bit_cardinality, + int max_cardinality, + int number_of_segments, + int min_val, + int max_val, + float ratio_sqrt) + { + (void)sax_cardinalities; + (void)max_bit_cardinality; + (void)max_cardinality; + (void)min_val; + (void)max_val; + float distance = 0; + int i; + for (i = 0; i < number_of_segments; i++) + { + sax_type c_c = sax[i]; + if ((c_c == 0 && paaL[i] > 0.0f)) + { + distance += paaL[i] * paaL[i]; + } + else if (c_c != 0 && paaU[i] < 0.0f) + { + distance += paaU[i] * paaU[i]; + } + } + distance = ratio_sqrt * distance; + (void)paa; + return distance; + } + + void insert_tree_node_m_hybridpqueueBreakpolydtw(float *paaU, float *paaL, isax_node *node, isax_index *index, + float bsf, pqueue_t **pq, pthread_mutex_t *lock_queue, + int *tnumber, int n_pqueue) + { + float distance = minidist_paa_to_isax_DTWbp(paaU, paaL, node->isax_values, + node->isax_cardinalities, + (sax_type)index->settings->sax_bit_cardinality, + index->settings->sax_alphabet_cardinality, + index->settings->paa_segments, + MINVAL, MAXVAL, + index->settings->mindist_sqrt); + if (distance < bsf) + { + if (node->is_leaf) + { + query_result *mindist_result = (query_result *)malloc(sizeof(query_result)); + mindist_result->node = node; + mindist_result->distance = distance; + pthread_mutex_lock(&lock_queue[*tnumber]); + pqueue_insert(pq[*tnumber], mindist_result); + pthread_mutex_unlock(&lock_queue[*tnumber]); + *tnumber = (*tnumber + 1) % n_pqueue; + added_tree_node++; + } + else + { + if (node->left_child != nullptr && node->left_child->isax_cardinalities != nullptr) + { + insert_tree_node_m_hybridpqueueBreakpolydtw(paaU, paaL, node->left_child, index, bsf, pq, lock_queue, tnumber, n_pqueue); + } + if (node->right_child != nullptr && node->right_child->isax_cardinalities != nullptr) + { + insert_tree_node_m_hybridpqueueBreakpolydtw(paaU, paaL, node->right_child, index, bsf, pq, lock_queue, tnumber, n_pqueue); + } + } + } + } + + float minidist_paa_to_isax_DTWbp(float *paaU, float *paaL, sax_type *sax, + sax_type *sax_cardinalities, + sax_type max_bit_cardinality, + int max_cardinality, + int number_of_segments, + int min_val, + int max_val, + float ratio_sqrt) + { + (void)max_bit_cardinality; + (void)max_cardinality; + (void)min_val; + (void)max_val; + float distance = 0; + int offset = ((max_cardinality - 1) * (max_cardinality - 2)) / 2; + (void)offset; + int i; + for (i = 0; i < number_of_segments; i++) + { + sax_type c_c = sax_cardinalities[i]; + sax_type c_m = max_bit_cardinality; + sax_type v = sax[i]; + sax_type region_lower = (sax_type)(v << (c_m - c_c)); + sax_type mask = (sax_type)((1u << (c_m - c_c)) - 1u); + sax_type region_upper = (sax_type)((~((sax_type)0 << (c_m - c_c))) | region_lower); + float breakpoint_lower = 0; + float breakpoint_upper = 0; + + if (region_lower == 0) + { + breakpoint_lower = (float)min_val; + } + else + { + breakpoint_lower = sax_breakpointsnew3[region_lower - 1]; + } + if (region_upper == (sax_type)(max_cardinality - 1)) + { + breakpoint_upper = (float)max_val; + } + else + { + breakpoint_upper = sax_breakpointsnew3[region_upper]; + } + + (void)mask; + + if (breakpoint_lower > paaU[i]) + { + distance += (float)pow((double)(breakpoint_lower - paaU[i]), 2.0); + } + else if (breakpoint_upper < paaL[i]) + { + distance += (float)pow((double)(breakpoint_upper - paaL[i]), 2.0); + } + } + distance = ratio_sqrt * distance; + return distance; + } + + float calculate_node_distance_SING_short_dtw(isax_index *index, isax_node *node, ts_type *query, float *uo, + float *lo, ts_type *paa, ts_type *paaU, ts_type *paaL, float bsf, + float shortrate, int warpWind, float *rawfile) + { + (void)paa; + (void)paaU; + (void)paaL; + if (node->buffer == nullptr) + return bsf; + float distmin, dist; + int k; + float *cb = (float *)malloc(sizeof(float) * (size_t)index->settings->timeseries_size); + float *cb1 = (float *)malloc(sizeof(float) * (size_t)index->settings->timeseries_size); + int length = 2 * warpWind + 1; + float *tSum = (float *)malloc(sizeof(float) * (size_t)length); + float *pCost = (float *)malloc(sizeof(float) * (size_t)length); + float *rDist = (float *)malloc(sizeof(float) * (size_t)length); + if (cb == nullptr || cb1 == nullptr || tSum == nullptr || pCost == nullptr || rDist == nullptr) + { + free(cb); + free(cb1); + free(tSum); + free(pCost); + free(rDist); + return bsf; + } + short bsfshort = (short)(bsf * shortrate); + for (int i = 0; i < node->buffer->partial_buffer_size; i++) + { + if (node->buffer->lbdarrayshort == nullptr) + continue; + if (node->buffer->lbdarrayshort[i] < bsfshort) + { + if (node->buffer->partial_position_buffer == nullptr || node->buffer->partial_position_buffer[i] == nullptr) + continue; + distmin = lb_keogh_data_bound(&(rawfile[*node->buffer->partial_position_buffer[i]]), uo, lo, cb1, + index->settings->timeseries_size, bsf); + if (distmin < bsf) + { + cb[index->settings->timeseries_size - 1] = cb1[index->settings->timeseries_size - 1]; + for (k = index->settings->timeseries_size - 2; k >= 0; k--) + cb[k] = cb[k + 1] + cb1[k]; + + dist = dtwsimdPruned(query, &(rawfile[*node->buffer->partial_position_buffer[i]]), cb, + index->settings->timeseries_size, warpWind, bsf, tSum, pCost, rDist); + if (dist < bsf) + bsf = dist; + } + } + } + free(tSum); + free(pCost); + free(rDist); + free(cb); + free(cb1); + return bsf; + } + + float calculate_node_DTW2_inmemorysing(isax_index *index, isax_node *node, ts_type *query, float *uo, + float *lo, ts_type *paa, ts_type *paaU, ts_type *paaL, float bsf, + int warpWind, float *rawfile, idx_t *best_position) + { + (void)paa; + if (node->buffer == nullptr) + return bsf; + float distmin, dist; + int k; + float *cb = (float *)malloc(sizeof(float) * (size_t)index->settings->timeseries_size); + float *cb1 = (float *)malloc(sizeof(float) * (size_t)index->settings->timeseries_size); + int length = 2 * warpWind + 1; + float *tSum = (float *)malloc(sizeof(float) * (size_t)length); + float *pCost = (float *)malloc(sizeof(float) * (size_t)length); + float *rDist = (float *)malloc(sizeof(float) * (size_t)length); + if (cb == nullptr || cb1 == nullptr || tSum == nullptr || pCost == nullptr || rDist == nullptr) + { + free(cb); + free(cb1); + free(tSum); + free(pCost); + free(rDist); + return bsf; + } + for (int i = 0; i < node->buffer->partial_buffer_size; i++) + { + if (node->buffer->partial_position_buffer == nullptr || node->buffer->partial_position_buffer[i] == nullptr) + continue; + + distmin = minidist_paa_to_isax_raw_DTW_SING_SIMD(paaU, paaL, node->buffer->partial_sax_buffer[i], + index->settings->max_sax_cardinalities, + index->settings->sax_bit_cardinality, + index->settings->sax_alphabet_cardinality, + index->settings->paa_segments, MINVAL, MAXVAL, + index->settings->mindist_sqrt); + + if (distmin < bsf) + { + distmin = lb_keogh_data_bound(&(rawfile[*node->buffer->partial_position_buffer[i]]), uo, lo, cb1, + index->settings->timeseries_size, bsf); + if (distmin < bsf) + { + cb[index->settings->timeseries_size - 1] = cb1[index->settings->timeseries_size - 1]; + for (k = index->settings->timeseries_size - 2; k >= 0; k--) + cb[k] = cb[k + 1] + cb1[k]; + + dist = dtwsimdPruned(query, &(rawfile[*node->buffer->partial_position_buffer[i]]), cb, + index->settings->timeseries_size, warpWind, bsf, tSum, pCost, rDist); + if (dist < bsf) + { + bsf = dist; + if (best_position != nullptr) + { + *best_position = (idx_t)(*node->buffer->partial_position_buffer[i] / index->settings->timeseries_size); + } + } + } + } + } + free(tSum); + free(pCost); + free(rDist); + free(cb); + free(cb1); + return bsf; + } + + void *uniworkershort_v4dtw(void *uniworkerdata) + { + int amountnode = ((uni_workerdata *)uniworkerdata)->amountnode; + isax_node **nodelist = ((uni_workerdata *)uniworkerdata)->nodelist; + isax_index *index = ((uni_workerdata *)uniworkerdata)->index; + ts_type *paa = ((uni_workerdata *)uniworkerdata)->paa; + ts_type *paaU = ((uni_workerdata *)uniworkerdata)->paaU; + ts_type *paaL = ((uni_workerdata *)uniworkerdata)->paaL; + ts_type *uo = ((uni_workerdata *)uniworkerdata)->uo; + ts_type *lo = ((uni_workerdata *)uniworkerdata)->lo; + int warpWind = ((uni_workerdata *)uniworkerdata)->warpWind; + + float bsf = ((uni_workerdata *)uniworkerdata)->bsf; + unsigned long *offsetarray = ((uni_workerdata *)uniworkerdata)->offsetarray; + bool *activechunk = ((uni_workerdata *)uniworkerdata)->activechunk; + bool *activenode = ((uni_workerdata *)uniworkerdata)->activenode; + int current_root_node_number; + int chunknumber = ((uni_workerdata *)uniworkerdata)->chunknumber; + int *chunkcounter = ((uni_workerdata *)uniworkerdata)->chunkcounter; + (void)amountnode; + (void)chunkcounter; + + isax_node *current_root_node; + query_result *n; + ts_type *ts = ((uni_workerdata *)uniworkerdata)->ts; + pqueue_t *pq = ((uni_workerdata *)uniworkerdata)->pq; + (void)pq; + query_result *do_not_remove = ((uni_workerdata *)uniworkerdata)->bsf_result; + float minimum_distance = ((uni_workerdata *)uniworkerdata)->minimum_distance; + int limit = ((uni_workerdata *)uniworkerdata)->limit; + int checks = 0; + bool finished = true; + int tight_bound = index->settings->tight_bound; + int aggressive_check = index->settings->aggressive_check; + (void)do_not_remove; + (void)limit; + (void)tight_bound; + (void)aggressive_check; + query_result *bsf_result = (((uni_workerdata *)uniworkerdata)->bsf_result); + float bsfdisntance = bsf; + float distance; + int calculate_node = 0, calculate_node_quque = 0; + (void)calculate_node; + (void)calculate_node_quque; + int startqueuenumber = ((uni_workerdata *)uniworkerdata)->startqueuenumber; + int tnumber = startqueuenumber; + short *lbdmap = ((uni_workerdata *)uniworkerdata)->lbdmapshort; + (void)lbdmap; + int localqueuecounter = 0; + (void)localqueuecounter; + pthread_mutex_t *alllock = ((uni_workerdata *)uniworkerdata)->alllock; + int n_pqueue = ((uni_workerdata *)uniworkerdata)->n_pqueue; + int *allqueuelabel = ((uni_workerdata *)uniworkerdata)->allqueuelabel; + + for (current_root_node_number = ((uni_workerdata *)uniworkerdata)->workerstartnode; + current_root_node_number < ((uni_workerdata *)uniworkerdata)->workerstopnode; + current_root_node_number++) + { + isax_node *node = nodelist[current_root_node_number]; + distance = nodedistancedtw(paa, paaU, paaL, node->isax_values, + node->isax_cardinalities, + (sax_type)index->settings->sax_bit_cardinality, + index->settings->sax_alphabet_cardinality, + index->settings->paa_segments, + MINVAL, MAXVAL, + index->settings->mindist_sqrt); + if (distance < bsf) + { + activechunk[(int)(offsetarray[current_root_node_number] / (index->sax_cache_size / chunknumber))] = true; + activechunk[(int)(offsetarray[current_root_node_number + 1] / (index->sax_cache_size / chunknumber))] = true; + activenode[current_root_node_number] = true; + } + else + { + activenode[current_root_node_number] = false; + } + } + + pthread_barrier_wait(((uni_workerdata *)uniworkerdata)->lock_barrierfirst); + + current_root_node_number = 0; + while (1) + { + current_root_node_number = __sync_fetch_and_add(((uni_workerdata *)uniworkerdata)->node_counter, 1); + if (current_root_node_number >= ((uni_workerdata *)uniworkerdata)->amountnode) + break; + current_root_node = ((uni_workerdata *)uniworkerdata)->nodelist[current_root_node_number]; + if (activenode[current_root_node_number] == true) + { + insert_tree_node_m_hybridpqueueBreakpolydtw(paaU, paaL, current_root_node, index, bsfdisntance, + ((uni_workerdata *)uniworkerdata)->allpq, + ((uni_workerdata *)uniworkerdata)->alllock, + &tnumber, n_pqueue); + } + } + pthread_barrier_wait(((uni_workerdata *)uniworkerdata)->lock_barrier); + + int queuenumber; + int offset = startqueuenumber; + while (1) + { + finished = true; + for (int i = 0; i < n_pqueue; i++) + { + queuenumber = (i + offset) % n_pqueue; + if (allqueuelabel[queuenumber] == 1) + { + finished = false; + bsfdisntance = bsf_result->distance; + pthread_mutex_lock(&alllock[queuenumber]); + n = (query_result *)pqueue_pop(((uni_workerdata *)uniworkerdata)->allpq[queuenumber]); + if (n == nullptr) + { + allqueuelabel[queuenumber] = 0; + pthread_mutex_unlock(&alllock[queuenumber]); + continue; + } + else + { + pthread_mutex_unlock(&alllock[queuenumber]); + } + + if (n->distance > bsfdisntance || n->distance > minimum_distance) + { + allqueuelabel[queuenumber] = 0; + free(n); + continue; + } + else + { + if (n->node->is_leaf) + { + checks++; + float *rawfile = ((uni_workerdata *)uniworkerdata)->rawfile; + if (n->node->buffer->arrayposition < *((uni_workerdata *)uniworkerdata)->gpuoffset) + { + distance = calculate_node_distance_SING_short_dtw(index, n->node, ts, uo, lo, + paa, paaU, paaL, + bsfdisntance, + ((uni_workerdata *)uniworkerdata)->shortrate, + ((uni_workerdata *)uniworkerdata)->warpWind, + rawfile); + } + else + { + distance = calculate_node_DTW2_inmemorysing(index, n->node, ts, uo, lo, + paa, paaU, paaL, + bsfdisntance, warpWind, + rawfile, nullptr); + } + if (distance < bsfdisntance) + { + pthread_rwlock_wrlock(((uni_workerdata *)uniworkerdata)->lock_bsf); + if (distance < bsf_result->distance) + { + bsf_result->distance = distance; + bsf_result->node = n->node; + } + pthread_rwlock_unlock(((uni_workerdata *)uniworkerdata)->lock_bsf); + } + } + } + free(n); + } + else if (allqueuelabel[queuenumber] == 0) + { + pthread_mutex_lock(&(((uni_workerdata *)uniworkerdata)->alllock[queuenumber])); + n = (query_result *)pqueue_pop(((uni_workerdata *)uniworkerdata)->allpq[queuenumber]); + pthread_mutex_unlock(&(((uni_workerdata *)uniworkerdata)->alllock[queuenumber])); + if (n != nullptr) + { + free(n); + } + else + { + allqueuelabel[queuenumber] = 2; + } + } + } + if (finished) + { + break; + } + } + + (void)checks; + pthread_exit(nullptr); + } + + void pass_tree_node_m_short(isax_node *node, isax_index *index, pthread_mutex_t *lock_queue, + unsigned long int *currentposition, sax_type *saxarray, sax_type *sortsaxarray, + short *lbdarray) + { + (void)lock_queue; + (void)saxarray; + if (node == nullptr) + return; + if (node->is_leaf) + { + if (node->buffer == nullptr || node->buffer->partial_sax_buffer == nullptr || node->buffer->partial_buffer_size <= 0) + return; + unsigned long int arrayposition = __sync_fetch_and_add(currentposition, (unsigned long)node->buffer->partial_buffer_size); + node->buffer->lbdarrayshort = &lbdarray[arrayposition]; + node->buffer->arrayposition = arrayposition + (unsigned long)node->buffer->partial_buffer_size; + const int paa_segments = index->settings->paa_segments; + for (int i = 0; i < node->buffer->partial_buffer_size; i++) + { + for (int j = 0; j < paa_segments; j++) + { + sortsaxarray[(arrayposition + (unsigned long)i) * (unsigned long)paa_segments + (unsigned long)j] = + (sax_type)node->buffer->partial_sax_buffer[i][j]; + } + } + } + else + { + if (node->left_child != nullptr && node->left_child->isax_cardinalities != nullptr) + pass_tree_node_m_short(node->left_child, index, lock_queue, currentposition, saxarray, sortsaxarray, lbdarray); + if (node->right_child != nullptr && node->right_child->isax_cardinalities != nullptr) + pass_tree_node_m_short(node->right_child, index, lock_queue, currentposition, saxarray, sortsaxarray, lbdarray); + } + } + + float calculate_node_DTW_inmemory(isax_index *index, isax_node *node, ts_type *query, float bsf, int warpWind, float *rawfile) + { + if (node->buffer == nullptr) + return bsf; + float *cb = (float *)calloc((size_t)index->settings->timeseries_size, sizeof(float)); + if (cb == nullptr) + return bsf; + int i; + for (i = 0; i < node->buffer->full_buffer_size; i++) + { + float dist = ts_euclidean_distance(query, node->buffer->full_ts_buffer[i], + index->settings->timeseries_size, bsf); + if (dist < bsf) + bsf = dist; + } + for (i = 0; i < node->buffer->tmp_full_buffer_size; i++) + { + float dist = ts_euclidean_distance(query, node->buffer->tmp_full_ts_buffer[i], + index->settings->timeseries_size, bsf); + if (dist < bsf) + bsf = dist; + } + if (rawfile != nullptr && node->buffer->partial_position_buffer != nullptr) + { + for (i = 0; i < node->buffer->partial_buffer_size; i++) + { + if (node->buffer->partial_position_buffer[i] == nullptr) + continue; + float dist = dtw(query, &(rawfile[*node->buffer->partial_position_buffer[i]]), cb, + index->settings->timeseries_size, warpWind, bsf); + if (dist < bsf) + bsf = dist; + } + } + free(cb); + return bsf; + } + + query_result approximate_DTW_inmemory_sing(ts_type *ts, ts_type *paa, isax_index *index, int warpWind, float *rawfile) + { + query_result result; + result.node = nullptr; + result.distance = FLT_MAX; + result.pqueue_position = 0; + result.pq_bsf = nullptr; + result.total_time = 0.0f; + + sax_type *sax = (sax_type *)malloc(sizeof(sax_type) * (size_t)index->settings->paa_segments); + if (sax == nullptr) + return result; + sax_from_paa(paa, sax, index->settings->paa_segments, + index->settings->sax_alphabet_cardinality, + index->settings->sax_bit_cardinality); + + root_mask_type root_mask = 0; + CREATE_MASK(root_mask, index, sax); + + if ((&((first_buffer_layer2 *)(index->fbl))->soft_buffers[(int)root_mask])->initialized) + { + isax_node *node = (&((first_buffer_layer2 *)(index->fbl))->soft_buffers[(int)root_mask])->node; + while (!node->is_leaf) + { + int location = index->settings->sax_bit_cardinality - 1 - + node->split_data->split_mask[node->split_data->splitpoint]; + root_mask_type mask = index->settings->bit_masks[location]; + if (sax[node->split_data->splitpoint] & mask) + node = node->right_child; + else + node = node->left_child; + } + result.distance = calculate_node_DTW_inmemory(index, node, ts, FLT_MAX, warpWind, rawfile); + result.node = node; + } + + free(sax); + return result; + } + void insert_tree_node_m_hybridpqueueBreakpolyroot(float *paa, isax_node *node, isax_index *index, float bsf, pqueue_t **pq, pthread_mutex_t *lock_queue, int *tnumber, int n_pqueue) { @@ -397,6 +955,15 @@ namespace daisy } } + float nodedistance(float *paa, sax_type *sax, + sax_type *sax_cardinalities, + sax_type max_bit_cardinality, + int max_cardinality, + int number_of_segments, + int min_val, + int max_val, + float ratio_sqrt); + void calculate_node_topk_SING(isax_index *index, isax_node *node, ts_type *query, ts_type *paa, pqueue_bsf *pq_bsf, pthread_rwlock_t *lock_queue, float *rawfile) { @@ -480,6 +1047,221 @@ namespace daisy } } + query_result exact_DTW_SING_new(ts_type *ts, ts_type *paa, ts_type *paaU, ts_type *paaL, + isax_index *index, float minimum_distance, int min_checked_leaves, + float *qts, float *gqts, float *gqtsU, float *gqtsL, + sax_type *gsaxarray, short *positionmap, short *gpositionmap, + float *gdictionary, node_list *nodelist, + unsigned long int *offsetarray, int **arrangenodearray, + int *arrangenodearraynumber, int warpWind, + int N_PQUEUE, int maxquerythread, int chunknumber, + float *rawfile) + { + + //query_result approximate_result = approximate_search_inmemory_messi(ts, paa, index); + //printf("this is check point 02\n"); + + query_result approximate_result = approximate_DTW_inmemory_sing(ts, paa, index, warpWind, rawfile); + //printf("this is check point 1\n"); + int tight_bound = index->settings->tight_bound; + int aggressive_check = index->settings->aggressive_check; + + bool labelvalue=false; + // Early termination... + if (approximate_result.distance == 0) { + return approximate_result; + } + float *lowerLemire= (float *)malloc(sizeof(float)*index->settings->timeseries_size); + float *upperLemire= (float *)malloc(sizeof(float)*index->settings->timeseries_size); + lower_upper_lemire(ts,index->settings->timeseries_size,warpWind,lowerLemire,upperLemire); + + if(approximate_result.distance == FLT_MAX || min_checked_leaves > 1) { + //approximate_result = refine_answer_inmemory(ts, paa, index, approximate_result, minimum_distance, min_checked_leaves); + } + query_result bsf_result = approximate_result; + pqueue_t **allpq=(pqueue_t**)malloc(sizeof(pqueue_t*)*N_PQUEUE); + + + pthread_mutex_t ququelock[N_PQUEUE]; + int queuelabel[N_PQUEUE]; + + query_result *do_not_remove = &approximate_result; + + //printf("this is check point 2\n"); + + if(approximate_result.node != NULL) { + // Insert approximate result in heap. + //pqueue_insert(pq, &approximate_result); + //GOOD: if(approximate_result.node->filename != NULL) + //GOOD: printf("POPS: %.5lf\t", approximate_result.distance); + } + // Insert all root nodes in heap. + isax_node *current_root_node = index->first_node; + + pthread_t threadid[maxquerythread],threadid2[maxquerythread]; + SING_workerdata workerdata[maxquerythread]; + pthread_mutex_t lock_queue=PTHREAD_MUTEX_INITIALIZER,lock_current_root_node=PTHREAD_MUTEX_INITIALIZER; + pthread_rwlock_t lock_bsf=PTHREAD_RWLOCK_INITIALIZER; + pthread_barrier_t lock_barrier,lock_barrierfirst; + pthread_barrier_init(&lock_barrier, NULL, maxquerythread+1); + pthread_barrier_init(&lock_barrierfirst, NULL, maxquerythread+1); + + int queueoffset[N_PQUEUE]; + unsigned long int gpuoffset=0; + int loopnumber = chunknumber; + for (int i = 0; i < N_PQUEUE; i++) + { + allpq[i]=pqueue_init(index->settings->root_nodes_size/N_PQUEUE, + cmp_pri, get_pri, set_pri, get_pos, set_pos); + pthread_mutex_init(&ququelock[i], NULL); + queuelabel[i]=1; + } + + bool *activechunk=(bool*)malloc(sizeof(bool)*(chunknumber+1)); + int *chunkcounter=(int*)malloc(sizeof(int)*(chunknumber)); + int *chunkfinishcounter=(int*)malloc(sizeof(int)*(chunknumber)); + + for (int i = 0; i < chunknumber; i++) + { + activechunk[i]=false; + chunkcounter[i]=0; + chunkfinishcounter[i]=0; + } + activechunk[loopnumber]=true; + gap_workerdata gapworkerdata[maxquerythread]; + int startnode=nodelist->node_amount, stopnode=0,nodecounter=0,nodecounter2=nodelist->node_amount-1;//, *gapstartnode,*gapstopnode + bool *activenode=(bool*)malloc(sizeof(bool)*nodelist->node_amount); + uni_workerdata uniworkerdata[maxquerythread]; + int node_counter=0; + unsigned long gpuoffsetnum=0; + float shortrate=5000.0f/approximate_result.distance; + //printf("the shortrate is %f\n",shortrate); + for (int i = 0; i < maxquerythread; i++) + { + uniworkerdata[i].nodelist = nodelist->nlist; + uniworkerdata[i].amountnode = nodelist->node_amount; + uniworkerdata[i].startnode = &startnode; + uniworkerdata[i].stopnode = &stopnode; // *gapstartnode,*gapstopnode; + uniworkerdata[i].chunkcounter = chunkcounter; + uniworkerdata[i].chunkfinishcounter = chunkfinishcounter; + uniworkerdata[i].nodecounter = &nodecounter; + uniworkerdata[i].arrangenodearray = arrangenodearray; + uniworkerdata[i].arrangenodearraynumber = arrangenodearraynumber; + uniworkerdata[i].nodecounter2 = &nodecounter2; + uniworkerdata[i].index = index; + uniworkerdata[i].bsf = approximate_result.distance; + uniworkerdata[i].paa = paa; + uniworkerdata[i].paaU = paaU; + uniworkerdata[i].paaL = paaL; + uniworkerdata[i].uo = upperLemire; + uniworkerdata[i].lo = lowerLemire; + uniworkerdata[i].lockposition = &lock_queue; + uniworkerdata[i].offsetarray = offsetarray; + uniworkerdata[i].activechunk = activechunk; + uniworkerdata[i].chunknumber = chunknumber; + uniworkerdata[i].activenode = activenode; + uniworkerdata[i].workerstartnode = (i)*nodelist->node_amount / maxquerythread; + uniworkerdata[i].workerstopnode = (i + 1) * nodelist->node_amount / maxquerythread; + uniworkerdata[i].shortrate = shortrate; + uniworkerdata[i].ts = ts; + uniworkerdata[i].lock_queue = &lock_queue; + uniworkerdata[i].lock_current_root_node = &lock_current_root_node; + uniworkerdata[i].lock_bsf = &lock_bsf; + uniworkerdata[i].minimum_distance = minimum_distance; + uniworkerdata[i].node_counter = &node_counter; + uniworkerdata[i].pq = allpq[i]; + uniworkerdata[i].bsf_result = &bsf_result; + uniworkerdata[i].lock_barrier = &lock_barrier; + uniworkerdata[i].lock_barrierfirst = &lock_barrierfirst; + uniworkerdata[i].warpWind = warpWind; + + uniworkerdata[i].alllock = ququelock; + uniworkerdata[i].allqueuelabel = queuelabel; + uniworkerdata[i].allpq = allpq; + uniworkerdata[i].startqueuenumber = (i % N_PQUEUE); + uniworkerdata[i].lbdmapshort = positionmap; + uniworkerdata[i].labelvalue = &labelvalue; + uniworkerdata[i].gpuoffset = &gpuoffset; + uniworkerdata[i].activenode = activenode; + uniworkerdata[i].rawfile = rawfile; + uniworkerdata[i].n_pqueue = N_PQUEUE; + } + uniworkerdata[maxquerythread-1].workerstopnode=nodelist->node_amount; + //printf("this is check point 3\n"); + + for (int i = 0; i < maxquerythread; i++) + { + + pthread_create(&(threadid2[i]),NULL,uniworkershort_v4dtw,(void*)&(uniworkerdata[i])); + } + // cudaStream_t *streams=LBDfloatstreamGPUdevidebegin( qts, gqts,datastartnumber,datasizerun,loopnumber); + pthread_barrier_wait(&lock_barrierfirst); + if(activechunk[loopnumber]) + activechunk[loopnumber-1]=true; + //printf("this is check point 4\n"); + + //printf("this is check point 5\n"); + LBDshortstreamGPUinsidedynamicratev2dtw(gsaxarray, positionmap, paaU, paaL, gqtsU, gqtsL, approximate_result.distance,index->sax_cache_size,gpositionmap,index->settings->paa_segments,index->settings->mindist_sqrt, &gpuoffset,shortrate,loopnumber,activechunk); + + pthread_barrier_wait(&lock_barrier); + //printf("this is check point 6\n"); + + //for (int i = 0; i < loopnumber; i++) + { + //if(activechunk[i]) + { + //LBDshortstreamGPUinsidedynamicrate(&gsaxarray[i*index->sax_cache_size/loopnumber*index->settings->paa_segments], &positionmap[i*index->sax_cache_size/loopnumber], paa, gqts, approximate_result.distance,index->sax_cache_size/loopnumber,&gpositionmap[i*index->sax_cache_size/loopnumber],index->settings->paa_segments,index->settings->mindist_sqrt, &gpuoffsetnum,shortrate); + //LBDcalculationnumber+=index->sax_cache_size/loopnumber; + } + // __sync_fetch_and_add(&gpuoffset,index->sax_cache_size/loopnumber); + //gpuoffset=(i+1)*index->sax_cache_size/loopnumber; + } + //LBDfloatstreamGPU(gsaxarray, positionmap, paa, gqts, approximate_result.distance,index->sax_cache_size,gpositionmap, gdictionary); + + + for (int i = 0; i < maxquerythread; i++) + { + pthread_join(threadid2[i],NULL); + } + + + //printf("this is check point 7\n"); + + + + + + + + + + + + + // Free the nodes that where not popped. + // Free the priority queue. + pthread_barrier_destroy(&lock_barrier); + + //pqueue_free(pq); + for (int i = 0; i < N_PQUEUE; i++) + { + pqueue_free(allpq[i]); + } + free(allpq); + free(activechunk); + free(chunkcounter); + free(chunkfinishcounter); + //free(rfdata); + //LBDcalculationnumber=(datasizerun-datastartnumber)*index->sax_cache_size/loopnumber; + //printf("the number of node added is \t%ld\t\t and real distance calculation is \t%ld\n ",LBDcalculationnumber,RDcalculationnumber); + //printf("the number of lower bound distance calculation is \t%ld\t\t and the delete node is \t%ld\n ",LBDcalculationnumber,RDcalculationnumber); + + return bsf_result; + + // Free the nodes that where not popped. + + } + void *exact_knn_SING_worker(void *arg) { SING_workerdata *wd = (SING_workerdata *)arg; @@ -637,8 +1419,7 @@ namespace daisy isax_node_record *r = (isax_node_record *)malloc(sizeof(isax_node_record)); int worker_id = ((buffer_data_inmemory *)transferdata)->workernumber; - int buffers_processed = 0; - static const int debug_print_max = 10; + (void)worker_id; while (1) { @@ -652,9 +1433,6 @@ namespace daisy if (current_fbl_node->buffer_size > 0) { - if (buffers_processed < debug_print_max) - fprintf(stderr, " [worker %d] buffer j=%d buffer_size=%d\n", worker_id, j, current_fbl_node->buffer_size); - buffers_processed++; for (k = 0; k < current_fbl_node->buffer_size; k++) { r->sax = &(index->sax_cache[current_fbl_node->pos_records[k] / index->settings->timeseries_size * index->settings->paa_segments]); @@ -757,7 +1535,7 @@ namespace daisy long int ts_num = (long int)dataset_size; int maxquerythread = this->index_workers; - fprintf(stderr, "[Sing buildIndex] index_creation_gpu: start, ts_num=%ld, index_workers=%d\n", (long)ts_num, maxquerythread); + // build-index debug logging removed (handled at demo level) idx->sax_file = NULL; long int ts_loaded = 0; @@ -801,7 +1579,6 @@ namespace daisy fprintf(stderr, "Sing::index_creation_gpu: initialize_simrec failed\n"); throw std::runtime_error("Sing::index_creation_gpu: initialize_simrec failed"); } - fprintf(stderr, "[Sing buildIndex] initialize_simrec ok, number_of_buffers=%d\n", ((first_buffer_layer2 *)idx->fbl)->number_of_buffers); for (i = 0; i < LOCK_SIZE; i++) pthread_mutex_init(&lockcbl[i], NULL); @@ -821,7 +1598,6 @@ namespace daisy __sync_fetch_and_add(&(current_buffer->max_buffer_size), 1); } } - fprintf(stderr, "[Sing buildIndex] phase 1 done: SAX count per buffer\n"); #pragma omp parallel for num_threads(maxquerythread) for (i = 0; i < ((first_buffer_layer2 *)(idx->fbl))->number_of_buffers; i++) @@ -836,7 +1612,6 @@ namespace daisy current_buffer->node->is_leaf = 1; } } - fprintf(stderr, "[Sing buildIndex] phase 2 done: buffers and nodes allocated\n"); #pragma omp parallel for num_threads(maxquerythread) for (long int j = 0; j < ts_num; j++) @@ -849,7 +1624,6 @@ namespace daisy memcpy(¤t_buffer->sax_records[buffersize * idx->settings->paa_segments], sax, sizeof(sax_type) * (size_t)idx->settings->paa_segments); current_buffer->pos_records[buffersize] = (file_position_type)(j * idx->settings->timeseries_size); } - fprintf(stderr, "[Sing buildIndex] phase 3 done: sax_records/pos_records filled\n"); for (i = 0; i < maxquerythread; i++) { @@ -877,12 +1651,10 @@ namespace daisy input_data[maxquerythread - 1].start_number = (maxquerythread - 1) * (int)(ts_num / maxquerythread); input_data[maxquerythread - 1].stop_number = (int)ts_num; - fprintf(stderr, "[Sing buildIndex] starting %d workers (add_record_to_node + flush)...\n", maxquerythread); for (i = 0; i < maxquerythread; i++) pthread_create(&(threadid[i]), NULL, index_creation_worker2_inmemory, (void *)&(input_data[i])); for (i = 0; i < maxquerythread; i++) pthread_join(threadid[i], NULL); - fprintf(stderr, "[Sing buildIndex] workers finished\n"); idx->sax_cache_size = (unsigned long)ts_num; @@ -902,7 +1674,11 @@ namespace daisy } if (this->distance_type == DistanceType::DTW) { - searchIndex_DTW(query, n_query, k, I, D); + if (k != 1) + { + fprintf(stderr, "[Sing][DTW] Warning: only k = 1 is supported for DTW in this version; proceeding with k = 1.\n"); + } + searchIndex_DTW(query, n_query, 1, I, D); return; } @@ -941,12 +1717,228 @@ namespace daisy void Sing::searchIndex_DTW(const float *query, const idx_t n_query, const idx_t k, idx_t *I, float *D) { - (void)query; - (void)n_query; - (void)k; - (void)I; - (void)D; - fprintf(stderr, "[Sing] searchIndex_DTW not implemented yet.\n"); + isax_index *index = this->index; + first_buffer_layer2 *fbl2 = (first_buffer_layer2 *)index->fbl; + if (fbl2 == nullptr || fbl2->soft_buffers == nullptr) + { + fprintf(stderr, "[Sing searchIndex_DTW] fbl not available, running brute-force.\n"); +#pragma omp parallel num_threads(num_threads) + { +#pragma omp for + for (idx_t qi = 0; qi < n_query; qi++) + { + std::priority_queue> pq; + const float *q_vec = query + qi * this->dim; + float bound = FLT_MAX; + for (idx_t dbi = 0; dbi < this->n_database; ++dbi) + { + const float *db_vec = this->database + dbi * this->dim; + float dist = this->distance_computer->compute_dist(const_cast(q_vec), + const_cast(db_vec), + this->dim, bound); + if ((idx_t)pq.size() < k) + pq.emplace(dist, dbi); + else if (dist < pq.top().first) + { + pq.pop(); + pq.emplace(dist, dbi); + bound = pq.top().first; + } + } + for (idx_t j = k; j > 0; --j) + { + D[qi * k + (j - 1)] = pq.top().first; + I[qi * k + (j - 1)] = pq.top().second; + pq.pop(); + } + } + } + return; + } + + node_list nodelist; + int root_nodes = index->settings->root_nodes_size; + nodelist.nlist = (isax_node **)malloc(sizeof(isax_node *) * (size_t)root_nodes); + nodelist.node_amount = 0; + for (int i = 0; i < root_nodes; i++) + { + fbl_soft_buffer2 *current_buffer = &fbl2->soft_buffers[i]; + if (current_buffer->initialized == 1 && current_buffer->node != nullptr) + { + nodelist.nlist[nodelist.node_amount++] = current_buffer->node; + } + } + if (nodelist.node_amount > 0) + index->first_node = nodelist.nlist[0]; + + const int paa_segments = index->settings->paa_segments; + const int ts_values_per_paa = index->settings->ts_values_per_paa_segment; + ts_type *paa = (ts_type *)malloc(sizeof(ts_type) * (size_t)paa_segments); + ts_type *paaU = (ts_type *)malloc(sizeof(ts_type) * (size_t)paa_segments); + ts_type *paaL = (ts_type *)malloc(sizeof(ts_type) * (size_t)paa_segments); + ts_type *upperLemire = (ts_type *)malloc(sizeof(ts_type) * (size_t)index->settings->timeseries_size); + ts_type *lowerLemire = (ts_type *)malloc(sizeof(ts_type) * (size_t)index->settings->timeseries_size); + + short *posbitmap = nullptr; + short *gposbitmap = nullptr; + float *qts = nullptr; + float *gqts = nullptr; + float *gqtsU = nullptr; + float *gqtsL = nullptr; + sax_type *gsaxarray = nullptr; + +#if SING_CUDA_ENABLED + initialdevice(); + posbitmap = initialposbitmapshort(posbitmap, (unsigned long)index->sax_cache_size); + gposbitmap = initialgposbitmapshort(gposbitmap, (unsigned long)index->sax_cache_size); + qts = (float *)malloc(sizeof(float) * (size_t)paa_segments); + gqts = initialgqts(gqts); + gqtsU = initialgqts(gqtsU); + gqtsL = initialgqts(gqtsL); + sax_type *gsaxarray_d = (sax_type *)initialgsaxarray(nullptr, (unsigned long)index->sax_cache_size); +#else + posbitmap = nullptr; + gposbitmap = nullptr; + qts = (float *)malloc(sizeof(float) * (size_t)paa_segments); + gqts = nullptr; + gqtsU = nullptr; + gqtsL = nullptr; + sax_type *gsaxarray_d = nullptr; +#endif + + unsigned long int *offsetarray = (unsigned long int *)malloc(sizeof(unsigned long int) * (size_t)(nodelist.node_amount + 1)); + sax_type *saxarrarysort = (sax_type *)malloc(sizeof(sax_type) * (size_t)index->sax_cache_size * (size_t)paa_segments); + unsigned long int currentposition = 0; + for (int i = 0; i < nodelist.node_amount; i++) + { + offsetarray[i] = currentposition; + pass_tree_node_m_short(nodelist.nlist[i], index, NULL, ¤tposition, index->sax_cache, saxarrarysort, posbitmap); + } + offsetarray[nodelist.node_amount] = (unsigned long int)index->sax_cache_size; + + if (currentposition > 0) + { +#if SING_CUDA_ENABLED + gpumemcpy(gsaxarray_d, saxarrarysort, (unsigned long)index->sax_cache_size); + gsaxarray = gsaxarray_d; +#else + gsaxarray = saxarrarysort; +#endif + } + else + { + for (int i = 0; i < nodelist.node_amount; i++) + offsetarray[i] = (unsigned long int)((unsigned long long)index->sax_cache_size * i / nodelist.node_amount); + offsetarray[nodelist.node_amount] = (unsigned long int)index->sax_cache_size; +#if SING_CUDA_ENABLED + gpumemcpy(gsaxarray_d, index->sax_cache, (unsigned long)index->sax_cache_size); + gsaxarray = gsaxarray_d; +#else + gsaxarray = index->sax_cache; +#endif + free(saxarrarysort); + saxarrarysort = nullptr; + } + + int chunknumber = 20; + int *arrangenodearraynumber = (int *)malloc(sizeof(int) * (size_t)chunknumber); + int *arrangernodearraynumbertemp = (int *)malloc(sizeof(int) * (size_t)chunknumber); + int **arrangenodearray = (int **)malloc(sizeof(int *) * (size_t)chunknumber); + for (int i = 0; i < chunknumber; i++) + { + arrangenodearraynumber[i] = 0; + arrangernodearraynumbertemp[i] = 0; + } + + for (int i = 0; i < nodelist.node_amount - 1; i++) + { + for (int j = (int)(offsetarray[i] / (index->sax_cache_size / chunknumber)); + j <= (int)(offsetarray[i + 1] / (index->sax_cache_size / chunknumber)); j++) + { + arrangenodearraynumber[j]++; + } + } + arrangenodearraynumber[chunknumber - 1]++; + + for (int i = 0; i < chunknumber; i++) + { + arrangenodearray[i] = (int *)malloc(sizeof(int) * (size_t)arrangenodearraynumber[i]); + } + + for (int i = 0; i < nodelist.node_amount - 1; i++) + { + for (int j = (int)(offsetarray[i] / (index->sax_cache_size / chunknumber)); + j <= (int)(offsetarray[i + 1] / (index->sax_cache_size / chunknumber)); j++) + { + arrangenodearray[j][arrangernodearraynumbertemp[j]] = i; + arrangernodearraynumbertemp[j]++; + } + } + arrangenodearray[chunknumber - 1][arrangernodearraynumbertemp[chunknumber - 1]] = nodelist.node_amount - 1; + + int N_PQUEUE = this->n_pqueue; + int maxquerythread = this->search_workers; + float minimum_distance = this->minimum_distance; + int min_checked_leaves = this->min_checked_leaves; + + for (idx_t q_loaded = 0; q_loaded < n_query; q_loaded++) + { + ts_type *ts = (ts_type *)(query + q_loaded * this->dim); + lower_upper_lemire(ts, index->settings->timeseries_size, this->warping_window, lowerLemire, upperLemire); + paa_from_ts(upperLemire, paaU, paa_segments, ts_values_per_paa); + paa_from_ts(lowerLemire, paaL, paa_segments, ts_values_per_paa); + paa_from_ts(ts, paa, paa_segments, ts_values_per_paa); + std::memcpy(qts, paa, sizeof(float) * (size_t)paa_segments); + + + int chunknumber = 20; // same value used in L2 search + + query_result result = exact_DTW_SING_new(ts, paa, paaU, paaL, index, + minimum_distance, min_checked_leaves, + qts, gqts, gqtsU, gqtsL, + gsaxarray, posbitmap, gposbitmap, + nullptr, &nodelist, offsetarray, + arrangenodearray, arrangenodearraynumber, + this->warping_window, + this->n_pqueue, this->search_workers, chunknumber, + this->database); + + // Use the final DTW result to recover the exact database index (k = 1). + idx_t best_pos = 0; + float best_dist = result.distance; + if (result.node != nullptr) + { + best_dist = calculate_node_DTW2_inmemorysing(index, result.node, ts, upperLemire, lowerLemire, + paa, paaU, paaL, result.distance, + this->warping_window, this->database, &best_pos); + } + D[q_loaded * k] = best_dist; + I[q_loaded * k] = best_pos; + } + + free(arrangenodearraynumber); + free(arrangernodearraynumbertemp); + for (int i = 0; i < chunknumber; i++) + free(arrangenodearray[i]); + free(arrangenodearray); + free(saxarrarysort); + free(offsetarray); + free(paa); + free(paaU); + free(paaL); + free(upperLemire); + free(lowerLemire); + free(qts); + free(nodelist.nlist); + +#if SING_CUDA_ENABLED + GPUfree(gsaxarray); + GPUfree(gposbitmap); + GPUfree(gqts); + GPUfree(gqtsU); + GPUfree(gqtsL); + cudaFreeHost(posbitmap); +#endif } void Sing::searchIndexL2Square(const float *query, const idx_t n_query, const idx_t k, idx_t *I, float *D) diff --git a/lib/algos/Sing.hpp b/lib/algos/Sing.hpp index 4e6c87c..f54c458 100644 --- a/lib/algos/Sing.hpp +++ b/lib/algos/Sing.hpp @@ -73,6 +73,54 @@ namespace daisy int *arrangenodearraynumber; } gap_workerdata; + typedef struct uni_workerdata + { + isax_node **nodelist; + int amountnode, workerstartnode, workerstopnode; + int *startnode, *stopnode, *gapstartnode, *gapstopnode; + int *nodecounter, *nodecounter2; + bool *activechunk; + bool *activenode; + isax_index *index; + int chunknumber; + float bsf; + ts_type *paa, *paaU, *paaL, *ts, *uo, *lo; + pthread_mutex_t *lockposition; + unsigned long *offsetarray; + int *chunkcounter; + int *chunkfinishcounter; + int **arrangenodearray; + int *arrangenodearraynumber; + isax_node *current_root_node; + pqueue_t *pq; + float minimum_distance; + int limit; + pthread_mutex_t *lock_current_root_node; + pthread_mutex_t *lock_queue; + pthread_barrier_t *lock_barrier, *lock_barrierfirst; + pthread_rwlock_t *lock_bsf; + query_result *bsf_result; + int *node_counter; + localStack *localstk; + localStack *allstk; + pthread_mutex_t *locallock, *alllock; + int *queuelabel, *allqueuelabel; + pqueue_t **allpq; + int startqueuenumber; + int warpWind; + int n_pqueue; + pqueue_bsf *pq_bsf; + float *lbdmap; + short *lbdmapshort; + float shortrate; + bool *labelvalue; + int *offsetvalue; + unsigned long int *gpuoffset; + unsigned long int *seriesnumber; + unsigned long int rawdatanumber; + float *rawfile; + } uni_workerdata; + struct SingConfig { int search_workers = 4; @@ -91,6 +139,53 @@ namespace daisy void calculate_node_cal_topk_inmemory(isax_index *index, isax_node *node, ts_type *query, ts_type *paa, pqueue_bsf *pq_bsf, pthread_rwlock_t *lock_queue, float *rawfile); + void pass_tree_node_m_short(isax_node *node, isax_index *index, pthread_mutex_t *lock_queue, + unsigned long int *currentposition, sax_type *saxarray, sax_type *sortsaxarray, + short *lbdarray); + + float nodedistance(float *paa, sax_type *sax, + sax_type *sax_cardinalities, + sax_type max_bit_cardinality, + int max_cardinality, + int number_of_segments, + int min_val, + int max_val, + float ratio_sqrt); + + float nodedistancedtw(float *paa, float *paaU, float *paaL, sax_type *sax, + sax_type *sax_cardinalities, + sax_type max_bit_cardinality, + int max_cardinality, + int number_of_segments, + int min_val, + int max_val, + float ratio_sqrt); + + void insert_tree_node_m_hybridpqueueBreakpolydtw(float *paaU, float *paaL, isax_node *node, isax_index *index, + float bsf, pqueue_t **pq, pthread_mutex_t *lock_queue, + int *tnumber, int n_pqueue); + + float minidist_paa_to_isax_DTWbp(float *paaU, float *paaL, sax_type *sax, + sax_type *sax_cardinalities, + sax_type max_bit_cardinality, + int max_cardinality, + int number_of_segments, + int min_val, + int max_val, + float ratio_sqrt); + + float calculate_node_distance_SING_short_dtw(isax_index *index, isax_node *node, ts_type *query, float *uo, + float *lo, ts_type *paa, ts_type *paaU, ts_type *paaL, float bsf, + float shortrate, int warpWind, float *rawfile); + + float calculate_node_DTW2_inmemorysing(isax_index *index, isax_node *node, ts_type *query, float *uo, + float *lo, ts_type *paa, ts_type *paaU, ts_type *paaL, float bsf, + int warpWind, float *rawfile, idx_t *best_position); + + float calculate_node_DTW_inmemory(isax_index *index, isax_node *node, ts_type *query, float bsf, int warpWind, float *rawfile); + + query_result approximate_DTW_inmemory_sing(ts_type *ts, ts_type *paa, isax_index *index, int warpWind, float *rawfile); + float minidist_paa_to_isax_Breakpoly(float *paa, sax_type *sax, sax_type *sax_cardinalities, sax_type max_bit_cardinality, int max_cardinality, int number_of_segments, int min_val, int max_val, float ratio_sqrt); diff --git a/lib/algos/singlib.cu b/lib/algos/singlib.cu index 6b47ff1..dfdacfa 100644 --- a/lib/algos/singlib.cu +++ b/lib/algos/singlib.cu @@ -63,6 +63,20 @@ extern "C" float *initialposbitmapfloat(float *posbitmap, unsigned long datasize return posbitmap; } +extern "C" short *initialposbitmapshort(short *posbitmap, unsigned long datasize) +{ + (void)posbitmap; + cudaMallocHost((void **)&posbitmap, sizeof(short) * (size_t)datasize); + return posbitmap; +} + +extern "C" short *initialgposbitmapshort(short *gposbitmap, unsigned long datasize) +{ + (void)gposbitmap; + cudaMalloc((void **)&gposbitmap, sizeof(short) * (size_t)datasize); + return gposbitmap; +} + extern "C" void gpumemcpy(singlib_sax_t *gsaxarray, const singlib_sax_t *saxarray, unsigned long datasize) { cudaMemcpy(gsaxarray, saxarray, sizeof(singlib_sax_t) * (size_t)datasize * PAA_SEGMENTS_SAX, cudaMemcpyHostToDevice); @@ -186,3 +200,142 @@ extern "C" void LBDfloatstreamGPU( cudaDeviceSynchronize(); } + +__global__ void calculate_lbdshortdynamicratedtw( + const sax_type *const saxarray, + const float *const paaU, + const float *const paaL, + const long int M, + const int N, + short *positionarray, + const float BSF, + float segmentsize, + float shortrate) +{ + const int thid = blockDim.x * blockIdx.x + threadIdx.x; + float distance = 0.0f; + int i = 0; + float breakpoint_lower = 0.0f; + float breakpoint_upper = 0.0f; + + for (int j = thid; j < M; j += gridDim.x * blockDim.x) + { + distance = 0.0f; + for (i = 0; i < N; i++) + { + if (segmentsize * distance < BSF) + { + sax_type v = saxarray[j * N + i]; + sax_type region_lower = v; + sax_type region_upper = (sax_type)(((sax_type)~0) | region_lower); + + if (region_lower == 0) + { + breakpoint_lower = -2000000.0f; + float breaku = ((float)region_lower - 127.0f) / 128.0f; + breakpoint_upper = breaku * (1.1362582192f * breaku * breaku + 0.99800f); + if (breakpoint_upper < paaL[i]) + { + float diff = breakpoint_upper - paaL[i]; + distance += diff * diff; + } + } + else if (region_upper == 256 - 1) + { + breakpoint_upper = +2000000.0f; + float breakx = ((float)region_lower - 128.0f) / 128.0f; + breakpoint_lower = breakx * (breakx * breakx * 1.1362582192f + 0.99800f); + if (breakpoint_lower > paaU[i]) + { + float diff = breakpoint_lower - paaU[i]; + distance += diff * diff; + } + } + else + { + float breakx = ((float)region_lower - 128.0f) / 128.0f; + breakpoint_lower = breakx * (breakx * breakx * 1.1362582192f + 0.99800f); + if (breakpoint_lower > paaU[i]) + { + float diff = breakpoint_lower - paaU[i]; + distance += diff * diff; + } + else + { + float breaku = ((float)region_lower - 127.0f) / 128.0f; + breakpoint_upper = breaku * (1.1362582192f * breaku * breaku + 0.99800f); + if (breakpoint_upper < paaL[i]) + { + float diff = breakpoint_upper - paaL[i]; + distance += diff * diff; + } + } + } + } + } + positionarray[j] = (short)floorf(segmentsize * distance * shortrate); + } +} + +extern "C" void LBDshortstreamGPUinsidedynamicratev2dtw( + sax_type *saxarray, + short *posbitmap, + float *qtsU, + float *qtsL, + float *gqtsU, + float *gqtsL, + float BSF, + unsigned long datasize, + short *gposbitmap, + int segmentnumber, + float segmentsize, + unsigned long *gpss, + float shortrate, + int chunknumber, + bool *activechunk) +{ + int num_streams = chunknumber; + cudaMemcpy(gqtsU, qtsU, sizeof(float) * (size_t)segmentnumber, cudaMemcpyHostToDevice); + cudaMemcpy(gqtsL, qtsL, sizeof(float) * (size_t)segmentnumber, cudaMemcpyHostToDevice); + cudaStream_t streams[num_streams]; + for (int i = 0; i < num_streams; i++) + { + cudaStreamCreate(&streams[i]); + } + + for (int i = 0; i < num_streams; i++) + { + if (activechunk[i]) + { + // calculate_lbdshortdynamicrate<<<500,800,20,streams[i]>>> (saxarray+i*datasize*segmentnumber/streamnumber,gqtsU, datasize/streamnumber, segmentnumber, gposbitmap+i*datasize/streamnumber,BSF,segmentsize,shortrate); + + calculate_lbdshortdynamicratedtw<<<500, 800, 20, streams[i]>>>( + saxarray + (size_t)i * datasize * (size_t)segmentnumber / (size_t)streamnumber, + gqtsU, + gqtsL, + datasize / (unsigned long)streamnumber, + segmentnumber, + gposbitmap + (size_t)i * datasize / (size_t)streamnumber, + BSF, + segmentsize, + shortrate); + + cudaMemcpyAsync( + posbitmap + (size_t)i * datasize / (size_t)streamnumber, + gposbitmap + (size_t)i * datasize / (size_t)streamnumber, + sizeof(short) * (size_t)(datasize / (unsigned long)streamnumber), + cudaMemcpyDeviceToHost, + streams[i]); + } + } + + for (int i = 0; i < num_streams; i++) + { + if (activechunk[i]) + { + cudaStreamSynchronize(streams[i]); + } + *gpss = *gpss + datasize / (unsigned long)streamnumber; + } + cudaDeviceSynchronize(); +} diff --git a/lib/algos/singlib.hpp b/lib/algos/singlib.hpp index f8b4c98..1d28cfb 100644 --- a/lib/algos/singlib.hpp +++ b/lib/algos/singlib.hpp @@ -18,12 +18,23 @@ singlib_sax_t *initialgsaxarray(singlib_sax_t *gsaxarray, unsigned long datasize singlib_sax_t *initialsaxarray(singlib_sax_t *saxarray, unsigned long datasize); float *initialgposbitmapfloat(float *gposbitmap, unsigned long datasize); float *initialposbitmapfloat(float *posbitmap, unsigned long datasize); +short *initialposbitmapshort(short *posbitmap, unsigned long datasize); +short *initialgposbitmapshort(short *gposbitmap, unsigned long datasize); void gpumemcpy(singlib_sax_t *gsaxarray, const singlib_sax_t *saxarray, unsigned long datasize); void LBDfloatstreamGPU(singlib_sax_t *saxarray, float *posbitmap, float *qts, float *gqts, float BSF, unsigned long datasize, float *gposbitmap, int segmentnumber, float segmentsize); +void LBDshortstreamGPUinsidedynamicratev2dtw(singlib_sax_t *saxarray, short *posbitmap, + float *qtsU, float *qtsL, + float *gqtsU, float *gqtsL, + float BSF, unsigned long datasize, + short *gposbitmap, int segmentnumber, + float segmentsize, unsigned long *gpss, + float shortrate, int chunknumber, + bool *activechunk); + #ifdef __cplusplus } #endif diff --git a/lib/isax/SAX.cpp b/lib/isax/SAX.cpp index 96c40fa..8eb8f91 100644 --- a/lib/isax/SAX.cpp +++ b/lib/isax/SAX.cpp @@ -336,6 +336,163 @@ namespace daisy } #endif +#if DAISY_SIMD_AVAILABLE + float minidist_paa_to_isax_raw_DTW_SING_SIMD(float *paaU, float *paaL, sax_type *sax, + sax_type *sax_cardinalities, + sax_type max_bit_cardinality, + int max_cardinality, + int number_of_segments, + int min_val, + int max_val, + float ratio_sqrt) + { + int region_upper[16], region_lower[16]; + float distancef[16]; + int offset = 0; + + __m256i vectorsignbit = _mm256_set1_epi32(0xffffffff); + __m256i vloweroffset = _mm256_set1_epi32(offset); + __m256i vupperoffset = _mm256_set1_epi32(offset + 1); + + __m128i sax_cardinalitiesv8 = _mm_lddqu_si128((const __m128i *)sax_cardinalities); + __m256i sax_cardinalitiesv16 = _mm256_cvtepu8_epi16(sax_cardinalitiesv8); + __m128i sax_cardinalitiesv16_0 = _mm256_extractf128_si256(sax_cardinalitiesv16, 0); + __m128i sax_cardinalitiesv16_1 = _mm256_extractf128_si256(sax_cardinalitiesv16, 1); + __m256i c_cv_0 = _mm256_cvtepu16_epi32(sax_cardinalitiesv16_0); + __m256i c_cv_1 = _mm256_cvtepu16_epi32(sax_cardinalitiesv16_1); + + __m128i saxv8 = _mm_lddqu_si128((const __m128i *)sax); + __m256i saxv16 = _mm256_cvtepu8_epi16(saxv8); + __m128i saxv16_0 = _mm256_extractf128_si256(saxv16, 0); + __m128i saxv16_1 = _mm256_extractf128_si256(saxv16, 1); + __m256i v_0 = _mm256_cvtepu16_epi32(saxv16_0); + __m256i v_1 = _mm256_cvtepu16_epi32(saxv16_1); + + __m256i c_m = _mm256_set1_epi32(max_bit_cardinality); + __m256i cm_ccv_0 = _mm256_sub_epi32(c_m, c_cv_0); + __m256i cm_ccv_1 = _mm256_sub_epi32(c_m, c_cv_1); + + __m256i region_lowerv_0 = _mm256_srlv_epi32(v_0, cm_ccv_0); + __m256i region_lowerv_1 = _mm256_srlv_epi32(v_1, cm_ccv_1); + region_lowerv_0 = _mm256_sllv_epi32(region_lowerv_0, cm_ccv_0); + region_lowerv_1 = _mm256_sllv_epi32(region_lowerv_1, cm_ccv_1); + + __m256i v1 = _mm256_andnot_si256(_mm256_setzero_si256(), vectorsignbit); + + __m256i region_upperv_0 = _mm256_sllv_epi32(v1, cm_ccv_0); + __m256i region_upperv_1 = _mm256_sllv_epi32(v1, cm_ccv_1); + region_upperv_0 = _mm256_andnot_si256(region_upperv_0, vectorsignbit); + region_upperv_1 = _mm256_andnot_si256(region_upperv_1, vectorsignbit); + + region_upperv_0 = _mm256_or_si256(region_upperv_0, region_lowerv_0); + region_upperv_1 = _mm256_or_si256(region_upperv_1, region_lowerv_1); + + region_lowerv_0 = _mm256_add_epi32(region_lowerv_0, vloweroffset); + region_lowerv_1 = _mm256_add_epi32(region_lowerv_1, vloweroffset); + region_upperv_0 = _mm256_add_epi32(region_upperv_0, vupperoffset); + region_upperv_1 = _mm256_add_epi32(region_upperv_1, vupperoffset); + + _mm256_storeu_si256((__m256i *)®ion_lower[0], region_lowerv_0); + _mm256_storeu_si256((__m256i *)®ion_lower[8], region_lowerv_1); + _mm256_storeu_si256((__m256i *)®ion_upper[0], region_upperv_0); + _mm256_storeu_si256((__m256i *)®ion_upper[8], region_upperv_1); + + __m256i lower_juge_zerov_0 = _mm256_cmpeq_epi32(region_lowerv_0, _mm256_setzero_si256()); + __m256i lower_juge_zerov_1 = _mm256_cmpeq_epi32(region_lowerv_1, _mm256_setzero_si256()); + + __m256i lower_juge_nzerov_0 = _mm256_andnot_si256(lower_juge_zerov_0, vectorsignbit); + __m256i lower_juge_nzerov_1 = _mm256_andnot_si256(lower_juge_zerov_1, vectorsignbit); + + __m256 minvalv = _mm256_set1_ps((float)min_val); + + __m256 lsax_breakpoints_shiftv_0 = _mm256_i32gather_ps(sax_breakpointsnew3, region_lowerv_0, 4); + __m256 lsax_breakpoints_shiftv_1 = _mm256_i32gather_ps(sax_breakpointsnew3, region_lowerv_1, 4); + + __m256 breakpoint_lowerv_0 = (__m256)_mm256_or_si256( + _mm256_and_si256(lower_juge_zerov_0, (__m256i)minvalv), + _mm256_and_si256(lower_juge_nzerov_0, (__m256i)lsax_breakpoints_shiftv_0)); + __m256 breakpoint_lowerv_1 = (__m256)_mm256_or_si256( + _mm256_and_si256(lower_juge_zerov_1, (__m256i)minvalv), + _mm256_and_si256(lower_juge_nzerov_1, (__m256i)lsax_breakpoints_shiftv_1)); + + __m256 usax_breakpoints_shiftv_0 = _mm256_i32gather_ps(sax_breakpointsnew3, region_upperv_0, 4); + __m256 usax_breakpoints_shiftv_1 = _mm256_i32gather_ps(sax_breakpointsnew3, region_upperv_1, 4); + + __m256i upper_juge_maxv_0 = _mm256_cmpeq_epi32(region_upperv_0, _mm256_set1_epi32(max_cardinality - 1)); + __m256i upper_juge_maxv_1 = _mm256_cmpeq_epi32(region_upperv_1, _mm256_set1_epi32(max_cardinality - 1)); + + __m256i upper_juge_nmaxv_0 = _mm256_andnot_si256(upper_juge_maxv_0, vectorsignbit); + __m256i upper_juge_nmaxv_1 = _mm256_andnot_si256(upper_juge_maxv_1, vectorsignbit); + + __m256 breakpoint_upperv_0 = (__m256)_mm256_or_si256( + _mm256_and_si256(upper_juge_maxv_0, (__m256i)_mm256_set1_ps((float)max_val)), + _mm256_and_si256(upper_juge_nmaxv_0, (__m256i)usax_breakpoints_shiftv_0)); + __m256 breakpoint_upperv_1 = (__m256)_mm256_or_si256( + _mm256_and_si256(upper_juge_maxv_1, (__m256i)_mm256_set1_ps((float)max_val)), + _mm256_and_si256(upper_juge_nmaxv_1, (__m256i)usax_breakpoints_shiftv_1)); + + __m256 paaUv_0 = _mm256_loadu_ps(paaU); + __m256 paaUv_1 = _mm256_loadu_ps(&paaU[8]); + __m256 paaLv_0 = _mm256_loadu_ps(paaL); + __m256 paaLv_1 = _mm256_loadu_ps(&paaL[8]); + + __m256 dis_juge_upv_0 = _mm256_cmp_ps(breakpoint_lowerv_0, paaUv_0, _CMP_GT_OS); + __m256 dis_juge_upv_1 = _mm256_cmp_ps(breakpoint_lowerv_1, paaUv_1, _CMP_GT_OS); + + __m256 dis_juge_lov_0 = _mm256_cmp_ps(breakpoint_upperv_0, paaLv_0, _CMP_LT_OS); + __m256 dis_juge_lov_1 = _mm256_cmp_ps(breakpoint_upperv_1, paaLv_1, _CMP_LT_OS); + + __m256 dis_juge_elv_0 = (__m256)_mm256_andnot_si256( + _mm256_or_si256((__m256i)dis_juge_upv_0, (__m256i)dis_juge_lov_0), vectorsignbit); + __m256 dis_juge_elv_1 = (__m256)_mm256_andnot_si256( + _mm256_or_si256((__m256i)dis_juge_upv_1, (__m256i)dis_juge_lov_1), vectorsignbit); + + __m256 dis_lowv_0 = _mm256_sub_ps(breakpoint_lowerv_0, paaUv_0); + __m256 dis_lowv_1 = _mm256_sub_ps(breakpoint_lowerv_1, paaUv_1); + __m256 dis_uppv_0 = _mm256_sub_ps(breakpoint_upperv_0, paaLv_0); + __m256 dis_uppv_1 = _mm256_sub_ps(breakpoint_upperv_1, paaLv_1); + + __m256 distancev_0 = (__m256)_mm256_or_si256( + _mm256_or_si256(_mm256_and_si256((__m256i)dis_juge_upv_0, (__m256i)dis_lowv_0), + _mm256_and_si256((__m256i)dis_juge_lov_0, (__m256i)dis_uppv_0)), + _mm256_and_si256((__m256i)dis_juge_elv_0, (__m256i)_mm256_set1_ps(0.0f))); + __m256 distancev_1 = (__m256)_mm256_or_si256( + _mm256_or_si256(_mm256_and_si256((__m256i)dis_juge_upv_1, (__m256i)dis_lowv_1), + _mm256_and_si256((__m256i)dis_juge_lov_1, (__m256i)dis_uppv_1)), + _mm256_and_si256((__m256i)dis_juge_elv_1, (__m256i)_mm256_set1_ps(0.0f))); + + __m256 distancesum_0 = _mm256_dp_ps(distancev_0, distancev_0, 0xff); + __m256 distancesum_1 = _mm256_dp_ps(distancev_1, distancev_1, 0xff); + __m256 distancevf = _mm256_add_ps(distancesum_0, distancesum_1); + + _mm256_storeu_ps(distancef, distancevf); + + return (distancef[0] + distancef[4]) * ratio_sqrt; + } +#else + float minidist_paa_to_isax_raw_DTW_SING_SIMD(float *paaU, float *paaL, sax_type *sax, + sax_type *sax_cardinalities, + sax_type max_bit_cardinality, + int max_cardinality, + int number_of_segments, + int min_val, + int max_val, + float ratio_sqrt) + { + (void)paaU; + (void)paaL; + (void)sax; + (void)sax_cardinalities; + (void)max_bit_cardinality; + (void)max_cardinality; + (void)number_of_segments; + (void)min_val; + (void)max_val; + (void)ratio_sqrt; + THROW_SIMD_NOT_AVAILABLE("minidist_paa_to_isax_raw_DTW_SING_SIMD"); + } +#endif + float ts_euclidean_distance(ts_type *t, ts_type *s, int size, float bound) { float distance = 0; diff --git a/lib/isax/SAX.hpp b/lib/isax/SAX.hpp index 60b439b..fa4d6b1 100644 --- a/lib/isax/SAX.hpp +++ b/lib/isax/SAX.hpp @@ -46,6 +46,15 @@ namespace daisy int max_val, float ratio_sqrt); + float minidist_paa_to_isax_raw_DTW_SING_SIMD(float *paaU, float *paaL, sax_type *sax, + sax_type *sax_cardinalities, + sax_type max_bit_cardinality, + int max_cardinality, + int number_of_segments, + int min_val, + int max_val, + float ratio_sqrt); + float minidist_paa_to_isax_raw_DTW_SIMD(float *paaU, float *paaL, sax_type *sax, sax_type *sax_cardinalities, sax_type max_bit_cardinality, diff --git a/lib/isax/iSAXIndex.hpp b/lib/isax/iSAXIndex.hpp index 6119383..87615be 100644 --- a/lib/isax/iSAXIndex.hpp +++ b/lib/isax/iSAXIndex.hpp @@ -109,6 +109,7 @@ namespace daisy int max_tmp_partial_buffer_size; float *lbdarray; + short *lbdarrayshort; unsigned long int arrayposition; } isax_node_buffer; diff --git a/setup.py b/setup.py index 602d505..ee7bd93 100644 --- a/setup.py +++ b/setup.py @@ -41,7 +41,8 @@ def check_mpi_available(): result = subprocess.run(["mpicc", "--version"], capture_output=True, timeout=5) if result.returncode == 0: return True - except (FileNotFoundError, subprocess.TimeoutExpired): + except (FileNotFoundError, subprocess.TimeoutExpired, PermissionError): + # Any problem running mpicc is treated as "MPI not available" pass # Try to import mpi4py @@ -79,22 +80,30 @@ def build_extensions(self): shutil.copy(str(src_file), str(dst_file)) # Define the extension module using pybind11 - # Odyssey requires MPI - skip on macOS/Windows, enable on Linux only if MPI is available - # Note: On Linux, the build expects MPI to be available. Install with: - # Linux: sudo apt-get install libopenmpi-dev openmpi-bin (or conda install openmpi) - # macOS: MPI not supported - will compile without Odyssey - # Windows: MPI not supported - will compile without Odyssey - ODYSSEY_ENABLED = (not IS_MACOS and not IS_WINDOWS) and check_mpi_available() + # Odyssey (distributed search with MPI) is enabled ONLY if explicitly requested + # via the ENABLE_MPI environment variable. This avoids probing MPI on systems + # where the user did not ask for it. + ENABLE_MPI = os.environ.get("ENABLE_MPI", "0").lower() in ("1", "true", "yes") + + if ENABLE_MPI: + if IS_MACOS or IS_WINDOWS: + raise RuntimeError( + "[DaiSy] ENABLE_MPI is set but Odyssey/MPI is not supported on this platform." + ) + if not check_mpi_available(): + raise RuntimeError( + "[DaiSy] ENABLE_MPI is set but MPI development libraries are not available.\n" + "[DaiSy] Please install an MPI implementation (e.g., OpenMPI/MPICH) or unset ENABLE_MPI." + ) + ODYSSEY_ENABLED = True + else: + ODYSSEY_ENABLED = False # Inform user about build configuration if ODYSSEY_ENABLED: - print("[DaiSy] Building with Odyssey (distributed search) support - MPI detected") + print("[DaiSy] Building with Odyssey (distributed search) support - MPI enabled (ENABLE_MPI=1)") else: - print("[DaiSy] Building without Odyssey support - MPI not available on this system") - print("[DaiSy] To enable Odyssey, install MPI development libraries:") - print("[DaiSy] - Ubuntu/Debian: sudo apt-get install libopenmpi-dev openmpi-bin") - print("[DaiSy] - macOS: Not supported (clang/ARM64 incompatible with OpenMP)") - print("[DaiSy] - Conda: conda install openmpi") + print("[DaiSy] Building without Odyssey support (ENABLE_MPI is not set or false)") sources = [ "pybinds/setup.cpp",