diff --git a/roboflow/core/project.py b/roboflow/core/project.py index 374c864f..e9da7f6a 100644 --- a/roboflow/core/project.py +++ b/roboflow/core/project.py @@ -812,3 +812,57 @@ def image(self, image_id: str) -> Dict: image_details = data["image"] return image_details + + def create_annotation_job( + self, name: str, batch_id: str, num_images: int, labeler_email: str, reviewer_email: str + ) -> Dict: + """ + Create a new annotation job in the project. + + Args: + name (str): The name of the annotation job + batch_id (str): The ID of the batch that contains the images to annotate + num_images (int): The number of images to include in the job + labeler_email (str): The email of the user who will label the images + reviewer_email (str): The email of the user who will review the annotations + + Returns: + Dict: A dictionary containing the created job details + + Example: + >>> import roboflow + + >>> rf = roboflow.Roboflow(api_key="YOUR_API_KEY") + + >>> project = rf.workspace().project("PROJECT_ID") + + >>> job = project.create_annotation_job( + ... name="Job created by API", + ... batch_id="batch123", + ... num_images=10, + ... labeler_email="user@example.com", + ... reviewer_email="reviewer@example.com" + ... ) + """ + url = f"{API_URL}/{self.__workspace}/{self.__project_name}/jobs?api_key={self.__api_key}" + + payload = { + "name": name, + "batch": batch_id, + "num_images": num_images, + "labelerEmail": labeler_email, + "reviewerEmail": reviewer_email, + } + + response = requests.post(url, headers={"Content-Type": "application/json"}, json=payload) + + if response.status_code != 200: + try: + error_data = response.json() + if "error" in error_data: + raise RuntimeError(error_data["error"]) + raise RuntimeError(response.text) + except ValueError: + raise RuntimeError(f"Failed to create annotation job: {response.text}") + + return response.json() diff --git a/tests/test_project.py b/tests/test_project.py index a7cd55de..afa69437 100644 --- a/tests/test_project.py +++ b/tests/test_project.py @@ -1,5 +1,6 @@ import requests import responses +from responses.matchers import json_params_matcher from roboflow import API_URL from roboflow.adapters.rfapi import AnnotationSaveError, ImageUploadError @@ -144,3 +145,81 @@ def test_image_invalid_json_response(self): self.project.image(image_id) self.assertIn("Expecting value", str(context.exception)) + + def test_create_annotation_job_success(self): + job_name = "Test Job" + batch_id = "test-batch-123" + num_images = 10 + labeler_email = "labeler@example.com" + reviewer_email = "reviewer@example.com" + + expected_response = { + "success": True, + "job": { + "id": "job-123", + "name": job_name, + "batch": batch_id, + "num_images": num_images, + "labeler": labeler_email, + "reviewer": reviewer_email, + "status": "created", + "created": 1616161616, + }, + } + + expected_url = f"{API_URL}/{WORKSPACE_NAME}/{PROJECT_NAME}/jobs?api_key={ROBOFLOW_API_KEY}" + + responses.add( + responses.POST, + expected_url, + json=expected_response, + status=200, + match=[ + json_params_matcher( + { + "name": job_name, + "batch": batch_id, + "num_images": num_images, + "labelerEmail": labeler_email, + "reviewerEmail": reviewer_email, + } + ) + ], + ) + + result = self.project.create_annotation_job( + name=job_name, + batch_id=batch_id, + num_images=num_images, + labeler_email=labeler_email, + reviewer_email=reviewer_email, + ) + + self.assertEqual(result, expected_response) + self.assertTrue(result["success"]) + self.assertEqual(result["job"]["id"], "job-123") + self.assertEqual(result["job"]["name"], job_name) + + def test_create_annotation_job_error(self): + job_name = "Test Job" + batch_id = "invalid-batch" + num_images = 10 + labeler_email = "labeler@example.com" + reviewer_email = "reviewer@example.com" + + error_response = {"error": "Batch not found"} + + expected_url = f"{API_URL}/{WORKSPACE_NAME}/{PROJECT_NAME}/jobs?api_key={ROBOFLOW_API_KEY}" + + responses.add(responses.POST, expected_url, json=error_response, status=404) + + with self.assertRaises(RuntimeError) as context: + self.project.create_annotation_job( + name=job_name, + batch_id=batch_id, + num_images=num_images, + labeler_email=labeler_email, + reviewer_email=reviewer_email, + ) + + self.assertEqual(str(context.exception), "Batch not found")