diff --git a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts index 93786aae7f..a987232f48 100644 --- a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts +++ b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts @@ -65,6 +65,11 @@ export class DynamicIntegrationsController { ? SyncDefinitionSchema.parse(rawSyncDef) : undefined; + const rawDeviceSyncDef = body.deviceSyncDefinition; + const validatedDeviceSyncDef = rawDeviceSyncDef + ? SyncDefinitionSchema.parse(rawDeviceSyncDef) + : undefined; + // Upsert the integration const integration = await this.dynamicIntegrationRepo.upsertBySlug({ slug: def.slug, @@ -83,6 +88,11 @@ export class DynamicIntegrationsController { JSON.stringify(validatedSyncDef), ) as Prisma.InputJsonValue) : null, + deviceSyncDefinition: validatedDeviceSyncDef + ? (JSON.parse( + JSON.stringify(validatedDeviceSyncDef), + ) as Prisma.InputJsonValue) + : null, services: (def.services as unknown as Prisma.InputJsonValue) ?? undefined, }); @@ -167,6 +177,12 @@ export class DynamicIntegrationsController { const validatedSyncDefCreate = rawSyncDefCreate ? SyncDefinitionSchema.parse(rawSyncDefCreate) : undefined; + + const rawDeviceSyncDefCreate = body.deviceSyncDefinition; + const validatedDeviceSyncDefCreate = rawDeviceSyncDefCreate + ? SyncDefinitionSchema.parse(rawDeviceSyncDefCreate) + : undefined; + const integration = await this.dynamicIntegrationRepo.create({ slug: def.slug, name: def.name, @@ -184,6 +200,11 @@ export class DynamicIntegrationsController { JSON.stringify(validatedSyncDefCreate), ) as Prisma.InputJsonValue) : undefined, + deviceSyncDefinition: validatedDeviceSyncDefCreate + ? (JSON.parse( + JSON.stringify(validatedDeviceSyncDefCreate), + ) as Prisma.InputJsonValue) + : undefined, }); for (const [index, check] of def.checks.entries()) { diff --git a/apps/api/src/integration-platform/controllers/sync.controller.ts b/apps/api/src/integration-platform/controllers/sync.controller.ts index cd52b77103..13a599d5d5 100644 --- a/apps/api/src/integration-platform/controllers/sync.controller.ts +++ b/apps/api/src/integration-platform/controllers/sync.controller.ts @@ -34,11 +34,13 @@ import { matchesSyncFilterTerms, parseSyncFilterTerms, interpretDeclarativeSync, + interpretDeclarativeDeviceSync, type OAuthConfig, type SyncDefinition, } from '@trycompai/integration-platform'; import { IntegrationSyncLoggerService } from '../services/integration-sync-logger.service'; import { GenericEmployeeSyncService } from '../services/generic-employee-sync.service'; +import { GenericDeviceSyncService } from '../services/generic-device-sync.service'; import { DynamicIntegrationRepository } from '../repositories/dynamic-integration.repository'; import { CheckRunRepository } from '../repositories/check-run.repository'; import { createCheckContext } from '@trycompai/integration-platform'; @@ -105,6 +107,7 @@ export class SyncController { private readonly oauthCredentialsService: OAuthCredentialsService, private readonly syncLoggerService: IntegrationSyncLoggerService, private readonly genericSyncService: GenericEmployeeSyncService, + private readonly genericDeviceSyncService: GenericDeviceSyncService, private readonly dynamicIntegrationRepo: DynamicIntegrationRepository, private readonly checkRunRepo: CheckRunRepository, ) {} @@ -1671,6 +1674,87 @@ export class SyncController { }; } + /** + * Get the current device sync provider for an organization + */ + @Get('device-sync-provider') + @ApiOperation({ summary: 'Get the currently configured device sync provider' }) + @RequirePermission('integration', 'read') + async getDeviceSyncProvider(@OrganizationId() organizationId: string) { + const org = await db.organization.findUnique({ + where: { id: organizationId }, + select: { deviceSyncProvider: true }, + }); + + if (!org) { + throw new HttpException('Organization not found', HttpStatus.NOT_FOUND); + } + + return { provider: org.deviceSyncProvider }; + } + + /** + * Set the device sync provider for an organization + */ + @Post('device-sync-provider') + @ApiOperation({ summary: 'Set the device sync provider' }) + @RequirePermission('integration', 'update') + async setDeviceSyncProvider( + @OrganizationId() organizationId: string, + @Body() body: { provider: string | null }, + ) { + // Only an explicit string (set) or null (clear) are valid. Reject anything + // else — non-string, or blank/whitespace — with a 400 rather than silently + // coercing it to null and clearing the org's configured provider. + const rawProvider = body?.provider; + if ( + rawProvider !== null && + (typeof rawProvider !== 'string' || rawProvider.trim().length === 0) + ) { + throw new HttpException( + 'provider must be a non-empty string or null', + HttpStatus.BAD_REQUEST, + ); + } + const provider = rawProvider === null ? null : rawProvider.trim(); + + if (provider) { + const allManifests = registry.getActiveManifests(); + const validProviders = allManifests + .filter((m) => m.capabilities?.includes('device_sync')) + .map((m) => m.id); + if (!validProviders.includes(provider)) { + throw new HttpException( + `Invalid device sync provider. Must be one of: ${validProviders.join(', ')}`, + HttpStatus.BAD_REQUEST, + ); + } + + const connection = await this.connectionRepository.findBySlugAndOrg( + provider, + organizationId, + ); + + if (!connection || connection.status !== 'active') { + throw new HttpException( + `Provider ${provider} is not connected`, + HttpStatus.BAD_REQUEST, + ); + } + } + + await db.organization.update({ + where: { id: organizationId }, + data: { deviceSyncProvider: provider }, + }); + + this.logger.log( + `Set device sync provider to ${provider || 'none'} for org ${organizationId}`, + ); + + return { success: true, provider }; + } + // ============================================================================ // Dynamic sync endpoints (for dynamic integrations with syncDefinition) // ============================================================================ @@ -1680,12 +1764,16 @@ export class SyncController { * Used by the frontend to render the provider selector dynamically. */ @Get('available-providers') - @ApiOperation({ summary: 'List employee sync providers available to the org' }) + @ApiOperation({ summary: 'List sync providers available to the org' }) @RequirePermission('integration', 'read') - async getAvailableSyncProviders(@OrganizationId() organizationId: string) { + async getAvailableSyncProviders( + @OrganizationId() organizationId: string, + @Query('syncType') syncType?: 'employee' | 'device', + ) { + const capability = syncType === 'device' ? 'device_sync' : 'sync'; const allManifests = registry.getActiveManifests(); const syncProviders = allManifests.filter((m) => - m.capabilities?.includes('sync'), + m.capabilities?.includes(capability), ); const results = await Promise.all( @@ -1955,4 +2043,253 @@ export class SyncController { ); } } + + /** + * Generic device sync endpoint for dynamic integrations. + * Runs the deviceSyncDefinition (DSL/code steps) and processes the resulting devices. + */ + @Post('dynamic/:providerSlug/devices') + @ApiOperation({ summary: 'Sync devices for a dynamic provider' }) + @RequirePermission('integration', 'update') + async syncDynamicProviderDevices( + @OrganizationId() organizationId: string, + @Param('providerSlug') providerSlug: string, + @Query('connectionId') connectionId: string, + ) { + if (!connectionId) { + throw new HttpException( + 'connectionId is required', + HttpStatus.BAD_REQUEST, + ); + } + + this.logger.log( + `[DeviceSync] Starting sync for provider="${providerSlug}" connection="${connectionId}" org="${organizationId}"`, + ); + + // 1. Validate connection + const connection = await this.connectionRepository.findById(connectionId); + if (!connection || connection.organizationId !== organizationId) { + throw new HttpException('Connection not found', HttpStatus.NOT_FOUND); + } + + // Verify the connection actually belongs to the requested provider, so a + // connectionId for one provider can't be driven through another's manifest + // and sync logic. + const expectedProvider = await db.integrationProvider.findUnique({ + where: { slug: providerSlug }, + select: { id: true }, + }); + if (!expectedProvider || connection.providerId !== expectedProvider.id) { + throw new HttpException( + `Connection does not belong to provider "${providerSlug}"`, + HttpStatus.BAD_REQUEST, + ); + } + + // 2. Get manifest from registry — must have 'device_sync' capability + const manifest = getManifest(providerSlug); + if (!manifest) { + throw new HttpException( + `Integration "${providerSlug}" not found`, + HttpStatus.NOT_FOUND, + ); + } + if (!manifest.capabilities?.includes('device_sync')) { + throw new HttpException( + `Integration "${providerSlug}" does not support device sync`, + HttpStatus.BAD_REQUEST, + ); + } + + // 3. Get dynamic integration — must have deviceSyncDefinition + const dynamicIntegration = + await this.dynamicIntegrationRepo.findBySlug(providerSlug); + if (!dynamicIntegration?.deviceSyncDefinition) { + throw new HttpException( + `Integration "${providerSlug}" has no device sync definition`, + HttpStatus.BAD_REQUEST, + ); + } + + // 4. Get & refresh credentials + let credentials = + await this.credentialVaultService.getDecryptedCredentials(connectionId); + if (!credentials) { + throw new HttpException( + 'No valid credentials found. Please reconnect the integration.', + HttpStatus.UNAUTHORIZED, + ); + } + + // Try to refresh OAuth token if applicable + if (manifest.auth.type === 'oauth2' && credentials.refresh_token) { + const oauthConfig = manifest.auth.config; + try { + const oauthCredentials = + await this.oauthCredentialsService.getCredentials( + providerSlug, + organizationId, + ); + if (oauthCredentials) { + const newToken = await this.credentialVaultService.refreshOAuthTokens( + connectionId, + { + tokenUrl: oauthConfig.tokenUrl, + refreshUrl: oauthConfig.refreshUrl, + clientId: oauthCredentials.clientId, + clientSecret: oauthCredentials.clientSecret, + clientAuthMethod: oauthConfig.clientAuthMethod, + }, + ); + if (newToken) { + credentials = + await this.credentialVaultService.getDecryptedCredentials( + connectionId, + ); + } + } + } catch (refreshError) { + this.logger.warn( + `Token refresh failed for ${providerSlug}, trying with existing token: ${refreshError}`, + ); + } + } + + const accessToken = credentials?.access_token; + this.logger.log( + `[DeviceSync] Credentials ready for "${providerSlug}" (auth=${manifest.auth.type}, hasToken=${!!accessToken})`, + ); + + // 5. Create a sync run record + const syncRun = await this.checkRunRepo.create({ + connectionId, + checkId: `device-sync:${providerSlug}`, + checkName: `Device Sync: ${manifest.name}`, + }); + + // 6. Create CheckContext with logging that captures to the run + const { ctx, getResults } = createCheckContext({ + manifest, + accessToken: typeof accessToken === 'string' ? accessToken : undefined, + credentials: (credentials ?? {}) as Record, + variables: ((connection.variables as Record) ?? + {}) as Record, + connectionId, + organizationId, + metadata: (connection.metadata as Record) ?? {}, + logger: { + info: (msg, data) => this.logger.log(msg, data), + warn: (msg, data) => this.logger.warn(msg, data), + error: (msg, data) => this.logger.error(msg, data), + }, + }); + + try { + // 7. Run device sync definition → get validated device list + const syncDefinition = + dynamicIntegration.deviceSyncDefinition as unknown as SyncDefinition; + const syncRunner = interpretDeclarativeDeviceSync({ + definition: syncDefinition, + }); + + const validDevices = await syncRunner.run(ctx); + + this.logger.log( + `[DeviceSync] Device sync definition produced ${validDevices.length} valid devices for "${providerSlug}"`, + ); + + // 8. Process devices via generic service + const result = await this.genericDeviceSyncService.processDevices({ + organizationId, + connectionId, + devices: validDevices, + options: { + providerName: manifest.name, + isDirectorySource: + (syncDefinition as { isDirectorySource?: boolean }) + .isDirectorySource ?? false, + }, + }); + + // 9. Persist execution logs + results to the run record + const executionLogs = getResults().logs.map((log) => ({ + level: log.level, + message: log.message, + ...(log.data ? { data: log.data } : {}), + timestamp: log.timestamp.toISOString(), + })); + + const startTime = syncRun.startedAt?.getTime() || Date.now(); + await this.checkRunRepo.complete(syncRun.id, { + status: result.errors > 0 ? 'failed' : 'success', + durationMs: Date.now() - startTime, + totalChecked: result.totalFound, + passedCount: result.imported + result.updated, + failedCount: result.errors, + logs: + executionLogs.length > 0 + ? (executionLogs as unknown as Prisma.InputJsonValue) + : undefined, + }); + + // Record that a device sync ran so the People → Devices selector can show + // "Last synced" (mirrors the employee sync path). + await this.connectionRepository.update(connectionId, { + lastSyncAt: new Date(), + }); + + this.logger.log( + `[DeviceSync] Sync complete for "${providerSlug}": imported=${result.imported} updated=${result.updated} removed=${result.removed} skipped=${result.skipped} errors=${result.errors}`, + ); + + return { + ...result, + syncRunId: syncRun.id, + }; + } catch (error) { + // Persist error + whatever logs were captured before the failure + const executionLogs = getResults().logs.map((log) => ({ + level: log.level, + message: log.message, + ...(log.data ? { data: log.data } : {}), + timestamp: log.timestamp.toISOString(), + })); + + const errorMessage = + error instanceof Error ? error.message : String(error); + const errorStack = error instanceof Error ? error.stack : undefined; + + const startTime = syncRun.startedAt?.getTime() || Date.now(); + await this.checkRunRepo.complete(syncRun.id, { + status: 'failed', + durationMs: Date.now() - startTime, + totalChecked: 0, + passedCount: 0, + failedCount: 0, + errorMessage, + logs: [ + ...executionLogs, + { + level: 'error', + message: `Device sync execution failed: ${errorMessage}`, + ...(errorStack ? { data: { stack: errorStack } } : {}), + timestamp: new Date().toISOString(), + }, + ] as unknown as Prisma.InputJsonValue, + }); + + this.logger.error( + `[DeviceSync] Sync failed for "${providerSlug}": ${errorMessage}`, + ); + + throw new HttpException( + { + message: `Device sync execution failed: ${errorMessage}`, + syncRunId: syncRun.id, + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } } diff --git a/apps/api/src/integration-platform/integration-platform.module.ts b/apps/api/src/integration-platform/integration-platform.module.ts index 36e52e415d..4ed3260faa 100644 --- a/apps/api/src/integration-platform/integration-platform.module.ts +++ b/apps/api/src/integration-platform/integration-platform.module.ts @@ -31,6 +31,7 @@ import { DynamicIntegrationRepository } from './repositories/dynamic-integration import { DynamicCheckRepository } from './repositories/dynamic-check.repository'; import { IntegrationSyncLoggerService } from './services/integration-sync-logger.service'; import { GenericEmployeeSyncService } from './services/generic-employee-sync.service'; +import { GenericDeviceSyncService } from './services/generic-device-sync.service'; @Module({ imports: [AuthModule, forwardRef(() => CloudSecurityModule)], @@ -59,6 +60,7 @@ import { GenericEmployeeSyncService } from './services/generic-employee-sync.ser TaskIntegrationChecksService, IntegrationSyncLoggerService, GenericEmployeeSyncService, + GenericDeviceSyncService, // Repositories ProviderRepository, ConnectionRepository, diff --git a/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts b/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts index 3f28cc5125..e45d2eb175 100644 --- a/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts +++ b/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts @@ -56,6 +56,7 @@ export class DynamicIntegrationRepository { capabilities?: Prisma.InputJsonValue; supportsMultipleConnections?: boolean; syncDefinition?: Prisma.InputJsonValue; + deviceSyncDefinition?: Prisma.InputJsonValue; services?: Prisma.InputJsonValue; }): Promise { return db.dynamicIntegration.create({ @@ -72,6 +73,7 @@ export class DynamicIntegrationRepository { capabilities: data.capabilities ?? ['checks'], supportsMultipleConnections: data.supportsMultipleConnections ?? false, syncDefinition: data.syncDefinition ?? undefined, + deviceSyncDefinition: data.deviceSyncDefinition ?? undefined, services: data.services ?? undefined, }, }); @@ -104,6 +106,7 @@ export class DynamicIntegrationRepository { capabilities?: Prisma.InputJsonValue; supportsMultipleConnections?: boolean; syncDefinition?: Prisma.InputJsonValue | null; + deviceSyncDefinition?: Prisma.InputJsonValue | null; services?: Prisma.InputJsonValue; }): Promise { return db.dynamicIntegration.upsert({ @@ -121,6 +124,7 @@ export class DynamicIntegrationRepository { capabilities: data.capabilities ?? ['checks'], supportsMultipleConnections: data.supportsMultipleConnections ?? false, syncDefinition: data.syncDefinition ?? undefined, + deviceSyncDefinition: data.deviceSyncDefinition ?? undefined, services: data.services ?? undefined, }, update: { @@ -138,6 +142,10 @@ export class DynamicIntegrationRepository { data.syncDefinition === null ? Prisma.DbNull : (data.syncDefinition ?? undefined), + deviceSyncDefinition: + data.deviceSyncDefinition === null + ? Prisma.DbNull + : (data.deviceSyncDefinition ?? undefined), services: data.services ?? undefined, }, }); diff --git a/apps/api/src/integration-platform/services/generic-device-sync.service.spec.ts b/apps/api/src/integration-platform/services/generic-device-sync.service.spec.ts new file mode 100644 index 0000000000..6aff923564 --- /dev/null +++ b/apps/api/src/integration-platform/services/generic-device-sync.service.spec.ts @@ -0,0 +1,469 @@ +import { Prisma } from '@prisma/client'; +import type { SyncDevice } from '@trycompai/integration-platform'; + +const mockMemberFindFirst = jest.fn(); +const mockDeviceFindFirst = jest.fn(); +const mockDeviceCreate = jest.fn(); +const mockDeviceUpdate = jest.fn(); +const mockDeviceDeleteMany = jest.fn(); +const mockDeviceFindMany = jest.fn(); + +jest.mock('@db', () => ({ + db: { + member: { + findFirst: (...args: unknown[]) => mockMemberFindFirst(...args), + }, + device: { + findFirst: (...args: unknown[]) => mockDeviceFindFirst(...args), + create: (...args: unknown[]) => mockDeviceCreate(...args), + update: (...args: unknown[]) => mockDeviceUpdate(...args), + deleteMany: (...args: unknown[]) => mockDeviceDeleteMany(...args), + findMany: (...args: unknown[]) => mockDeviceFindMany(...args), + }, + }, +})); + +import { GenericDeviceSyncService } from './generic-device-sync.service'; + +describe('GenericDeviceSyncService', () => { + let service: GenericDeviceSyncService; + + const ORG_ID = 'org_1'; + const CONN_ID = 'conn_1'; + + const baseDevice = ( + overrides: Partial = {}, + ): SyncDevice => ({ + name: 'Test MacBook', + platform: 'macos', + serialNumber: 'SN-001', + userEmail: 'alice@example.com', + status: 'active', + ...overrides, + }); + + beforeEach(() => { + jest.clearAllMocks(); + service = new GenericDeviceSyncService(); + + // Defaults: member exists, no existing device, Phase 2 returns empty. + mockMemberFindFirst.mockResolvedValue({ + id: 'mem_1', + organizationId: ORG_ID, + }); + mockDeviceFindFirst.mockResolvedValue(null); + mockDeviceCreate.mockResolvedValue({ id: 'dev_1' }); + mockDeviceUpdate.mockResolvedValue({ id: 'dev_1' }); + mockDeviceFindMany.mockResolvedValue([]); + mockDeviceDeleteMany.mockResolvedValue({ count: 0 }); + }); + + // ======================================================================== + // Phase 1 — Import + // ======================================================================== + + describe('Phase 1 — Import', () => { + it('creates a new device when member exists and device is new', async () => { + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice()], + }); + + expect(mockMemberFindFirst).toHaveBeenCalledWith( + expect.objectContaining({ + where: expect.objectContaining({ + organizationId: ORG_ID, + deactivated: false, + }), + select: { id: true }, + }), + ); + + expect(mockDeviceCreate).toHaveBeenCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ + name: 'Test MacBook', + platform: 'macos', + serialNumber: 'SN-001', + memberId: 'mem_1', + organizationId: ORG_ID, + source: 'integration', + integrationConnectionId: CONN_ID, + }), + }), + ); + + expect(result.imported).toBe(1); + expect(result.totalFound).toBe(1); + expect(result.details).toContainEqual( + expect.objectContaining({ + status: 'imported', + }), + ); + }); + + it('updates an existing integration device matched by serial number', async () => { + mockDeviceFindFirst.mockResolvedValue({ + id: 'dev_existing', + source: 'integration', + serialNumber: 'SN-001', + organizationId: ORG_ID, + }); + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [ + baseDevice({ + osVersion: '15.0', + hardwareModel: 'MacBookPro18,1', + }), + ], + }); + + expect(mockDeviceUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + where: { id: 'dev_existing' }, + data: expect.objectContaining({ + name: 'Test MacBook', + osVersion: '15.0', + hardwareModel: 'MacBookPro18,1', + source: 'integration', + integrationConnectionId: CONN_ID, + }), + }), + ); + + expect(mockDeviceCreate).not.toHaveBeenCalled(); + expect(result.updated).toBe(1); + expect(result.imported).toBe(0); + }); + + it('updates memberId when device ownership changes', async () => { + mockDeviceFindFirst.mockResolvedValue({ + id: 'dev_existing', + source: 'integration', + serialNumber: 'SN-001', + organizationId: ORG_ID, + }); + mockMemberFindFirst.mockResolvedValue({ id: 'mem_new_owner' }); + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice()], + }); + + expect(mockDeviceUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + where: { id: 'dev_existing' }, + data: expect.objectContaining({ + memberId: 'mem_new_owner', + }), + }), + ); + expect(result.updated).toBe(1); + }); + + it('refreshes memberId on the P2002 unique-constraint fallback update', async () => { + // existingDevice lookup → null (forces a create), then the create hits a + // unique constraint and the conflicting-row lookup returns the existing device. + mockDeviceFindFirst + .mockResolvedValueOnce(null) + .mockResolvedValueOnce({ + id: 'dev_conflict', + source: 'integration', + serialNumber: 'SN-001', + organizationId: ORG_ID, + }); + mockMemberFindFirst.mockResolvedValue({ id: 'mem_new_owner' }); + mockDeviceCreate.mockRejectedValue( + new Prisma.PrismaClientKnownRequestError('Unique constraint failed', { + code: 'P2002', + clientVersion: 'test', + }), + ); + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice()], + }); + + // The fallback update must apply the current owner, not just static fields. + expect(mockDeviceUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + where: { id: 'dev_conflict' }, + data: expect.objectContaining({ memberId: 'mem_new_owner' }), + }), + ); + expect(result.updated).toBe(1); + }); + + it('falls back via externalDeviceId when create hits the externalId unique constraint', async () => { + // externalId-only device (no serial): a concurrent create hits P2002 on + // (integrationConnectionId, externalDeviceId); the fallback must re-find by + // that key (not by an undefined serialNumber) and update. + mockDeviceFindFirst + .mockResolvedValueOnce(null) // Phase 1 externalId lookup → none yet + .mockResolvedValueOnce({ id: 'dev_ext', source: 'integration' }); // fallback by externalId + mockMemberFindFirst.mockResolvedValue({ id: 'mem_1' }); + mockDeviceCreate.mockRejectedValue( + new Prisma.PrismaClientKnownRequestError('Unique constraint failed', { + code: 'P2002', + clientVersion: 'test', + }), + ); + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice({ serialNumber: undefined, externalId: 'ext-123' })], + }); + + expect(mockDeviceUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + where: { id: 'dev_ext' }, + data: expect.objectContaining({ + memberId: 'mem_1', + externalDeviceId: 'ext-123', + }), + }), + ); + expect(result.updated).toBe(1); + expect(result.errors).toBe(0); + }); + + it('backfills serialNumber when updating a device matched by externalId', async () => { + // No serial match, but an externalId match exists; the update should + // backfill the now-reported serial so the row becomes serial-linkable. + mockDeviceFindFirst + .mockResolvedValueOnce(null) // serial lookup → no match + .mockResolvedValueOnce({ id: 'dev_ext', source: 'integration' }); // externalId match + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice({ serialNumber: 'SN-NEW', externalId: 'ext-1' })], + }); + + expect(mockDeviceUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + where: { id: 'dev_ext' }, + data: expect.objectContaining({ + serialNumber: 'SN-NEW', + externalDeviceId: 'ext-1', + }), + }), + ); + expect(result.updated).toBe(1); + }); + + it('skips a device whose serial is already managed by the agent (no hijack)', async () => { + mockDeviceFindFirst.mockResolvedValue({ + id: 'dev_agent', + source: 'agent', + serialNumber: 'SN-001', + organizationId: ORG_ID, + }); + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice()], + }); + + expect(mockDeviceUpdate).not.toHaveBeenCalled(); + expect(mockDeviceCreate).not.toHaveBeenCalled(); + expect(result.skipped).toBe(1); + expect(result.details).toContainEqual( + expect.objectContaining({ + status: 'skipped', + reason: expect.stringContaining('agent'), + }), + ); + }); + + it('skips a device that has neither serialNumber nor externalId', async () => { + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [ + baseDevice({ serialNumber: undefined, externalId: undefined }), + ], + }); + + expect(mockMemberFindFirst).not.toHaveBeenCalled(); + expect(mockDeviceCreate).not.toHaveBeenCalled(); + expect(result.skipped).toBe(1); + expect(result.details).toContainEqual( + expect.objectContaining({ + status: 'skipped', + reason: expect.stringContaining('identifier'), + }), + ); + }); + + it('skips devices when no matching member exists', async () => { + mockMemberFindFirst.mockResolvedValue(null); + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice({ userEmail: 'unknown@example.com' })], + }); + + expect(mockDeviceCreate).not.toHaveBeenCalled(); + expect(mockDeviceUpdate).not.toHaveBeenCalled(); + expect(result.skipped).toBe(1); + expect(result.details).toContainEqual( + expect.objectContaining({ + status: 'skipped', + reason: expect.stringContaining('member'), + }), + ); + }); + + it('only processes devices with status active', async () => { + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [ + baseDevice({ status: 'active', serialNumber: 'SN-ACTIVE' }), + baseDevice({ status: 'inactive', serialNumber: 'SN-INACTIVE' }), + ], + }); + + // Only the active device should trigger a member lookup + create + expect(mockMemberFindFirst).toHaveBeenCalledTimes(1); + expect(result.imported).toBe(1); + expect(result.totalFound).toBe(2); + }); + }); + + // ======================================================================== + // Phase 2 — Remove disappeared (only when isDirectorySource = true) + // ======================================================================== + + describe('Phase 2 — Remove disappeared', () => { + it('is skipped entirely when isDirectorySource is not set (default)', async () => { + mockDeviceFindMany.mockResolvedValue([ + { + id: 'dev_old', + serialNumber: 'SN-OLD', + externalDeviceId: null, + integrationConnectionId: CONN_ID, + }, + ]); + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice({ serialNumber: 'SN-001' })], + }); + + // No pruning by default — a non-authoritative provider must never delete. + expect(mockDeviceFindMany).not.toHaveBeenCalled(); + expect(mockDeviceDeleteMany).not.toHaveBeenCalled(); + expect(result.removed).toBe(0); + }); + + it('deletes stale devices when isDirectorySource = true', async () => { + mockDeviceFindMany.mockResolvedValue([ + { + id: 'dev_old', + serialNumber: 'SN-OLD', + externalDeviceId: null, + integrationConnectionId: CONN_ID, + }, + { + id: 'dev_current', + serialNumber: 'SN-001', + externalDeviceId: null, + integrationConnectionId: CONN_ID, + }, + ]); + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice({ serialNumber: 'SN-001' })], + options: { isDirectorySource: true }, + }); + + expect(mockDeviceDeleteMany).toHaveBeenCalledWith({ + where: { id: { in: ['dev_old'] } }, + }); + expect(result.removed).toBe(1); + }); + + it('should NOT delete existing devices when all sync devices were skipped (member not found)', async () => { + mockMemberFindFirst.mockResolvedValue(null); + mockDeviceFindMany.mockResolvedValue([ + { + id: 'dev_existing', + serialNumber: 'SN-001', + externalDeviceId: null, + integrationConnectionId: CONN_ID, + }, + ]); + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice({ serialNumber: 'SN-001' })], + options: { isDirectorySource: true }, + }); + + // Device was skipped because member doesn't exist, but its identifier + // is still tracked so Phase 2 won't remove it from the DB. + expect(result.skipped).toBe(1); + expect(result.removed).toBe(0); + expect(mockDeviceDeleteMany).not.toHaveBeenCalled(); + }); + + it('should skip Phase 2 when sync payload contains only inactive devices', async () => { + mockDeviceFindMany.mockResolvedValue([ + { + id: 'dev_existing', + serialNumber: 'EXISTING', + externalDeviceId: null, + integrationConnectionId: CONN_ID, + }, + ]); + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice({ status: 'inactive', serialNumber: 'SN-INACTIVE' })], + options: { isDirectorySource: true }, + }); + + // No active devices means no identifiers tracked → Phase 2 guard skips removal + expect(result.removed).toBe(0); + expect(mockDeviceDeleteMany).not.toHaveBeenCalled(); + }); + + it('does NOT delete devices that are still in the sync result', async () => { + mockDeviceFindMany.mockResolvedValue([ + { + id: 'dev_current', + serialNumber: 'SN-001', + externalDeviceId: null, + integrationConnectionId: CONN_ID, + }, + ]); + + const result = await service.processDevices({ + organizationId: ORG_ID, + connectionId: CONN_ID, + devices: [baseDevice({ serialNumber: 'SN-001' })], + options: { isDirectorySource: true }, + }); + + expect(mockDeviceDeleteMany).not.toHaveBeenCalled(); + expect(result.removed).toBe(0); + }); + }); +}); diff --git a/apps/api/src/integration-platform/services/generic-device-sync.service.ts b/apps/api/src/integration-platform/services/generic-device-sync.service.ts new file mode 100644 index 0000000000..370fc4f71e --- /dev/null +++ b/apps/api/src/integration-platform/services/generic-device-sync.service.ts @@ -0,0 +1,359 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Prisma } from '@prisma/client'; +import { db } from '@db'; +import type { SyncDevice } from '@trycompai/integration-platform'; + +// ============================================================================ +// Types +// ============================================================================ + +export interface DeviceSyncResultDetail { + identifier: string; + status: 'imported' | 'updated' | 'skipped' | 'removed' | 'error'; + reason?: string; +} + +export interface DeviceSyncResult { + success: boolean; + totalFound: number; + imported: number; + updated: number; + skipped: number; + removed: number; + errors: number; + details: DeviceSyncResultDetail[]; +} + +interface SyncedIdentifier { + serialNumber?: string; + externalId?: string; +} + +// ============================================================================ +// Service +// ============================================================================ + +/** + * Generic device sync service that handles platform-generic operations: + * - Creating or updating Device records from a standardized device list + * - Removing devices no longer present in the provider + * + * Mirrors GenericEmployeeSyncService but for the device import flow. + */ +@Injectable() +export class GenericDeviceSyncService { + private readonly logger = new Logger(GenericDeviceSyncService.name); + + async processDevices({ + organizationId, + connectionId, + devices, + options = {}, + }: { + organizationId: string; + connectionId: string; + devices: SyncDevice[]; + options?: { providerName?: string; isDirectorySource?: boolean }; + }): Promise { + const providerName = options.providerName ?? 'provider'; + const isDirectorySource = options.isDirectorySource ?? false; + + const result: DeviceSyncResult = { + success: true, + totalFound: devices.length, + imported: 0, + updated: 0, + skipped: 0, + removed: 0, + errors: 0, + details: [], + }; + + this.logger.log( + `[DeviceSync] Processing ${devices.length} devices for org="${organizationId}" provider="${providerName}"`, + ); + + const activeDevices = devices.filter((d) => d.status === 'active'); + const syncedIdentifiers: SyncedIdentifier[] = []; + + // ================================================================== + // Phase 1: Import active devices + // ================================================================== + for (const device of activeDevices) { + const identifier = + device.serialNumber ?? device.externalId ?? device.name; + + // A device with neither serialNumber nor externalId can never be matched + // on a later sync, so importing it would create an untrackable orphan. + if (!device.serialNumber && !device.externalId) { + result.skipped++; + result.details.push({ + identifier, + status: 'skipped', + reason: 'No stable identifier (serialNumber or externalId required)', + }); + continue; + } + + try { + const normalizedEmail = device.userEmail.toLowerCase(); + + // Track ALL sync identifiers for Phase 2 (even if member doesn't exist yet) + syncedIdentifiers.push({ + serialNumber: device.serialNumber, + externalId: device.externalId, + }); + + // Find member by email in this org + const member = await db.member.findFirst({ + where: { + organizationId, + deactivated: false, + user: { email: normalizedEmail }, + }, + select: { id: true }, + }); + + if (!member) { + result.skipped++; + result.details.push({ + identifier, + status: 'skipped', + reason: `No matching member for email ${normalizedEmail}`, + }); + continue; + } + + // Find existing device — serialNumber match takes priority + let existingDevice: { id: string; source: string } | null = null; + if (device.serialNumber) { + existingDevice = await db.device.findFirst({ + where: { + serialNumber: device.serialNumber, + organizationId, + }, + select: { id: true, source: true }, + }); + } + + // Never overwrite a device owned by the agent or Fleet that happens to + // share a hardware serial — doing so would hijack (and later expose to + // deletion) a device managed by a richer source. Leave it untouched. + if (existingDevice && existingDevice.source !== 'integration') { + result.skipped++; + result.details.push({ + identifier, + status: 'skipped', + reason: `Serial already managed by "${existingDevice.source}" — left untouched`, + }); + continue; + } + + if (!existingDevice && device.externalId) { + existingDevice = await db.device.findFirst({ + where: { + externalDeviceId: device.externalId, + integrationConnectionId: connectionId, + source: 'integration', + }, + select: { id: true, source: true }, + }); + } + + const updateData = { + name: device.name, + hostname: device.hostname ?? device.name, + platform: device.platform, + osVersion: device.osVersion ?? 'Unknown', + hardwareModel: device.hardwareModel, + lastCheckIn: new Date(), + source: 'integration' as const, + integrationConnectionId: connectionId, + externalDeviceId: device.externalId, + // Backfill the serial on updates too, so an externalId-matched row + // becomes serial-linkable once the provider reports one. When the + // device has no serial, Prisma omits `undefined` and leaves it as-is. + serialNumber: device.serialNumber, + }; + + if (existingDevice) { + await db.device.update({ + where: { id: existingDevice.id }, + data: { ...updateData, memberId: member.id }, + }); + result.updated++; + result.details.push({ identifier, status: 'updated' }); + } else { + try { + await db.device.create({ + data: { + ...updateData, + memberId: member.id, + organizationId, + }, + }); + result.imported++; + result.details.push({ identifier, status: 'imported' }); + } catch (createError) { + if ( + createError instanceof Prisma.PrismaClientKnownRequestError && + createError.code === 'P2002' + ) { + this.logger.warn( + `[DeviceSync] Unique constraint hit for ${identifier} — falling back to update`, + ); + // The conflict can be on (serialNumber, organizationId) OR on + // (integrationConnectionId, externalDeviceId). Re-find by whichever + // identifier is present — never query with an undefined value + // (that would drop the filter and match an arbitrary row). + let conflicting: { id: string; source: string } | null = null; + if (device.serialNumber) { + conflicting = await db.device.findFirst({ + where: { + serialNumber: device.serialNumber, + organizationId, + }, + select: { id: true, source: true }, + }); + } + if (!conflicting && device.externalId) { + conflicting = await db.device.findFirst({ + where: { + externalDeviceId: device.externalId, + integrationConnectionId: connectionId, + }, + select: { id: true, source: true }, + }); + } + + if (conflicting && conflicting.source !== 'integration') { + result.skipped++; + result.details.push({ + identifier, + status: 'skipped', + reason: `Serial already managed by "${conflicting.source}" — left untouched`, + }); + } else if (conflicting) { + await db.device.update({ + where: { id: conflicting.id }, + data: { ...updateData, memberId: member.id }, + }); + result.updated++; + result.details.push({ identifier, status: 'updated' }); + } else { + result.errors++; + result.details.push({ + identifier, + status: 'error', + reason: 'Unique conflict but conflicting device not found', + }); + } + } else { + throw createError; + } + } + } + } catch (error) { + this.logger.error( + `Error processing device ${identifier}: ${error}`, + ); + result.errors++; + result.details.push({ + identifier, + status: 'error', + reason: error instanceof Error ? error.message : 'Unknown error', + }); + } + } + + this.logger.log( + `[DeviceSync] Phase 1 complete: imported=${result.imported} updated=${result.updated} skipped=${result.skipped} errors=${result.errors}`, + ); + + // ================================================================== + // Phase 2: Remove disappeared devices + // ================================================================== + + // Phase 2 removal only runs when the provider is the authoritative device + // source (isDirectorySource). Like the employee sync, a non-authoritative + // or partial provider response must NEVER delete devices it simply didn't + // return this run. + // WARNING: the removal below is a HARD delete that cascades to the devices' + // Finding rows. It must be converted to a recoverable soft-removal before + // any provider sets isDirectorySource=true. + if (!isDirectorySource) { + this.logger.log( + `[DeviceSync] Phase 2 skipped for "${providerName}": isDirectorySource=false. Devices absent from the sync payload were left alone.`, + ); + } else if (syncedIdentifiers.length === 0) { + this.logger.log( + '[DeviceSync] No devices successfully processed — skipping Phase 2 removal', + ); + } else { + const existingIntegrationDevices = await db.device.findMany({ + where: { + organizationId, + integrationConnectionId: connectionId, + source: 'integration', + }, + select: { + id: true, + serialNumber: true, + externalDeviceId: true, + }, + }); + + const syncedSerials = new Set( + syncedIdentifiers + .map((s) => s.serialNumber) + .filter((v): v is string => Boolean(v)), + ); + const syncedExternalIds = new Set( + syncedIdentifiers + .map((s) => s.externalId) + .filter((v): v is string => Boolean(v)), + ); + + const toRemove = existingIntegrationDevices.filter((d) => { + const matchedBySerial = + d.serialNumber && syncedSerials.has(d.serialNumber); + const matchedByExternal = + d.externalDeviceId && syncedExternalIds.has(d.externalDeviceId); + return !matchedBySerial && !matchedByExternal; + }); + + if (toRemove.length > 0) { + const idsToDelete = toRemove.map((d) => d.id); + + try { + await db.device.deleteMany({ + where: { id: { in: idsToDelete } }, + }); + result.removed = toRemove.length; + + for (const device of toRemove) { + result.details.push({ + identifier: + device.serialNumber ?? device.externalDeviceId ?? device.id, + status: 'removed', + reason: `Device no longer reported by ${providerName}`, + }); + } + } catch (deleteError) { + this.logger.error( + `[DeviceSync] Failed to delete ${idsToDelete.length} devices: ${deleteError}`, + ); + result.errors += idsToDelete.length; + } + } + } + + result.success = result.errors === 0; + + this.logger.log( + `[DeviceSync] Sync complete for ${providerName}: ${result.imported} imported, ${result.updated} updated, ${result.removed} removed, ${result.skipped} skipped, ${result.errors} errors`, + ); + + return result; + } +} diff --git a/apps/api/src/trigger/integration-platform/run-device-sync.spec.ts b/apps/api/src/trigger/integration-platform/run-device-sync.spec.ts new file mode 100644 index 0000000000..47c8be0e45 --- /dev/null +++ b/apps/api/src/trigger/integration-platform/run-device-sync.spec.ts @@ -0,0 +1,21 @@ +// Mock the Trigger.dev SDK at the module boundary so importing the task does +// not require a trigger runtime. `task()` simply returns its config object, +// which lets us assert on the static configuration (e.g. maxDuration). +jest.mock('@trigger.dev/sdk', () => ({ + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + tags: { add: jest.fn() }, + task: (config: unknown) => config, +})); + +import { runDeviceSync } from './run-device-sync'; + +const config = runDeviceSync as unknown as { id: string; maxDuration: number }; + +describe('runDeviceSync task config', () => { + it('declares maxDuration in SECONDS, not milliseconds', () => { + // Trigger.dev maxDuration is in seconds. 10 minutes = 600. + // The ms form (1000 * 60 * 10 = 600_000) would be ~7 days. + expect(config.maxDuration).toBe(600); + expect(config.maxDuration).toBeLessThan(24 * 60 * 60); + }); +}); diff --git a/apps/api/src/trigger/integration-platform/run-device-sync.ts b/apps/api/src/trigger/integration-platform/run-device-sync.ts new file mode 100644 index 0000000000..196a957d4d --- /dev/null +++ b/apps/api/src/trigger/integration-platform/run-device-sync.ts @@ -0,0 +1,90 @@ +import { logger, tags, task } from '@trigger.dev/sdk'; + +const API_BASE_URL = process.env.BASE_URL || 'http://localhost:3333'; + +/** + * Trigger.dev task that runs device sync for a single org+connection. + * Calls the existing API endpoint which handles credential refresh, + * DSL interpretation, and device processing. + * + * Triggered by the daily integration-checks-schedule orchestrator. + */ +export const runDeviceSync = task({ + id: 'run-device-sync', + maxDuration: 60 * 10, // 10 minutes — Trigger.dev maxDuration is in SECONDS + run: async (payload: { + organizationId: string; + connectionId: string; + providerSlug: string; + }) => { + const { organizationId, connectionId, providerSlug } = payload; + + await tags.add([`org:${organizationId}`]); + + logger.info(`Starting device sync for provider "${providerSlug}"`, { + connectionId, + organizationId, + }); + + try { + const url = new URL( + `${API_BASE_URL}/v1/integrations/sync/dynamic/${providerSlug}/devices`, + ); + url.searchParams.set('connectionId', connectionId); + + const response = await fetch(url.toString(), { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-service-token': process.env.SERVICE_TOKEN_TRIGGER!, + 'x-organization-id': organizationId, + }, + }); + + if (!response.ok) { + const errorBody = await response.text(); + logger.error( + `Device sync API call failed: ${response.status} - ${errorBody}`, + ); + + return { + success: false, + error: `Device sync failed: ${response.status} - ${errorBody}`, + }; + } + + const result = (await response.json()) as { + success: boolean; + totalFound: number; + imported: number; + updated: number; + removed: number; + skipped: number; + errors: number; + syncRunId?: string; + }; + + logger.info(`Device sync completed for "${providerSlug}"`, { + imported: result.imported, + updated: result.updated, + removed: result.removed, + skipped: result.skipped, + errors: result.errors, + }); + + return result; + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + + logger.error(`Device sync failed for "${providerSlug}"`, { + error: errorMessage, + }); + + return { + success: false, + error: errorMessage, + }; + } + }, +}); diff --git a/apps/api/src/trigger/integration-platform/run-integration-checks-schedule.spec.ts b/apps/api/src/trigger/integration-platform/run-integration-checks-schedule.spec.ts index 9343a609c9..6e7e55d22b 100644 --- a/apps/api/src/trigger/integration-platform/run-integration-checks-schedule.spec.ts +++ b/apps/api/src/trigger/integration-platform/run-integration-checks-schedule.spec.ts @@ -34,6 +34,10 @@ jest.mock('./run-task-integration-checks', () => ({ runTaskIntegrationChecks: { batchTrigger: jest.fn() }, })); +jest.mock('./run-device-sync', () => ({ + runDeviceSync: { trigger: jest.fn() }, +})); + const atUtc = (iso: string) => new Date(`${iso}T00:00:00.000Z`); describe('filterDueTasks (integration orchestrator)', () => { diff --git a/apps/api/src/trigger/integration-platform/run-integration-checks-schedule.ts b/apps/api/src/trigger/integration-platform/run-integration-checks-schedule.ts index 600e03ef94..bec40c6a0d 100644 --- a/apps/api/src/trigger/integration-platform/run-integration-checks-schedule.ts +++ b/apps/api/src/trigger/integration-platform/run-integration-checks-schedule.ts @@ -2,6 +2,7 @@ import { getManifest } from '@trycompai/integration-platform'; import { db, TaskFrequency } from '@db'; import { logger, schedules } from '@trigger.dev/sdk'; import { runTaskIntegrationChecks } from './run-task-integration-checks'; +import { runDeviceSync } from './run-device-sync'; import { parseDisabledTaskChecks } from '../../integration-platform/utils/disabled-task-checks'; import { isDueToday } from '../shared/is-due-today'; @@ -54,11 +55,6 @@ export const integrationChecksSchedule = schedules.task({ }, }); - if (activeConnections.length === 0) { - logger.info('No active integration connections found'); - return { success: true, tasksTriggered: 0 }; - } - logger.info(`Found ${activeConnections.length} active connections`); // For each connection, find tasks that have checks mapped to them @@ -143,48 +139,90 @@ export const integrationChecksSchedule = schedules.task({ } } + // Trigger integration checks in batches + let totalTriggered = 0; + if (tasksToRun.length === 0) { logger.info('No tasks with mapped integration checks found'); - return { success: true, tasksTriggered: 0 }; + } else { + logger.info( + `Found ${tasksToRun.length} tasks with integration checks to run`, + ); + + const BATCH_SIZE = 500; + const triggerPayloads = tasksToRun.map((t) => ({ payload: t })); + + try { + for (let i = 0; i < triggerPayloads.length; i += BATCH_SIZE) { + const batch = triggerPayloads.slice(i, i + BATCH_SIZE); + await runTaskIntegrationChecks.batchTrigger(batch); + totalTriggered += batch.length; + + logger.info( + `Triggered batch ${Math.floor(i / BATCH_SIZE) + 1}: ${batch.length} tasks`, + ); + } + + logger.info(`Triggered ${totalTriggered} task integration check runs`); + } catch (error) { + logger.error('Failed to trigger task integration checks', { + error: error instanceof Error ? error.message : String(error), + triggeredBeforeError: totalTriggered, + }); + } } - logger.info( - `Found ${tasksToRun.length} tasks with integration checks to run`, - ); + // === Device Sync === + // Find orgs with deviceSyncProvider set and trigger device sync + const orgsWithDeviceSync = await db.organization.findMany({ + where: { deviceSyncProvider: { not: null } }, + select: { id: true, deviceSyncProvider: true }, + }); - // Trigger in batches of 500 - const BATCH_SIZE = 500; - const triggerPayloads = tasksToRun.map((t) => ({ payload: t })); - let totalTriggered = 0; + let deviceSyncsTriggered = 0; + let deviceSyncFailures = 0; - try { - for (let i = 0; i < triggerPayloads.length; i += BATCH_SIZE) { - const batch = triggerPayloads.slice(i, i + BATCH_SIZE); - await runTaskIntegrationChecks.batchTrigger(batch); - totalTriggered += batch.length; + for (const org of orgsWithDeviceSync) { + const connection = await db.integrationConnection.findFirst({ + where: { + organizationId: org.id, + status: 'active', + provider: { slug: org.deviceSyncProvider! }, + }, + select: { id: true }, + }); - logger.info( - `Triggered batch ${Math.floor(i / BATCH_SIZE) + 1}: ${batch.length} tasks`, + if (!connection) { + logger.warn( + `No active connection for device sync provider ${org.deviceSyncProvider} in org ${org.id}`, ); + continue; } - logger.info(`Triggered ${totalTriggered} task integration check runs`); + try { + await runDeviceSync.trigger({ + organizationId: org.id, + connectionId: connection.id, + providerSlug: org.deviceSyncProvider!, + }); + deviceSyncsTriggered++; + } catch (error) { + deviceSyncFailures++; + logger.error( + `Failed to trigger device sync for org ${org.id}`, + { error: error instanceof Error ? error.message : String(error) }, + ); + } + } - return { - success: true, - tasksTriggered: totalTriggered, - }; - } catch (error) { - logger.error('Failed to trigger task integration checks', { - error: error instanceof Error ? error.message : String(error), - triggeredBeforeError: totalTriggered, - }); + logger.info(`Triggered ${deviceSyncsTriggered} device syncs`); - return { - success: false, - tasksTriggered: totalTriggered, - error: error instanceof Error ? error.message : String(error), - }; - } + return { + // Report failure when not every queued task batch was dispatched OR a + // device-sync dispatch threw, so partial/failed runs aren't masked. + success: totalTriggered === tasksToRun.length && deviceSyncFailures === 0, + tasksTriggered: totalTriggered, + deviceSyncsTriggered, + }; }, }); diff --git a/apps/app/src/app/(app)/[orgId]/people/devices/components/DeviceSyncProviderSelector.test.tsx b/apps/app/src/app/(app)/[orgId]/people/devices/components/DeviceSyncProviderSelector.test.tsx new file mode 100644 index 0000000000..4366fe3d35 --- /dev/null +++ b/apps/app/src/app/(app)/[orgId]/people/devices/components/DeviceSyncProviderSelector.test.tsx @@ -0,0 +1,117 @@ +import { render, screen } from '@testing-library/react'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { DeviceSyncProviderSelector } from './DeviceSyncProviderSelector'; +import type { DeviceSyncProviderInfo } from '../hooks/useDeviceSync'; + +const { mockHasPermission, mockUseDeviceSync } = vi.hoisted(() => ({ + mockHasPermission: vi.fn(), + mockUseDeviceSync: vi.fn(), +})); + +vi.mock('next/navigation', () => ({ + useParams: () => ({ orgId: 'org_1' }), +})); + +vi.mock('@/hooks/use-permissions', () => ({ + usePermissions: () => ({ hasPermission: mockHasPermission }), +})); + +vi.mock('../hooks/useDeviceSync', () => ({ + useDeviceSync: (opts: { organizationId: string; enabled?: boolean }) => + mockUseDeviceSync(opts), +})); + +const provider: DeviceSyncProviderInfo = { + slug: 'jamf', + name: 'Jamf', + logoUrl: 'https://example.com/jamf.png', + connected: true, + connectionId: 'icn_1', + lastSyncAt: null, + nextSyncAt: null, +}; + +beforeEach(() => { + vi.clearAllMocks(); + mockUseDeviceSync.mockReturnValue({ + selectedProvider: 'jamf', + isSyncing: false, + isLoading: false, + availableProviders: [provider], + syncDevices: vi.fn(), + setSyncProvider: vi.fn(), + getProviderName: (slug: string) => (slug === 'jamf' ? 'Jamf' : slug), + getProviderLogo: () => provider.logoUrl, + hasAnyConnection: true, + }); +}); + +describe('DeviceSyncProviderSelector — RBAC gating', () => { + it('renders the sync controls for a user with integration:update', () => { + mockHasPermission.mockImplementation( + (resource: string, action: string) => + resource === 'integration' && action === 'update', + ); + + render(); + + expect( + screen.getByRole('button', { name: /Sync now/i }), + ).toBeInTheDocument(); + expect(screen.getByText('Jamf')).toBeInTheDocument(); + // Hook is enabled (and therefore allowed to hit the device-sync APIs). + expect(mockUseDeviceSync).toHaveBeenCalledWith( + expect.objectContaining({ enabled: true }), + ); + }); + + it('renders nothing for a user without integration:update and disables the hook', () => { + mockHasPermission.mockReturnValue(false); + + const { container } = render(); + + expect(container).toBeEmptyDOMElement(); + expect( + screen.queryByRole('button', { name: /Sync now/i }), + ).not.toBeInTheDocument(); + // The hook must be disabled so no device-sync API is called without permission. + expect(mockUseDeviceSync).toHaveBeenCalledWith( + expect.objectContaining({ enabled: false }), + ); + }); + + it('shows the provider picker when the saved provider is no longer connected', () => { + mockHasPermission.mockImplementation( + (resource: string, action: string) => + resource === 'integration' && action === 'update', + ); + mockUseDeviceSync.mockReturnValue({ + selectedProvider: 'jamf', // saved, but no longer in the connected list + isSyncing: false, + isLoading: false, + availableProviders: [{ ...provider, slug: 'kandji', name: 'Kandji' }], + syncDevices: vi.fn(), + setSyncProvider: vi.fn(), + getProviderName: (slug: string) => (slug === 'kandji' ? 'Kandji' : slug), + getProviderLogo: () => provider.logoUrl, + hasAnyConnection: true, + }); + + render(); + + // The picker must be available so the user can switch to a connected provider. + expect(screen.getByRole('combobox')).toBeInTheDocument(); + expect(screen.getByRole('option', { name: 'Kandji' })).toBeInTheDocument(); + }); + + it('does not render for read-only integration access (integration:read only)', () => { + mockHasPermission.mockImplementation( + (resource: string, action: string) => + resource === 'integration' && action === 'read', + ); + + const { container } = render(); + + expect(container).toBeEmptyDOMElement(); + }); +}); diff --git a/apps/app/src/app/(app)/[orgId]/people/devices/components/DeviceSyncProviderSelector.tsx b/apps/app/src/app/(app)/[orgId]/people/devices/components/DeviceSyncProviderSelector.tsx new file mode 100644 index 0000000000..1d899a7019 --- /dev/null +++ b/apps/app/src/app/(app)/[orgId]/people/devices/components/DeviceSyncProviderSelector.tsx @@ -0,0 +1,124 @@ +'use client'; + +import { useParams } from 'next/navigation'; +import { Button, Skeleton } from '@trycompai/design-system'; +import { Renew } from '@trycompai/design-system/icons'; +import { usePermissions } from '@/hooks/use-permissions'; +import { useDeviceSync } from '../hooks/useDeviceSync'; + +export function DeviceSyncProviderSelector() { + const { orgId } = useParams<{ orgId: string }>(); + const { hasPermission } = usePermissions(); + // Selecting a provider and triggering syncs are integration:update actions. + // Gate the hook itself so users without the permission never hit any + // device-sync API, and hide the controls below. + const canManageDeviceSync = hasPermission('integration', 'update'); + const { + selectedProvider, + isSyncing, + isLoading, + availableProviders, + syncDevices, + setSyncProvider, + getProviderName, + getProviderLogo, + hasAnyConnection, + } = useDeviceSync({ organizationId: orgId, enabled: canManageDeviceSync }); + + if (!canManageDeviceSync) { + return null; + } + + if (isLoading) { + return ; + } + + if (!hasAnyConnection) { + return null; + } + + const connectedProviders = availableProviders.filter((p) => p.connected); + + const handleSync = async () => { + if (!selectedProvider) return; + await syncDevices(selectedProvider); + }; + + const handleProviderChange = (e: React.ChangeEvent) => { + void setSyncProvider(e.target.value || null); + }; + + return ( +
+
+ {selectedProvider ? ( + <> + +
+
+ {getProviderName(selectedProvider)} +
+ {(() => { + const info = availableProviders.find( + (p) => p.slug === selectedProvider, + ); + if (!info?.lastSyncAt) return null; + const lastSync = new Date(info.lastSyncAt); + return ( +
+ Last synced{' '} + {lastSync.toLocaleDateString(undefined, { + month: 'short', + day: 'numeric', + hour: 'numeric', + minute: '2-digit', + })} +
+ ); + })()} +
+ + ) : ( +
+ Select an integration to sync devices +
+ )} +
+ +
+ {connectedProviders.length > 1 || + !connectedProviders.some((p) => p.slug === selectedProvider) ? ( + + ) : null} + + {selectedProvider && ( + + )} +
+
+ ); +} diff --git a/apps/app/src/app/(app)/[orgId]/people/devices/components/DevicesTabContent.tsx b/apps/app/src/app/(app)/[orgId]/people/devices/components/DevicesTabContent.tsx index 05234729a1..45b0818592 100644 --- a/apps/app/src/app/(app)/[orgId]/people/devices/components/DevicesTabContent.tsx +++ b/apps/app/src/app/(app)/[orgId]/people/devices/components/DevicesTabContent.tsx @@ -6,6 +6,7 @@ import { useAgentDevices } from '../hooks/useAgentDevices'; import { useFleetHosts } from '../hooks/useFleetHosts'; import { DeviceAgentDevicesList } from './DeviceAgentDevicesList'; import { DeviceComplianceChart } from './DeviceComplianceChart'; +import { DeviceSyncProviderSelector } from './DeviceSyncProviderSelector'; import { EmployeeDevicesList } from './EmployeeDevicesList'; interface DevicesTabContentProps { @@ -60,6 +61,7 @@ export function DevicesTabContent({ isCurrentUserOwner }: DevicesTabContentProps return (
+ ({ + apiClient: { + get: (url: string) => getMock(url), + post: (url: string, body?: unknown) => postMock(url, body), + }, +})); + +vi.mock('sonner', () => ({ + toast: { + success: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warning: vi.fn(), + }, +})); + +function wrapper({ children }: { children: React.ReactNode }) { + return ( + new Map(), dedupingInterval: 0, revalidateOnFocus: false }} + > + {children} + + ); +} + +const jamf = { + slug: 'jamf', + name: 'Jamf', + logoUrl: '/jamf.png', + connected: true, + connectionId: 'icn_1', + lastSyncAt: null, + nextSyncAt: null, +}; + +beforeEach(() => { + getMock.mockReset(); + postMock.mockReset(); + // No provider selected yet; one connected provider available. + getMock.mockImplementation((url: string) => { + if (url.includes('device-sync-provider')) { + return Promise.resolve({ data: { provider: null }, status: 200 }); + } + if (url.includes('available-providers')) { + return Promise.resolve({ data: { providers: [jamf] }, status: 200 }); + } + return Promise.resolve({ data: null, status: 200 }); + }); +}); + +describe('useDeviceSync', () => { + it('does not run the sync when setting the provider fails', async () => { + // Persisting the provider choice fails. + postMock.mockImplementation((url: string) => { + if (url.includes('device-sync-provider')) { + return Promise.resolve({ error: 'nope', status: 500 }); + } + return Promise.resolve({ data: { success: true }, status: 200 }); + }); + + const { result } = renderHook(() => useDeviceSync({ organizationId: 'org_1' }), { + wrapper, + }); + + await waitFor(() => expect(result.current.availableProviders).toHaveLength(1)); + + await act(async () => { + await result.current.syncDevices('jamf'); + }); + + // setSyncProvider was attempted... + expect( + postMock.mock.calls.some(([url]) => url.includes('device-sync-provider')), + ).toBe(true); + // ...but the device sync POST must NOT fire with a stale/unsaved provider. + expect(postMock.mock.calls.some(([url]) => url.includes('/devices'))).toBe(false); + }); + + it('makes no API calls when disabled (no integration:update permission)', () => { + renderHook(() => useDeviceSync({ organizationId: 'org_1', enabled: false }), { + wrapper, + }); + + expect(getMock).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/app/src/app/(app)/[orgId]/people/devices/hooks/useDeviceSync.ts b/apps/app/src/app/(app)/[orgId]/people/devices/hooks/useDeviceSync.ts new file mode 100644 index 0000000000..20b4e8e532 --- /dev/null +++ b/apps/app/src/app/(app)/[orgId]/people/devices/hooks/useDeviceSync.ts @@ -0,0 +1,194 @@ +'use client'; + +import { apiClient } from '@/lib/api-client'; +import { useState } from 'react'; +import { toast } from 'sonner'; +import useSWR from 'swr'; + +export interface DeviceSyncProviderInfo { + slug: string; + name: string; + logoUrl: string; + connected: boolean; + connectionId: string | null; + lastSyncAt: string | null; + nextSyncAt: string | null; +} + +interface DeviceSyncResult { + success: boolean; + totalFound: number; + imported: number; + updated: number; + skipped: number; + removed: number; + errors: number; +} + +interface UseDeviceSyncOptions { + organizationId: string; + /** + * When false, the hook makes no API calls (used to fully disable it for users + * who lack the integration:update permission, so no device-sync API is hit). + */ + enabled?: boolean; +} + +interface UseDeviceSyncReturn { + selectedProvider: string | null; + isSyncing: boolean; + isLoading: boolean; + availableProviders: DeviceSyncProviderInfo[]; + syncDevices: (provider: string) => Promise; + setSyncProvider: (provider: string | null) => Promise; + getProviderName: (provider: string) => string; + getProviderLogo: (provider: string) => string; + hasAnyConnection: boolean; +} + +export function useDeviceSync({ + organizationId, + enabled = true, +}: UseDeviceSyncOptions): UseDeviceSyncReturn { + const [isSyncing, setIsSyncing] = useState(false); + + // Fetch current device sync provider + const { data: providerData, mutate: mutateProvider } = useSWR<{ provider: string | null }>( + enabled + ? `/v1/integrations/sync/device-sync-provider?organizationId=${organizationId}` + : null, + async (url: string) => { + const res = await apiClient.get<{ provider: string | null }>(url); + if (res.error) throw new Error(res.error); + return res.data as { provider: string | null }; + }, + ); + + // Fetch available device sync providers + const { data: availableData, isLoading, mutate: mutateAvailable } = useSWR<{ providers: DeviceSyncProviderInfo[] }>( + enabled + ? `/v1/integrations/sync/available-providers?organizationId=${organizationId}&syncType=device` + : null, + async (url: string) => { + const res = await apiClient.get<{ providers: DeviceSyncProviderInfo[] }>(url); + if (res.error) throw new Error(res.error); + return res.data as { providers: DeviceSyncProviderInfo[] }; + }, + ); + + const selectedProvider = providerData?.provider ?? null; + const availableProviders = Array.isArray(availableData?.providers) + ? availableData.providers + : []; + + const getProviderName = (provider: string): string => { + return availableProviders.find((p) => p.slug === provider)?.name ?? provider; + }; + + const getProviderLogo = (provider: string): string => { + return availableProviders.find((p) => p.slug === provider)?.logoUrl ?? ''; + }; + + const setSyncProvider = async (provider: string | null): Promise => { + try { + const response = await apiClient.post( + `/v1/integrations/sync/device-sync-provider?organizationId=${organizationId}`, + { provider }, + ); + + if (response.error) { + toast.error('Failed to set device sync provider'); + return false; + } + + mutateProvider({ provider }, false); + + if (provider) { + const name = getProviderName(provider); + toast.success(`${name} set as your device sync provider`); + } + return true; + } catch { + toast.error('Failed to set device sync provider'); + return false; + } + }; + + const syncDevices = async (provider: string): Promise => { + const providerInfo = availableProviders.find((p) => p.slug === provider); + const connId = providerInfo?.connectionId; + + if (!connId) { + toast.error(`${getProviderName(provider)} is not connected`); + return null; + } + + setIsSyncing(true); + const providerName = getProviderName(provider); + + try { + if (selectedProvider !== provider) { + // Don't sync with a stale provider config if persisting the choice failed. + const providerSet = await setSyncProvider(provider); + if (!providerSet) { + return null; + } + } + + const response = await apiClient.post( + `/v1/integrations/sync/dynamic/${provider}/devices?organizationId=${organizationId}&connectionId=${connId}`, + ); + + // Branch on the presence of a result body, NOT on `success`: a partial + // sync returns HTTP 200 with success=false (errors > 0) but still imports + // some devices — we must surface both the summary and the warning. + if (response.data) { + // The sync updated connection.lastSyncAt server-side; revalidate so the + // selector's "Last synced" reflects it instead of staying stale. + void mutateAvailable(); + const { totalFound, imported, updated, removed, skipped, errors } = + response.data; + const parts: string[] = []; + if (imported > 0) parts.push(`${imported} new`); + if (updated > 0) parts.push(`${updated} updated`); + if (removed > 0) parts.push(`${removed} removed`); + if (skipped > 0) parts.push(`${skipped} skipped`); + + if (parts.length > 0) { + toast.success(`Synced ${totalFound} devices — ${parts.join(', ')}`); + } else if (errors === 0) { + toast.info('All devices are already synced'); + } + + if (errors > 0) { + toast.warning(`${errors} device${errors > 1 ? 's' : ''} failed to sync`); + } + + return response.data; + } + + if (response.error) { + toast.error(response.error); + } + + return null; + } catch { + toast.error(`Failed to sync devices from ${providerName}`); + return null; + } finally { + setIsSyncing(false); + } + }; + + return { + selectedProvider, + isSyncing, + isLoading, + availableProviders, + syncDevices, + setSyncProvider, + getProviderName, + getProviderLogo, + hasAnyConnection: availableProviders.some((p) => p.connected), + }; +} diff --git a/packages/db/prisma/migrations/20260508100534_add_device_sync_fields/migration.sql b/packages/db/prisma/migrations/20260508100534_add_device_sync_fields/migration.sql new file mode 100644 index 0000000000..a4d0b2917b --- /dev/null +++ b/packages/db/prisma/migrations/20260508100534_add_device_sync_fields/migration.sql @@ -0,0 +1,16 @@ +-- CreateEnum +CREATE TYPE "DeviceSource" AS ENUM ('agent', 'fleet', 'integration'); + +-- AlterTable +ALTER TABLE "Device" ADD COLUMN "externalDeviceId" TEXT, +ADD COLUMN "integrationConnectionId" TEXT, +ADD COLUMN "source" "DeviceSource" NOT NULL DEFAULT 'agent'; + +-- AlterTable +ALTER TABLE "DynamicIntegration" ADD COLUMN "deviceSyncDefinition" JSONB; + +-- AlterTable +ALTER TABLE "Organization" ADD COLUMN "deviceSyncProvider" TEXT; + +-- CreateIndex +CREATE INDEX "Device_integrationConnectionId_idx" ON "Device"("integrationConnectionId"); diff --git a/packages/db/prisma/migrations/20260603120000_add_device_external_id_unique/migration.sql b/packages/db/prisma/migrations/20260603120000_add_device_external_id_unique/migration.sql new file mode 100644 index 0000000000..52a60dc47d --- /dev/null +++ b/packages/db/prisma/migrations/20260603120000_add_device_external_id_unique/migration.sql @@ -0,0 +1,6 @@ +-- Prevent duplicate integration-sourced devices for the same connection and +-- back the device-sync fallback lookup (externalDeviceId + integrationConnectionId). +-- Postgres treats NULLs as distinct, so agent/serial-only rows (NULL +-- integrationConnectionId and/or NULL externalDeviceId) are unaffected. +-- CreateIndex +CREATE UNIQUE INDEX "Device_integrationConnectionId_externalDeviceId_key" ON "Device"("integrationConnectionId", "externalDeviceId"); diff --git a/packages/db/prisma/schema/device.prisma b/packages/db/prisma/schema/device.prisma index 3a609c3eb7..8c1b93fe7a 100644 --- a/packages/db/prisma/schema/device.prisma +++ b/packages/db/prisma/schema/device.prisma @@ -29,11 +29,20 @@ model Device { findings Finding[] + source DeviceSource @default(agent) + integrationConnectionId String? + externalDeviceId String? + @@unique([serialNumber, organizationId]) + // Prevents duplicate integration-sourced devices for the same connection and + // also serves the device-sync fallback lookup (externalDeviceId + connection). + // NULLs are distinct in Postgres, so agent/serial-only rows are unaffected. + @@unique([integrationConnectionId, externalDeviceId]) @@index([memberId]) @@index([organizationId]) @@index([isCompliant]) @@index([agentSessionId]) + @@index([integrationConnectionId]) } enum DevicePlatform { @@ -41,3 +50,9 @@ enum DevicePlatform { windows linux } + +enum DeviceSource { + agent + fleet + integration +} diff --git a/packages/db/prisma/schema/dynamic-integration.prisma b/packages/db/prisma/schema/dynamic-integration.prisma index 58163d17de..89b9a35838 100644 --- a/packages/db/prisma/schema/dynamic-integration.prisma +++ b/packages/db/prisma/schema/dynamic-integration.prisma @@ -36,6 +36,10 @@ model DynamicIntegration { /// When present and capabilities includes 'sync', enables employee sync syncDefinition Json? + /// Declarative device sync definition (JSON — DSL steps that produce device list) + /// When present and capabilities includes 'device_sync', enables device sync + deviceSyncDefinition Json? + /// Services metadata (JSON array of { id, name, description, enabledByDefault?, implemented? }) services Json? diff --git a/packages/db/prisma/schema/organization.prisma b/packages/db/prisma/schema/organization.prisma index 5fbb485609..079cf79052 100644 --- a/packages/db/prisma/schema/organization.prisma +++ b/packages/db/prisma/schema/organization.prisma @@ -25,6 +25,10 @@ model Organization { // When set, the scheduled sync will only use this provider employeeSyncProvider String? + // Device sync provider (e.g., 'jamf', 'kandji') + // When set, the scheduled sync will import devices from this provider + deviceSyncProvider String? + apiKeys ApiKey[] mcpOrgBindings McpOrgBinding[] auditLog AuditLog[] diff --git a/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts b/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts index 7b17d2ba8f..aed2a4fb34 100644 --- a/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts +++ b/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts @@ -1,7 +1,10 @@ import { describe, it, expect, beforeEach } from 'bun:test'; -import { interpretDeclarativeCheck } from '../interpreter'; +import { + interpretDeclarativeCheck, + interpretDeclarativeDeviceSync, +} from '../interpreter'; import type { CheckContext } from '../../types'; -import type { CheckDefinition } from '../types'; +import type { CheckDefinition, SyncDefinition } from '../types'; /** * Creates a mock CheckContext for testing. @@ -1083,3 +1086,72 @@ describe('interpretDeclarativeCheck', () => { }); }); }); + +describe('interpretDeclarativeDeviceSync', () => { + const deviceDef = (code: string, devicesPath = 'devices'): SyncDefinition => ({ + steps: [{ type: 'code', code }], + employeesPath: 'employees', + devicesPath, + isDirectorySource: false, + }); + + it('returns devices validated against the DEVICE schema (regression: not the employee schema)', async () => { + // A device object has userEmail + platform but no `email`; the employee + // interpreter would drop/strip it. The device interpreter keeps it intact. + const runner = interpretDeclarativeDeviceSync({ + definition: deviceDef(`scope.devices = [ + { name: 'MB Pro', platform: 'macos', serialNumber: 'SN1', userEmail: 'alice@x.com', status: 'active' }, + ];`), + }); + + const devices = await runner.run(createMockContext()); + + expect(devices).toHaveLength(1); + expect(devices[0]).toMatchObject({ + platform: 'macos', + userEmail: 'alice@x.com', + serialNumber: 'SN1', + }); + }); + + it('drops entries that fail SyncDeviceSchema and keeps the valid ones', async () => { + const runner = interpretDeclarativeDeviceSync({ + definition: deviceDef(`scope.devices = [ + { name: 'Good', platform: 'macos', userEmail: 'a@x.com', status: 'active' }, + { name: 'Missing platform + email', status: 'active' }, + { name: 'Bad platform', platform: 'ios', userEmail: 'b@x.com', status: 'active' }, + ];`), + }); + + const devices = await runner.run(createMockContext()); + + expect(devices).toHaveLength(1); + expect(devices[0]!.name).toBe('Good'); + }); + + it('reads from a custom devicesPath', async () => { + const runner = interpretDeclarativeDeviceSync({ + definition: deviceDef( + `scope.fleet = [ + { name: 'PC', platform: 'windows', userEmail: 'c@x.com', status: 'active' }, + ];`, + 'fleet', + ), + }); + + const devices = await runner.run(createMockContext()); + + expect(devices).toHaveLength(1); + expect(devices[0]!.platform).toBe('windows'); + }); + + it('throws when the devices path does not resolve to an array', async () => { + const runner = interpretDeclarativeDeviceSync({ + definition: deviceDef(`scope.devices = { not: 'an array' };`), + }); + + await expect(runner.run(createMockContext())).rejects.toThrow( + 'did not produce an array', + ); + }); +}); diff --git a/packages/integration-platform/src/dsl/index.ts b/packages/integration-platform/src/dsl/index.ts index 4a3544dcbc..13febcd40a 100644 --- a/packages/integration-platform/src/dsl/index.ts +++ b/packages/integration-platform/src/dsl/index.ts @@ -1,5 +1,9 @@ // DSL Engine — Declarative check and sync definitions -export { interpretDeclarativeCheck, interpretDeclarativeSync } from './interpreter'; +export { + interpretDeclarativeCheck, + interpretDeclarativeSync, + interpretDeclarativeDeviceSync, +} from './interpreter'; export { evaluateCondition, evaluateOperator, resolvePath } from './expression-evaluator'; export { interpolate, interpolateTemplate } from './template-engine'; export { validateIntegrationDefinition, type ValidationResult } from './validate'; @@ -16,6 +20,7 @@ export type { CodeStep, CheckDefinition, SyncEmployee, + SyncDevice, SyncDefinition, Condition, FieldCondition, @@ -31,6 +36,7 @@ export { DSLStepSchema, CheckDefinitionSchema, SyncEmployeeSchema, + SyncDeviceSchema, SyncDefinitionSchema, DynamicIntegrationDefinitionSchema, ConditionSchema, diff --git a/packages/integration-platform/src/dsl/interpreter.ts b/packages/integration-platform/src/dsl/interpreter.ts index e442aad223..5c0560ab54 100644 --- a/packages/integration-platform/src/dsl/interpreter.ts +++ b/packages/integration-platform/src/dsl/interpreter.ts @@ -4,6 +4,7 @@ import type { CheckDefinition, SyncDefinition, SyncEmployee, + SyncDevice, FetchStep, FetchPagesStep, ForEachStep, @@ -12,7 +13,7 @@ import type { EmitStep, CodeStep, } from './types'; -import { SyncEmployeeSchema } from './types'; +import { SyncEmployeeSchema, SyncDeviceSchema } from './types'; import { evaluateCondition, evaluateOperator, resolvePath } from './expression-evaluator'; import { interpolate, interpolateTemplate } from './template-engine'; @@ -115,6 +116,65 @@ export function interpretDeclarativeSync(opts: { }; } +/** + * Converts a declarative SyncDefinition (JSON DSL) into a function that + * produces a validated list of SyncDevice objects. + * + * Mirrors interpretDeclarativeSync but resolves the device list at + * `scope[devicesPath]` (default `devices`) and validates each item with + * SyncDeviceSchema. Kept separate from the employee interpreter so the two + * validation paths never bleed into each other. + */ +export function interpretDeclarativeDeviceSync(opts: { + definition: SyncDefinition; + defaultSeverity?: FindingSeverity; +}): { + run: (ctx: CheckContext) => Promise; +} { + return { + run: async (ctx: CheckContext) => { + const scope: Record = { + variables: ctx.variables, + credentials: ctx.credentials, + accessToken: ctx.accessToken, + connectionId: ctx.connectionId, + organizationId: ctx.organizationId, + metadata: ctx.metadata, + }; + + ctx.log('Running declarative device sync'); + + for (const step of opts.definition.steps) { + await executeStep(step, scope, ctx, opts.defaultSeverity || 'medium'); + } + + const devicesPath = opts.definition.devicesPath || 'devices'; + const raw = resolvePath(scope, devicesPath); + + if (!Array.isArray(raw)) { + throw new Error( + `Device sync definition did not produce an array at scope.${devicesPath}`, + ); + } + + const devices: SyncDevice[] = []; + for (let i = 0; i < raw.length; i++) { + const parsed = SyncDeviceSchema.safeParse(raw[i]); + if (!parsed.success) { + ctx.warn( + `Device at index ${i} failed validation: ${parsed.error.issues.map((iss) => iss.message).join(', ')}`, + ); + continue; + } + devices.push(parsed.data); + } + + ctx.log(`Device sync produced ${devices.length} validated devices`); + return devices; + }, + }; +} + /** * Execute a single DSL step. */ diff --git a/packages/integration-platform/src/dsl/types.ts b/packages/integration-platform/src/dsl/types.ts index 36665c83fb..b3d6a52168 100644 --- a/packages/integration-platform/src/dsl/types.ts +++ b/packages/integration-platform/src/dsl/types.ts @@ -290,6 +290,12 @@ export type SyncEmployee = z.infer; export const SyncDefinitionSchema = z.object({ steps: z.array(DSLStepSchema), employeesPath: z.string().default('employees'), + /** + * For device sync definitions: the scope path that the DSL steps populate + * with the standardized device list. Defaults to `devices`. Ignored by the + * employee interpreter (which uses `employeesPath`). + */ + devicesPath: z.string().optional().default('devices'), variables: z.array(VariableSchema).optional(), /** * Whether this provider is authoritative for "who works here" (directory of record). @@ -308,6 +314,25 @@ export const SyncDefinitionSchema = z.object({ export type SyncDefinition = z.infer; +// ============================================================================ +// Sync Device Schema (for dynamic device sync) +// ============================================================================ + +export const SyncDeviceSchema = z.object({ + name: z.string(), + platform: z.enum(['macos', 'windows', 'linux']), + serialNumber: z.string().optional(), + hostname: z.string().optional(), + osVersion: z.string().optional(), + hardwareModel: z.string().optional(), + userEmail: z.string(), + status: z.enum(['active', 'inactive']), + externalId: z.string().optional(), + metadata: z.record(z.string(), z.unknown()).optional(), +}); + +export type SyncDevice = z.infer; + // ============================================================================ // Dynamic Integration Definition (full manifest + checks as JSON) // ============================================================================ @@ -335,9 +360,12 @@ export const DynamicIntegrationDefinitionSchema = z.object({ type: z.enum(['oauth2', 'api_key', 'basic', 'jwt', 'custom']), config: z.record(z.string(), z.unknown()), }), - capabilities: z.array(z.enum(['checks', 'webhook', 'sync'])).default(['checks']), + capabilities: z + .array(z.enum(['checks', 'webhook', 'sync', 'device_sync'])) + .default(['checks']), supportsMultipleConnections: z.boolean().optional(), syncDefinition: SyncDefinitionSchema.optional(), + deviceSyncDefinition: SyncDefinitionSchema.optional(), services: z.array( z.object({ id: z.string(), diff --git a/packages/integration-platform/src/index.ts b/packages/integration-platform/src/index.ts index 42382cd7a5..6131578d9c 100644 --- a/packages/integration-platform/src/index.ts +++ b/packages/integration-platform/src/index.ts @@ -99,6 +99,7 @@ export { export { interpretDeclarativeCheck, interpretDeclarativeSync, + interpretDeclarativeDeviceSync, evaluateCondition, evaluateOperator, resolvePath, @@ -107,6 +108,7 @@ export { validateIntegrationDefinition, CheckDefinitionSchema, SyncEmployeeSchema, + SyncDeviceSchema, SyncDefinitionSchema, DynamicIntegrationDefinitionSchema, ConditionSchema, @@ -119,6 +121,7 @@ export type { CodeStep, CheckDefinition, SyncEmployee, + SyncDevice, SyncDefinition, Condition, DynamicIntegrationDefinition, diff --git a/packages/integration-platform/src/types.ts b/packages/integration-platform/src/types.ts index a2929b5099..e1cb010af9 100644 --- a/packages/integration-platform/src/types.ts +++ b/packages/integration-platform/src/types.ts @@ -1,4 +1,5 @@ import { z } from 'zod'; +import type { SyncDefinition } from './dsl/types'; import type { TaskTemplateId } from './task-mappings'; // ============================================================================ @@ -208,7 +209,7 @@ export type CredentialField = z.infer; // Integration Capabilities // ============================================================================ -export type IntegrationCapability = 'checks' | 'webhook' | 'sync'; +export type IntegrationCapability = 'checks' | 'webhook' | 'sync' | 'device_sync'; export const WebhookConfigSchema = z.object({ /** Webhook endpoint path suffix */ @@ -856,6 +857,9 @@ export interface IntegrationManifest { /** Runtime handler for webhooks */ handler?: IntegrationHandler; + /** Declarative device sync definition (same DSL as employee sync) */ + deviceSyncDefinition?: SyncDefinition; + /** Whether multiple connections per org are allowed */ supportsMultipleConnections?: boolean;