Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions reverse_engineering/helpers/fetchRequestHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,12 @@ const fetchDatabaseViewsNames = ({ dbName, connectionInfo, logger }) =>
executeCommand({ connectionInfo, command: `SHOW VIEWS IN \`${dbName}\``, logger });

const fetchDatabaseViewsNamesViaPython = ({ dbName, connectionInfo, logger }) =>
executeCommand({ connectionInfo, command: getViewNamesCommand(dbName), language: SPARK_LANGUAGE.python, logger });
executeCommand({
connectionInfo,
command: getViewNamesCommand({ dbName }),
language: SPARK_LANGUAGE.python,
logger,
});

const fetchClusterTablesNames = ({ dbName, connectionInfo, logger }) =>
executeCommand({ connectionInfo, command: `SHOW TABLES IN \`${dbName}\``, logger });
Expand Down Expand Up @@ -491,7 +496,7 @@ const fetchClusterFieldMetadataInBatches = async ({ columnPlan, connectionInfo,

const rows = await async.mapLimit(tasks, 10, async ({ dbName, tableName, batch }) => {
const columnsJson = JSON.stringify(batch);
const command = getClusterFieldMetadataBatch(dbName, tableName, columnsJson);
const command = getClusterFieldMetadataBatch({ dbName, tableName, columnsJson });
const out = await executeCommand({ connectionInfo, command, language: SPARK_LANGUAGE.python, logger });
const parsed = JSON.parse(coercePythonNotebookOutput(out));
return { dbName, tableName, table: parsed };
Expand Down Expand Up @@ -522,21 +527,13 @@ const fetchClusterFieldMetadataInBatches = async ({ columnPlan, connectionInfo,
return clusterData;
};

const fetchFieldMetadataBatched = async (
databasesNames,
collectionsNames,
connectionInfo,
logger,
previousData = {},
) => {
const { tableNames, dbNames } = prepareNamesForInsertionIntoScalaCode(databasesNames, collectionsNames);

const columnListCommand = getClusterColumnNames(tableNames.join(', '), dbNames.join(', '));
const fetchFieldMetadataBatched = async (namesJoined, connectionInfo, logger, previousData = {}) => {
const columnListCommand = getClusterColumnNames(namesJoined);

logger.log(
'info',
'',
`Start retrieving tables info (batched): \nDatabases: ${dbNames.join(', ')} \nTables: ${tableNames.join(', ')}`,
`Start retrieving tables info (batched): \nDatabases: ${namesJoined.databasesNames} \nTables: ${namesJoined.tablesNames}`,
);

const namesRaw = await executeCommand({
Expand Down Expand Up @@ -571,15 +568,23 @@ const fetchFieldMetadataBatched = async (
const fetchFieldMetadata = async (databasesNames, collectionsNames, connectionInfo, logger, previousData = {}) => {
const { tableNames, dbNames } = prepareNamesForInsertionIntoScalaCode(databasesNames, collectionsNames);

logger.log(
'info',
'',
`Start retrieving tables info: \nDatabases: ${dbNames.join(', ')} \nTables: ${tableNames.join(', ')}`,
);
const tableNamesJoined = tableNames.join(', ');
const dbNamesJoined = dbNames.join(', ');

const getFullClusterInfoCommand = getClusterData(tableNames.join(', '), dbNames.join(', '));
const namesJoined = {
tablesNames: tableNamesJoined,
databasesNames: dbNamesJoined,
};

try {
const getFullClusterInfoCommand = getClusterData(namesJoined);

logger.log(
'info',
'',
`Start retrieving tables info: \nDatabases: ${dbNames.join(', ')} \nTables: ${tableNames.join(', ')}`,
);

const rawOutput = await executeCommand({
connectionInfo,
command: getFullClusterInfoCommand,
Expand All @@ -591,7 +596,7 @@ const fetchFieldMetadata = async (databasesNames, collectionsNames, connectionIn

if (isNotebookOutputTruncation(str)) {
logger.log('info', '', 'Cluster field metadata output truncated; using batched retrieval.');
return fetchFieldMetadataBatched(databasesNames, collectionsNames, connectionInfo, logger, previousData);
return fetchFieldMetadataBatched(namesJoined, connectionInfo, logger, previousData);
}

try {
Expand All @@ -606,7 +611,7 @@ const fetchFieldMetadata = async (databasesNames, collectionsNames, connectionIn
'Single-pass metadata JSON parse failed; using batched retrieval.',
);

return fetchFieldMetadataBatched(databasesNames, collectionsNames, connectionInfo, logger, previousData);
return fetchFieldMetadataBatched(namesJoined, connectionInfo, logger, previousData);
}
} catch (error) {
const msg = stringifyErrorMessage(error);
Expand All @@ -616,7 +621,7 @@ const fetchFieldMetadata = async (databasesNames, collectionsNames, connectionIn
{ message: msg.slice(0, 300) },
'Single-pass cluster metadata failed; using batched retrieval.',
);
return fetchFieldMetadataBatched(databasesNames, collectionsNames, connectionInfo, logger, previousData);
return fetchFieldMetadataBatched(namesJoined, connectionInfo, logger, previousData);
}
throw error;
}
Expand Down Expand Up @@ -707,7 +712,7 @@ const fetchCreateStatementRequest = async (entityName, connectionInfo, logger, d
});

try {
const script = getTableSchemaColumnsForDdlFallback(fullName);
const script = getTableSchemaColumnsForDdlFallback({ fullName });
const raw = await executeCommand({
connectionInfo,
command: script,
Expand Down
22 changes: 13 additions & 9 deletions reverse_engineering/helpers/pythonScriptGeneratorHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const { FIELD_METADATA_STRING_MAX } = require('../../shared/constants');

/** Full cluster field metadata in one notebook exit (preferred when output fits). */
const getClusterData = (tablesNames, databasesNames) => `
const getClusterData = ({ tablesNames, databasesNames, sizeOnly = false }) => `
import json

databasesNames = [${databasesNames}]
Expand Down Expand Up @@ -38,10 +38,14 @@ def getDatabaseMetadata(dbName):

clusterData = { dbName: getDatabaseMetadata(dbName) for dbName in databasesNames }

dbutils.notebook.exit(json.dumps(clusterData))
payload = json.dumps(clusterData)

size_bytes = len(payload.encode("utf-8"))

dbutils.notebook.exit(${sizeOnly ? 'size_bytes' : 'payload'})
`;

const getClusterColumnNames = (tablesNames, databasesNames) => `
const getClusterColumnNames = ({ tablesNames, databasesNames }) => `
import json

databasesNames = [${databasesNames}]
Expand All @@ -68,7 +72,7 @@ dbutils.notebook.exit(json.dumps(cluster_column_names))
* @param {string} tableName
* @param {string} columnsJson - JSON array string ('["a","b"]')
*/
const getClusterFieldMetadataBatch = (dbName, tableName, columnsJson) => `
const getClusterFieldMetadataBatch = ({ dbName, tableName, columnsJson }) => `
import json
_db = ${JSON.stringify(dbName)}
_table = ${JSON.stringify(tableName)}
Expand Down Expand Up @@ -105,21 +109,21 @@ except Exception:
dbutils.notebook.exit(json.dumps(out))
`;

const getViewNamesCommand = databaseName => `
const getViewNamesCommand = ({ dbName }) => `
import json

viewNames = spark.sql("show views in ${databaseName}").rdd.map(lambda p: p.viewName).collect()
viewNames = spark.sql("show views in ${dbName}").rdd.map(lambda p: p.viewName).collect()
dbutils.notebook.exit(json.dumps(viewNames))
`;

/**
* Compact column name + Spark type list for building a minimal CREATE TABLE when SHOW CREATE TABLE output is too large.
* Use a 3-part name (catalog.schema.table) when Unity Catalog applies — the Python command context does not share SQL session catalog state.
* @param {string} fqn - e.g. "hive_metastore.default.my_table" or "default.my_table"
* @param {string} fullName - e.g. "hive_metastore.default.my_table" or "default.my_table"
*/
const getTableSchemaColumnsForDdlFallback = fqn => `
const getTableSchemaColumnsForDdlFallback = ({ fullName }) => `
import json
_fqn = ${JSON.stringify(fqn)}
_fqn = ${JSON.stringify(fullName)}
_cols = [{"name": f.name, "colType": f.dataType.simpleString()} for f in spark.table(_fqn).schema.fields]
dbutils.notebook.exit(json.dumps(_cols))
`;
Expand Down
23 changes: 11 additions & 12 deletions reverse_engineering/helpers/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,21 @@ const splitTableAndViewNames = names => {

const getCount = (count, recordSamplingSettings) => {
const per = recordSamplingSettings.relative.value;
const size =
recordSamplingSettings.active === 'absolute'
? recordSamplingSettings.absolute.value
: Math.round((count / 100) * per);
return size;

return recordSamplingSettings.active === 'absolute'
? recordSamplingSettings.absolute.value
: Math.round((count / 100) * per);
};

const prepareNamesForInsertionIntoScalaCode = (databasesNames, collectionsNames) =>
databasesNames.reduce(
(entities, dbName) => {
const { tables } = splitTableAndViewNames(collectionsNames[dbName]);
const tableNames = tables.map(tableName => `\"${tableName}\"`).join(', ');
const tableNames = tables.map(tableName => `"${tableName}"`).join(', ');

return {
tableNames: [...entities.tableNames, `\"${dbName}\": [${tableNames}]`],
dbNames: databasesNames.map(name => `\"${name}\"`),
tableNames: [...entities.tableNames, `"${dbName}": [${tableNames}]`],
dbNames: databasesNames.map(name => `"${name}"`),
};
},
{ viewNames: [], tableNames: [] },
Expand All @@ -34,8 +33,8 @@ const convertCustomTags = (custom_tags, logger) => {
return Object.keys(custom_tags).reduce((tags, tagKey) => {
return [...tags, { customTagKey: tagKey, customtagvalue: custom_tags[tagKey] }];
}, []);
} catch (e) {
logger.log('error', custom_tags, 'Error converting custom tags');
} catch (err) {
logger.log('error', custom_tags, `Error converting custom tags: ${JSON.stringify(err, null, 2)}`);
return [];
}
};
Expand All @@ -52,7 +51,7 @@ const cleanEntityName = (sparkVersion, name = '') => {

const isSupportGettingListOfViews = (sparkVersionString = '') => {
const MAX_NOT_SUPPORT_VERSION = 6;
const databricksRuntimeMajorVersion = parseInt(sparkVersionString.slice(0, sparkVersionString.indexOf('.')));
const databricksRuntimeMajorVersion = Number.parseInt(sparkVersionString.slice(0, sparkVersionString.indexOf('.')));
return databricksRuntimeMajorVersion > MAX_NOT_SUPPORT_VERSION;
};

Expand All @@ -68,7 +67,7 @@ const getErrorMessage = (error = {}) => {
return error.message || 'Reverse Engineering error';
};

const removeParentheses = string => string.replace(/^\(|\)$/g, '');
const removeParentheses = string => string.replaceAll(/^\(|\)$/g, '');

const getTemplateDocByJsonSchema = schema => {
if (!schema) {
Expand Down
Loading