@@ -3,6 +3,7 @@ import { env } from './env.mjs';
33import { prisma } from "@/prisma" ;
44import { SINGLE_TENANT_USER_ID , SINGLE_TENANT_ORG_ID , SINGLE_TENANT_ORG_DOMAIN , SINGLE_TENANT_ORG_NAME , SINGLE_TENANT_USER_EMAIL } from './lib/constants' ;
55import { readFile } from 'fs/promises' ;
6+ import { watch } from 'fs' ;
67import stripJsonComments from 'strip-json-comments' ;
78import { SourcebotConfig } from "@sourcebot/schemas/v3/index.type" ;
89import { ConnectionConfig } from '@sourcebot/schemas/v3/connection.type' ;
@@ -21,6 +22,114 @@ const isRemotePath = (path: string) => {
2122 return path . startsWith ( 'https://' ) || path . startsWith ( 'http://' ) ;
2223}
2324
25+ const scheduleDeclarativeConfigSync = async ( configPath : string ) => {
26+ const configContent = await ( async ( ) => {
27+ if ( isRemotePath ( configPath ) ) {
28+ const response = await fetch ( configPath ) ;
29+ if ( ! response . ok ) {
30+ throw new Error ( `Failed to fetch config file ${ configPath } : ${ response . statusText } ` ) ;
31+ }
32+ return response . text ( ) ;
33+ } else {
34+ return readFile ( configPath , {
35+ encoding : 'utf-8' ,
36+ } ) ;
37+ }
38+ } ) ( ) ;
39+
40+ const config = JSON . parse ( stripJsonComments ( configContent ) ) as SourcebotConfig ;
41+ const isValidConfig = ajv . validate ( indexSchema , config ) ;
42+ if ( ! isValidConfig ) {
43+ throw new Error ( `Config file '${ configPath } ' is invalid: ${ ajv . errorsText ( ajv . errors ) } ` ) ;
44+ }
45+
46+ if ( config . connections ) {
47+ for ( const [ key , newConnectionConfig ] of Object . entries ( config . connections ) ) {
48+ const currentConnection = await prisma . connection . findUnique ( {
49+ where : {
50+ name_orgId : {
51+ name : key ,
52+ orgId : SINGLE_TENANT_ORG_ID ,
53+ }
54+ } ,
55+ include : {
56+ repos : {
57+ include : {
58+ repo : true ,
59+ }
60+ }
61+ }
62+ } ) ;
63+
64+ const currentConnectionConfig = currentConnection ? currentConnection . config as unknown as ConnectionConfig : undefined ;
65+ const syncNeededOnUpdate =
66+ ( currentConnectionConfig && JSON . stringify ( currentConnectionConfig ) !== JSON . stringify ( newConnectionConfig ) ) ||
67+ ( currentConnection ?. syncStatus === ConnectionSyncStatus . FAILED ) ;
68+
69+ const connectionDb = await prisma . connection . upsert ( {
70+ where : {
71+ name_orgId : {
72+ name : key ,
73+ orgId : SINGLE_TENANT_ORG_ID ,
74+ }
75+ } ,
76+ update : {
77+ config : newConnectionConfig as unknown as Prisma . InputJsonValue ,
78+ syncStatus : syncNeededOnUpdate ? ConnectionSyncStatus . SYNC_NEEDED : undefined ,
79+ isDeclarative : true ,
80+ } ,
81+ create : {
82+ name : key ,
83+ connectionType : newConnectionConfig . type ,
84+ config : newConnectionConfig as unknown as Prisma . InputJsonValue ,
85+ isDeclarative : true ,
86+ org : {
87+ connect : {
88+ id : SINGLE_TENANT_ORG_ID ,
89+ }
90+ }
91+ }
92+ } ) ;
93+
94+ console . log ( `Upserted connection with name '${ key } '. Connection ID: ${ connectionDb . id } ` ) ;
95+
96+ // Re-try any repos that failed to index.
97+ const failedRepos = currentConnection ?. repos . filter ( repo => repo . repo . repoIndexingStatus === RepoIndexingStatus . FAILED ) . map ( repo => repo . repo . id ) ?? [ ] ;
98+ if ( failedRepos . length > 0 ) {
99+ await prisma . repo . updateMany ( {
100+ where : {
101+ id : {
102+ in : failedRepos ,
103+ }
104+ } ,
105+ data : {
106+ repoIndexingStatus : RepoIndexingStatus . NEW ,
107+ }
108+ } )
109+ }
110+ }
111+
112+ const deletedConnections = await prisma . connection . findMany ( {
113+ where : {
114+ isDeclarative : true ,
115+ name : {
116+ notIn : Object . keys ( config . connections ) ,
117+ } ,
118+ orgId : SINGLE_TENANT_ORG_ID ,
119+ }
120+ } ) ;
121+
122+ for ( const connection of deletedConnections ) {
123+ console . log ( `Deleting connection with name '${ connection . name } '. Connection ID: ${ connection . id } ` ) ;
124+ await prisma . connection . delete ( {
125+ where : {
126+ id : connection . id ,
127+ }
128+ } )
129+ }
130+ }
131+ }
132+
24133const initSingleTenancy = async ( ) => {
25134 await prisma . org . upsert ( {
26135 where : {
@@ -77,110 +186,14 @@ const initSingleTenancy = async () => {
77186 // Load any connections defined declaratively in the config file.
78187 const configPath = env . CONFIG_PATH ;
79188 if ( configPath ) {
80- const configContent = await ( async ( ) => {
81- if ( isRemotePath ( configPath ) ) {
82- const response = await fetch ( configPath ) ;
83- if ( ! response . ok ) {
84- throw new Error ( `Failed to fetch config file ${ configPath } : ${ response . statusText } ` ) ;
85- }
86- return response . text ( ) ;
87- } else {
88- return readFile ( configPath , {
89- encoding : 'utf-8' ,
90- } ) ;
91- }
92- } ) ( ) ;
93-
94- const config = JSON . parse ( stripJsonComments ( configContent ) ) as SourcebotConfig ;
95- const isValidConfig = ajv . validate ( indexSchema , config ) ;
96- if ( ! isValidConfig ) {
97- throw new Error ( `Config file '${ configPath } ' is invalid: ${ ajv . errorsText ( ajv . errors ) } ` ) ;
98- }
99-
100- if ( config . connections ) {
101- for ( const [ key , newConnectionConfig ] of Object . entries ( config . connections ) ) {
102- const currentConnection = await prisma . connection . findUnique ( {
103- where : {
104- name_orgId : {
105- name : key ,
106- orgId : SINGLE_TENANT_ORG_ID ,
107- }
108- } ,
109- include : {
110- repos : {
111- include : {
112- repo : true ,
113- }
114- }
115- }
116- } ) ;
117-
118- const currentConnectionConfig = currentConnection ? currentConnection . config as unknown as ConnectionConfig : undefined ;
119- const syncNeededOnUpdate =
120- ( currentConnectionConfig && JSON . stringify ( currentConnectionConfig ) !== JSON . stringify ( newConnectionConfig ) ) ||
121- ( currentConnection ?. syncStatus === ConnectionSyncStatus . FAILED ) ;
122-
123- const connectionDb = await prisma . connection . upsert ( {
124- where : {
125- name_orgId : {
126- name : key ,
127- orgId : SINGLE_TENANT_ORG_ID ,
128- }
129- } ,
130- update : {
131- config : newConnectionConfig as unknown as Prisma . InputJsonValue ,
132- syncStatus : syncNeededOnUpdate ? ConnectionSyncStatus . SYNC_NEEDED : undefined ,
133- isDeclarative : true ,
134- } ,
135- create : {
136- name : key ,
137- connectionType : newConnectionConfig . type ,
138- config : newConnectionConfig as unknown as Prisma . InputJsonValue ,
139- isDeclarative : true ,
140- org : {
141- connect : {
142- id : SINGLE_TENANT_ORG_ID ,
143- }
144- }
145- }
146- } ) ;
147-
148- console . log ( `Upserted connection with name '${ key } '. Connection ID: ${ connectionDb . id } ` ) ;
149-
150- // Re-try any repos that failed to index.
151- const failedRepos = currentConnection ?. repos . filter ( repo => repo . repo . repoIndexingStatus === RepoIndexingStatus . FAILED ) . map ( repo => repo . repo . id ) ?? [ ] ;
152- if ( failedRepos . length > 0 ) {
153- await prisma . repo . updateMany ( {
154- where : {
155- id : {
156- in : failedRepos ,
157- }
158- } ,
159- data : {
160- repoIndexingStatus : RepoIndexingStatus . NEW ,
161- }
162- } )
163- }
164- }
189+ await scheduleDeclarativeConfigSync ( configPath ) ;
165190
166- const deletedConnections = await prisma . connection . findMany ( {
167- where : {
168- isDeclarative : true ,
169- name : {
170- notIn : Object . keys ( config . connections ) ,
171- } ,
172- orgId : SINGLE_TENANT_ORG_ID ,
173- }
191+ // watch for changes assuming it is a local file
192+ if ( ! isRemotePath ( configPath ) ) {
193+ watch ( configPath , ( ) => {
194+ console . log ( `Config file ${ configPath } changed. Re-syncing...` ) ;
195+ scheduleDeclarativeConfigSync ( configPath ) ;
174196 } ) ;
175-
176- for ( const connection of deletedConnections ) {
177- console . log ( `Deleting connection with name '${ connection . name } '. Connection ID: ${ connection . id } ` ) ;
178- await prisma . connection . delete ( {
179- where : {
180- id : connection . id ,
181- }
182- } )
183- }
184197 }
185198 }
186199}
0 commit comments