|
1 | | -from typing import List |
| 1 | +from typing import List, Dict |
2 | 2 | from ..abcs.database_types import Float, TemporalType, FractionalType, DbPath |
3 | 3 | from ..abcs.mixins import AbstractMixin_MD5 |
4 | 4 | from .postgresql import ( |
@@ -70,3 +70,31 @@ def select_table_schema(self, path: DbPath) -> str: |
70 | 70 | "SELECT column_name, data_type, datetime_precision, numeric_precision, numeric_scale FROM information_schema.columns " |
71 | 71 | f"WHERE table_name = '{table.lower()}' AND table_schema = '{schema.lower()}'" |
72 | 72 | ) |
| 73 | + |
| 74 | + def select_external_table_schema(self, path: DbPath) -> str: |
| 75 | + schema, table = self._normalize_table_path(path) |
| 76 | + |
| 77 | + return f"""SELECT |
| 78 | + columnname AS column_name |
| 79 | + , CASE WHEN external_type = 'string' THEN 'varchar' ELSE external_type END AS data_type |
| 80 | + , NULL AS datetime_precision |
| 81 | + , NULL AS numeric_precision |
| 82 | + , NULL AS numeric_scale |
| 83 | + FROM svv_external_columns |
| 84 | + WHERE tablename = '{table.lower()}' AND schemaname = '{schema.lower()}' |
| 85 | + """ |
| 86 | + |
| 87 | + def query_external_table_schema(self, path: DbPath) -> Dict[str, tuple]: |
| 88 | + rows = self.query(self.select_external_table_schema(path), list) |
| 89 | + if not rows: |
| 90 | + raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns") |
| 91 | + |
| 92 | + d = {r[0]: r for r in rows} |
| 93 | + assert len(d) == len(rows) |
| 94 | + return d |
| 95 | + |
| 96 | + def query_table_schema(self, path: DbPath) -> Dict[str, tuple]: |
| 97 | + try: |
| 98 | + return super().query_table_schema(path) |
| 99 | + except RuntimeError: |
| 100 | + return self.query_external_table_schema(path) |
0 commit comments