Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions behave_framework/src/minifi_behave/containers/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
#

class Directory:
def __init__(self, path, files: dict[str, str] | None = None, mode="rw"):
def __init__(self, path, files: dict[str, str | bytes] | None = None, mode="rw"):
self.path = path
self.files: dict[str, str] = {}
if files is not None:
self.files = files
self.mode = mode

def add_file(self, file_name: str, content: str):
def add_file(self, file_name: str, content: str | bytes):
self.files[file_name] = content
20 changes: 16 additions & 4 deletions behave_framework/src/minifi_behave/steps/core_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import humanfriendly
from behave import when, step, given

from pathlib import Path
from minifi_behave.containers.http_proxy_container import HttpProxy
from minifi_behave.containers.nifi_container import NifiContainer
from minifi_behave.containers.directory import Directory
Expand Down Expand Up @@ -59,14 +60,15 @@ def create_file_with_size_in_directory(context: MinifiTestContext, directory: st
dirs.append(new_dir)


def __add_directory_with_file_to_container(context: MinifiTestContext, directory: str, file_name: str, content: str, container_name: str):
def __add_directory_with_file_to_container(context: MinifiTestContext, directory: str, file_name: str, content: str | bytes, container_name: str):
dirs = context.get_or_create_minifi_container(container_name).dirs
new_content = content.replace("\\n", "\n")
if isinstance(content, str):
content = content.replace("\\n", "\n")
if directory in dirs:
dirs[directory].files[file_name] = new_content
dirs[directory].files[file_name] = content
return
new_dir = Directory(directory)
new_dir.files[file_name] = new_content
new_dir.files[file_name] = content
dirs.append(new_dir)


Expand All @@ -82,6 +84,16 @@ def create_file_with_content_in_directory(context: MinifiTestContext, directory:
context.execute_steps(f'given a directory at "{directory}" has a file with the content "{content}" in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow')


@step('a directory at "{directory}" has a file with the content from "{path}"')
@step("a directory at '{directory}' has a file with the content from '{path}'")
def create_file_with_content_in_directory(context: MinifiTestContext, directory: str, path: str):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should keep function names unique:

Suggested change
def create_file_with_content_in_directory(context: MinifiTestContext, directory: str, path: str):
def create_file_with_content_from_path_in_directory(context: MinifiTestContext, directory: str, path: str):

assert context.resource_dir is not None, "Cannot copy file if resource_dir is not set for the context"
content = None
with open(context.resource_dir / path, "rb") as f:
content = f.read()
__add_directory_with_file_to_container(context, directory, str(uuid.uuid4()), content, DEFAULT_MINIFI_CONTAINER_NAME)


@step('a directory at "{directory}" has a file "{file_name}" with the content "{content}"')
def create_file_with_name_and_content_in_directory(context: MinifiTestContext, directory: str, file_name: str, content: str):
__add_directory_with_file_to_container(context, directory, file_name, content, DEFAULT_MINIFI_CONTAINER_NAME)
Expand Down
23 changes: 15 additions & 8 deletions cmake/LlamaCpp.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ set(BUILD_SHARED_LIBS "OFF" CACHE STRING "" FORCE)
set(LLAMA_BUILD_TESTS "OFF" CACHE STRING "" FORCE)
set(LLAMA_BUILD_EXAMPLES "OFF" CACHE STRING "" FORCE)
set(LLAMA_BUILD_SERVER "OFF" CACHE STRING "" FORCE)
set(LLAMA_BUILD_COMMON "ON" CACHE STRING "" FORCE)
set(GGML_OPENMP "OFF" CACHE STRING "" FORCE)
set(GGML_METAL "OFF" CACHE STRING "" FORCE)
set(GGML_BLAS "OFF" CACHE STRING "" FORCE)
Expand All @@ -30,24 +31,30 @@ else()
set(GGML_NATIVE "ON" CACHE STRING "" FORCE)
endif()

set(PATCH_FILE_1 "${CMAKE_SOURCE_DIR}/thirdparty/llamacpp/lu8_macro_fix.patch") # https://github.com/ggml-org/llama.cpp/issues/12740
set(PATCH_FILE_2 "${CMAKE_SOURCE_DIR}/thirdparty/llamacpp/cpp-23-fixes.patch")
set(PATCH_FILE_1 "${CMAKE_SOURCE_DIR}/thirdparty/llamacpp/mtmd-fix.patch")

set(PC ${Bash_EXECUTABLE} -c "set -x &&\
(\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_1}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_1}\\\") &&\
(\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_2}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_2}\\\")")
(\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_1}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_1}\\\")")


FetchContent_Declare(llamacpp
URL https://github.com/ggml-org/llama.cpp/archive/refs/tags/b5958.tar.gz
URL_HASH SHA256=4e8a2abd83092aa446cd13556f6fe8777139da7b191bdaa0e1b79fe9740b36a6
PATCH_COMMAND "${PC}"
SYSTEM
URL https://github.com/ggml-org/llama.cpp/archive/refs/tags/b8944.tar.gz
URL_HASH SHA256=ca231c8aca086f56bad3ed371f6dc5b01e971e812a8ddf67564f087390c0e781
PATCH_COMMAND "${PC}"
SYSTEM
)

FetchContent_MakeAvailable(llamacpp)

if(MSVC AND TARGET llama)
target_compile_options(llama PRIVATE /Zc:__cplusplus)
endif()

set(LLAMACPP_INCLUDE_DIRS
"${llamacpp_SOURCE_DIR}/include"
"${llamacpp_SOURCE_DIR}/ggml/include"
"${llamacpp_SOURCE_DIR}/tools"
"${llamacpp_SOURCE_DIR}/common"
"${llamacpp_SOURCE_DIR}/vendor"
CACHE STRING "" FORCE
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ namespace org::apache::nifi::minifi::api::core {
struct EnsureMovedFromDeleter {
void operator()(MinifiFlowFile* ff) {
if (ff) {
throw std::logic_error("Each flowfile should be either transferred or removed");
if (std::uncaught_exceptions()) {
// there is already an exception in progress, do not terminate the process (although there are scenarios we could throw here)
} else {
throw std::logic_error("Each flowfile should be either transferred or removed");
}
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion extensions/llamacpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ add_minifi_library(minifi-llamacpp SHARED ${SOURCES})
target_include_directories(minifi-llamacpp PUBLIC "${CMAKE_SOURCE_DIR}/extensions/llamacpp")
target_include_directories(minifi-llamacpp PUBLIC "${LLAMACPP_INCLUDE_DIRS}")

target_link_libraries(minifi-llamacpp minifi-cpp-extension-lib llama)
target_link_libraries(minifi-llamacpp minifi-cpp-extension-lib llama mtmd llama-common)

register_c_api_extension(minifi-llamacpp "LLAMACPP EXTENSION" LLAMACPP-EXTENSION "Provides llama.cpp support" "extensions/llamacpp/tests")

154 changes: 120 additions & 34 deletions extensions/llamacpp/processors/DefaultLlamaContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
*/

#include "DefaultLlamaContext.h"

#include <range/v3/all.hpp>

#include "minifi-cpp/Exception.h"
#include "fmt/format.h"
#include "mtmd/mtmd-helper.h"

namespace org::apache::nifi::minifi::extensions::llamacpp::processors {

Expand All @@ -36,25 +40,26 @@ std::vector<llama_token> tokenizeInput(const llama_vocab* vocab, const std::stri
return tokenized_input;
}

constexpr size_t DEFAULT_BUFFER_SIZE = 4096;

} // namespace


DefaultLlamaContext::DefaultLlamaContext(const std::filesystem::path& model_path, const LlamaSamplerParams& llama_sampler_params, const LlamaContextParams& llama_ctx_params) {
DefaultLlamaContext::DefaultLlamaContext(const std::filesystem::path& model_path, const std::optional<std::filesystem::path>& multimodal_model_path,
const LlamaSamplerParams& llama_sampler_params, const LlamaContextParams& llama_ctx_params, const std::shared_ptr<core::logging::Logger>& logger) {
llama_model_ = llama_model_load_from_file(model_path.string().c_str(), llama_model_default_params()); // NOLINT(cppcoreguidelines-prefer-member-initializer)
if (!llama_model_) {
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, fmt::format("Failed to load model from '{}'", model_path.string()));
}

chat_template_ = common_chat_templates_init(llama_model_, "");

llama_context_params ctx_params = llama_context_default_params();
ctx_params.n_ctx = llama_ctx_params.n_ctx;
ctx_params.n_batch = llama_ctx_params.n_batch;
ctx_params.n_ubatch = llama_ctx_params.n_ubatch;
ctx_params.n_seq_max = llama_ctx_params.n_seq_max;
ctx_params.n_threads = llama_ctx_params.n_threads;
ctx_params.n_threads_batch = llama_ctx_params.n_threads_batch;
ctx_params.flash_attn = false;
ctx_params.flash_attn_type = LLAMA_FLASH_ATTN_TYPE_DISABLED;
llama_ctx_ = llama_init_from_model(llama_model_, ctx_params);

auto sparams = llama_sampler_chain_default_params();
Expand All @@ -73,9 +78,27 @@ DefaultLlamaContext::DefaultLlamaContext(const std::filesystem::path& model_path
llama_sampler_chain_add(llama_sampler_, llama_sampler_init_temp(*llama_sampler_params.temperature));
}
llama_sampler_chain_add(llama_sampler_, llama_sampler_init_dist(LLAMA_DEFAULT_SEED));

if (!multimodal_model_path) {
logger->log_info("No multimodal model path provided");
return;
}

mtmd_context_params mparams = mtmd_context_params_default();
mparams.use_gpu = false;
mparams.flash_attn_type = LLAMA_FLASH_ATTN_TYPE_DISABLED;

multimodal_ctx_ = mtmd_init_from_file(multimodal_model_path->string().c_str(), llama_model_, mparams);
if (!multimodal_ctx_) {
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, fmt::format("Failed to load multimodal model from '{}'", multimodal_model_path->string()));
}

logger->log_info("Successfully loaded multimodal model from '{}'", multimodal_model_path->string());
Comment on lines +82 to +96
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would extract this to a separate function and have something like

if (multimodal_model_path) {
  initializeMultimodalContext();
}

}

DefaultLlamaContext::~DefaultLlamaContext() {
mtmd_free(multimodal_ctx_);
multimodal_ctx_ = nullptr;
llama_sampler_free(llama_sampler_);
llama_sampler_ = nullptr;
llama_free(llama_ctx_);
Expand All @@ -85,47 +108,96 @@ DefaultLlamaContext::~DefaultLlamaContext() {
}

std::optional<std::string> DefaultLlamaContext::applyTemplate(const std::vector<LlamaChatMessage>& messages) {
std::vector<llama_chat_message> llama_messages;
llama_messages.reserve(messages.size());
std::transform(messages.begin(), messages.end(), std::back_inserter(llama_messages),
[](const LlamaChatMessage& msg) { return llama_chat_message{.role = msg.role.c_str(), .content = msg.content.c_str()}; });
std::string text;
text.resize(DEFAULT_BUFFER_SIZE);
const char * chat_template = llama_model_chat_template(llama_model_, nullptr);
int32_t res_size = llama_chat_apply_template(chat_template, llama_messages.data(), llama_messages.size(), true, text.data(), gsl::narrow<int32_t>(text.size()));
if (res_size < 0) {
if (!chat_template_) {
return std::nullopt;
}
if (res_size > gsl::narrow<int32_t>(text.size())) {
text.resize(res_size);
res_size = llama_chat_apply_template(chat_template, llama_messages.data(), llama_messages.size(), true, text.data(), gsl::narrow<int32_t>(text.size()));
if (res_size < 0) {
return std::nullopt;
}
common_chat_templates_inputs inputs;
for (auto& msg : messages) {
common_chat_msg chat_msg;
chat_msg.role = msg.role;
chat_msg.content = msg.content;
inputs.messages.push_back(std::move(chat_msg));
}
text.resize(res_size);
inputs.enable_thinking = false; // TODO(adebreceni): MINIFICPP-2800 common_chat_templates_support_enable_thinking(chat_template_.get());

return text;
return common_chat_templates_apply(chat_template_.get(), inputs).prompt;
}

std::expected<GenerationResult, std::string> DefaultLlamaContext::generate(const std::string& input, std::function<void(std::string_view/*token*/)> token_handler) {
namespace {

struct mtmd_bitmap_deleter {
void operator()(mtmd_bitmap* val) { mtmd_bitmap_free(val); }
};
using unique_bitmap_ptr = std::unique_ptr<mtmd_bitmap, mtmd_bitmap_deleter>;

struct mtmd_input_chunks_deleter {
void operator()(mtmd_input_chunks* val) { mtmd_input_chunks_free(val); }
};
using unique_mtmd_input_chunks_ptr = std::unique_ptr<mtmd_input_chunks, mtmd_input_chunks_deleter>;

} // namespace

std::expected<GenerationResult, std::string> DefaultLlamaContext::generate(const std::string& prompt, const std::vector<std::vector<std::byte>>& files,
std::function<void(std::string_view/*token*/)> token_handler) {
GenerationResult result{};
auto start_time = std::chrono::steady_clock::now();
llama_memory_seq_rm(llama_get_memory(llama_ctx_), 0, -1, -1);
const llama_vocab * vocab = llama_model_get_vocab(llama_model_);
std::vector<llama_token> tokenized_input = tokenizeInput(vocab, input);
result.num_tokens_in = gsl::narrow<uint64_t>(tokenized_input.size());
llama_pos n_past = 0;
std::vector<llama_token> tokenized_input;
llama_batch batch = llama_batch_init(1, 0, 1);
auto batch_deleter = gsl::finally([&] {llama_batch_free(batch);});
batch.n_tokens = 1;
batch.n_seq_id[0] = 1;
batch.seq_id[0][0] = 0;
batch.logits[0] = true;
Comment on lines +149 to +153
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved before the while (decode_status == 0) { line as it is only used in the loop. Also it might be better to use a wrapper object for automatic initialization and destruction.

int32_t decode_status = 0;
if (multimodal_ctx_) {
if (files.empty()) {
return std::unexpected{"Multimodal input requires at least one file"};
}
std::vector<unique_bitmap_ptr> bitmaps;
for (auto& file : files) {
unique_bitmap_ptr bitmap{mtmd_helper_bitmap_init_from_buf(multimodal_ctx_, reinterpret_cast<const unsigned char*>(file.data()), file.size())};
if (!bitmap) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to create multimodal bitmap from buffer");
}
bitmaps.push_back(std::move(bitmap));
}
mtmd_input_text inp_txt = {
.text = prompt.c_str(),
.add_special = true,
.parse_special = true,
};
unique_mtmd_input_chunks_ptr chunks{mtmd_input_chunks_init()};
auto bitmap_c_ptrs = bitmaps | ranges::views::transform([] (auto& ptr) {return static_cast<const mtmd_bitmap*>(ptr.get());}) | ranges::to<std::vector>();
auto tokenized = mtmd_tokenize(multimodal_ctx_, chunks.get(), &inp_txt, bitmap_c_ptrs.data(), bitmap_c_ptrs.size());
if (tokenized != 0) {
throw Exception(PROCESSOR_EXCEPTION, fmt::format("Failed to tokenize multimodal prompt, error: {}", tokenized));
}
auto status = mtmd_helper_eval_chunks(multimodal_ctx_, llama_ctx_, chunks.get(), 0, 0, 1, true, &n_past);
if (status != 0) {
throw Exception(PROCESSOR_EXCEPTION, fmt::format("Failed to eval multimodal chunks, error: {}", status));
}
Comment thread
adamdebreceni marked this conversation as resolved.
Comment on lines +156 to +181
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would extract this to a separate function. Additionally why is llama_decode run in case of the of string tokenization, but not in the multimodal use case?

} else {
if (!files.empty()) {
return std::unexpected{"Model is not configured for multimodal input"};
}
try {
tokenized_input = tokenizeInput(vocab, prompt);
} catch (std::exception& e) {
return std::unexpected{fmt::format("Error during tokenization: {}", e.what())};
} catch (...) {
return std::unexpected{"Unknown error during tokenization"};
}
n_past = gsl::narrow<llama_pos>(tokenized_input.size());
decode_status = llama_decode(llama_ctx_, llama_batch_get_one(tokenized_input.data(), n_past));
}
result.num_tokens_in = gsl::narrow<uint64_t>(n_past);

llama_batch batch = llama_batch_get_one(tokenized_input.data(), gsl::narrow<int32_t>(tokenized_input.size()));
llama_token new_token_id = 0;
bool first_token_generated = false;
while (true) {
int32_t res = llama_decode(llama_ctx_, batch);
if (res == 1) {
return std::unexpected{"Could not find a KV slot for the batch (try reducing the size of the batch or increase the context)"};
} else if (res < 0) {
return std::unexpected{"Error occurred while executing llama decode"};
}

while (decode_status == 0) {
new_token_id = llama_sampler_sample(llama_sampler_, llama_ctx_, -1);
if (!first_token_generated) {
result.time_to_first_token = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
Expand All @@ -147,8 +219,22 @@ std::expected<GenerationResult, std::string> DefaultLlamaContext::generate(const
gsl_Assert(len < 128);

std::string_view token_str{buf.data(), gsl::narrow<std::string_view::size_type>(len)};
batch = llama_batch_get_one(&new_token_id, 1);
batch.token[0] = new_token_id;
batch.pos[0] = n_past;
++n_past;
token_handler(token_str);

decode_status = llama_decode(llama_ctx_, batch);
}

if (decode_status == 1) {
return std::unexpected("Could not find a KV slot for the batch (try reducing the size of the batch or increase the context)");
}
if (decode_status == 2) {
return std::unexpected("Llama decode aborted");
}
if (decode_status < 0) {
return std::unexpected("Error occurred while executing llama decode");
}

result.tokens_per_second =
Expand Down
11 changes: 9 additions & 2 deletions extensions/llamacpp/processors/DefaultLlamaContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,32 @@
#include "LlamaContext.h"
#include "llama.h"
#include "LlamaBackendInitializer.h"
#include "chat.h"
#include "mtmd/mtmd.h"
#include "minifi-cpp/core/logging/Logger.h"

namespace org::apache::nifi::minifi::extensions::llamacpp::processors {

class DefaultLlamaContext : public LlamaContext {
public:
DefaultLlamaContext(const std::filesystem::path& model_path, const LlamaSamplerParams& llama_sampler_params, const LlamaContextParams& llama_ctx_params);
DefaultLlamaContext(const std::filesystem::path& model_path, const std::optional<std::filesystem::path>& multimodal_model_path,
const LlamaSamplerParams& llama_sampler_params, const LlamaContextParams& llama_ctx_params, const std::shared_ptr<core::logging::Logger>& logger);
DefaultLlamaContext(const DefaultLlamaContext&) = delete;
DefaultLlamaContext(DefaultLlamaContext&&) = delete;
DefaultLlamaContext& operator=(const DefaultLlamaContext&) = delete;
DefaultLlamaContext& operator=(DefaultLlamaContext&&) = delete;
~DefaultLlamaContext() override;

std::optional<std::string> applyTemplate(const std::vector<LlamaChatMessage>& messages) override;
std::expected<GenerationResult, std::string> generate(const std::string& input, std::function<void(std::string_view/*token*/)> token_handler) override;
std::expected<GenerationResult, std::string> generate(const std::string& prompt, const std::vector<std::vector<std::byte>>& files,
std::function<void(std::string_view/*token*/)> token_handler) override;

private:
const LlamaBackendInitializer& llama_context_initializer_ = LlamaBackendInitializer::get();
llama_model* llama_model_{};
common_chat_templates_ptr chat_template_;
llama_context* llama_ctx_{};
mtmd_context* multimodal_ctx_{};
llama_sampler* llama_sampler_{};
};

Expand Down
3 changes: 2 additions & 1 deletion extensions/llamacpp/processors/LlamaContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ struct GenerationResult {
class LlamaContext {
public:
virtual std::optional<std::string> applyTemplate(const std::vector<LlamaChatMessage>& messages) = 0;
virtual std::expected<GenerationResult, std::string> generate(const std::string& input, std::function<void(std::string_view/*token*/)> token_handler) = 0;
virtual std::expected<GenerationResult, std::string> generate(const std::string& input, const std::vector<std::vector<std::byte>>& files,
std::function<void(std::string_view/*token*/)> token_handler) = 0;
virtual ~LlamaContext() = default;
};

Expand Down
Loading
Loading