diff --git a/backend/apps/datasource/api/datasource.py b/backend/apps/datasource/api/datasource.py index 69cad28f3..9c182e345 100644 --- a/backend/apps/datasource/api/datasource.py +++ b/backend/apps/datasource/api/datasource.py @@ -193,6 +193,7 @@ def inner(): return await asyncio.to_thread(inner) +# not used @router.post("/fieldEnum/{id}") async def field_enum(session: SessionDep, id: int): def inner(): diff --git a/backend/apps/db/db.py b/backend/apps/db/db.py index 89d40fcc4..1a22dc355 100644 --- a/backend/apps/db/db.py +++ b/backend/apps/db/db.py @@ -72,7 +72,21 @@ def get_uri_from_config(type: str, conf: DatasourceConf) -> str: return db_url +def get_extra_config(conf: DatasourceConf): + config_dict = {} + if conf.extraJdbc: + config_arr = conf.extraJdbc.split("&") + for config in config_arr: + kv = config.split("=") + if len(kv) == 2 and kv[0] and kv[1]: + config_dict[kv[0]] = kv[1] + else: + raise Exception(f'param: {config} is error') + return config_dict + + def get_origin_connect(type: str, conf: DatasourceConf): + extra_config_dict = get_extra_config(conf) if type == "sqlServer": return pymssql.connect( server=conf.host, @@ -81,10 +95,12 @@ def get_origin_connect(type: str, conf: DatasourceConf): password=conf.password, database=conf.database, timeout=conf.timeout, - tds_version='7.0' # options: '4.2', '7.0', '8.0' ... + tds_version='7.0', # options: '4.2', '7.0', '8.0' ..., + **extra_config_dict ) +# use sqlalchemy def get_engine(ds: CoreDatasource, timeout: int = 0) -> Engine: conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration))) if ds.type != "excel" else get_engine_config() if conf.timeout is None: @@ -135,9 +151,10 @@ def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDs return False else: conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration))) + extra_config_dict = get_extra_config(conf) if ds.type == 'dm': with dmPython.connect(user=conf.username, password=conf.password, server=conf.host, - port=conf.port) as conn, conn.cursor() as cursor: + port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor: try: cursor.execute('select 1', timeout=10).fetchall() SQLBotLogUtil.info("success") @@ -150,7 +167,7 @@ def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDs elif ds.type == 'doris': with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host, port=conf.port, db=conf.database, connect_timeout=10, - read_timeout=10) as conn, conn.cursor() as cursor: + read_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor: try: cursor.execute('select 1') SQLBotLogUtil.info("success") @@ -164,7 +181,7 @@ def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDs with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username, password=conf.password, - timeout=10) as conn, conn.cursor() as cursor: + timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor: try: cursor.execute('select 1') SQLBotLogUtil.info("success") @@ -221,16 +238,17 @@ def get_version(ds: CoreDatasource | AssistantOutDsSchema): res = result.fetchall() version = res[0][0] else: + extra_config_dict = get_extra_config(conf) if ds.type == 'dm': with dmPython.connect(user=conf.username, password=conf.password, server=conf.host, port=conf.port) as conn, conn.cursor() as cursor: - cursor.execute(sql, timeout=10) + cursor.execute(sql, timeout=10, **extra_config_dict) res = cursor.fetchall() version = res[0][0] elif ds.type == 'doris': with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host, port=conf.port, db=conf.database, connect_timeout=10, - read_timeout=10) as conn, conn.cursor() as cursor: + read_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor: cursor.execute(sql) res = cursor.fetchall() version = res[0][0] @@ -260,9 +278,10 @@ def get_schema(ds: CoreDatasource): res_list = [item[0] for item in res] return res_list else: + extra_config_dict = get_extra_config(conf) if ds.type == 'dm': with dmPython.connect(user=conf.username, password=conf.password, server=conf.host, - port=conf.port) as conn, conn.cursor() as cursor: + port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor: cursor.execute("""select OBJECT_NAME from dba_objects where object_type='SCH'""", timeout=conf.timeout) res = cursor.fetchall() res_list = [item[0] for item in res] @@ -270,7 +289,7 @@ def get_schema(ds: CoreDatasource): elif ds.type == 'redshift': with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username, password=conf.password, - timeout=conf.timeout) as conn, conn.cursor() as cursor: + timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor: cursor.execute("""SELECT nspname FROM pg_namespace""") res = cursor.fetchall() res_list = [item[0] for item in res] @@ -288,9 +307,10 @@ def get_tables(ds: CoreDatasource): res_list = [TableSchema(*item) for item in res] return res_list else: + extra_config_dict = get_extra_config(conf) if ds.type == 'dm': with dmPython.connect(user=conf.username, password=conf.password, server=conf.host, - port=conf.port) as conn, conn.cursor() as cursor: + port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor: cursor.execute(sql, {"param": sql_param}, timeout=conf.timeout) res = cursor.fetchall() res_list = [TableSchema(*item) for item in res] @@ -298,7 +318,7 @@ def get_tables(ds: CoreDatasource): elif ds.type == 'doris': with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host, port=conf.port, db=conf.database, connect_timeout=conf.timeout, - read_timeout=conf.timeout) as conn, conn.cursor() as cursor: + read_timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor: cursor.execute(sql, (sql_param,)) res = cursor.fetchall() res_list = [TableSchema(*item) for item in res] @@ -306,7 +326,7 @@ def get_tables(ds: CoreDatasource): elif ds.type == 'redshift': with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username, password=conf.password, - timeout=conf.timeout) as conn, conn.cursor() as cursor: + timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor: cursor.execute(sql, (sql_param,)) res = cursor.fetchall() res_list = [TableSchema(*item) for item in res] @@ -328,9 +348,10 @@ def get_fields(ds: CoreDatasource, table_name: str = None): res_list = [ColumnSchema(*item) for item in res] return res_list else: + extra_config_dict = get_extra_config(conf) if ds.type == 'dm': with dmPython.connect(user=conf.username, password=conf.password, server=conf.host, - port=conf.port) as conn, conn.cursor() as cursor: + port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor: cursor.execute(sql, {"param1": p1, "param2": p2}, timeout=conf.timeout) res = cursor.fetchall() res_list = [ColumnSchema(*item) for item in res] @@ -338,7 +359,7 @@ def get_fields(ds: CoreDatasource, table_name: str = None): elif ds.type == 'doris': with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host, port=conf.port, db=conf.database, connect_timeout=conf.timeout, - read_timeout=conf.timeout) as conn, conn.cursor() as cursor: + read_timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor: cursor.execute(sql, (p1, p2)) res = cursor.fetchall() res_list = [ColumnSchema(*item) for item in res] @@ -346,7 +367,7 @@ def get_fields(ds: CoreDatasource, table_name: str = None): elif ds.type == 'redshift': with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username, password=conf.password, - timeout=conf.timeout) as conn, conn.cursor() as cursor: + timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor: cursor.execute(sql, (p1, p2)) res = cursor.fetchall() res_list = [ColumnSchema(*item) for item in res] @@ -379,9 +400,10 @@ def exec_sql(ds: CoreDatasource | AssistantOutDsSchema, sql: str, origin_column= raise ParseSQLResultError(str(ex)) else: conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration))) + extra_config_dict = get_extra_config(conf) if ds.type == 'dm': with dmPython.connect(user=conf.username, password=conf.password, server=conf.host, - port=conf.port) as conn, conn.cursor() as cursor: + port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor: try: cursor.execute(sql, timeout=conf.timeout) res = cursor.fetchall() @@ -400,7 +422,7 @@ def exec_sql(ds: CoreDatasource | AssistantOutDsSchema, sql: str, origin_column= elif ds.type == 'doris': with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host, port=conf.port, db=conf.database, connect_timeout=conf.timeout, - read_timeout=conf.timeout) as conn, conn.cursor() as cursor: + read_timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor: try: cursor.execute(sql) res = cursor.fetchall() @@ -419,7 +441,7 @@ def exec_sql(ds: CoreDatasource | AssistantOutDsSchema, sql: str, origin_column= elif ds.type == 'redshift': with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username, password=conf.password, - timeout=conf.timeout) as conn, conn.cursor() as cursor: + timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor: try: cursor.execute(sql) res = cursor.fetchall()