11# -*- coding: utf-8 -*-
2- import posixpath
32import ydb
43import basic_example_data
54
6- # Table path prefix allows to put the working tables into the specific directory
7- # inside the YDB database. Putting `PRAGMA TablePathPrefix("some/path")`
8- # at the beginning of the query allows to reference the tables through
9- # their names "under" the specified directory.
10- #
11- # TablePathPrefix has to be defined as an absolute path, which has to be started
12- # with the current database location.
13- #
14- # https://ydb.tech/ru/docs/yql/reference/syntax/pragma#table-path-prefix
15-
16- DropTablesQuery = """PRAGMA TablePathPrefix("{}");
5+
6+ DropTablesQuery = """
177DROP TABLE IF EXISTS series;
188DROP TABLE IF EXISTS seasons;
199DROP TABLE IF EXISTS episodes;
2010"""
2111
22- FillDataQuery = """PRAGMA TablePathPrefix("{}");
23-
12+ FillDataQuery = """
2413DECLARE $seriesData AS List<Struct<
2514 series_id: Int64,
2615 title: Utf8,
6958"""
7059
7160
72- def fill_tables_with_data (pool : ydb .QuerySessionPool , path : str ):
61+ def fill_tables_with_data (pool : ydb .QuerySessionPool ):
7362 print ("\n Filling tables with data..." )
7463
75- query = FillDataQuery .format (path )
76-
7764 pool .execute_with_retries (
78- query ,
65+ FillDataQuery ,
7966 {
8067 "$seriesData" : (basic_example_data .get_series_data (), basic_example_data .get_series_data_type ()),
8168 "$seasonsData" : (basic_example_data .get_seasons_data (), basic_example_data .get_seasons_data_type ()),
@@ -84,11 +71,10 @@ def fill_tables_with_data(pool: ydb.QuerySessionPool, path: str):
8471 )
8572
8673
87- def select_simple (pool : ydb .QuerySessionPool , path : str ):
74+ def select_simple (pool : ydb .QuerySessionPool ):
8875 print ("\n Check series table..." )
8976 result_sets = pool .execute_with_retries (
90- f"""
91- PRAGMA TablePathPrefix("{ path } ");
77+ """
9278 SELECT
9379 series_id,
9480 title,
@@ -111,21 +97,23 @@ def select_simple(pool: ydb.QuerySessionPool, path: str):
11197 return first_set
11298
11399
114- def upsert_simple (pool : ydb .QuerySessionPool , path : str ):
100+ def upsert_simple (pool : ydb .QuerySessionPool ):
115101 print ("\n Performing UPSERT into episodes..." )
116102
117103 pool .execute_with_retries (
118- f"""
119- PRAGMA TablePathPrefix("{ path } ");
104+ """
120105 UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD");
121106 """
122107 )
123108
124109
125- def select_with_parameters (pool : ydb .QuerySessionPool , path : str , series_id , season_id , episode_id ):
110+ def select_with_parameters (pool : ydb .QuerySessionPool , series_id , season_id , episode_id ):
126111 result_sets = pool .execute_with_retries (
127- f"""
128- PRAGMA TablePathPrefix("{ path } ");
112+ """
113+ DECLARE $seriesId AS Int64;
114+ DECLARE $seasonId AS Int64;
115+ DECLARE $episodeId AS Int64;
116+
129117 SELECT
130118 title,
131119 air_date
@@ -151,10 +139,13 @@ def select_with_parameters(pool: ydb.QuerySessionPool, path: str, series_id, sea
151139# In most cases it's better to use transaction control settings in session.transaction
152140# calls instead to avoid additional hops to YDB cluster and allow more efficient
153141# execution of queries.
154- def explicit_transaction_control (pool : ydb .QuerySessionPool , path : str , series_id , season_id , episode_id ):
142+ def explicit_transaction_control (pool : ydb .QuerySessionPool , series_id , season_id , episode_id ):
155143 def callee (session : ydb .QuerySessionSync ):
156- query = f"""
157- PRAGMA TablePathPrefix("{ path } ");
144+ query = """
145+ DECLARE $seriesId AS Int64;
146+ DECLARE $seasonId AS Int64;
147+ DECLARE $episodeId AS Int64;
148+
158149 UPDATE episodes
159150 SET air_date = CurrentUtcDate()
160151 WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId;
@@ -183,12 +174,9 @@ def callee(session: ydb.QuerySessionSync):
183174 return pool .retry_operation_sync (callee )
184175
185176
186- def huge_select (pool : ydb .QuerySessionPool , path : str ):
177+ def huge_select (pool : ydb .QuerySessionPool ):
187178 def callee (session : ydb .QuerySessionSync ):
188- query = f"""
189- PRAGMA TablePathPrefix("{ path } ");
190- SELECT * from episodes;
191- """
179+ query = """SELECT * from episodes;"""
192180
193181 with session .transaction ().execute (
194182 query ,
@@ -202,16 +190,15 @@ def callee(session: ydb.QuerySessionSync):
202190 return pool .retry_operation_sync (callee )
203191
204192
205- def drop_tables (pool : ydb .QuerySessionPool , path : str ):
193+ def drop_tables (pool : ydb .QuerySessionPool ):
206194 print ("\n Cleaning up existing tables..." )
207- pool .execute_with_retries (DropTablesQuery . format ( path ) )
195+ pool .execute_with_retries (DropTablesQuery )
208196
209197
210- def create_tables (pool : ydb .QuerySessionPool , path : str ):
198+ def create_tables (pool : ydb .QuerySessionPool ):
211199 print ("\n Creating table series..." )
212200 pool .execute_with_retries (
213- f"""
214- PRAGMA TablePathPrefix("{ path } ");
201+ """
215202 CREATE table `series` (
216203 `series_id` Int64,
217204 `title` Utf8,
@@ -224,8 +211,7 @@ def create_tables(pool: ydb.QuerySessionPool, path: str):
224211
225212 print ("\n Creating table seasons..." )
226213 pool .execute_with_retries (
227- f"""
228- PRAGMA TablePathPrefix("{ path } ");
214+ """
229215 CREATE table `seasons` (
230216 `series_id` Int64,
231217 `season_id` Int64,
@@ -239,8 +225,7 @@ def create_tables(pool: ydb.QuerySessionPool, path: str):
239225
240226 print ("\n Creating table episodes..." )
241227 pool .execute_with_retries (
242- f"""
243- PRAGMA TablePathPrefix("{ path } ");
228+ """
244229 CREATE table `episodes` (
245230 `series_id` Int64,
246231 `season_id` Int64,
@@ -253,29 +238,7 @@ def create_tables(pool: ydb.QuerySessionPool, path: str):
253238 )
254239
255240
256- def is_directory_exists (driver : ydb .Driver , path : str ):
257- try :
258- return driver .scheme_client .describe_path (path ).is_directory ()
259- except ydb .SchemeError :
260- return False
261-
262-
263- def ensure_path_exists (driver , database , path ):
264- paths_to_create = list ()
265- path = path .rstrip ("/" )
266- while path not in ("" , database ):
267- full_path = posixpath .join (database , path )
268- if is_directory_exists (driver , full_path ):
269- break
270- paths_to_create .append (full_path )
271- path = posixpath .dirname (path ).rstrip ("/" )
272-
273- while len (paths_to_create ) > 0 :
274- full_path = paths_to_create .pop (- 1 )
275- driver .scheme_client .make_directory (full_path )
276-
277-
278- def run (endpoint , database , path ):
241+ def run (endpoint , database ):
279242 with ydb .Driver (
280243 endpoint = endpoint ,
281244 database = database ,
@@ -284,26 +247,19 @@ def run(endpoint, database, path):
284247 driver .wait (timeout = 5 , fail_fast = True )
285248
286249 with ydb .QuerySessionPool (driver ) as pool :
250+ drop_tables (pool )
287251
288- ensure_path_exists (driver , database , path )
289-
290- # absolute path - prefix to the table's names,
291- # including the database location
292- full_path = posixpath .join (database , path )
293-
294- drop_tables (pool , full_path )
295-
296- create_tables (pool , full_path )
252+ create_tables (pool )
297253
298- fill_tables_with_data (pool , full_path )
254+ fill_tables_with_data (pool )
299255
300- select_simple (pool , full_path )
256+ select_simple (pool )
301257
302- upsert_simple (pool , full_path )
258+ upsert_simple (pool )
303259
304- select_with_parameters (pool , full_path , 2 , 3 , 7 )
305- select_with_parameters (pool , full_path , 2 , 3 , 8 )
260+ select_with_parameters (pool , 2 , 3 , 7 )
261+ select_with_parameters (pool , 2 , 3 , 8 )
306262
307- explicit_transaction_control (pool , full_path , 2 , 6 , 1 )
308- select_with_parameters (pool , full_path , 2 , 6 , 1 )
309- huge_select (pool , full_path )
263+ explicit_transaction_control (pool , 2 , 6 , 1 )
264+ select_with_parameters (pool , 2 , 6 , 1 )
265+ huge_select (pool )
0 commit comments