@@ -298,3 +298,135 @@ def test_load_microbatch_required_only(
298298 column = exp .to_column ("ds" , quoted = True ), format = "%Y-%m-%d"
299299 )
300300 assert model .kind .batch_size is None
301+
302+
303+ @pytest .mark .slow
304+ def test_load_microbatch_with_ref (
305+ tmp_path : Path , caplog , dbt_dummy_postgres_config : PostgresConfig , create_empty_project
306+ ) -> None :
307+ yaml = YAML ()
308+ project_dir , model_dir = create_empty_project ()
309+ source_schema = {
310+ "version" : 2 ,
311+ "sources" : [
312+ {
313+ "name" : "my_source" ,
314+ "tables" : [{"name" : "my_table" , "config" : {"event_time" : "ds" }}],
315+ }
316+ ],
317+ }
318+ source_schema_file = model_dir / "source_schema.yml"
319+ with open (source_schema_file , "w" , encoding = "utf-8" ) as f :
320+ yaml .dump (source_schema , f )
321+ # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it
322+ microbatch_contents = """
323+ {{
324+ config(
325+ materialized='incremental',
326+ incremental_strategy='microbatch',
327+ event_time='ds',
328+ begin='2020-01-01',
329+ batch_size='day'
330+ )
331+ }}
332+
333+ SELECT cola, ds FROM {{ source('my_source', 'my_table') }}
334+ """
335+ microbatch_model_file = model_dir / "microbatch.sql"
336+ with open (microbatch_model_file , "w" , encoding = "utf-8" ) as f :
337+ f .write (microbatch_contents )
338+
339+ microbatch_two_contents = """
340+ {{
341+ config(
342+ materialized='incremental',
343+ incremental_strategy='microbatch',
344+ event_time='ds',
345+ begin='2020-01-05',
346+ batch_size='day'
347+ )
348+ }}
349+
350+ SELECT cola, ds FROM {{ ref('microbatch') }}
351+ """
352+ microbatch_two_model_file = model_dir / "microbatch_two.sql"
353+ with open (microbatch_two_model_file , "w" , encoding = "utf-8" ) as f :
354+ f .write (microbatch_two_contents )
355+
356+ microbatch_snapshot_fqn = '"local"."main"."microbatch"'
357+ microbatch_two_snapshot_fqn = '"local"."main"."microbatch_two"'
358+ context = Context (paths = project_dir )
359+ assert (
360+ context .render (microbatch_snapshot_fqn , start = "2025-01-01" , end = "2025-01-10" ).sql ()
361+ == 'SELECT "cola" AS "cola", "ds" AS "ds" FROM (SELECT * FROM "local"."my_source"."my_table" AS "my_table" WHERE "ds" BETWEEN \' 2025-01-01 00:00:00+00:00\' AND \' 2025-01-10 23:59:59.999999+00:00\' ) AS "_q_0"'
362+ )
363+ assert (
364+ context .render (microbatch_two_snapshot_fqn , start = "2025-01-01" , end = "2025-01-10" ).sql ()
365+ == 'SELECT "_q_0"."cola" AS "cola", "_q_0"."ds" AS "ds" FROM (SELECT "microbatch"."cola" AS "cola", "microbatch"."ds" AS "ds" FROM "local"."main"."microbatch" AS "microbatch" WHERE "microbatch"."ds" <= \' 2025-01-10 23:59:59.999999+00:00\' AND "microbatch"."ds" >= \' 2025-01-01 00:00:00+00:00\' ) AS "_q_0"'
366+ )
367+
368+
369+ @pytest .mark .slow
370+ def test_load_microbatch_with_ref_no_filter (
371+ tmp_path : Path , caplog , dbt_dummy_postgres_config : PostgresConfig , create_empty_project
372+ ) -> None :
373+ yaml = YAML ()
374+ project_dir , model_dir = create_empty_project ()
375+ source_schema = {
376+ "version" : 2 ,
377+ "sources" : [
378+ {
379+ "name" : "my_source" ,
380+ "tables" : [{"name" : "my_table" , "config" : {"event_time" : "ds" }}],
381+ }
382+ ],
383+ }
384+ source_schema_file = model_dir / "source_schema.yml"
385+ with open (source_schema_file , "w" , encoding = "utf-8" ) as f :
386+ yaml .dump (source_schema , f )
387+ # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it
388+ microbatch_contents = """
389+ {{
390+ config(
391+ materialized='incremental',
392+ incremental_strategy='microbatch',
393+ event_time='ds',
394+ begin='2020-01-01',
395+ batch_size='day'
396+ )
397+ }}
398+
399+ SELECT cola, ds FROM {{ source('my_source', 'my_table').render() }}
400+ """
401+ microbatch_model_file = model_dir / "microbatch.sql"
402+ with open (microbatch_model_file , "w" , encoding = "utf-8" ) as f :
403+ f .write (microbatch_contents )
404+
405+ microbatch_two_contents = """
406+ {{
407+ config(
408+ materialized='incremental',
409+ incremental_strategy='microbatch',
410+ event_time='ds',
411+ begin='2020-01-01',
412+ batch_size='day'
413+ )
414+ }}
415+
416+ SELECT cola, ds FROM {{ ref('microbatch').render() }}
417+ """
418+ microbatch_two_model_file = model_dir / "microbatch_two.sql"
419+ with open (microbatch_two_model_file , "w" , encoding = "utf-8" ) as f :
420+ f .write (microbatch_two_contents )
421+
422+ microbatch_snapshot_fqn = '"local"."main"."microbatch"'
423+ microbatch_two_snapshot_fqn = '"local"."main"."microbatch_two"'
424+ context = Context (paths = project_dir )
425+ assert (
426+ context .render (microbatch_snapshot_fqn , start = "2025-01-01" , end = "2025-01-10" ).sql ()
427+ == 'SELECT "cola" AS "cola", "ds" AS "ds" FROM "local"."my_source"."my_table" AS "my_table"'
428+ )
429+ assert (
430+ context .render (microbatch_two_snapshot_fqn , start = "2025-01-01" , end = "2025-01-10" ).sql ()
431+ == 'SELECT "microbatch"."cola" AS "cola", "microbatch"."ds" AS "ds" FROM "local"."main"."microbatch" AS "microbatch"'
432+ )
0 commit comments