Skip to content

Commit da8c50d

Browse files
authored
Add pagination for snowflake usage and lineage queries sql (open-metadata#23781)
* Add pagination for snowflake usage and lineage queries sql * py_format
1 parent 4708c2b commit da8c50d

4 files changed

Lines changed: 155 additions & 23 deletions

File tree

ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
SNOWFLAKE_SQL_STATEMENT,
2323
)
2424
from metadata.ingestion.source.database.snowflake.query_parser import (
25+
SNOWFLAKE_QUERY_BATCH_SIZE,
2526
SnowflakeQueryParserSource,
2627
)
2728
from metadata.ingestion.source.database.stored_procedures_mixin import (
@@ -72,26 +73,48 @@ def yield_table_query(self) -> Iterator[TableQuery]:
7273
yield a TableQuery with query parsing info
7374
"""
7475
for engine in self.get_engine():
75-
rows = []
76-
with engine.connect() as conn:
77-
rows = conn.execution_options(
78-
stream_results=True, max_row_buffer=100
79-
).execute(
80-
self.get_sql_statement(start_time=self.start, end_time=self.end)
76+
offset = 0
77+
total_fetched = 0
78+
max_results = self.source_config.resultLimit
79+
while total_fetched < max_results:
80+
batch_size = min(
81+
SNOWFLAKE_QUERY_BATCH_SIZE, max_results - total_fetched
8182
)
82-
# exit from active connection after fetching rows & during
83-
# further process of `yield_query_lineage`
84-
for row in rows:
85-
query_dict = dict(row)
86-
query_dict.update({k.lower(): v for k, v in query_dict.items()})
87-
try:
88-
yield TableQuery(
89-
dialect=self.dialect.value,
90-
query=query_dict["query_text"],
91-
databaseName=self.get_database_name(query_dict),
92-
serviceName=self.config.serviceName,
93-
databaseSchema=self.get_schema_name(query_dict),
83+
rows = []
84+
row_count = 0
85+
with engine.connect() as conn:
86+
rows = conn.execution_options(
87+
stream_results=True, max_row_buffer=100
88+
).execute(
89+
self.get_sql_statement(
90+
start_time=self.start,
91+
end_time=self.end,
92+
offset=offset,
93+
limit=batch_size,
94+
)
9495
)
95-
except Exception as exc:
96-
logger.debug(traceback.format_exc())
97-
logger.warning(f"Error processing query_dict {query_dict}: {exc}")
96+
for row in rows:
97+
query_dict = dict(row)
98+
query_dict.update({k.lower(): v for k, v in query_dict.items()})
99+
row_count += 1
100+
try:
101+
yield TableQuery(
102+
dialect=self.dialect.value,
103+
query=query_dict["query_text"],
104+
databaseName=self.get_database_name(query_dict),
105+
serviceName=self.config.serviceName,
106+
databaseSchema=self.get_schema_name(query_dict),
107+
)
108+
except Exception as exc:
109+
logger.debug(traceback.format_exc())
110+
logger.warning(
111+
f"Error processing query_dict {query_dict}: {exc}"
112+
)
113+
total_fetched += row_count
114+
if row_count < batch_size:
115+
break
116+
offset += batch_size
117+
logger.info(
118+
f"Fetching next page with offset {offset} (fetched {total_fetched}/{max_results}) "
119+
f"for lineage queries"
120+
)

ingestion/src/metadata/ingestion/source/database/snowflake/queries.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
AND query_text NOT LIKE '/* {{"app": "dbt", %%}} */%%'
3232
AND start_time between to_timestamp_ltz('{start_time}') and to_timestamp_ltz('{end_time}')
3333
{filters}
34+
ORDER BY start_time
3435
LIMIT {result_limit}
36+
OFFSET {offset}
3537
"""
3638
)
3739

ingestion/src/metadata/ingestion/source/database/snowflake/query_parser.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
logger = ingestion_logger()
3434
SNOWFLAKE_ABORTED_CODE = "1969"
35+
SNOWFLAKE_QUERY_BATCH_SIZE = 1000
3536

3637

3738
class SnowflakeQueryParserSource(QueryParserSource, ABC):
@@ -51,18 +52,27 @@ def create(
5152
)
5253
return cls(config, metadata)
5354

54-
def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str:
55+
def get_sql_statement(
56+
self,
57+
start_time: datetime,
58+
end_time: datetime,
59+
offset: int = 0,
60+
limit: int = None,
61+
) -> str:
5562
"""
5663
returns sql statement to fetch query logs
5764
"""
65+
if limit is None:
66+
limit = self.config.sourceConfig.config.resultLimit
5867
return self.sql_stmt.format(
5968
start_time=start_time,
6069
end_time=end_time,
61-
result_limit=self.config.sourceConfig.config.resultLimit,
70+
result_limit=limit,
6271
filters=self.get_filters(),
6372
account_usage=self.service_connection.accountUsageSchema,
6473
credit_cost=self.service_connection.creditCost
6574
* self.service_connection.creditCost,
75+
offset=offset,
6676
)
6777

6878
def check_life_cycle_query(

ingestion/src/metadata/ingestion/source/database/snowflake/usage.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,22 @@
1111
"""
1212
Snowflake usage module
1313
"""
14+
import traceback
15+
from datetime import timedelta
16+
from typing import Iterable
17+
18+
from metadata.generated.schema.type.basic import DateTime
19+
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
20+
from metadata.ingestion.lineage.masker import mask_query
1421
from metadata.ingestion.source.database.snowflake.queries import SNOWFLAKE_SQL_STATEMENT
1522
from metadata.ingestion.source.database.snowflake.query_parser import (
23+
SNOWFLAKE_QUERY_BATCH_SIZE,
1624
SnowflakeQueryParserSource,
1725
)
1826
from metadata.ingestion.source.database.usage_source import UsageSource
27+
from metadata.utils.logger import ingestion_logger
28+
29+
logger = ingestion_logger()
1930

2031

2132
class SnowflakeUsageSource(SnowflakeQueryParserSource, UsageSource):
@@ -41,3 +52,89 @@ class SnowflakeUsageSource(SnowflakeQueryParserSource, UsageSource):
4152
"INSERT",
4253
"MERGE",
4354
]
55+
56+
def yield_table_queries(self) -> Iterable[TableQuery]:
57+
"""
58+
Given an Engine, iterate over the day range and
59+
query the results with pagination
60+
"""
61+
daydiff = self.end - self.start
62+
for days in range(daydiff.days):
63+
logger.info(
64+
f"Scanning query logs for {(self.start + timedelta(days=days)).date()} - "
65+
f"{(self.start + timedelta(days=days + 1)).date()}"
66+
)
67+
query = None
68+
offset = 0
69+
total_fetched = 0
70+
max_results = self.source_config.resultLimit
71+
try:
72+
for engine in self.get_engine():
73+
while total_fetched < max_results:
74+
batch_size = min(
75+
SNOWFLAKE_QUERY_BATCH_SIZE, max_results - total_fetched
76+
)
77+
query = self.get_sql_statement(
78+
start_time=self.start + timedelta(days=days),
79+
end_time=self.start + timedelta(days=days + 1),
80+
offset=offset,
81+
limit=batch_size,
82+
)
83+
with engine.connect() as conn:
84+
rows = conn.execute(query)
85+
queries = []
86+
row_count = 0
87+
for row in rows:
88+
row = dict(row)
89+
row_count += 1
90+
try:
91+
row.update({k.lower(): v for k, v in row.items()})
92+
logger.debug(f"Processing row: {row}")
93+
query_type = row.get("query_type")
94+
query_text = self.format_query(row["query_text"])
95+
queries.append(
96+
TableQuery(
97+
query=query_text,
98+
query_type=query_type,
99+
exclude_usage=self.check_life_cycle_query(
100+
query_type=query_type,
101+
query_text=query_text,
102+
),
103+
dialect=self.dialect.value,
104+
userName=row["user_name"],
105+
startTime=str(row["start_time"]),
106+
endTime=str(row["end_time"]),
107+
analysisDate=DateTime(row["start_time"]),
108+
aborted=self.get_aborted_status(row),
109+
databaseName=self.get_database_name(row),
110+
duration=row.get("duration"),
111+
serviceName=self.config.serviceName,
112+
databaseSchema=self.get_schema_name(row),
113+
cost=row.get("cost"),
114+
)
115+
)
116+
except Exception as exc:
117+
logger.debug(traceback.format_exc())
118+
logger.warning(
119+
f"Unexpected exception processing row [{row}]: {exc}"
120+
)
121+
if queries:
122+
yield TableQueries(queries=queries)
123+
total_fetched += row_count
124+
if row_count < batch_size:
125+
break
126+
offset += batch_size
127+
logger.info(
128+
f"Fetching next page with offset {offset} (fetched {total_fetched}/{max_results}) "
129+
f"for {(self.start + timedelta(days=days)).date()}"
130+
)
131+
except Exception as exc:
132+
if query:
133+
logger.debug(
134+
(
135+
f"###### USAGE QUERY #######\n{mask_query(query, self.dialect.value) or query}"
136+
"\n##########################"
137+
)
138+
)
139+
logger.debug(traceback.format_exc())
140+
logger.error(f"Source usage processing error: {exc}")

0 commit comments

Comments
 (0)