Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions schemaregistry/mock-schemaregistry-client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@

import {
Association,
AssociationCreateOrUpdateRequest,
AssociationInfo,
AssociationResponse,
Client,
Compatibility,
LifecyclePolicy,
minimize,
SchemaInfo,
SchemaMetadata,
Expand Down Expand Up @@ -43,13 +48,21 @@

const noSubject = "";

interface AssociationCacheEntry {
resourceName: string;
resourceNamespace: string;
resourceType: string;
associations: Association[];
}

class MockClient implements Client {
private clientConfig?: ClientConfig;
private infoToSchemaCache: Map<string, MetadataCacheEntry>;
private idToSchemaCache: Map<string, InfoCacheEntry>;
private guidToSchemaCache: Map<string, InfoCacheEntry>;
private schemaToVersionCache: Map<string, VersionCacheEntry>;
private configCache: Map<string, ServerConfig>;
private associationCache: Map<string, AssociationCacheEntry>; // keyed by resourceId

Check warning on line 65 in schemaregistry/mock-schemaregistry-client.ts

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Member 'associationCache' is never reassigned; mark it as `readonly`.

[S2933] Fields that are only assigned in the constructor should be "readonly" See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-javascript&pullRequest=454&issues=c854afd2-9e04-4f02-80c9-baba0d8de422&open=c854afd2-9e04-4f02-80c9-baba0d8de422
private counter: Counter;

constructor(config?: ClientConfig) {
Expand All @@ -59,6 +72,7 @@
this.guidToSchemaCache = new Map();
this.schemaToVersionCache = new Map();
this.configCache = new Map();
this.associationCache = new Map();
this.counter = new Counter();
}

Expand Down Expand Up @@ -138,6 +152,17 @@
}

async getBySubjectAndId(subject: string, id: number, format?: string): Promise<SchemaInfo> {
if (subject == null || subject === '') {
// Search for any entry where the id matches
for (const [key, value] of this.idToSchemaCache.entries()) {
const parsedKey = JSON.parse(key);
if (parsedKey.id === id && !value.softDeleted) {
return value.info;
}
}
throw new RestError("Schema not found", 404, 40400);
}
Comment thread
rayokota marked this conversation as resolved.

const cacheKey = stringify({ subject, id });
const cacheEntry = this.idToSchemaCache.get(cacheKey);

Expand Down Expand Up @@ -478,6 +503,132 @@
return config;
}

async getAssociationsByResourceName(

Check failure on line 506 in schemaregistry/mock-schemaregistry-client.ts

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this function to reduce its Cognitive Complexity from 21 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-javascript&pullRequest=454&issues=1c945c3d-8210-4a66-a659-d7565c203195&open=1c945c3d-8210-4a66-a659-d7565c203195
resourceName: string,
resourceNamespace: string,
resourceType: string | null,
associationTypes: string[],
lifecycle: LifecyclePolicy | null,
offset: number,
limit: number
): Promise<Association[]> {
const results: Association[] = [];

for (const [_, entry] of this.associationCache.entries()) {
if (entry.resourceName === resourceName && entry.resourceNamespace === resourceNamespace) {
if (resourceType != null && entry.resourceType !== resourceType) {
continue;
}
for (const assoc of entry.associations) {
if (associationTypes.length === 0 || associationTypes.includes(assoc.associationType)) {
if (lifecycle == null || assoc.lifecycle === lifecycle) {
results.push(assoc);
}
}
}
}
}

// Apply pagination
const start = offset;
const end = limit >= 1 ? start + limit : results.length;
return results.slice(start, end);
}

async createAssociation(request: AssociationCreateOrUpdateRequest): Promise<AssociationResponse> {
const resourceId = request.resourceId || v4();
const resourceName = request.resourceName || '';
const resourceNamespace = request.resourceNamespace || '-';
const resourceType = request.resourceType || 'topic';

// Get or create association cache entry
let cacheEntry = this.associationCache.get(resourceId);
if (!cacheEntry) {
cacheEntry = {
resourceName,
resourceNamespace,
resourceType,
associations: []
};
this.associationCache.set(resourceId, cacheEntry);
}

const responseAssociations: AssociationInfo[] = [];

// Process each association in the request
for (const assocInfo of request.associations || []) {
const association: Association = {
subject: assocInfo.subject || '',
resourceName,
resourceNamespace,
resourceId,
resourceType,
associationType: assocInfo.associationType || 'value',
lifecycle: assocInfo.lifecycle,
frozen: assocInfo.frozen
};

// Check if association already exists (same subject and associationType)
const existingIndex = cacheEntry.associations.findIndex(
a => a.subject === association.subject && a.associationType === association.associationType
);

if (existingIndex >= 0) {
// Update existing
cacheEntry.associations[existingIndex] = association;
} else {
// Add new
cacheEntry.associations.push(association);
}

responseAssociations.push({
subject: association.subject,
associationType: association.associationType,
lifecycle: association.lifecycle,
frozen: association.frozen
});
}

return {
resourceName,
resourceNamespace,
resourceId,
resourceType,
associations: responseAssociations
};
}

async deleteAssociations(
resourceId: string,
resourceType: string | null,
associationTypes: string[] | null,
cascadeLifecycle: boolean
): Promise<void> {
const cacheEntry = this.associationCache.get(resourceId);
if (!cacheEntry) {
return;
}

if (resourceType != null && cacheEntry.resourceType !== resourceType) {
return;
}

if (associationTypes == null || associationTypes.length === 0) {
// Delete all associations for this resourceId
this.associationCache.delete(resourceId);
} else {
// Filter out specified association types
cacheEntry.associations = cacheEntry.associations.filter(
a => !associationTypes.includes(a.associationType)
);

// If no associations left, remove the entry
if (cacheEntry.associations.length === 0) {
this.associationCache.delete(resourceId);
}
}
}

clearLatestCaches(): void {
return;
}
Expand Down
183 changes: 183 additions & 0 deletions schemaregistry/schemaregistry-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,74 @@ export interface ServerConfig {
overrideRuleSet?: RuleSet;
}

/**
* LifecyclePolicy represents the lifecycle policy for an association
*/
export enum LifecyclePolicy {
STRONG = 'STRONG',
WEAK = 'WEAK'
}

/**
* Association represents an association between a subject and a resource
*/
export interface Association {
subject: string;
guid?: string;
resourceName: string;
resourceNamespace: string;
resourceId?: string;
resourceType: string;
associationType: string;
lifecycle?: LifecyclePolicy;
frozen?: boolean;
}

/**
* AssociationInfo represents association info returned in a response
*/
export interface AssociationInfo {
subject?: string;
associationType?: string;
lifecycle?: LifecyclePolicy;
frozen?: boolean;
schema?: SchemaInfo;
}

/**
* AssociationCreateOrUpdateInfo represents an association to create or update
*/
export interface AssociationCreateOrUpdateInfo {
subject?: string;
associationType?: string;
lifecycle?: LifecyclePolicy;
frozen?: boolean;
schema?: SchemaInfo;
normalize?: boolean;
}

/**
* AssociationCreateOrUpdateRequest represents a request to create or update associations
*/
export interface AssociationCreateOrUpdateRequest {
resourceName?: string;
resourceNamespace?: string;
resourceId?: string;
resourceType?: string;
associations?: AssociationCreateOrUpdateInfo[];
}

/**
* AssociationResponse represents a response from creating/updating associations
*/
export interface AssociationResponse {
resourceName?: string;
resourceNamespace?: string;
resourceId?: string;
resourceType?: string;
associations?: AssociationInfo[];
}

export interface isCompatibleResponse {
is_compatible: boolean;
}
Expand Down Expand Up @@ -171,6 +239,22 @@ export interface Client {
updateConfig(subject: string, update: ServerConfig): Promise<ServerConfig>;
getDefaultConfig(): Promise<ServerConfig>;
updateDefaultConfig(update: ServerConfig): Promise<ServerConfig>;
getAssociationsByResourceName(
resourceName: string,
resourceNamespace: string,
resourceType: string | null,
associationTypes: string[],
lifecycle: LifecyclePolicy | null,
offset: number,
limit: number
): Promise<Association[]>;
createAssociation(request: AssociationCreateOrUpdateRequest): Promise<AssociationResponse>;
Comment thread
rayokota marked this conversation as resolved.
deleteAssociations(
resourceId: string,
resourceType: string | null,
associationTypes: string[] | null,
cascadeLifecycle: boolean
): Promise<void>;
clearLatestCaches(): void;
clearCaches(): void;
close(): void;
Expand Down Expand Up @@ -755,6 +839,105 @@ export class SchemaRegistryClient implements Client {
return response.data;
}

/**
* Get associations by resource name.
* @param resourceName - The resource name to query.
* @param resourceNamespace - The resource namespace.
* @param resourceType - The resource type (optional).
* @param associationTypes - The association types to filter by.
* @param lifecycle - The lifecycle policy to filter by (optional).
* @param offset - The offset for pagination.
* @param limit - The limit for pagination (-1 for no limit).
*/
async getAssociationsByResourceName(
resourceName: string,
resourceNamespace: string,
resourceType: string | null,
associationTypes: string[],
lifecycle: LifecyclePolicy | null,
offset: number,
limit: number
): Promise<Association[]> {
const encodedNamespace = encodeURIComponent(resourceNamespace);
const encodedName = encodeURIComponent(resourceName);

let path = `/associations/resources/${encodedNamespace}/${encodedName}`;
const queryParams: string[] = [];

if (resourceType != null) {
queryParams.push(`resourceType=${encodeURIComponent(resourceType)}`);
}
for (const associationType of associationTypes) {
queryParams.push(`associationType=${encodeURIComponent(associationType)}`);
}
if (lifecycle != null) {
queryParams.push(`lifecycle=${encodeURIComponent(lifecycle)}`);
}
if (offset > 0) {
queryParams.push(`offset=${offset}`);
}
if (limit >= 1) {
Comment thread
rayokota marked this conversation as resolved.
queryParams.push(`limit=${limit}`);
}

if (queryParams.length > 0) {
path += '?' + queryParams.join('&');
}

const response: AxiosResponse<Association[]> = await this.restService.handleRequest(
path,
'GET'
);
return response.data;
}

/**
* Create an association between a subject and a resource.
* @param request - The association create or update request.
*/
async createAssociation(request: AssociationCreateOrUpdateRequest): Promise<AssociationResponse> {
const response: AxiosResponse<AssociationResponse> = await this.restService.handleRequest(
'/associations',
'POST',
request
);
return response.data;
}

/**
* Delete associations for a resource.
* @param resourceId - The resource identifier.
* @param resourceType - The type of resource (e.g., "topic"). Can be null.
* @param associationTypes - The types of associations to delete (e.g., "key", "value"). Can be null to delete all.
* @param cascadeLifecycle - Whether to cascade the lifecycle policy to dependent schemas.
*/
async deleteAssociations(
resourceId: string,
resourceType: string | null,
associationTypes: string[] | null,
cascadeLifecycle: boolean
): Promise<void> {
const queryParams: string[] = [];

if (resourceType != null) {
queryParams.push(`resourceType=${encodeURIComponent(resourceType)}`);
}
if (associationTypes != null) {
for (const associationType of associationTypes) {
queryParams.push(`associationType=${encodeURIComponent(associationType)}`);
}
}
queryParams.push(`cascadeLifecycle=${cascadeLifecycle}`);

const queryString = queryParams.length > 0 ? '?' + queryParams.join('&') : '';
const endpoint = `/associations/resources/${encodeURIComponent(resourceId)}${queryString}`;

await this.restService.handleRequest(
endpoint,
'DELETE'
);
}

/**
* Clear the latest caches.
*/
Expand Down
Loading