Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions roboflow/core/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,3 +812,57 @@ def image(self, image_id: str) -> Dict:
image_details = data["image"]

return image_details

def create_annotation_job(
self, name: str, batch_id: str, num_images: int, labeler_email: str, reviewer_email: str
) -> Dict:
"""
Create a new annotation job in the project.

Args:
name (str): The name of the annotation job
batch_id (str): The ID of the batch that contains the images to annotate
num_images (int): The number of images to include in the job
labeler_email (str): The email of the user who will label the images
reviewer_email (str): The email of the user who will review the annotations

Returns:
Dict: A dictionary containing the created job details

Example:
>>> import roboflow

>>> rf = roboflow.Roboflow(api_key="YOUR_API_KEY")

>>> project = rf.workspace().project("PROJECT_ID")

>>> job = project.create_annotation_job(
... name="Job created by API",
... batch_id="batch123",
... num_images=10,
... labeler_email="user@example.com",
... reviewer_email="reviewer@example.com"
... )
"""
url = f"{API_URL}/{self.__workspace}/{self.__project_name}/jobs?api_key={self.__api_key}"

payload = {
"name": name,
"batch": batch_id,
"num_images": num_images,
"labelerEmail": labeler_email,
"reviewerEmail": reviewer_email,
}

response = requests.post(url, headers={"Content-Type": "application/json"}, json=payload)

if response.status_code != 200:
try:
error_data = response.json()
if "error" in error_data:
raise RuntimeError(error_data["error"])
raise RuntimeError(response.text)
except ValueError:
raise RuntimeError(f"Failed to create annotation job: {response.text}")

return response.json()
79 changes: 79 additions & 0 deletions tests/test_project.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import requests
import responses
from responses.matchers import json_params_matcher

from roboflow import API_URL
from roboflow.adapters.rfapi import AnnotationSaveError, ImageUploadError
Expand Down Expand Up @@ -144,3 +145,81 @@ def test_image_invalid_json_response(self):
self.project.image(image_id)

self.assertIn("Expecting value", str(context.exception))

def test_create_annotation_job_success(self):
job_name = "Test Job"
batch_id = "test-batch-123"
num_images = 10
labeler_email = "labeler@example.com"
reviewer_email = "reviewer@example.com"

expected_response = {
"success": True,
"job": {
"id": "job-123",
"name": job_name,
"batch": batch_id,
"num_images": num_images,
"labeler": labeler_email,
"reviewer": reviewer_email,
"status": "created",
"created": 1616161616,
},
}

expected_url = f"{API_URL}/{WORKSPACE_NAME}/{PROJECT_NAME}/jobs?api_key={ROBOFLOW_API_KEY}"

responses.add(
responses.POST,
expected_url,
json=expected_response,
status=200,
match=[
json_params_matcher(
{
"name": job_name,
"batch": batch_id,
"num_images": num_images,
"labelerEmail": labeler_email,
"reviewerEmail": reviewer_email,
}
)
],
)

result = self.project.create_annotation_job(
name=job_name,
batch_id=batch_id,
num_images=num_images,
labeler_email=labeler_email,
reviewer_email=reviewer_email,
)

self.assertEqual(result, expected_response)
self.assertTrue(result["success"])
self.assertEqual(result["job"]["id"], "job-123")
self.assertEqual(result["job"]["name"], job_name)

def test_create_annotation_job_error(self):
job_name = "Test Job"
batch_id = "invalid-batch"
num_images = 10
labeler_email = "labeler@example.com"
reviewer_email = "reviewer@example.com"

error_response = {"error": "Batch not found"}

expected_url = f"{API_URL}/{WORKSPACE_NAME}/{PROJECT_NAME}/jobs?api_key={ROBOFLOW_API_KEY}"

responses.add(responses.POST, expected_url, json=error_response, status=404)

with self.assertRaises(RuntimeError) as context:
self.project.create_annotation_job(
name=job_name,
batch_id=batch_id,
num_images=num_images,
labeler_email=labeler_email,
reviewer_email=reviewer_email,
)

self.assertEqual(str(context.exception), "Batch not found")