11use super :: { DataSource , DatabaseOps } ;
22use anyhow:: Result ;
3- use tiberius:: { Client , Config } ;
3+ use tiberius:: { Client , Config , Query } ;
44use tokio:: net:: TcpStream ;
55use tokio_util:: compat:: TokioAsyncWriteCompatExt ;
66
77pub struct SqlServerOps ;
88
9+ const TABLE_DDL_SQL : & str = r#"
10+ DECLARE @TableName NVARCHAR(128) = '{tableName}';
11+ DECLARE @SchemaName NVARCHAR(128) = '{schemaName}';
12+ -- 生成列定义
13+ SELECT
14+ 'CREATE TABLE [' + @SchemaName + '].[' + @TableName + '] (' + CHAR(13) +
15+ STRING_AGG(
16+ ' [' + c.name + '] ' +
17+ UPPER(t.name) +
18+ CASE
19+ WHEN t.name IN ('varchar','nvarchar','char','nchar','varbinary')
20+ THEN '(' + IIF(c.max_length = -1, 'MAX', CAST(c.max_length AS VARCHAR)) + ')'
21+ WHEN t.name IN ('decimal','numeric')
22+ THEN '(' + CAST(c.precision AS VARCHAR) + ',' + CAST(c.scale AS VARCHAR) + ')'
23+ ELSE ''
24+ END +
25+ ' ' +
26+ IIF(c.is_nullable = 0, 'NOT NULL', 'NULL') +
27+ IIF(ic.column_id IS NOT NULL, ' IDENTITY(1,1)', ''),
28+ ',' + CHAR(13)
29+ )
30+ + CHAR(13) + ');'
31+ FROM sys.columns c
32+ JOIN sys.types t ON c.user_type_id = t.user_type_id
33+ LEFT JOIN sys.identity_columns ic ON c.object_id = ic.object_id AND c.column_id = ic.column_id
34+ WHERE c.object_id = OBJECT_ID(@SchemaName + '.' + @TableName);
35+ "# ;
36+
937#[ async_trait:: async_trait]
1038impl DatabaseOps for SqlServerOps {
1139 async fn get_tables ( & self , ds : DataSource ) -> Result < Vec < String > > {
@@ -15,22 +43,21 @@ impl DatabaseOps for SqlServerOps {
1543 config. port ( ds. port as u16 ) ;
1644 config. database ( & ds. database ) ;
1745 config. authentication ( tiberius:: AuthMethod :: sql_server ( ds. username , ds. password ) ) ;
46+ config. trust_cert ( ) ;
1847
1948 let tcp = TcpStream :: connect ( config. get_addr ( ) ) . await ?;
2049 tcp. set_nodelay ( true ) ?;
2150
2251 let mut client = Client :: connect ( config, tcp. compat_write ( ) ) . await ?;
2352
24- let query = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG = ?" ;
25-
26- let stream = client. query ( query, & [ & ds. database ] ) . await ?;
53+ let select = Query :: new ( format ! ( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG = '{}'" , ds. database) ) ;
54+ let stream = select. query ( & mut client) . await ?;
2755 let tables = stream
2856 . into_first_result ( )
2957 . await ?
3058 . into_iter ( )
3159 . filter_map ( |row| row. get :: < & str , _ > ( 0 ) . map ( |s| s. to_string ( ) ) )
3260 . collect ( ) ;
33-
3461 Ok ( tables)
3562 }
3663
@@ -41,17 +68,20 @@ impl DatabaseOps for SqlServerOps {
4168 config. port ( ds. port as u16 ) ;
4269 config. database ( ds. database ) ;
4370 config. authentication ( tiberius:: AuthMethod :: sql_server ( ds. username , ds. password ) ) ;
71+ config. trust_cert ( ) ;
4472
4573 let tcp = TcpStream :: connect ( config. get_addr ( ) ) . await ?;
4674 tcp. set_nodelay ( true ) ?;
4775
4876 let mut client = Client :: connect ( config, tcp. compat_write ( ) ) . await ?;
4977
50- let query = format ! ( "SELECT OBJECT_DEFINITION(OBJECT_ID('{}'))" , table_name) ;
78+ let sql = TABLE_DDL_SQL
79+ . replace ( "{schemaName}" , "dbo" )
80+ . replace ( "{tableName}" , & table_name) ;
81+ let select = Query :: new ( sql) ;
82+ let stream = select. query ( & mut client) . await ?;
5183
52- let schema = client
53- . query ( query, & [ ] )
54- . await ?
84+ let schema = stream
5585 . into_first_result ( )
5686 . await ?
5787 . into_iter ( )
0 commit comments