Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 96 additions & 47 deletions src/confluent_kafka/schema_registry/_async/avro.py

Large diffs are not rendered by default.

257 changes: 150 additions & 107 deletions src/confluent_kafka/schema_registry/_async/json_schema.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,15 @@
from threading import Lock
from typing import Dict, List, Literal, Optional, Union

from ..common.schema_registry_client import RegisteredSchema, Schema, ServerConfig
from ..common.schema_registry_client import (
Association,
AssociationCreateOrUpdateRequest,
AssociationInfo,
AssociationResponse,
RegisteredSchema,
Schema,
ServerConfig,
)
from ..error import SchemaRegistryError
from .schema_registry_client import AsyncSchemaRegistryClient

Expand Down Expand Up @@ -142,11 +150,119 @@
self.subject_schemas.clear()


class _AssociationStore(object):

def __init__(self):
self.lock = Lock()
# Key: resource_id -> List[Association]
self.associations_by_resource_id: Dict[str, List[Association]] = defaultdict(list)
# Key: (resource_namespace, resource_name) -> resource_id
self.resource_id_index: Dict[tuple, str] = {}

def create_association(self, request: AssociationCreateOrUpdateRequest) -> AssociationResponse:
with self.lock:
resource_id = request.resource_id
resource_name = request.resource_name
resource_namespace = request.resource_namespace
resource_type = request.resource_type

# Index resource_id by (namespace, name)
if resource_name and resource_namespace and resource_id:
self.resource_id_index[(resource_namespace, resource_name)] = resource_id

created_associations = []
if request.associations and resource_id is not None:
for assoc_info in request.associations:
association = Association(
subject=assoc_info.subject,
guid=None,
resource_name=resource_name,
resource_namespace=resource_namespace,
resource_id=resource_id,
resource_type=resource_type,
association_type=assoc_info.association_type,
frozen=assoc_info.frozen if assoc_info.frozen is not None else False,
)
self.associations_by_resource_id[resource_id].append(association)
created_associations.append(
AssociationInfo(
subject=assoc_info.subject,
association_type=assoc_info.association_type,
lifecycle=assoc_info.lifecycle,
frozen=assoc_info.frozen if assoc_info.frozen is not None else False,
schema=assoc_info.schema,
)
)

return AssociationResponse(
resource_name=resource_name,
resource_namespace=resource_namespace,
resource_id=resource_id,
resource_type=resource_type,
associations=created_associations,
)

def delete_associations(

Check failure on line 205 in src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this function to reduce its Cognitive Complexity from 17 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2194&issues=eefe7ad3-5b7c-4b3d-af7e-cb17084ac43b&open=eefe7ad3-5b7c-4b3d-af7e-cb17084ac43b
self,
resource_id: str,
resource_type: Optional[str] = None,
association_types: Optional[List[str]] = None,
) -> None:
with self.lock:
if resource_id not in self.associations_by_resource_id:
return

if association_types is None and resource_type is None:
# Delete all associations for this resource
del self.associations_by_resource_id[resource_id]
else:
# Filter and keep only non-matching associations
remaining = []
for assoc in self.associations_by_resource_id[resource_id]:
keep = False
if resource_type is not None and assoc.resource_type != resource_type:
keep = True
if association_types is not None and assoc.association_type not in association_types:
keep = True
if keep:
remaining.append(assoc)
self.associations_by_resource_id[resource_id] = remaining

def get_associations_by_resource_name(

Check failure on line 231 in src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this function to reduce its Cognitive Complexity from 18 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2194&issues=a97eb796-5c55-418c-a08c-8a868c9ec39a&open=a97eb796-5c55-418c-a08c-8a868c9ec39a
self,
resource_name: str,
resource_namespace: str,
resource_type: Optional[str] = None,
association_types: Optional[List[str]] = None,
) -> List[Association]:
with self.lock:
result = []
for resource_id, associations in self.associations_by_resource_id.items():
for assoc in associations:
# Check if namespace matches (or is wildcard)
if resource_namespace != "-" and assoc.resource_namespace != resource_namespace:
continue
if assoc.resource_name != resource_name:
continue
if resource_type is not None and assoc.resource_type != resource_type:
continue
if association_types is not None and assoc.association_type not in association_types:
continue
result.append(assoc)
return result

def clear(self):
with self.lock:
self.associations_by_resource_id.clear()
self.resource_id_index.clear()


class AsyncMockSchemaRegistryClient(AsyncSchemaRegistryClient):

def __init__(self, conf: dict):
super().__init__(conf)
self._store = _SchemaStore()
self._association_store = _AssociationStore()

async def register_schema(self, subject_name: str, schema: 'Schema', normalize_schemas: bool = False) -> int:
registered_schema = await self.register_schema_full_response(
Expand Down Expand Up @@ -300,3 +416,46 @@

async def get_config(self, subject_name: Optional[str] = None) -> 'ServerConfig': # noqa F821
return None # type: ignore[return-value]

async def get_associations_by_resource_name(
self,
resource_name: str,
resource_namespace: str,
resource_type: Optional[str] = None,
association_types: Optional[List[str]] = None,
offset: int = 0,
limit: int = -1,
) -> List['Association']:
return self._association_store.get_associations_by_resource_name(
resource_name, resource_namespace, resource_type, association_types
)

async def create_association(self, request: 'AssociationCreateOrUpdateRequest') -> 'AssociationResponse':
"""
Creates an association between a subject and a resource.

Args:
request (AssociationCreateOrUpdateRequest): The association create or update request.

Returns:
AssociationResponse: The response containing the created associations.
"""
return self._association_store.create_association(request)

async def delete_associations(
self,
resource_id: str,
resource_type: Optional[str] = None,
association_types: Optional[List[str]] = None,
cascade_lifecycle: bool = False,
) -> None:
"""
Deletes associations for a resource.

Args:
resource_id (str): The resource identifier.
resource_type (str, optional): The type of resource (e.g., "topic").
association_types (List[str], optional): The types of associations to delete.
cascade_lifecycle (bool): Whether to cascade the lifecycle policy to dependent schemas.
"""
self._association_store.delete_associations(resource_id, resource_type, association_types)
Loading