diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index 1df57fb..53771ce 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -183,13 +183,20 @@ def get_all_etl_tasks( def get_integration_progress( integration_id: str, ) -> float: - count_all_records = integration_records_bo.count(integration_id) + integration = get_by_id(integration_id) + count_all_records = integration_records_bo.count(integration) all_tasks = get_all_etl_tasks(integration_id) finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES] - if count_all_records == 0: + if ( + count_all_records == 0 + or integration.state == enums.CognitionMarkdownFileState.FAILED.value + ): return 0.0 - return round((len(finished_tasks) / count_all_records) * 100.0, 2) + integration_progress = round((len(finished_tasks) / count_all_records) * 100.0, 2) + if integration.state not in FINISHED_STATES: + integration_progress = min(integration_progress - 1, 0) + return integration_progress def count_org_integrations(org_id: str) -> Dict[str, int]: diff --git a/enums.py b/enums.py index 552f9bd..7452e66 100644 --- a/enums.py +++ b/enums.py @@ -400,17 +400,6 @@ class Pages(Enum): SETTINGS = "settings" -class DOCS(Enum): - UPLOADING_DATA = "https://docs.kern.ai/refinery/project-creation-and-data-upload" - KNOWLEDGE_BASE = "https://docs.kern.ai/refinery/heuristics#labeling-functions" - WORKFLOW = "https://docs.kern.ai/refinery/manual-labeling#labeling-workflow" - CREATING_PROJECTS = "https://docs.kern.ai/refinery/project-creation-and-data-upload#project-creation-workflow" - WEAK_SUPERVISION = "https://docs.kern.ai/refinery/weak-supervision" - CREATE_EMBEDDINGS = "https://docs.kern.ai/refinery/embedding-integration" - INFORMATION_SOURCES = "https://docs.kern.ai/refinery/heuristics#labeling-functions" - DATA_BROWSER = "https://docs.kern.ai/refinery/data-management" - - class SliceTypes(Enum): STATIC_DEFAULT = "STATIC_DEFAULT" STATIC_OUTLIER = "STATIC_OUTLIER" @@ -1091,7 +1080,7 @@ def get_supported_file_extensions(self) -> List[str]: ".webp", ".avif", ] - return ["txt"] + return [".txt"] @staticmethod def from_extension(value: str): diff --git a/etl_utils.py b/etl_utils.py index 49affd3..39e2901 100644 --- a/etl_utils.py +++ b/etl_utils.py @@ -11,8 +11,6 @@ FileReference, CognitionIntegration, IntegrationSharepoint, - CognitionMarkdownDataset, - CognitionMarkdownFile, ) ETL_DIR = Path(os.getenv("ETL_DIR", "/app/data/etl")) @@ -24,14 +22,20 @@ def get_full_config_and_tokenizer_from_config_id( etl_config_id: Optional[str] = None, # or in file_reference.meta_data content_type: Optional[str] = None, # or in file_reference.content_type chunk_size: Optional[int] = 1000, + # only set for markdown datasets + markdown_file_id: Optional[str] = None, # or in file_reference.meta_data # only set for chat messages - project_id: Optional[str] = None, - conversation_id: Optional[str] = None, + project_id: Optional[str] = None, # or in file_reference.meta_data + conversation_id: Optional[str] = None, # or in file_reference.meta_data ) -> Tuple[Dict[str, Any], str]: + for_dataset = False for_project = False if project_id and conversation_id: # project related load for_project = True + elif markdown_file_id: + # dataset related load + for_dataset = True etl_preset_item = etl_config_presets_db_co.get( etl_config_id or file_reference.meta_data.get("etl_config_id") @@ -46,6 +50,11 @@ def get_full_config_and_tokenizer_from_config_id( "llmIdentifier": llm_indicator_extract, "overwriteVisionPrompt": extraction_config.get("overwriteVisionPrompt"), } + elif extraction_config.get("azureDiApiBase"): + llm_config = { + "azureDiApiBase": extraction_config["azureDiApiBase"], + "azureDiEnvVarId": extraction_config["azureDiEnvVarId"], + } full_config = [ { "task_type": enums.CognitionMarkdownFileState.EXTRACTING.value, @@ -56,7 +65,7 @@ def get_full_config_and_tokenizer_from_config_id( "minio_path": file_reference.minio_path, "fallback": None, # later filled by config of project }, - **llm_config, + "llm_config": llm_config, }, ] @@ -68,8 +77,9 @@ def get_full_config_and_tokenizer_from_config_id( "llmIdentifier": transformation_config.get("llmIdentifier"), } + # splitting strategy "CHUNK" needs llm_config to execute `split_large_sections_via_llm` splitting_config = { - "llm_config": transformation_llm_config, # splitting strategy "CHUNK" needs llm_config to execute `split_large_sections_via_llm` + "llm_config": transformation_llm_config, "task_type": enums.CognitionMarkdownFileState.SPLITTING.value, "task_config": { "use_cache": True, @@ -79,8 +89,6 @@ def get_full_config_and_tokenizer_from_config_id( } if transformation_type == "COMMON_ETL": - # add default splitting for common etl - full_config.append(splitting_config) transformers = [ { # NOTE: __call_gpt_with_key only reads user_prompt @@ -119,6 +127,7 @@ def get_full_config_and_tokenizer_from_config_id( }, } ) + if for_project: full_config.append( { @@ -139,14 +148,17 @@ def get_full_config_and_tokenizer_from_config_id( }, }, ) - else: + elif for_dataset: full_config.append( { "task_type": enums.CognitionMarkdownFileState.LOADING.value, "task_config": { "markdown_file": { "enabled": True, - "id": file_reference.meta_data["markdown_file_id"], + "id": ( + markdown_file_id + or file_reference.meta_data["markdown_file_id"] + ), } }, }, @@ -154,94 +166,6 @@ def get_full_config_and_tokenizer_from_config_id( return full_config, etl_preset_item.etl_config.get("tokenizer") -def get_full_config_for_markdown_file( - file_reference: FileReference, - markdown_dataset: CognitionMarkdownDataset, - markdown_file: CognitionMarkdownFile, - chunk_size: Optional[int] = 1000, -) -> List[Dict[str, Any]]: - extraction_llm_config, transformation_llm_config = __get_llm_config_from_dataset( - markdown_dataset - ) - extractor = markdown_file.meta_data.get("extractor") - if extractor is None: - print( - f"WARNING: {__name__} - no extractor found in markdown_file meta_data for {file_reference.original_file_name}, will infer default" - ) - - full_config = [ - { - "llm_config": extraction_llm_config, - "task_type": enums.CognitionMarkdownFileState.EXTRACTING.value, - "task_config": { - "use_cache": True, - "extractor": extractor, - "minio_path": file_reference.minio_path, - "fallback": None, # later filled by config of project - }, - }, - { - "llm_config": extraction_llm_config, - "task_type": enums.CognitionMarkdownFileState.SPLITTING.value, - "task_config": { - "use_cache": True, - "strategy": enums.ETLSplitStrategy.CHUNK.value, - "chunk_size": chunk_size, - }, - }, - { - "llm_config": transformation_llm_config, - "task_type": enums.CognitionMarkdownFileState.TRANSFORMING.value, - "task_config": { - "use_cache": True, - "transformers": [ - { # NOTE: __call_gpt_with_key only reads user_prompt - "enabled": False, - "name": enums.ETLTransformer.CLEANSE.value, - "system_prompt": None, - "user_prompt": None, - }, - { - "enabled": True, - "name": enums.ETLTransformer.TEXT_TO_TABLE.value, - "system_prompt": None, - "user_prompt": None, - }, - { - "enabled": False, - "name": enums.ETLTransformer.SUMMARIZE.value, - "system_prompt": None, - "user_prompt": None, - }, - ], - }, - }, - { - "task_type": enums.CognitionMarkdownFileState.LOADING.value, - "task_config": { - "markdown_file": { - "enabled": True, - "id": str(markdown_file.id), - }, - }, - }, - ] - return full_config - - -def __get_llm_config_from_dataset( - markdown_dataset: CognitionMarkdownDataset, -) -> Tuple[Dict[str, Any], str]: - extraction_llm_config = markdown_dataset.llm_config.get("extraction", {}) - transformation_llm_config = markdown_dataset.llm_config.get("transformation", {}) - if not extraction_llm_config or not transformation_llm_config: - raise ValueError( - f"Dataset with id {markdown_dataset.id} has incomplete llm_config" - ) - - return extraction_llm_config, transformation_llm_config - - def get_full_config_for_integration( integration: CognitionIntegration, record: IntegrationSharepoint, @@ -385,6 +309,9 @@ def rm_tree(path: Path): rm_tree(etl_cache_dir) +# TODO: delete_etl_tasks for related file_reference_id + + def get_download_key(org_id: str, download_id: str) -> Path: return Path(org_id) / download_id / "download" @@ -490,10 +417,16 @@ def get_transformation_key( return transformation_key -def get_hashed_string(*args, delimiter: str = "_") -> str: - hash_string = delimiter.join(map(str, args)) - hasher = hashlib.new("sha256") - hasher.update(hash_string.encode()) +def get_hashed_string(*args, delimiter: str = "_", from_bytes: bool = False) -> str: + if not from_bytes: + _hash = delimiter.join(map(str, args)).encode() + else: + try: + _hash = next(map(bytes, args)) + except StopIteration: + raise ValueError("ERROR: A 'bytes' argument is required to hash") + + hasher = hashlib.sha256(_hash) return hasher.hexdigest() diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index fa87262..6e8323d 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -1,5 +1,7 @@ from typing import Any, List, Optional, Dict, Union +from sqlalchemy.sql.expression import cast from sqlalchemy.orm.attributes import flag_modified +from sqlalchemy.dialects.postgresql import UUID import datetime import mimetypes @@ -117,6 +119,70 @@ def get_supported_file_extensions() -> Dict[str, List[str]]: return file_extensions +def get_or_create( + org_id: str, + user_id: str, + original_file_name: str, + file_size_bytes: int, + tokenizer: Optional[str] = None, + full_config: Optional[Dict[str, Any]] = None, + file_path: Optional[str] = None, + meta_data: Optional[Dict[str, Any]] = None, + priority: Optional[int] = -1, + id: Optional[str] = None, + with_commit: bool = True, +): + if id: + return get_by_id(id) + + file_reference_id = meta_data.get("file_reference_id") if meta_data else None + integration_id = meta_data.get("integration_id") if meta_data else None + markdown_file_id = meta_data.get("markdown_file_id") if meta_data else None + query: EtlTask = session.query(EtlTask).filter( + EtlTask.organization_id == org_id, + EtlTask.original_file_name == original_file_name, + EtlTask.file_size_bytes == file_size_bytes, + ) + + if file_path: + query = query.filter(EtlTask.file_path == file_path) + if file_reference_id: + query = query.filter( + file_reference_id + == cast(EtlTask.meta_data.op("->>")("file_reference_id"), UUID) + ) + if markdown_file_id: + query = query.filter( + markdown_file_id + == cast(EtlTask.meta_data.op("->>")("markdown_file_id"), UUID) + ) + if integration_id: + query = query.filter( + integration_id == cast(EtlTask.meta_data.op("->>")("integration_id"), UUID) + ) + + # TODO: enhance + if with_commit is False: + return query.first() + + if etl_task := query.first(): + return etl_task + + return create( + org_id=org_id, + user_id=user_id, + original_file_name=original_file_name, + file_size_bytes=file_size_bytes, + tokenizer=tokenizer, + full_config=full_config, + meta_data=meta_data, + priority=priority, + file_path=file_path, + id=id, + with_commit=with_commit, + ) + + def create( org_id: str, user_id: str, diff --git a/integration_objects/manager.py b/integration_objects/manager.py index 25c819a..d002ad8 100644 --- a/integration_objects/manager.py +++ b/integration_objects/manager.py @@ -14,6 +14,7 @@ IntegrationPdf, IntegrationGithubIssue, IntegrationGithubFile, + CognitionIntegration, ) @@ -31,12 +32,12 @@ def get( return query.order_by(IntegrationModel.created_at.desc()).all() -def count(integration_id: str) -> Union[List[object], object]: - IntegrationModel = integration_model(integration_id) +def count(integration: CognitionIntegration) -> int: + IntegrationModel = integration_model(integration=integration) return ( session.query(IntegrationModel) .filter( - IntegrationModel.integration_id == integration_id, + IntegrationModel.integration_id == integration.id, ) .count() ) @@ -105,8 +106,13 @@ def get_all_by_integration_id( ) -def integration_model(integration_id: str) -> Type: - integration = integration_db_bo.get_by_id(integration_id) +def integration_model( + integration_id: Optional[str] = None, + integration: Optional[CognitionIntegration] = None, +) -> Type: + if not integration_id and not integration: + raise ValueError("Either integration_id or integration must be provided") + integration = integration or integration_db_bo.get_by_id(integration_id) if integration.type == CognitionIntegrationType.SHAREPOINT.value: return IntegrationSharepoint elif integration.type == CognitionIntegrationType.PDF.value: