-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfunctional_pipeline.py
More file actions
69 lines (50 loc) · 1.73 KB
/
functional_pipeline.py
File metadata and controls
69 lines (50 loc) · 1.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
Functional pipeline example for FMTools.
Demonstrates both pure functional transforms and schema extraction steps.
"""
from __future__ import annotations
import asyncio
from examples._support import AppleFMSetupError, raise_sdk_setup_error, require_apple_fm
try:
import apple_fm_sdk as fm
except Exception as exc: # pragma: no cover - import error path
raise_sdk_setup_error("functional_pipeline.py", exc)
try:
from fmtools.functional import collect, extract, filter_fn, map_fn, pipe, source
except Exception as exc: # pragma: no cover - import error path
raise_sdk_setup_error("functional_pipeline.py", exc)
@fm.generable()
class Person:
name: str = fm.guide(description="Person name")
age: int = fm.guide(description="Estimated age as an integer")
async def run_transform_only_pipeline() -> None:
result = await pipe(
source(range(1, 11)),
map_fn(lambda n: n * 3),
filter_fn(lambda n: n % 2 == 0),
collect(),
)()
print("Transform-only result:", result)
async def run_extraction_pipeline() -> None:
require_apple_fm("functional_pipeline.py")
text_rows = [
"Alice is 31 and works in product engineering.",
"Bob is 27 and runs operations.",
]
people = await pipe(
source(text_rows),
extract(Person, instructions="Extract person name and integer age."),
collect(),
)()
print("Extraction result:")
for person in people:
print(f" - {person}")
async def main() -> None:
await run_transform_only_pipeline()
await run_extraction_pipeline()
if __name__ == "__main__":
try:
asyncio.run(main())
except AppleFMSetupError as exc:
print(exc)
raise SystemExit(2) from exc