From 56a4ac3c1593604ba357507dbf5c0a607440e459 Mon Sep 17 00:00:00 2001 From: Lina Date: Wed, 3 Dec 2025 09:21:04 +0100 Subject: [PATCH 1/8] Links in the notification center navigate to a non existing docs --- enums.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/enums.py b/enums.py index 552f9bd..dcc09c8 100644 --- a/enums.py +++ b/enums.py @@ -400,17 +400,6 @@ class Pages(Enum): SETTINGS = "settings" -class DOCS(Enum): - UPLOADING_DATA = "https://docs.kern.ai/refinery/project-creation-and-data-upload" - KNOWLEDGE_BASE = "https://docs.kern.ai/refinery/heuristics#labeling-functions" - WORKFLOW = "https://docs.kern.ai/refinery/manual-labeling#labeling-workflow" - CREATING_PROJECTS = "https://docs.kern.ai/refinery/project-creation-and-data-upload#project-creation-workflow" - WEAK_SUPERVISION = "https://docs.kern.ai/refinery/weak-supervision" - CREATE_EMBEDDINGS = "https://docs.kern.ai/refinery/embedding-integration" - INFORMATION_SOURCES = "https://docs.kern.ai/refinery/heuristics#labeling-functions" - DATA_BROWSER = "https://docs.kern.ai/refinery/data-management" - - class SliceTypes(Enum): STATIC_DEFAULT = "STATIC_DEFAULT" STATIC_OUTLIER = "STATIC_OUTLIER" From 6e8d7f28b5867325e66b0c217bcf18ce1e7f2043 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Wed, 3 Dec 2025 17:41:17 +0100 Subject: [PATCH 2/8] fix: integration progress calculation --- cognition_objects/integration.py | 13 ++++++++++--- integration_objects/manager.py | 16 +++++++++++----- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index 1df57fb..53771ce 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -183,13 +183,20 @@ def get_all_etl_tasks( def get_integration_progress( integration_id: str, ) -> float: - count_all_records = integration_records_bo.count(integration_id) + integration = get_by_id(integration_id) + count_all_records = integration_records_bo.count(integration) all_tasks = get_all_etl_tasks(integration_id) finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES] - if count_all_records == 0: + if ( + count_all_records == 0 + or integration.state == enums.CognitionMarkdownFileState.FAILED.value + ): return 0.0 - return round((len(finished_tasks) / count_all_records) * 100.0, 2) + integration_progress = round((len(finished_tasks) / count_all_records) * 100.0, 2) + if integration.state not in FINISHED_STATES: + integration_progress = min(integration_progress - 1, 0) + return integration_progress def count_org_integrations(org_id: str) -> Dict[str, int]: diff --git a/integration_objects/manager.py b/integration_objects/manager.py index 25c819a..d002ad8 100644 --- a/integration_objects/manager.py +++ b/integration_objects/manager.py @@ -14,6 +14,7 @@ IntegrationPdf, IntegrationGithubIssue, IntegrationGithubFile, + CognitionIntegration, ) @@ -31,12 +32,12 @@ def get( return query.order_by(IntegrationModel.created_at.desc()).all() -def count(integration_id: str) -> Union[List[object], object]: - IntegrationModel = integration_model(integration_id) +def count(integration: CognitionIntegration) -> int: + IntegrationModel = integration_model(integration=integration) return ( session.query(IntegrationModel) .filter( - IntegrationModel.integration_id == integration_id, + IntegrationModel.integration_id == integration.id, ) .count() ) @@ -105,8 +106,13 @@ def get_all_by_integration_id( ) -def integration_model(integration_id: str) -> Type: - integration = integration_db_bo.get_by_id(integration_id) +def integration_model( + integration_id: Optional[str] = None, + integration: Optional[CognitionIntegration] = None, +) -> Type: + if not integration_id and not integration: + raise ValueError("Either integration_id or integration must be provided") + integration = integration or integration_db_bo.get_by_id(integration_id) if integration.type == CognitionIntegrationType.SHAREPOINT.value: return IntegrationSharepoint elif integration.type == CognitionIntegrationType.PDF.value: From 1fcdb079ed3ce9b9ed3c867f23115fe69bc7731b Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 4 Dec 2025 10:32:10 +0100 Subject: [PATCH 3/8] fix: llm_config for extraction configs --- etl_utils.py | 90 +--------------------------------------------------- 1 file changed, 1 insertion(+), 89 deletions(-) diff --git a/etl_utils.py b/etl_utils.py index 49affd3..4dda6d1 100644 --- a/etl_utils.py +++ b/etl_utils.py @@ -56,7 +56,7 @@ def get_full_config_and_tokenizer_from_config_id( "minio_path": file_reference.minio_path, "fallback": None, # later filled by config of project }, - **llm_config, + "llm_config": llm_config, }, ] @@ -154,94 +154,6 @@ def get_full_config_and_tokenizer_from_config_id( return full_config, etl_preset_item.etl_config.get("tokenizer") -def get_full_config_for_markdown_file( - file_reference: FileReference, - markdown_dataset: CognitionMarkdownDataset, - markdown_file: CognitionMarkdownFile, - chunk_size: Optional[int] = 1000, -) -> List[Dict[str, Any]]: - extraction_llm_config, transformation_llm_config = __get_llm_config_from_dataset( - markdown_dataset - ) - extractor = markdown_file.meta_data.get("extractor") - if extractor is None: - print( - f"WARNING: {__name__} - no extractor found in markdown_file meta_data for {file_reference.original_file_name}, will infer default" - ) - - full_config = [ - { - "llm_config": extraction_llm_config, - "task_type": enums.CognitionMarkdownFileState.EXTRACTING.value, - "task_config": { - "use_cache": True, - "extractor": extractor, - "minio_path": file_reference.minio_path, - "fallback": None, # later filled by config of project - }, - }, - { - "llm_config": extraction_llm_config, - "task_type": enums.CognitionMarkdownFileState.SPLITTING.value, - "task_config": { - "use_cache": True, - "strategy": enums.ETLSplitStrategy.CHUNK.value, - "chunk_size": chunk_size, - }, - }, - { - "llm_config": transformation_llm_config, - "task_type": enums.CognitionMarkdownFileState.TRANSFORMING.value, - "task_config": { - "use_cache": True, - "transformers": [ - { # NOTE: __call_gpt_with_key only reads user_prompt - "enabled": False, - "name": enums.ETLTransformer.CLEANSE.value, - "system_prompt": None, - "user_prompt": None, - }, - { - "enabled": True, - "name": enums.ETLTransformer.TEXT_TO_TABLE.value, - "system_prompt": None, - "user_prompt": None, - }, - { - "enabled": False, - "name": enums.ETLTransformer.SUMMARIZE.value, - "system_prompt": None, - "user_prompt": None, - }, - ], - }, - }, - { - "task_type": enums.CognitionMarkdownFileState.LOADING.value, - "task_config": { - "markdown_file": { - "enabled": True, - "id": str(markdown_file.id), - }, - }, - }, - ] - return full_config - - -def __get_llm_config_from_dataset( - markdown_dataset: CognitionMarkdownDataset, -) -> Tuple[Dict[str, Any], str]: - extraction_llm_config = markdown_dataset.llm_config.get("extraction", {}) - transformation_llm_config = markdown_dataset.llm_config.get("transformation", {}) - if not extraction_llm_config or not transformation_llm_config: - raise ValueError( - f"Dataset with id {markdown_dataset.id} has incomplete llm_config" - ) - - return extraction_llm_config, transformation_llm_config - - def get_full_config_for_integration( integration: CognitionIntegration, record: IntegrationSharepoint, From 5f8bc622c18c95815ab909b19126ca8185c01f45 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 5 Dec 2025 12:05:11 +0100 Subject: [PATCH 4/8] perf: always perform md chunking regardless of transform config --- etl_utils.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/etl_utils.py b/etl_utils.py index 49affd3..787bab6 100644 --- a/etl_utils.py +++ b/etl_utils.py @@ -60,7 +60,17 @@ def get_full_config_and_tokenizer_from_config_id( }, ] - if transformation_config := etl_preset_item.etl_config.get("transformation"): + splitting_config = { + "task_type": enums.CognitionMarkdownFileState.SPLITTING.value, + "task_config": { + "use_cache": True, + "strategy": enums.ETLSplitStrategy.CHUNK.value, + "chunk_size": chunk_size, + }, + } + transformation_config = etl_preset_item.etl_config.get("transformation") + + if transformation_config: transformation_type = transformation_config.get("type", "NO_TRANSFORMATION") if transformation_type != "NO_TRANSFORMATION": transformation_llm_config = { @@ -68,20 +78,11 @@ def get_full_config_and_tokenizer_from_config_id( "llmIdentifier": transformation_config.get("llmIdentifier"), } - splitting_config = { - "llm_config": transformation_llm_config, # splitting strategy "CHUNK" needs llm_config to execute `split_large_sections_via_llm` - "task_type": enums.CognitionMarkdownFileState.SPLITTING.value, - "task_config": { - "use_cache": True, - "strategy": enums.ETLSplitStrategy.CHUNK.value, - "chunk_size": chunk_size, - }, - } + # splitting strategy "CHUNK" needs llm_config to execute `split_large_sections_via_llm` + splitting_config.update({"llm_config": transformation_llm_config}) + full_config.append(splitting_config) if transformation_type == "COMMON_ETL": - # add default splitting for common etl - - full_config.append(splitting_config) transformers = [ { # NOTE: __call_gpt_with_key only reads user_prompt "enabled": True, @@ -97,7 +98,6 @@ def get_full_config_and_tokenizer_from_config_id( }, ] elif transformation_type == "SUMMARIZE": - full_config.append(splitting_config) transformers = [ { "enabled": True, @@ -119,6 +119,7 @@ def get_full_config_and_tokenizer_from_config_id( }, } ) + if for_project: full_config.append( { From 1f318392ce78130e86cc4e4c953238256e02ce4f Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 8 Dec 2025 12:49:17 +0100 Subject: [PATCH 5/8] fix: revert splitting on NO_TRANSFORMATION --- etl_utils.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/etl_utils.py b/etl_utils.py index 7a79e81..3770472 100644 --- a/etl_utils.py +++ b/etl_utils.py @@ -60,17 +60,7 @@ def get_full_config_and_tokenizer_from_config_id( }, ] - splitting_config = { - "task_type": enums.CognitionMarkdownFileState.SPLITTING.value, - "task_config": { - "use_cache": True, - "strategy": enums.ETLSplitStrategy.CHUNK.value, - "chunk_size": chunk_size, - }, - } - transformation_config = etl_preset_item.etl_config.get("transformation") - - if transformation_config: + if transformation_config := etl_preset_item.etl_config.get("transformation"): transformation_type = transformation_config.get("type", "NO_TRANSFORMATION") if transformation_type != "NO_TRANSFORMATION": transformation_llm_config = { @@ -79,10 +69,18 @@ def get_full_config_and_tokenizer_from_config_id( } # splitting strategy "CHUNK" needs llm_config to execute `split_large_sections_via_llm` - splitting_config.update({"llm_config": transformation_llm_config}) - full_config.append(splitting_config) + splitting_config = { + "llm_config": transformation_llm_config, + "task_type": enums.CognitionMarkdownFileState.SPLITTING.value, + "task_config": { + "use_cache": True, + "strategy": enums.ETLSplitStrategy.CHUNK.value, + "chunk_size": chunk_size, + }, + } if transformation_type == "COMMON_ETL": + full_config.append(splitting_config) transformers = [ { # NOTE: __call_gpt_with_key only reads user_prompt "enabled": True, @@ -98,6 +96,7 @@ def get_full_config_and_tokenizer_from_config_id( }, ] elif transformation_type == "SUMMARIZE": + full_config.append(splitting_config) transformers = [ { "enabled": True, From 83c94f935037f81376fedac31d2cb8e7fb4d775d Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Tue, 9 Dec 2025 00:36:22 +0100 Subject: [PATCH 6/8] perf: get_or_create etl task --- etl_utils.py | 34 ++++++++++++++++----- global_objects/etl_task.py | 60 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 8 deletions(-) diff --git a/etl_utils.py b/etl_utils.py index 3770472..167400b 100644 --- a/etl_utils.py +++ b/etl_utils.py @@ -24,14 +24,20 @@ def get_full_config_and_tokenizer_from_config_id( etl_config_id: Optional[str] = None, # or in file_reference.meta_data content_type: Optional[str] = None, # or in file_reference.content_type chunk_size: Optional[int] = 1000, + # only set for markdown datasets + markdown_file_id: Optional[str] = None, # or in file_reference.meta_data # only set for chat messages - project_id: Optional[str] = None, - conversation_id: Optional[str] = None, + project_id: Optional[str] = None, # or in file_reference.meta_data + conversation_id: Optional[str] = None, # or in file_reference.meta_data ) -> Tuple[Dict[str, Any], str]: + for_dataset = False for_project = False if project_id and conversation_id: # project related load for_project = True + elif markdown_file_id: + # dataset related load + for_dataset = True etl_preset_item = etl_config_presets_db_co.get( etl_config_id or file_reference.meta_data.get("etl_config_id") @@ -139,14 +145,17 @@ def get_full_config_and_tokenizer_from_config_id( }, }, ) - else: + elif for_dataset: full_config.append( { "task_type": enums.CognitionMarkdownFileState.LOADING.value, "task_config": { "markdown_file": { "enabled": True, - "id": file_reference.meta_data["markdown_file_id"], + "id": ( + markdown_file_id + or file_reference.meta_data["markdown_file_id"] + ), } }, }, @@ -297,6 +306,9 @@ def rm_tree(path: Path): rm_tree(etl_cache_dir) +# TODO: delete_etl_tasks for related file_reference_id + + def get_download_key(org_id: str, download_id: str) -> Path: return Path(org_id) / download_id / "download" @@ -402,10 +414,16 @@ def get_transformation_key( return transformation_key -def get_hashed_string(*args, delimiter: str = "_") -> str: - hash_string = delimiter.join(map(str, args)) - hasher = hashlib.new("sha256") - hasher.update(hash_string.encode()) +def get_hashed_string(*args, delimiter: str = "_", from_bytes: bool = False) -> str: + if not from_bytes: + _hash = delimiter.join(map(str, args)).encode() + else: + try: + _hash = next(map(bytes, args)) + except StopIteration: + raise ValueError("ERROR: A 'bytes' argument is required to hash") + + hasher = hashlib.sha256(_hash) return hasher.hexdigest() diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index fa87262..03e8bb3 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -1,5 +1,7 @@ from typing import Any, List, Optional, Dict, Union +from sqlalchemy.sql.expression import cast from sqlalchemy.orm.attributes import flag_modified +from sqlalchemy.dialects.postgresql import UUID import datetime import mimetypes @@ -117,6 +119,64 @@ def get_supported_file_extensions() -> Dict[str, List[str]]: return file_extensions +def get_or_create( + org_id: str, + user_id: str, + original_file_name: str, + file_size_bytes: int, + tokenizer: str, + full_config: Dict[str, Any], + file_path: Optional[str] = None, + meta_data: Optional[Dict[str, Any]] = None, + priority: Optional[int] = -1, + id: Optional[str] = None, + with_commit: bool = True, +): + if id: + return get_by_id(id) + + file_reference_id = meta_data.get("file_reference_id") if meta_data else None + integration_id = meta_data.get("integration_id") if meta_data else None + query: EtlTask = session.query(EtlTask).filter( + EtlTask.organization_id == org_id, + EtlTask.original_file_name == original_file_name, + EtlTask.file_size_bytes == file_size_bytes, + ) + + if file_path: + query = query.filter(EtlTask.file_path == file_path) + if file_reference_id: + query = query.filter( + file_reference_id + == cast(EtlTask.meta_data.op("->>")("file_reference_id"), UUID) + ) + if integration_id: + query = query.filter( + integration_id == cast(EtlTask.meta_data.op("->>")("integration_id"), UUID) + ) + + # TODO: enhance + if with_commit is False: + return query.first() + + if etl_task := query.first(): + return etl_task + + return create( + org_id=org_id, + user_id=user_id, + original_file_name=original_file_name, + file_size_bytes=file_size_bytes, + tokenizer=tokenizer, + full_config=full_config, + meta_data=meta_data, + priority=priority, + file_path=file_path, + id=id, + with_commit=with_commit, + ) + + def create( org_id: str, user_id: str, From 05b4b1d9b9763003ee1f57f7d1f71201f9c50640 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Tue, 9 Dec 2025 02:21:50 +0100 Subject: [PATCH 7/8] fix: azure di config presets --- etl_utils.py | 7 +++++-- global_objects/etl_task.py | 10 ++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/etl_utils.py b/etl_utils.py index 167400b..39e2901 100644 --- a/etl_utils.py +++ b/etl_utils.py @@ -11,8 +11,6 @@ FileReference, CognitionIntegration, IntegrationSharepoint, - CognitionMarkdownDataset, - CognitionMarkdownFile, ) ETL_DIR = Path(os.getenv("ETL_DIR", "/app/data/etl")) @@ -52,6 +50,11 @@ def get_full_config_and_tokenizer_from_config_id( "llmIdentifier": llm_indicator_extract, "overwriteVisionPrompt": extraction_config.get("overwriteVisionPrompt"), } + elif extraction_config.get("azureDiApiBase"): + llm_config = { + "azureDiApiBase": extraction_config["azureDiApiBase"], + "azureDiEnvVarId": extraction_config["azureDiEnvVarId"], + } full_config = [ { "task_type": enums.CognitionMarkdownFileState.EXTRACTING.value, diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 03e8bb3..6e8323d 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -124,8 +124,8 @@ def get_or_create( user_id: str, original_file_name: str, file_size_bytes: int, - tokenizer: str, - full_config: Dict[str, Any], + tokenizer: Optional[str] = None, + full_config: Optional[Dict[str, Any]] = None, file_path: Optional[str] = None, meta_data: Optional[Dict[str, Any]] = None, priority: Optional[int] = -1, @@ -137,6 +137,7 @@ def get_or_create( file_reference_id = meta_data.get("file_reference_id") if meta_data else None integration_id = meta_data.get("integration_id") if meta_data else None + markdown_file_id = meta_data.get("markdown_file_id") if meta_data else None query: EtlTask = session.query(EtlTask).filter( EtlTask.organization_id == org_id, EtlTask.original_file_name == original_file_name, @@ -150,6 +151,11 @@ def get_or_create( file_reference_id == cast(EtlTask.meta_data.op("->>")("file_reference_id"), UUID) ) + if markdown_file_id: + query = query.filter( + markdown_file_id + == cast(EtlTask.meta_data.op("->>")("markdown_file_id"), UUID) + ) if integration_id: query = query.filter( integration_id == cast(EtlTask.meta_data.op("->>")("integration_id"), UUID) From c907b192bd18679deec9dadfd876d05434ec0118 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Tue, 9 Dec 2025 10:31:22 +0100 Subject: [PATCH 8/8] fix: update supported file extensions --- enums.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enums.py b/enums.py index dcc09c8..7452e66 100644 --- a/enums.py +++ b/enums.py @@ -1080,7 +1080,7 @@ def get_supported_file_extensions(self) -> List[str]: ".webp", ".avif", ] - return ["txt"] + return [".txt"] @staticmethod def from_extension(value: str):