diff --git a/notebooks/tutorials/01_introduction_to_orcapod.ipynb b/notebooks/tutorials/01_introduction_to_orcapod.ipynb
index ef11dc8..94f24dc 100644
--- a/notebooks/tutorials/01_introduction_to_orcapod.ipynb
+++ b/notebooks/tutorials/01_introduction_to_orcapod.ipynb
@@ -106,6 +106,16 @@
"stream = op.streams.TableStream(table, tag_columns=[\"a\", \"b\"])"
]
},
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "id": "b1e9393d",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "stream = op.sources.DataFrameSource(table, tag_columns=[\"a\", \"b\"])"
+ ]
+ },
{
"cell_type": "markdown",
"id": "93ac78cc",
@@ -124,7 +134,7 @@
},
{
"cell_type": "code",
- "execution_count": 6,
+ "execution_count": 7,
"id": "2d4a0812",
"metadata": {},
"outputs": [
@@ -153,7 +163,7 @@
},
{
"cell_type": "code",
- "execution_count": 7,
+ "execution_count": 8,
"id": "79e67bfc",
"metadata": {},
"outputs": [
@@ -165,7 +175,7 @@
" ({'a': 3, 'b': 'z'}, {'c': True, 'd': 3.3})]"
]
},
- "execution_count": 7,
+ "execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
@@ -175,17 +185,41 @@
]
},
{
- "cell_type": "markdown",
- "id": "20fa500e",
+ "cell_type": "code",
+ "execution_count": 9,
+ "id": "52baee9c",
"metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "DataFrameSource[DataFrameSource]\n",
+ "
\n",
+ "
shape: (3, 4)| *a | *b | c | d |
|---|
| i64 | str | bool | f64 |
| 1 | "x" | true | 1.1 |
| 2 | "y" | false | 2.2 |
| 3 | "z" | true | 3.3 |
"
+ ],
+ "text/plain": [
+ "DataFrameSource"
+ ]
+ },
+ "execution_count": 9,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
"source": [
- "Every stream can be converted into a Polars dataframe with `as_df()` method"
+ "stream"
]
},
{
"cell_type": "code",
- "execution_count": 8,
- "id": "52baee9c",
+ "execution_count": 10,
+ "id": "f83b5062",
"metadata": {},
"outputs": [
{
@@ -213,7 +247,7 @@
"└─────┴─────┴───────┴─────┘"
]
},
- "execution_count": 8,
+ "execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
@@ -240,7 +274,7 @@
},
{
"cell_type": "code",
- "execution_count": 9,
+ "execution_count": 11,
"id": "4648fbe9",
"metadata": {},
"outputs": [
@@ -254,22 +288,22 @@
" white-space: pre-wrap;\n",
"}\n",
"\n",
- "shape: (3, 6)| a | b | c | d | _source_c | _source_d |
|---|
| i64 | str | bool | f64 | str | str |
| 1 | "x" | true | 1.1 | null | null |
| 2 | "y" | false | 2.2 | null | null |
| 3 | "z" | true | 3.3 | null | null |
"
+ "shape: (3, 6)| a | b | c | d | _source_c | _source_d |
|---|
| i64 | str | bool | f64 | str | str |
| 1 | "x" | true | 1.1 | "data_frame:source_5a4c41dbd055… | "data_frame:source_5a4c41dbd055… |
| 2 | "y" | false | 2.2 | "data_frame:source_5a4c41dbd055… | "data_frame:source_5a4c41dbd055… |
| 3 | "z" | true | 3.3 | "data_frame:source_5a4c41dbd055… | "data_frame:source_5a4c41dbd055… |
"
],
"text/plain": [
"shape: (3, 6)\n",
- "┌─────┬─────┬───────┬─────┬───────────┬───────────┐\n",
- "│ a ┆ b ┆ c ┆ d ┆ _source_c ┆ _source_d │\n",
- "│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ str ┆ bool ┆ f64 ┆ str ┆ str │\n",
- "╞═════╪═════╪═══════╪═════╪═══════════╪═══════════╡\n",
- "│ 1 ┆ x ┆ true ┆ 1.1 ┆ null ┆ null │\n",
- "│ 2 ┆ y ┆ false ┆ 2.2 ┆ null ┆ null │\n",
- "│ 3 ┆ z ┆ true ┆ 3.3 ┆ null ┆ null │\n",
- "└─────┴─────┴───────┴─────┴───────────┴───────────┘"
+ "┌─────┬─────┬───────┬─────┬─────────────────────────────────┬─────────────────────────────────┐\n",
+ "│ a ┆ b ┆ c ┆ d ┆ _source_c ┆ _source_d │\n",
+ "│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │\n",
+ "│ i64 ┆ str ┆ bool ┆ f64 ┆ str ┆ str │\n",
+ "╞═════╪═════╪═══════╪═════╪═════════════════════════════════╪═════════════════════════════════╡\n",
+ "│ 1 ┆ x ┆ true ┆ 1.1 ┆ data_frame:source_5a4c41dbd055… ┆ data_frame:source_5a4c41dbd055… │\n",
+ "│ 2 ┆ y ┆ false ┆ 2.2 ┆ data_frame:source_5a4c41dbd055… ┆ data_frame:source_5a4c41dbd055… │\n",
+ "│ 3 ┆ z ┆ true ┆ 3.3 ┆ data_frame:source_5a4c41dbd055… ┆ data_frame:source_5a4c41dbd055… │\n",
+ "└─────┴─────┴───────┴─────┴─────────────────────────────────┴─────────────────────────────────┘"
]
},
- "execution_count": 9,
+ "execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
@@ -288,7 +322,7 @@
},
{
"cell_type": "code",
- "execution_count": 10,
+ "execution_count": 12,
"id": "001b2a9c",
"metadata": {},
"outputs": [
@@ -317,7 +351,7 @@
"└─────┴─────┴───────┴─────┴─────────────────────────────────┘"
]
},
- "execution_count": 10,
+ "execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
@@ -336,7 +370,7 @@
},
{
"cell_type": "code",
- "execution_count": 11,
+ "execution_count": 13,
"id": "d3b9e394",
"metadata": {},
"outputs": [
@@ -365,7 +399,7 @@
"└─────┴─────┴───────┴─────┴─────────────────────────────────┘"
]
},
- "execution_count": 11,
+ "execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
@@ -384,7 +418,7 @@
},
{
"cell_type": "code",
- "execution_count": 12,
+ "execution_count": 14,
"id": "92cbfa50",
"metadata": {},
"outputs": [
@@ -413,7 +447,7 @@
"└─────┴─────┴───────┴─────┴──────────────────┘"
]
},
- "execution_count": 12,
+ "execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
@@ -432,7 +466,7 @@
},
{
"cell_type": "code",
- "execution_count": 13,
+ "execution_count": 15,
"id": "bf6acd59",
"metadata": {},
"outputs": [
@@ -441,7 +475,7 @@
"text/plain": [
"pyarrow.Table\n",
"a: int64\n",
- "b: string\n",
+ "b: large_string\n",
"c: bool\n",
"d: double\n",
"_context_key: large_string\n",
@@ -453,7 +487,7 @@
"_context_key: [[\"std:v0.1:default\",\"std:v0.1:default\",\"std:v0.1:default\"]]"
]
},
- "execution_count": 13,
+ "execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
@@ -488,13 +522,14 @@
},
{
"cell_type": "code",
- "execution_count": 14,
+ "execution_count": 16,
"id": "68bff9fb",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "DataFrameSource[DataFrameSource]\n",
"\n",
- "
shape: (3, 4)| a | b | c | d |
|---|
| i64 | str | bool | f64 |
| 1 | "x" | true | 1.1 |
| 2 | "y" | false | 2.2 |
| 3 | "z" | true | 3.3 |
"
+ "shape: (3, 4)| *a | *b | c | d |
|---|
| i64 | str | bool | f64 |
| 1 | "x" | true | 1.1 |
| 2 | "y" | false | 2.2 |
| 3 | "z" | true | 3.3 |
"
],
"text/plain": [
- "shape: (3, 4)\n",
- "┌─────┬─────┬───────┬─────┐\n",
- "│ a ┆ b ┆ c ┆ d │\n",
- "│ --- ┆ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ str ┆ bool ┆ f64 │\n",
- "╞═════╪═════╪═══════╪═════╡\n",
- "│ 1 ┆ x ┆ true ┆ 1.1 │\n",
- "│ 2 ┆ y ┆ false ┆ 2.2 │\n",
- "│ 3 ┆ z ┆ true ┆ 3.3 │\n",
- "└─────┴─────┴───────┴─────┘"
+ "DataFrameSource"
]
},
- "execution_count": 14,
+ "execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "stream.as_df()"
+ "stream"
]
},
{
"cell_type": "code",
- "execution_count": 15,
+ "execution_count": 17,
"id": "c78096a7",
"metadata": {},
"outputs": [],
@@ -538,7 +564,7 @@
},
{
"cell_type": "code",
- "execution_count": 16,
+ "execution_count": 18,
"id": "6f8a2f0b",
"metadata": {},
"outputs": [],
@@ -548,7 +574,7 @@
},
{
"cell_type": "code",
- "execution_count": 17,
+ "execution_count": 19,
"id": "e1ac13b1",
"metadata": {},
"outputs": [
@@ -558,7 +584,7 @@
"{'a': 1, 'b': 'x'}"
]
},
- "execution_count": 17,
+ "execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
@@ -569,7 +595,7 @@
},
{
"cell_type": "code",
- "execution_count": 18,
+ "execution_count": 20,
"id": "263fa1c5",
"metadata": {},
"outputs": [
@@ -579,7 +605,7 @@
"{'c': True, 'd': 1.1}"
]
},
- "execution_count": 18,
+ "execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
@@ -598,7 +624,7 @@
},
{
"cell_type": "code",
- "execution_count": 19,
+ "execution_count": 21,
"id": "42158816",
"metadata": {},
"outputs": [
@@ -608,7 +634,7 @@
"1"
]
},
- "execution_count": 19,
+ "execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
@@ -619,7 +645,7 @@
},
{
"cell_type": "code",
- "execution_count": 20,
+ "execution_count": 22,
"id": "6a792175",
"metadata": {},
"outputs": [
@@ -629,7 +655,7 @@
"'x'"
]
},
- "execution_count": 20,
+ "execution_count": 22,
"metadata": {},
"output_type": "execute_result"
}
@@ -640,7 +666,7 @@
},
{
"cell_type": "code",
- "execution_count": 21,
+ "execution_count": 23,
"id": "a28f2051",
"metadata": {},
"outputs": [
@@ -650,7 +676,7 @@
"True"
]
},
- "execution_count": 21,
+ "execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
@@ -661,7 +687,7 @@
},
{
"cell_type": "code",
- "execution_count": 22,
+ "execution_count": 24,
"id": "981e6c44",
"metadata": {},
"outputs": [
@@ -671,7 +697,7 @@
"1.1"
]
},
- "execution_count": 22,
+ "execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
@@ -690,7 +716,7 @@
},
{
"cell_type": "code",
- "execution_count": 23,
+ "execution_count": 25,
"id": "56423d2c",
"metadata": {},
"outputs": [
@@ -700,7 +726,7 @@
"{'c': bool, 'd': float}"
]
},
- "execution_count": 23,
+ "execution_count": 25,
"metadata": {},
"output_type": "execute_result"
}
@@ -712,7 +738,7 @@
},
{
"cell_type": "code",
- "execution_count": 24,
+ "execution_count": 26,
"id": "d5e02f81",
"metadata": {},
"outputs": [
@@ -722,7 +748,7 @@
"('c', 'd')"
]
},
- "execution_count": 24,
+ "execution_count": 26,
"metadata": {},
"output_type": "execute_result"
}
@@ -742,7 +768,7 @@
},
{
"cell_type": "code",
- "execution_count": 25,
+ "execution_count": 27,
"id": "b1b18ee4",
"metadata": {},
"outputs": [
@@ -757,7 +783,7 @@
"d: [[1.1]]"
]
},
- "execution_count": 25,
+ "execution_count": 27,
"metadata": {},
"output_type": "execute_result"
}
@@ -766,6 +792,27 @@
"packet.as_table()"
]
},
+ {
+ "cell_type": "code",
+ "execution_count": 28,
+ "id": "1cef6251",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'c': True, 'd': 1.1}"
+ ]
+ },
+ "execution_count": 28,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "packet.as_dict()"
+ ]
+ },
{
"cell_type": "markdown",
"id": "f4e4a38f",
@@ -776,7 +823,7 @@
},
{
"cell_type": "code",
- "execution_count": 26,
+ "execution_count": 29,
"id": "3aa4020e",
"metadata": {},
"outputs": [
@@ -787,7 +834,7 @@
"d: double"
]
},
- "execution_count": 26,
+ "execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
@@ -806,7 +853,7 @@
},
{
"cell_type": "code",
- "execution_count": 27,
+ "execution_count": 30,
"id": "bea6c771",
"metadata": {},
"outputs": [
@@ -816,7 +863,7 @@
"{'a': 1, 'b': 'x'}"
]
},
- "execution_count": 27,
+ "execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
@@ -835,17 +882,18 @@
},
{
"cell_type": "code",
- "execution_count": 28,
+ "execution_count": 31,
"id": "92f00feb",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
- "{'c': None, 'd': None}"
+ "{'c': 'data_frame:source_5a4c41dbd0552e031d5a::row_0::c',\n",
+ " 'd': 'data_frame:source_5a4c41dbd0552e031d5a::row_0::c::d'}"
]
},
- "execution_count": 28,
+ "execution_count": 31,
"metadata": {},
"output_type": "execute_result"
}
@@ -864,17 +912,20 @@
},
{
"cell_type": "code",
- "execution_count": 29,
+ "execution_count": 32,
"id": "bba2bc5c",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
- "{'c': True, 'd': 1.1, '_source_c': None, '_source_d': None}"
+ "{'c': True,\n",
+ " 'd': 1.1,\n",
+ " '_source_c': 'data_frame:source_5a4c41dbd0552e031d5a::row_0::c',\n",
+ " '_source_d': 'data_frame:source_5a4c41dbd0552e031d5a::row_0::c::d'}"
]
},
- "execution_count": 29,
+ "execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
@@ -885,7 +936,7 @@
},
{
"cell_type": "code",
- "execution_count": 30,
+ "execution_count": 33,
"id": "bd09d9d1",
"metadata": {},
"outputs": [
@@ -900,11 +951,11 @@
"----\n",
"c: [[true]]\n",
"d: [[1.1]]\n",
- "_source_c: [[null]]\n",
- "_source_d: [[null]]"
+ "_source_c: [[\"data_frame:source_5a4c41dbd0552e031d5a::row_0::c\"]]\n",
+ "_source_d: [[\"data_frame:source_5a4c41dbd0552e031d5a::row_0::c::d\"]]"
]
},
- "execution_count": 30,
+ "execution_count": 33,
"metadata": {},
"output_type": "execute_result"
}
@@ -923,7 +974,7 @@
},
{
"cell_type": "code",
- "execution_count": 31,
+ "execution_count": 34,
"id": "03219fd3",
"metadata": {},
"outputs": [
@@ -933,7 +984,7 @@
"ContentHash(method='arrow_v0.1', digest=b'n\\x11C\\x89ms\\xd3pux\\x11\\xb5,\\xee\\xea\\x8d\\x1dEl\\xd3\\x02\\x06Ao\\xbf\\x81uN\\x1c\\xeaUh')"
]
},
- "execution_count": 31,
+ "execution_count": 34,
"metadata": {},
"output_type": "execute_result"
}
@@ -968,7 +1019,7 @@
},
{
"cell_type": "code",
- "execution_count": 32,
+ "execution_count": 35,
"id": "11ee5130",
"metadata": {},
"outputs": [],
@@ -995,13 +1046,14 @@
},
{
"cell_type": "code",
- "execution_count": 33,
+ "execution_count": 36,
"id": "73b75816",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "TableStream[TableStream]\n",
"\n",
- "
shape: (3, 3)| id | a | b |
|---|
| i64 | i64 | str |
| 0 | 1 | "x" |
| 1 | 2 | "y" |
| 4 | 3 | "z" |
"
+ "shape: (3, 3)| *id | a | b |
|---|
| i64 | i64 | str |
| 0 | 1 | "x" |
| 1 | 2 | "y" |
| 4 | 3 | "z" |
"
],
"text/plain": [
- "shape: (3, 3)\n",
- "┌─────┬─────┬─────┐\n",
- "│ id ┆ a ┆ b │\n",
- "│ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ i64 ┆ str │\n",
- "╞═════╪═════╪═════╡\n",
- "│ 0 ┆ 1 ┆ x │\n",
- "│ 1 ┆ 2 ┆ y │\n",
- "│ 4 ┆ 3 ┆ z │\n",
- "└─────┴─────┴─────┘"
+ "TableStream(table=['id', 'a', 'b'], tag_columns=('id',))"
]
},
- "execution_count": 33,
+ "execution_count": 36,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "stream1.as_df()"
+ "stream1"
]
},
{
"cell_type": "code",
- "execution_count": 34,
+ "execution_count": 37,
"id": "519754a0",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "TableStream[TableStream]\n",
"\n",
- "
shape: (3, 3)| id | c | d |
|---|
| i64 | bool | f64 |
| 0 | true | 1.1 |
| 1 | false | 2.2 |
| 2 | true | 3.3 |
"
+ "shape: (3, 3)| *id | c | d |
|---|
| i64 | bool | f64 |
| 0 | true | 1.1 |
| 1 | false | 2.2 |
| 2 | true | 3.3 |
"
],
"text/plain": [
- "shape: (3, 3)\n",
- "┌─────┬───────┬─────┐\n",
- "│ id ┆ c ┆ d │\n",
- "│ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ bool ┆ f64 │\n",
- "╞═════╪═══════╪═════╡\n",
- "│ 0 ┆ true ┆ 1.1 │\n",
- "│ 1 ┆ false ┆ 2.2 │\n",
- "│ 2 ┆ true ┆ 3.3 │\n",
- "└─────┴───────┴─────┘"
+ "TableStream(table=['id', 'c', 'd'], tag_columns=('id',))"
]
},
- "execution_count": 34,
+ "execution_count": 37,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "stream2.as_df()"
+ "stream2"
]
},
{
@@ -1083,7 +1118,7 @@
},
{
"cell_type": "code",
- "execution_count": 35,
+ "execution_count": 38,
"id": "8299d4b1",
"metadata": {},
"outputs": [],
@@ -1093,7 +1128,7 @@
},
{
"cell_type": "code",
- "execution_count": 36,
+ "execution_count": 39,
"id": "dfc7ee9f",
"metadata": {},
"outputs": [],
@@ -1119,13 +1154,14 @@
},
{
"cell_type": "code",
- "execution_count": 37,
+ "execution_count": 40,
"id": "48ef0a8f",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "KernelStream[Join]\n",
"\n",
- "
shape: (2, 5)| id | a | b | c | d |
|---|
| i64 | i64 | str | bool | f64 |
| 0 | 1 | "x" | true | 1.1 |
| 1 | 2 | "y" | false | 2.2 |
"
+ "shape: (2, 5)| *id | a | b | c | d |
|---|
| i64 | i64 | str | bool | f64 |
| 0 | 1 | "x" | true | 1.1 |
| 1 | 2 | "y" | false | 2.2 |
"
],
"text/plain": [
- "shape: (2, 5)\n",
- "┌─────┬─────┬─────┬───────┬─────┐\n",
- "│ id ┆ a ┆ b ┆ c ┆ d │\n",
- "│ --- ┆ --- ┆ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ i64 ┆ str ┆ bool ┆ f64 │\n",
- "╞═════╪═════╪═════╪═══════╪═════╡\n",
- "│ 0 ┆ 1 ┆ x ┆ true ┆ 1.1 │\n",
- "│ 1 ┆ 2 ┆ y ┆ false ┆ 2.2 │\n",
- "└─────┴─────┴─────┴───────┴─────┘"
+ "KernelStream(kernel=Join, upstreams=(TableStream(table=['id', 'a', 'b'], tag_columns=('id',)), TableStream(table=['id', 'c', 'd'], tag_columns=('id',))))"
]
},
- "execution_count": 37,
+ "execution_count": 40,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "joined_stream.as_df()"
+ "joined_stream"
]
},
{
@@ -1174,13 +1202,14 @@
},
{
"cell_type": "code",
- "execution_count": 38,
+ "execution_count": 41,
"id": "fbc58246",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "KernelStream[Join]\n",
"\n",
- "
shape: (2, 5)| id | a | b | c | d |
|---|
| i64 | i64 | str | bool | f64 |
| 0 | 1 | "x" | true | 1.1 |
| 1 | 2 | "y" | false | 2.2 |
"
+ "shape: (2, 5)| *id | a | b | c | d |
|---|
| i64 | i64 | str | bool | f64 |
| 0 | 1 | "x" | true | 1.1 |
| 1 | 2 | "y" | false | 2.2 |
"
],
"text/plain": [
- "shape: (2, 5)\n",
- "┌─────┬─────┬─────┬───────┬─────┐\n",
- "│ id ┆ a ┆ b ┆ c ┆ d │\n",
- "│ --- ┆ --- ┆ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ i64 ┆ str ┆ bool ┆ f64 │\n",
- "╞═════╪═════╪═════╪═══════╪═════╡\n",
- "│ 0 ┆ 1 ┆ x ┆ true ┆ 1.1 │\n",
- "│ 1 ┆ 2 ┆ y ┆ false ┆ 2.2 │\n",
- "└─────┴─────┴─────┴───────┴─────┘"
+ "KernelStream(kernel=Join, upstreams=(TableStream(table=['id', 'a', 'b'], tag_columns=('id',)), TableStream(table=['id', 'c', 'd'], tag_columns=('id',))))"
]
},
- "execution_count": 38,
+ "execution_count": 41,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "stream1.join(stream2).as_df()"
+ "stream1.join(stream2)"
]
},
{
"cell_type": "code",
- "execution_count": 39,
+ "execution_count": 42,
"id": "c6b0b571",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "KernelStream[SemiJoin]\n",
""
+ "shape: (2, 3)"
],
"text/plain": [
- "shape: (2, 3)\n",
- "┌─────┬─────┬─────┐\n",
- "│ id ┆ a ┆ b │\n",
- "│ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ i64 ┆ str │\n",
- "╞═════╪═════╪═════╡\n",
- "│ 0 ┆ 1 ┆ x │\n",
- "│ 1 ┆ 2 ┆ y │\n",
- "└─────┴─────┴─────┘"
+ "KernelStream(kernel=SemiJoin, upstreams=(TableStream(table=['id', 'a', 'b'], tag_columns=('id',)), TableStream(table=['id', 'c', 'd'], tag_columns=('id',))))"
]
},
- "execution_count": 39,
+ "execution_count": 42,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "stream1.semi_join(stream2).as_df()"
+ "stream1.semi_join(stream2)"
]
},
{
"cell_type": "code",
- "execution_count": 40,
+ "execution_count": 43,
"id": "5be42490",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "KernelStream[MapPackets]\n",
"\n",
- "
shape: (3, 3)| id | a_mapped | b |
|---|
| i64 | i64 | str |
| 0 | 1 | "x" |
| 1 | 2 | "y" |
| 4 | 3 | "z" |
"
+ "shape: (3, 2)"
],
"text/plain": [
- "shape: (3, 3)\n",
- "┌─────┬──────────┬─────┐\n",
- "│ id ┆ a_mapped ┆ b │\n",
- "│ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ i64 ┆ str │\n",
- "╞═════╪══════════╪═════╡\n",
- "│ 0 ┆ 1 ┆ x │\n",
- "│ 1 ┆ 2 ┆ y │\n",
- "│ 4 ┆ 3 ┆ z │\n",
- "└─────┴──────────┴─────┘"
+ "KernelStream(kernel=MapPackets, upstreams=(TableStream(table=['id', 'a', 'b'], tag_columns=('id',)),))"
]
},
- "execution_count": 40,
+ "execution_count": 43,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "stream1.map_packets({\"a\": \"a_mapped\"}).as_df()"
+ "stream1.map_packets({\"a\": \"a_mapped\"})"
]
},
{
"cell_type": "code",
- "execution_count": 41,
+ "execution_count": 44,
"id": "c9c98304",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "KernelStream[MapTags]\n",
"\n",
- "
shape: (3, 3)| name | a | b |
|---|
| i64 | i64 | str |
| 0 | 1 | "x" |
| 1 | 2 | "y" |
| 4 | 3 | "z" |
"
+ "shape: (3, 3)| *name | a | b |
|---|
| i64 | i64 | str |
| 0 | 1 | "x" |
| 1 | 2 | "y" |
| 4 | 3 | "z" |
"
],
"text/plain": [
- "shape: (3, 3)\n",
- "┌──────┬─────┬─────┐\n",
- "│ name ┆ a ┆ b │\n",
- "│ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ i64 ┆ str │\n",
- "╞══════╪═════╪═════╡\n",
- "│ 0 ┆ 1 ┆ x │\n",
- "│ 1 ┆ 2 ┆ y │\n",
- "│ 4 ┆ 3 ┆ z │\n",
- "└──────┴─────┴─────┘"
+ "KernelStream(kernel=MapTags, upstreams=(TableStream(table=['id', 'a', 'b'], tag_columns=('id',)),))"
]
},
- "execution_count": 41,
+ "execution_count": 44,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "stream1.map_tags({\"id\": \"name\"}).as_df()"
+ "stream1.map_tags({\"id\": \"name\"})"
]
},
{
@@ -1348,17 +1346,59 @@
},
{
"cell_type": "code",
- "execution_count": 42,
+ "execution_count": 45,
"id": "35423d9a",
"metadata": {},
"outputs": [],
"source": [
- "@op.function_pod(output_keys=[\"sum\"])\n",
+ "@op.function_pod(\"sum\")\n",
"def add_numbers(a: int, b: int) -> int:\n",
" \"\"\"A simple function pod that adds two numbers.\"\"\"\n",
" return a + b"
]
},
+ {
+ "cell_type": "code",
+ "execution_count": 46,
+ "id": "36c077ef",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "15"
+ ]
+ },
+ "execution_count": 46,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "add_numbers(5, 10)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 47,
+ "id": "caa64a92",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "FunctionPod:add_numbers"
+ ]
+ },
+ "execution_count": 47,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "add_numbers.pod"
+ ]
+ },
{
"cell_type": "markdown",
"id": "f737eeac",
@@ -1377,7 +1417,7 @@
},
{
"cell_type": "code",
- "execution_count": 43,
+ "execution_count": 48,
"id": "119d33a3",
"metadata": {},
"outputs": [],
@@ -1390,18 +1430,19 @@
" }\n",
")\n",
"\n",
- "input_stream = op.streams.TableStream(input_table, tag_columns=[\"id\"])"
+ "input_stream = op.sources.ArrowTableSource(input_table, tag_columns=[\"id\"])"
]
},
{
"cell_type": "code",
- "execution_count": 44,
+ "execution_count": 49,
"id": "e3b60eca",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "ArrowTableSource[ArrowTableSource]\n",
"\n",
- "
shape: (5, 3)| id | a | b |
|---|
| i64 | i64 | i64 |
| 0 | 1 | 10 |
| 1 | 2 | 20 |
| 2 | 3 | 30 |
| 3 | 4 | 40 |
| 4 | 5 | 50 |
"
+ "shape: (5, 3)| *id | a | b |
|---|
| i64 | i64 | i64 |
| 0 | 1 | 10 |
| 1 | 2 | 20 |
| 2 | 3 | 30 |
| 3 | 4 | 40 |
| 4 | 5 | 50 |
"
],
"text/plain": [
- "shape: (5, 3)\n",
- "┌─────┬─────┬─────┐\n",
- "│ id ┆ a ┆ b │\n",
- "│ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ i64 ┆ i64 │\n",
- "╞═════╪═════╪═════╡\n",
- "│ 0 ┆ 1 ┆ 10 │\n",
- "│ 1 ┆ 2 ┆ 20 │\n",
- "│ 2 ┆ 3 ┆ 30 │\n",
- "│ 3 ┆ 4 ┆ 40 │\n",
- "│ 4 ┆ 5 ┆ 50 │\n",
- "└─────┴─────┴─────┘"
+ "ArrowTableSource"
]
},
- "execution_count": 44,
+ "execution_count": 49,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "input_stream.as_df()"
+ "input_stream"
]
},
{
"cell_type": "code",
- "execution_count": 45,
+ "execution_count": 50,
"id": "2b3b42ff",
"metadata": {},
"outputs": [],
"source": [
"# run the stream through the function pod!\n",
- "output_stream = add_numbers(input_stream)"
+ "output_stream = add_numbers.pod(input_stream)"
]
},
{
@@ -1456,34 +1486,14 @@
},
{
"cell_type": "code",
- "execution_count": 46,
+ "execution_count": 51,
"id": "ff05a8fc",
"metadata": {},
- "outputs": [
- {
- "data": {
- "text/plain": [
- "KernelStream(kernel=FunctionPod:add_numbers(a: int, b: int)-> , upstreams=(TableStream(table=['id', 'a', 'b'], tag_columns=('id',)),))"
- ]
- },
- "execution_count": 46,
- "metadata": {},
- "output_type": "execute_result"
- }
- ],
- "source": [
- "output_stream"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 47,
- "id": "35107c18",
- "metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "KernelStream[add_numbers]\n",
"\n",
- "
shape: (5, 2)| id | sum |
|---|
| i64 | i64 |
| 0 | 11 |
| 1 | 22 |
| 2 | 33 |
| 3 | 44 |
| 4 | 55 |
"
+ "shape: (5, 2)| *id | sum |
|---|
| i64 | i64 |
| 0 | 11 |
| 1 | 22 |
| 2 | 33 |
| 3 | 44 |
| 4 | 55 |
"
],
"text/plain": [
- "shape: (5, 2)\n",
- "┌─────┬─────┐\n",
- "│ id ┆ sum │\n",
- "│ --- ┆ --- │\n",
- "│ i64 ┆ i64 │\n",
- "╞═════╪═════╡\n",
- "│ 0 ┆ 11 │\n",
- "│ 1 ┆ 22 │\n",
- "│ 2 ┆ 33 │\n",
- "│ 3 ┆ 44 │\n",
- "│ 4 ┆ 55 │\n",
- "└─────┴─────┘"
+ "KernelStream(kernel=FunctionPod:add_numbers(a: int, b: int)-> , upstreams=(ArrowTableSource,))"
]
},
- "execution_count": 47,
+ "execution_count": 51,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "output_stream.as_df() # this triggers the computation!"
+ "output_stream"
]
},
{
@@ -1527,7 +1526,7 @@
},
{
"cell_type": "code",
- "execution_count": 48,
+ "execution_count": 52,
"id": "6431180f",
"metadata": {},
"outputs": [],
@@ -1553,7 +1552,7 @@
},
{
"cell_type": "code",
- "execution_count": 49,
+ "execution_count": 53,
"id": "9b7fcbbf",
"metadata": {},
"outputs": [],
@@ -1572,24 +1571,57 @@
},
{
"cell_type": "code",
- "execution_count": 50,
+ "execution_count": 54,
+ "id": "9371bbbf",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "ArrowTableSource[ArrowTableSource]\n",
+ "\n",
+ "
shape: (5, 3)| *id | a | b |
|---|
| i64 | i64 | i64 |
| 0 | 1 | 10 |
| 1 | 2 | 20 |
| 2 | 3 | 30 |
| 3 | 4 | 40 |
| 4 | 5 | 50 |
"
+ ],
+ "text/plain": [
+ "ArrowTableSource"
+ ]
+ },
+ "execution_count": 54,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "input_stream"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 55,
"id": "3e496c59",
"metadata": {},
"outputs": [],
"source": [
- "stats_output = compute_stats(input_stream)\n",
- "messages = build_message(stats_output)"
+ "stats_output = compute_stats.pod(input_stream)\n",
+ "messages = build_message.pod(stats_output)"
]
},
{
"cell_type": "code",
- "execution_count": 51,
- "id": "23c0fa92",
+ "execution_count": 56,
+ "id": "e3d9aad3",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "KernelStream[compute_stats]\n",
"\n",
- "
shape: (5, 2)| id | stats |
|---|
| i64 | list[struct[2]] |
| 0 | [{"sum",11}, {"difference",-9}, {"product",10}] |
| 1 | [{"sum",22}, {"difference",-18}, {"product",40}] |
| 2 | [{"sum",33}, {"difference",-27}, {"product",90}] |
| 3 | [{"sum",44}, {"difference",-36}, {"product",160}] |
| 4 | [{"sum",55}, {"difference",-45}, {"product",250}] |
"
+ "shape: (5, 2)| *id | stats |
|---|
| i64 | list[struct[2]] |
| 0 | [{"sum",11}, {"difference",-9}, {"product",10}] |
| 1 | [{"sum",22}, {"difference",-18}, {"product",40}] |
| 2 | [{"sum",33}, {"difference",-27}, {"product",90}] |
| 3 | [{"sum",44}, {"difference",-36}, {"product",160}] |
| 4 | [{"sum",55}, {"difference",-45}, {"product",250}] |
"
],
"text/plain": [
- "shape: (5, 2)\n",
- "┌─────┬─────────────────────────────────┐\n",
- "│ id ┆ stats │\n",
- "│ --- ┆ --- │\n",
- "│ i64 ┆ list[struct[2]] │\n",
- "╞═════╪═════════════════════════════════╡\n",
- "│ 0 ┆ [{\"sum\",11}, {\"difference\",-9}… │\n",
- "│ 1 ┆ [{\"sum\",22}, {\"difference\",-18… │\n",
- "│ 2 ┆ [{\"sum\",33}, {\"difference\",-27… │\n",
- "│ 3 ┆ [{\"sum\",44}, {\"difference\",-36… │\n",
- "│ 4 ┆ [{\"sum\",55}, {\"difference\",-45… │\n",
- "└─────┴─────────────────────────────────┘"
+ "KernelStream(kernel=FunctionPod:compute_stats(a: int, b: int)-> dict[str, int], upstreams=(ArrowTableSource,))"
]
},
- "execution_count": 51,
+ "execution_count": 56,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "stats_output.as_df()"
+ "stats_output"
]
},
{
"cell_type": "code",
- "execution_count": 52,
- "id": "bba7f8d3",
+ "execution_count": 57,
+ "id": "e04b9c62",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "KernelStream[build_message]\n",
"\n",
- "
shape: (5, 2)| id | message |
|---|
| i64 | str |
| 0 | "Hi! The sum was 11, the differ… |
| 1 | "Hi! The sum was 22, the differ… |
| 2 | "Hi! The sum was 33, the differ… |
| 3 | "Hi! The sum was 44, the differ… |
| 4 | "Hi! The sum was 55, the differ… |
"
+ "shape: (5, 2)| *id | message |
|---|
| i64 | str |
| 0 | "Hi! The sum was 11, the differ… |
| 1 | "Hi! The sum was 22, the differ… |
| 2 | "Hi! The sum was 33, the differ… |
| 3 | "Hi! The sum was 44, the differ… |
| 4 | "Hi! The sum was 55, the differ… |
"
],
"text/plain": [
- "shape: (5, 2)\n",
- "┌─────┬─────────────────────────────────┐\n",
- "│ id ┆ message │\n",
- "│ --- ┆ --- │\n",
- "│ i64 ┆ str │\n",
- "╞═════╪═════════════════════════════════╡\n",
- "│ 0 ┆ Hi! The sum was 11, the differ… │\n",
- "│ 1 ┆ Hi! The sum was 22, the differ… │\n",
- "│ 2 ┆ Hi! The sum was 33, the differ… │\n",
- "│ 3 ┆ Hi! The sum was 44, the differ… │\n",
- "│ 4 ┆ Hi! The sum was 55, the differ… │\n",
- "└─────┴─────────────────────────────────┘"
+ "KernelStream(kernel=FunctionPod:build_message(stats: dict[str, int])-> , upstreams=(KernelStream(kernel=FunctionPod:compute_stats(a: int, b: int)-> dict[str, int], upstreams=(ArrowTableSource,)),))"
]
},
- "execution_count": 52,
+ "execution_count": 57,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "messages.as_df()"
+ "messages"
]
},
{
@@ -1683,7 +1694,7 @@
},
{
"cell_type": "code",
- "execution_count": 54,
+ "execution_count": 58,
"id": "cb4bc91a",
"metadata": {},
"outputs": [],
@@ -1703,7 +1714,7 @@
},
{
"cell_type": "code",
- "execution_count": 55,
+ "execution_count": 59,
"id": "f371822b",
"metadata": {},
"outputs": [],
@@ -1728,39 +1739,74 @@
},
{
"cell_type": "code",
- "execution_count": 56,
+ "execution_count": 60,
"id": "e132fc93",
"metadata": {},
"outputs": [],
"source": [
"# now defien the pipeline\n",
"with pipeline:\n",
- " sum_results = add_numbers(input_stream)\n",
- " product_results = multiply_numbers(input_stream)\n",
- " final_results = combine_results(sum_results, product_results)"
+ " sum_results = add_numbers.pod(input_stream)\n",
+ " product_results = multiply_numbers.pod(input_stream)\n",
+ " final_results = combine_results.pod(sum_results, product_results)"
]
},
{
- "cell_type": "markdown",
- "id": "dad175c6",
+ "cell_type": "code",
+ "execution_count": 61,
+ "id": "ada3d448",
"metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "ArrowTableSource[ArrowTableSource]\n",
+ "\n",
+ "
shape: (5, 3)| *id | a | b |
|---|
| i64 | i64 | i64 |
| 0 | 1 | 10 |
| 1 | 2 | 20 |
| 2 | 3 | 30 |
| 3 | 4 | 40 |
| 4 | 5 | 50 |
"
+ ],
+ "text/plain": [
+ "ArrowTableSource"
+ ]
+ },
+ "execution_count": 61,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
"source": [
- "You can access individual elements of the pipeline as an attribute. By default, the attribute is named after the operator/pod name."
+ "input_stream"
]
},
{
"cell_type": "code",
- "execution_count": 57,
- "id": "cca9e0d0",
+ "execution_count": 62,
+ "id": "1e2d0a2a",
"metadata": {},
"outputs": [
{
"data": {
+ "text/html": [
+ "PodNode[add_numbers]\n",
+ "\n",
+ "
shape: (0, 5)| *id | _tag::source:57778e89cbc0 | sum | _source_sum | _context_key |
|---|
| i64 | str | i64 | str | str |
"
+ ],
"text/plain": [
"PodNode(pod=FunctionPod:add_numbers)"
]
},
- "execution_count": 57,
+ "execution_count": 62,
"metadata": {},
"output_type": "execute_result"
}
@@ -1771,54 +1817,75 @@
},
{
"cell_type": "code",
- "execution_count": 58,
- "id": "08add7d9",
+ "execution_count": 63,
+ "id": "9cc43f6a",
"metadata": {},
"outputs": [
{
"data": {
+ "image/png": "",
"text/plain": [
- "{'TableStream': KernelNode(kernel=StreamSource),\n",
- " 'multiply_numbers': PodNode(pod=FunctionPod:multiply_numbers),\n",
- " 'add_numbers': PodNode(pod=FunctionPod:add_numbers),\n",
- " 'Join': KernelNode(kernel=Join()),\n",
- " 'combine_results': PodNode(pod=FunctionPod:combine_results)}"
+ ""
]
},
- "execution_count": 58,
"metadata": {},
- "output_type": "execute_result"
+ "output_type": "display_data"
}
],
"source": [
- "pipeline.nodes"
+ "pipeline.show_graph()"
]
},
{
"cell_type": "markdown",
- "id": "5f33f5a9",
+ "id": "dad175c6",
"metadata": {},
"source": [
- "Notice that elements of the pipeline is wrapped in a `Node`, being either `PodNode` or `KernelNode`."
+ "You can access individual elements of the pipeline as an attribute. By default, the attribute is named after the operator/pod name."
]
},
{
- "cell_type": "markdown",
- "id": "2b6bc8df",
+ "cell_type": "code",
+ "execution_count": 64,
+ "id": "cca9e0d0",
"metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "PodNode[multiply_numbers]\n",
+ "\n",
+ "
shape: (0, 5)| *id | _tag::source:57778e89cbc0 | product | _source_product | _context_key |
|---|
| i64 | str | i64 | str | str |
"
+ ],
+ "text/plain": [
+ "PodNode(pod=FunctionPod:multiply_numbers)"
+ ]
+ },
+ "execution_count": 64,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
"source": [
- "You can fetch results of the pipeline through these nodes. For example, you can access the saved results of the pipeline as Polars dataframe by access the `df` attribute."
+ "pipeline.multiply_numbers"
]
},
{
"cell_type": "code",
- "execution_count": 59,
- "id": "21086f72",
+ "execution_count": 65,
+ "id": "635884c6",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "KernelNode[Join]\n",
"\n",
- "
shape: (5, 2)| id | sum |
|---|
| i64 | i64 |
| 0 | 11 |
| 1 | 22 |
| 2 | 33 |
| 3 | 44 |
| 4 | 55 |
"
+ "shape: (0, 4)| *id | sum | product | _context_key_right |
|---|
| i64 | i64 | i64 | str |
"
],
"text/plain": [
- "shape: (5, 2)\n",
- "┌─────┬─────┐\n",
- "│ id ┆ sum │\n",
- "│ --- ┆ --- │\n",
- "│ i64 ┆ i64 │\n",
- "╞═════╪═════╡\n",
- "│ 0 ┆ 11 │\n",
- "│ 1 ┆ 22 │\n",
- "│ 2 ┆ 33 │\n",
- "│ 3 ┆ 44 │\n",
- "│ 4 ┆ 55 │\n",
- "└─────┴─────┘"
+ "KernelNode(kernel=Join())"
]
},
- "execution_count": 59,
+ "execution_count": 65,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "pipeline.add_numbers.as_df()"
+ "pipeline.Join"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 66,
+ "id": "08add7d9",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'ArrowTableSource': KernelNode(kernel=ArrowTableSource),\n",
+ " 'add_numbers': PodNode(pod=FunctionPod:add_numbers),\n",
+ " 'multiply_numbers': PodNode(pod=FunctionPod:multiply_numbers),\n",
+ " 'Join': KernelNode(kernel=Join()),\n",
+ " 'combine_results': PodNode(pod=FunctionPod:combine_results)}"
+ ]
+ },
+ "execution_count": 66,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "pipeline.nodes"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "5f33f5a9",
+ "metadata": {},
+ "source": [
+ "Notice that elements of the pipeline is wrapped in a `Node`, being either `PodNode` or `KernelNode`."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "2b6bc8df",
+ "metadata": {},
+ "source": [
+ "You can fetch results of the pipeline through these nodes. For example, you can access the saved results of the pipeline as Polars dataframe by access the `df` attribute."
]
},
{
@@ -1862,8 +1959,8 @@
},
{
"cell_type": "code",
- "execution_count": 60,
- "id": "1e741659",
+ "execution_count": 67,
+ "id": "bb357c14",
"metadata": {},
"outputs": [],
"source": [
@@ -1888,13 +1985,14 @@
},
{
"cell_type": "code",
- "execution_count": 61,
+ "execution_count": 68,
"id": "c77154ec",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "PodNode[add_numbers]\n",
"\n",
- "
shape: (5, 2)| id | sum |
|---|
| i64 | i64 |
| 0 | 11 |
| 1 | 22 |
| 2 | 33 |
| 3 | 44 |
| 4 | 55 |
"
+ "shape: (5, 2)| *id | sum |
|---|
| i64 | i64 |
| 0 | 11 |
| 1 | 22 |
| 2 | 33 |
| 3 | 44 |
| 4 | 55 |
"
],
"text/plain": [
- "shape: (5, 2)\n",
- "┌─────┬─────┐\n",
- "│ id ┆ sum │\n",
- "│ --- ┆ --- │\n",
- "│ i64 ┆ i64 │\n",
- "╞═════╪═════╡\n",
- "│ 0 ┆ 11 │\n",
- "│ 1 ┆ 22 │\n",
- "│ 2 ┆ 33 │\n",
- "│ 3 ┆ 44 │\n",
- "│ 4 ┆ 55 │\n",
- "└─────┴─────┘"
+ "PodNode(pod=FunctionPod:add_numbers)"
]
},
- "execution_count": 61,
+ "execution_count": 68,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "pipeline.add_numbers.as_df()"
+ "pipeline.add_numbers"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 69,
+ "id": "38bfc68b",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "KernelNode[Join]\n",
+ "\n",
+ "
shape: (5, 3)| *id | sum | product |
|---|
| i64 | i64 | i64 |
| 0 | 11 | 10 |
| 1 | 22 | 40 |
| 2 | 33 | 90 |
| 3 | 44 | 160 |
| 4 | 55 | 250 |
"
+ ],
+ "text/plain": [
+ "KernelNode(kernel=Join())"
+ ]
+ },
+ "execution_count": 69,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "pipeline.Join"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 70,
+ "id": "7897be61",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/png": "",
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "pipeline.show_graph()"
]
},
{
@@ -1956,7 +2096,7 @@
},
{
"cell_type": "code",
- "execution_count": 62,
+ "execution_count": 71,
"id": "37e65e33",
"metadata": {},
"outputs": [],
@@ -1968,29 +2108,30 @@
},
{
"cell_type": "code",
- "execution_count": 63,
+ "execution_count": 72,
"id": "3bad8332",
"metadata": {},
"outputs": [],
"source": [
"# now defien the pipeline\n",
"with pipeline2:\n",
- " sum_results = add_numbers(input_stream, label=\"my_summation\")\n",
- " product_results = multiply_numbers(input_stream, label=\"my_product\")\n",
- " final_results = combine_results(\n",
+ " sum_results = add_numbers.pod(input_stream, label=\"my_summation\")\n",
+ " product_results = multiply_numbers.pod(input_stream, label=\"my_product\")\n",
+ " final_results = combine_results.pod(\n",
" sum_results, product_results, label=\"my_final_result\"\n",
" )"
]
},
{
"cell_type": "code",
- "execution_count": 64,
+ "execution_count": 73,
"id": "8f146ae7",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "PodNode[my_summation]\n",
"\n",
- "
shape: (5, 2)| id | sum |
|---|
| i64 | i64 |
| 0 | 11 |
| 1 | 22 |
| 2 | 33 |
| 3 | 44 |
| 4 | 55 |
"
+ "shape: (5, 2)| *id | sum |
|---|
| i64 | i64 |
| 0 | 11 |
| 1 | 22 |
| 2 | 33 |
| 3 | 44 |
| 4 | 55 |
"
],
"text/plain": [
- "shape: (5, 2)\n",
- "┌─────┬─────┐\n",
- "│ id ┆ sum │\n",
- "│ --- ┆ --- │\n",
- "│ i64 ┆ i64 │\n",
- "╞═════╪═════╡\n",
- "│ 0 ┆ 11 │\n",
- "│ 1 ┆ 22 │\n",
- "│ 2 ┆ 33 │\n",
- "│ 3 ┆ 44 │\n",
- "│ 4 ┆ 55 │\n",
- "└─────┴─────┘"
+ "PodNode(pod=FunctionPod:add_numbers)"
]
},
- "execution_count": 64,
+ "execution_count": 73,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "pipeline2.my_summation.as_df()"
+ "pipeline2.my_summation"
]
},
{
"cell_type": "code",
- "execution_count": 65,
+ "execution_count": 74,
"id": "8fd7bf4e",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "PodNode[my_product]\n",
"\n",
- "
shape: (5, 2)| id | product |
|---|
| i64 | i64 |
| 0 | 10 |
| 1 | 40 |
| 2 | 90 |
| 3 | 160 |
| 4 | 250 |
"
+ "shape: (5, 2)| *id | product |
|---|
| i64 | i64 |
| 0 | 10 |
| 1 | 40 |
| 2 | 90 |
| 3 | 160 |
| 4 | 250 |
"
],
"text/plain": [
- "shape: (5, 2)\n",
- "┌─────┬─────────┐\n",
- "│ id ┆ product │\n",
- "│ --- ┆ --- │\n",
- "│ i64 ┆ i64 │\n",
- "╞═════╪═════════╡\n",
- "│ 0 ┆ 10 │\n",
- "│ 1 ┆ 40 │\n",
- "│ 2 ┆ 90 │\n",
- "│ 3 ┆ 160 │\n",
- "│ 4 ┆ 250 │\n",
- "└─────┴─────────┘"
+ "PodNode(pod=FunctionPod:multiply_numbers)"
]
},
- "execution_count": 65,
+ "execution_count": 74,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "pipeline2.my_product.as_df()"
+ "pipeline2.my_product"
]
},
{
"cell_type": "code",
- "execution_count": 66,
+ "execution_count": 75,
"id": "2a918db1",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
+ "PodNode[my_final_result]\n",
"\n",
- "
shape: (5, 2)| id | result |
|---|
| i64 | str |
| 0 | "Sum: 11, Product: 10" |
| 1 | "Sum: 22, Product: 40" |
| 2 | "Sum: 33, Product: 90" |
| 3 | "Sum: 44, Product: 160" |
| 4 | "Sum: 55, Product: 250" |
"
+ "shape: (5, 2)| *id | result |
|---|
| i64 | str |
| 0 | "Sum: 11, Product: 10" |
| 1 | "Sum: 22, Product: 40" |
| 2 | "Sum: 33, Product: 90" |
| 3 | "Sum: 44, Product: 160" |
| 4 | "Sum: 55, Product: 250" |
"
],
"text/plain": [
- "shape: (5, 2)\n",
- "┌─────┬───────────────────────┐\n",
- "│ id ┆ result │\n",
- "│ --- ┆ --- │\n",
- "│ i64 ┆ str │\n",
- "╞═════╪═══════════════════════╡\n",
- "│ 0 ┆ Sum: 11, Product: 10 │\n",
- "│ 1 ┆ Sum: 22, Product: 40 │\n",
- "│ 2 ┆ Sum: 33, Product: 90 │\n",
- "│ 3 ┆ Sum: 44, Product: 160 │\n",
- "│ 4 ┆ Sum: 55, Product: 250 │\n",
- "└─────┴───────────────────────┘"
+ "PodNode(pod=FunctionPod:combine_results)"
]
},
- "execution_count": 66,
+ "execution_count": 75,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "pipeline2.my_final_result.as_df()"
+ "pipeline2.my_final_result"
]
},
{
@@ -2119,7 +2229,7 @@
],
"metadata": {
"kernelspec": {
- "display_name": ".venv",
+ "display_name": "orcapod",
"language": "python",
"name": "python3"
},
diff --git a/src/orcapod/core/pods.py b/src/orcapod/core/pods.py
index 02d3aa4..7587881 100644
--- a/src/orcapod/core/pods.py
+++ b/src/orcapod/core/pods.py
@@ -207,6 +207,7 @@ def call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, cp.Packet | None]: ...
@abstractmethod
@@ -216,6 +217,7 @@ async def async_call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, cp.Packet | None]: ...
def track_invocation(self, *streams: cp.Stream, label: str | None = None) -> None:
@@ -408,6 +410,7 @@ def call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, DictPacket | None]:
if not self.is_active():
logger.info(
@@ -426,7 +429,11 @@ def call(
with self._tracker_manager.no_tracking():
if execution_engine is not None:
# use the provided execution engine to run the function
- values = execution_engine.submit_sync(self.function, **input_dict)
+ values = execution_engine.submit_sync(
+ self.function,
+ fn_kwargs=input_dict,
+ engine_opts=execution_engine_opts,
+ )
else:
values = self.function(**input_dict)
@@ -458,6 +465,7 @@ async def async_call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, cp.Packet | None]:
"""
Asynchronous call to the function pod. This is a placeholder for future implementation.
@@ -481,7 +489,9 @@ async def async_call(
input_dict = packet
if execution_engine is not None:
# use the provided execution engine to run the function
- values = await execution_engine.submit_async(self.function, **input_dict)
+ values = await execution_engine.submit_async(
+ self.function, fn_kwargs=input_dict, engine_opts=execution_engine_opts
+ )
else:
values = self.function(**input_dict)
@@ -607,9 +617,14 @@ def call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, cp.Packet | None]:
return self.pod.call(
- tag, packet, record_id=record_id, execution_engine=execution_engine
+ tag,
+ packet,
+ record_id=record_id,
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
async def async_call(
@@ -618,9 +633,14 @@ async def async_call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, cp.Packet | None]:
return await self.pod.async_call(
- tag, packet, record_id=record_id, execution_engine=execution_engine
+ tag,
+ packet,
+ record_id=record_id,
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
def kernel_identity_structure(
@@ -683,6 +703,7 @@ def call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
skip_cache_lookup: bool = False,
skip_cache_insert: bool = False,
) -> tuple[cp.Tag, cp.Packet | None]:
@@ -700,7 +721,11 @@ def call(
print(f"Cache hit for {packet}!")
if output_packet is None:
tag, output_packet = super().call(
- tag, packet, record_id=record_id, execution_engine=execution_engine
+ tag,
+ packet,
+ record_id=record_id,
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
if (
output_packet is not None
@@ -717,6 +742,7 @@ async def async_call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
skip_cache_lookup: bool = False,
skip_cache_insert: bool = False,
) -> tuple[cp.Tag, cp.Packet | None]:
@@ -732,7 +758,11 @@ async def async_call(
output_packet = self.get_cached_output_for_packet(packet)
if output_packet is None:
tag, output_packet = await super().async_call(
- tag, packet, record_id=record_id, execution_engine=execution_engine
+ tag,
+ packet,
+ record_id=record_id,
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
if output_packet is not None and not skip_cache_insert:
self.record_packet(
@@ -740,6 +770,7 @@ async def async_call(
output_packet,
record_id=record_id,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
return tag, output_packet
@@ -754,11 +785,14 @@ def record_packet(
output_packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
skip_duplicates: bool = False,
) -> cp.Packet:
"""
Record the output packet against the input packet in the result store.
"""
+
+ # TODO: consider incorporating execution_engine_opts into the record
data_table = output_packet.as_table(include_context=True, include_source=True)
for i, (k, v) in enumerate(self.tiered_pod_id.items()):
diff --git a/src/orcapod/core/sources/base.py b/src/orcapod/core/sources/base.py
index 89c8ff9..b8e128f 100644
--- a/src/orcapod/core/sources/base.py
+++ b/src/orcapod/core/sources/base.py
@@ -119,9 +119,13 @@ def __iter__(self) -> Iterator[tuple[cp.Tag, cp.Packet]]:
def iter_packets(
self,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[cp.Tag, cp.Packet]]:
"""Delegate to the cached KernelStream."""
- return self().iter_packets(execution_engine=execution_engine)
+ return self().iter_packets(
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ )
def as_table(
self,
@@ -131,6 +135,7 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table":
"""Delegate to the cached KernelStream."""
return self().as_table(
@@ -140,18 +145,25 @@ def as_table(
include_content_hash=include_content_hash,
sort_by_tags=sort_by_tags,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
def flow(
- self, execution_engine: cp.ExecutionEngine | None = None
+ self,
+ execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Collection[tuple[cp.Tag, cp.Packet]]:
"""Delegate to the cached KernelStream."""
- return self().flow(execution_engine=execution_engine)
+ return self().flow(
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ )
def run(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
@@ -159,12 +171,18 @@ def run(
This is a no-op for sources since they are not executed like pods.
"""
- self().run(*args, execution_engine=execution_engine, **kwargs)
+ self().run(
+ *args,
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ **kwargs,
+ )
async def run_async(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
@@ -172,7 +190,12 @@ async def run_async(
This is a no-op for sources since they are not executed like pods.
"""
- await self().run_async(*args, execution_engine=execution_engine, **kwargs)
+ await self().run_async(
+ *args,
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ **kwargs,
+ )
# ==================== LiveStream Protocol (Delegation) ====================
@@ -339,9 +362,13 @@ def __iter__(self) -> Iterator[tuple[cp.Tag, cp.Packet]]:
def iter_packets(
self,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[cp.Tag, cp.Packet]]:
"""Delegate to the cached KernelStream."""
- return self().iter_packets(execution_engine=execution_engine)
+ return self().iter_packets(
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ )
def as_table(
self,
@@ -351,6 +378,7 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table":
"""Delegate to the cached KernelStream."""
return self().as_table(
@@ -360,18 +388,25 @@ def as_table(
include_content_hash=include_content_hash,
sort_by_tags=sort_by_tags,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
def flow(
- self, execution_engine: cp.ExecutionEngine | None = None
+ self,
+ execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Collection[tuple[cp.Tag, cp.Packet]]:
"""Delegate to the cached KernelStream."""
- return self().flow(execution_engine=execution_engine)
+ return self().flow(
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ )
def run(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
@@ -379,12 +414,18 @@ def run(
This is a no-op for sources since they are not executed like pods.
"""
- self().run(*args, execution_engine=execution_engine, **kwargs)
+ self().run(
+ *args,
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ **kwargs,
+ )
async def run_async(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
@@ -392,7 +433,12 @@ async def run_async(
This is a no-op for sources since they are not executed like pods.
"""
- await self().run_async(*args, execution_engine=execution_engine, **kwargs)
+ await self().run_async(
+ *args,
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ **kwargs,
+ )
# ==================== LiveStream Protocol (Delegation) ====================
diff --git a/src/orcapod/core/streams/base.py b/src/orcapod/core/streams/base.py
index 8cb1bbb..d96230a 100644
--- a/src/orcapod/core/streams/base.py
+++ b/src/orcapod/core/streams/base.py
@@ -173,6 +173,7 @@ def pop(self) -> cp.Stream:
def __init__(
self,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -181,6 +182,7 @@ def __init__(
# note that this is not necessary for Stream protocol, but is provided
# for convenience to resolve semantic types and other context-specific information
self._execution_engine = execution_engine
+ self._execution_engine_opts = execution_engine_opts
@property
def substream_identities(self) -> tuple[str, ...]:
@@ -206,6 +208,8 @@ def execution_engine(self, engine: cp.ExecutionEngine | None) -> None:
"""
self._execution_engine = engine
+ # TODO: add getter/setter for execution engine opts
+
def get_substream(self, substream_id: str) -> cp.Stream:
"""
Returns the substream with the given substream_id.
@@ -321,6 +325,7 @@ def __iter__(
def iter_packets(
self,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[cp.Tag, cp.Packet]]: ...
@abstractmethod
@@ -328,6 +333,7 @@ def run(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None: ...
@@ -336,6 +342,7 @@ async def run_async(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None: ...
@@ -348,6 +355,7 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table": ...
def as_polars_df(
@@ -358,6 +366,7 @@ def as_polars_df(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pl.DataFrame":
"""
Convert the entire stream to a Polars DataFrame.
@@ -370,6 +379,7 @@ def as_polars_df(
include_content_hash=include_content_hash,
sort_by_tags=sort_by_tags,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
)
@@ -381,6 +391,7 @@ def as_df(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pl.DataFrame":
"""
Convert the entire stream to a Polars DataFrame.
@@ -392,6 +403,7 @@ def as_df(
include_content_hash=include_content_hash,
sort_by_tags=sort_by_tags,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
def as_lazy_frame(
@@ -402,6 +414,7 @@ def as_lazy_frame(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pl.LazyFrame":
"""
Convert the entire stream to a Polars LazyFrame.
@@ -413,6 +426,7 @@ def as_lazy_frame(
include_content_hash=include_content_hash,
sort_by_tags=sort_by_tags,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
return df.lazy()
@@ -425,6 +439,7 @@ def as_pandas_df(
sort_by_tags: bool = True,
index_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pd.DataFrame":
df = self.as_polars_df(
include_data_context=include_data_context,
@@ -433,6 +448,7 @@ def as_pandas_df(
include_content_hash=include_content_hash,
sort_by_tags=sort_by_tags,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
tag_keys, _ = self.keys()
pdf = df.to_pandas()
@@ -441,13 +457,21 @@ def as_pandas_df(
return pdf
def flow(
- self, execution_engine: cp.ExecutionEngine | None = None
+ self,
+ execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Collection[tuple[cp.Tag, cp.Packet]]:
"""
Flow everything through the stream, returning the entire collection of
(Tag, Packet) as a collection. This will tigger any upstream computation of the stream.
"""
- return [e for e in self.iter_packets(execution_engine=execution_engine)]
+ return [
+ e
+ for e in self.iter_packets(
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ )
+ ]
def _repr_html_(self) -> str:
df = self.as_polars_df()
@@ -464,6 +488,7 @@ def view(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "StreamView":
df = self.as_polars_df(
include_data_context=include_data_context,
@@ -472,6 +497,7 @@ def view(
include_content_hash=include_content_hash,
sort_by_tags=sort_by_tags,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
tag_map = {t: f"*{t}" for t in self.tag_keys()}
# TODO: construct repr html better
diff --git a/src/orcapod/core/streams/cached_pod_stream.py b/src/orcapod/core/streams/cached_pod_stream.py
index 6e667e9..541af52 100644
--- a/src/orcapod/core/streams/cached_pod_stream.py
+++ b/src/orcapod/core/streams/cached_pod_stream.py
@@ -63,6 +63,7 @@ async def run_async(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
@@ -127,7 +128,9 @@ async def run_async(
tag,
packet,
skip_cache_lookup=True,
- execution_engine=execution_engine,
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
)
pending_calls.append(pending)
import asyncio
@@ -143,6 +146,7 @@ def run(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
cached_results = []
@@ -221,8 +225,11 @@ def run(
tag,
packet,
skip_cache_lookup=True,
- execution_engine=execution_engine,
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
)
+ # TODO: use getter for execution engine opts
hash_to_output_lut[packet_hash] = output_packet
cached_results.append((tag, output_packet))
@@ -230,7 +237,9 @@ def run(
self._set_modified_time()
def iter_packets(
- self, execution_engine: cp.ExecutionEngine | None = None
+ self,
+ execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[cp.Tag, cp.Packet]]:
"""
Processes the input stream and prepares the output stream.
@@ -244,7 +253,9 @@ def iter_packets(
include_system_tags=True,
include_source=True,
include_content_hash=constants.INPUT_PACKET_HASH,
- execution_engine=execution_engine,
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
)
existing_entries = self.pod.get_all_cached_outputs(
include_system_columns=True
@@ -331,7 +342,9 @@ def iter_packets(
tag,
packet,
skip_cache_lookup=True,
- execution_engine=execution_engine,
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
)
hash_to_output_lut[packet_hash] = output_packet
cached_results.append((tag, output_packet))
@@ -375,12 +388,17 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table":
if self._cached_output_table is None:
all_tags = []
all_packets = []
tag_schema, packet_schema = None, None
- for tag, packet in self.iter_packets(execution_engine=execution_engine):
+ for tag, packet in self.iter_packets(
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
+ ):
if tag_schema is None:
tag_schema = tag.arrow_schema(include_system_tags=True)
if packet_schema is None:
diff --git a/src/orcapod/core/streams/kernel_stream.py b/src/orcapod/core/streams/kernel_stream.py
index c3850a5..e5f60e3 100644
--- a/src/orcapod/core/streams/kernel_stream.py
+++ b/src/orcapod/core/streams/kernel_stream.py
@@ -141,18 +141,25 @@ def run(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
self.refresh()
assert self._cached_stream is not None, (
"Stream has not been updated or is empty."
)
- self._cached_stream.run(*args, execution_engine=execution_engine, **kwargs)
+ self._cached_stream.run(
+ *args,
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ **kwargs,
+ )
async def run_async(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
self.refresh()
@@ -160,7 +167,10 @@ async def run_async(
"Stream has not been updated or is empty."
)
await self._cached_stream.run_async(
- *args, execution_engine=execution_engine, **kwargs
+ *args,
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ **kwargs,
)
def as_table(
@@ -171,6 +181,7 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table":
self.refresh()
assert self._cached_stream is not None, (
@@ -183,17 +194,22 @@ def as_table(
include_content_hash=include_content_hash,
sort_by_tags=sort_by_tags,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
def iter_packets(
self,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[cp.Tag, cp.Packet]]:
self.refresh()
assert self._cached_stream is not None, (
"Stream has not been updated or is empty."
)
- return self._cached_stream.iter_packets(execution_engine=execution_engine)
+ return self._cached_stream.iter_packets(
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ )
def __repr__(self) -> str:
return f"{self.__class__.__name__}(kernel={self.source}, upstreams={self.upstreams})"
diff --git a/src/orcapod/core/streams/lazy_pod_stream.py b/src/orcapod/core/streams/lazy_pod_stream.py
index 9eefc83..23f146a 100644
--- a/src/orcapod/core/streams/lazy_pod_stream.py
+++ b/src/orcapod/core/streams/lazy_pod_stream.py
@@ -49,7 +49,9 @@ def __init__(self, pod: cp.Pod, prepared_stream: cp.Stream, **kwargs):
self._cached_content_hash_column: pa.Array | None = None
def iter_packets(
- self, execution_engine: cp.ExecutionEngine | None = None
+ self,
+ execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[cp.Tag, cp.Packet]]:
if self._prepared_stream_iterator is not None:
for i, (tag, packet) in enumerate(self._prepared_stream_iterator):
@@ -61,8 +63,13 @@ def iter_packets(
else:
# Process packet
processed = self.pod.call(
- tag, packet, execution_engine=execution_engine
+ tag,
+ packet,
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
)
+ # TODO: verify the proper use of execution engine opts
if processed is not None:
# Update shared cache for future iterators (optimization)
self._cached_output_packets[i] = processed
@@ -83,6 +90,7 @@ async def run_async(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
if self._prepared_stream_iterator is not None:
@@ -91,7 +99,11 @@ async def run_async(
if i not in self._cached_output_packets:
# Process packet
pending_call_lut[i] = self.pod.async_call(
- tag, packet, execution_engine=execution_engine
+ tag,
+ packet,
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
)
indices = list(pending_call_lut.keys())
@@ -108,10 +120,14 @@ def run(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
- **kwargs: Any
+ execution_engine_opts: dict[str, Any] | None = None,
+ **kwargs: Any,
) -> None:
# Fallback to synchronous run
- self.flow(execution_engine=execution_engine)
+ self.flow(
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts or self._execution_engine_opts,
+ )
def keys(
self, include_system_tags: bool = False
@@ -143,12 +159,17 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table":
if self._cached_output_table is None:
all_tags = []
all_packets = []
tag_schema, packet_schema = None, None
- for tag, packet in self.iter_packets(execution_engine=execution_engine):
+ for tag, packet in self.iter_packets(
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
+ ):
if tag_schema is None:
tag_schema = tag.arrow_schema(include_system_tags=True)
if packet_schema is None:
@@ -202,7 +223,10 @@ def as_table(
if self._cached_content_hash_column is None:
content_hashes = []
# TODO: verify that order will be preserved
- for tag, packet in self.iter_packets():
+ for tag, packet in self.iter_packets(
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts or self._execution_engine_opts,
+ ):
content_hashes.append(packet.content_hash().to_string())
self._cached_content_hash_column = pa.array(
content_hashes, type=pa.large_string()
diff --git a/src/orcapod/core/streams/pod_node_stream.py b/src/orcapod/core/streams/pod_node_stream.py
index b6ef449..d0e624c 100644
--- a/src/orcapod/core/streams/pod_node_stream.py
+++ b/src/orcapod/core/streams/pod_node_stream.py
@@ -56,7 +56,9 @@ def mode(self) -> str:
return self.pod_node.mode
async def run_async(
- self, execution_engine: cp.ExecutionEngine | None = None
+ self,
+ execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> None:
"""
Runs the stream, processing the input stream and preparing the output stream.
@@ -120,7 +122,9 @@ async def run_async(
tag,
packet,
skip_cache_lookup=True,
- execution_engine=execution_engine,
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
)
pending_calls.append(pending)
import asyncio
@@ -136,6 +140,7 @@ def run(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
cached_results = []
@@ -150,7 +155,8 @@ def run(
include_system_tags=True,
include_source=True,
include_content_hash=constants.INPUT_PACKET_HASH,
- execution_engine=execution_engine,
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts or self._execution_engine_opts,
)
existing_entries = self.pod_node.get_all_cached_outputs(
include_system_columns=True
@@ -230,7 +236,9 @@ def run(
packet,
record_id=packet_record_id,
skip_cache_lookup=True,
- execution_engine=execution_engine,
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
)
packet_record_to_output_lut[packet_record_id] = output_packet
self.pod_node.add_pipeline_record(
@@ -254,7 +262,9 @@ def clear_cache(self) -> None:
self._cached_content_hash_column = None
def iter_packets(
- self, execution_engine: cp.ExecutionEngine | None = None
+ self,
+ execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[cp.Tag, cp.Packet]]:
"""
Processes the input stream and prepares the output stream.
@@ -422,12 +432,17 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table":
if self._cached_output_table is None:
all_tags = []
all_packets = []
tag_schema, packet_schema = None, None
- for tag, packet in self.iter_packets(execution_engine=execution_engine):
+ for tag, packet in self.iter_packets(
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
+ ):
if tag_schema is None:
tag_schema = tag.arrow_schema(include_system_tags=True)
if packet_schema is None:
@@ -499,7 +514,11 @@ def as_table(
if include_content_hash:
if self._cached_content_hash_column is None:
content_hashes = []
- for tag, packet in self.iter_packets(execution_engine=execution_engine):
+ for tag, packet in self.iter_packets(
+ execution_engine=execution_engine or self.execution_engine,
+ execution_engine_opts=execution_engine_opts
+ or self._execution_engine_opts,
+ ):
content_hashes.append(packet.content_hash().to_string())
self._cached_content_hash_column = pa.array(
content_hashes, type=pa.large_string()
diff --git a/src/orcapod/core/streams/table_stream.py b/src/orcapod/core/streams/table_stream.py
index a71ea5f..9df6289 100644
--- a/src/orcapod/core/streams/table_stream.py
+++ b/src/orcapod/core/streams/table_stream.py
@@ -200,6 +200,7 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table":
"""
Returns the underlying table representation of the stream.
@@ -250,7 +251,9 @@ def clear_cache(self) -> None:
self._cached_elements = None
def iter_packets(
- self, execution_engine: cp.ExecutionEngine | None = None
+ self,
+ execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[cp.Tag, ArrowPacket]]:
"""
Iterates over the packets in the stream.
@@ -298,6 +301,7 @@ def run(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
@@ -311,6 +315,7 @@ async def run_async(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
diff --git a/src/orcapod/core/streams/wrapped_stream.py b/src/orcapod/core/streams/wrapped_stream.py
index 3f14203..6ba8530 100644
--- a/src/orcapod/core/streams/wrapped_stream.py
+++ b/src/orcapod/core/streams/wrapped_stream.py
@@ -58,6 +58,7 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table":
"""
Returns the underlying table representation of the stream.
@@ -70,17 +71,23 @@ def as_table(
include_content_hash=include_content_hash,
sort_by_tags=sort_by_tags,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
+ # TODO handle default execution engine
def iter_packets(
self,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[cp.Tag, cp.Packet]]:
"""
Iterates over the packets in the stream.
Each packet is represented as a tuple of (Tag, Packet).
"""
- return self._stream.iter_packets(execution_engine=execution_engine)
+ return self._stream.iter_packets(
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ )
def identity_structure(self) -> Any:
return self._stream.identity_structure()
diff --git a/src/orcapod/execution_engines/ray_execution_engine.py b/src/orcapod/execution_engines/ray_execution_engine.py
index d3443df..d4e6727 100644
--- a/src/orcapod/execution_engines/ray_execution_engine.py
+++ b/src/orcapod/execution_engines/ray_execution_engine.py
@@ -29,42 +29,80 @@ class RayEngine:
def __init__(self, ray_address: str | None = None, **ray_init_kwargs):
"""Initialize Ray with native async support."""
- if not ray.is_initialized():
+ if not self.is_initialized():
ray.init(address=ray_address, **ray_init_kwargs)
+ # track whether ray engine was initialized in this engine
self._ray_initialized_here = True
+ logger.info("Native Ray async engine initialized")
else:
self._ray_initialized_here = False
+ logger.info("Working with an existing Ray async engine")
- logger.info("Native Ray async engine initialized")
logger.info(f"Cluster resources: {ray.cluster_resources()}")
+ def is_initialized(self) -> bool:
+ """Check if Ray is initialized."""
+ return ray.is_initialized()
+
@property
def name(self) -> str:
return "ray"
- def submit_sync(self, func: Callable[..., T], *args, **kwargs) -> T:
+ def submit_sync(
+ self,
+ func: Callable[..., T],
+ /,
+ *,
+ fn_args: tuple[Any, ...] = (),
+ fn_kwargs: dict[str, Any] | None = None,
+ **engine_opts: Any,
+ ) -> T:
"""
Submit a function synchronously using Ray.
- This is a blocking call that waits for the result.
+ Arguments destined for the function must be provided via
+ ``fn_args``/``fn_kwargs``. Engine-specific options (e.g., resources)
+ should be passed through ``engine_opts`` and are applied via
+ ``.options(**engine_opts)``.
"""
+ if fn_kwargs is None:
+ fn_kwargs = {}
+
# Create remote function and submit
remote_func = ray.remote(func)
- object_ref = remote_func.remote(*args, **kwargs)
+ if engine_opts:
+ remote_func = remote_func.options(**engine_opts) # type: ignore[arg-type]
+ object_ref = remote_func.remote(*fn_args, **fn_kwargs)
# Wait for the result - this is blocking
result = ray.get(object_ref)
return result
- async def submit_async(self, func: Callable[..., T], *args, **kwargs) -> T:
+ async def submit_async(
+ self,
+ func: Callable[..., T],
+ /,
+ *,
+ fn_args: tuple[Any, ...] = (),
+ fn_kwargs: dict[str, Any] | None = None,
+ **engine_opts: Any,
+ ) -> T:
"""
Submit a function using Ray's native async support.
- Uses ObjectRef.future() which Ray converts to asyncio.Future natively.
+ Arguments destined for the function must be provided via
+ ``fn_args``/``fn_kwargs``. Engine-specific options (e.g., resources)
+ should be passed through ``engine_opts`` and are applied via
+ ``.options(**engine_opts)``.
"""
+ if fn_kwargs is None:
+ fn_kwargs = {}
+
# Create remote function and submit
remote_func = ray.remote(func)
- object_ref = remote_func.remote(*args, **kwargs)
+ if engine_opts:
+ remote_func = remote_func.options(**engine_opts) # type: ignore[arg-type]
+ object_ref = remote_func.remote(*fn_args, **fn_kwargs)
# Use Ray's native async support - this is the key insight!
# ObjectRef.future() returns a concurrent.futures.Future that works with asyncio
diff --git a/src/orcapod/pipeline/graph.py b/src/orcapod/pipeline/graph.py
index ddb7422..b3534ca 100644
--- a/src/orcapod/pipeline/graph.py
+++ b/src/orcapod/pipeline/graph.py
@@ -179,6 +179,7 @@ def set_mode(self, mode: str) -> None:
def run(
self,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
run_async: bool | None = None,
) -> None:
"""Execute the pipeline by running all nodes in the graph.
@@ -203,9 +204,16 @@ def run(
for node in nx.topological_sort(self.graph):
if run_async:
- synchronous_run(node.run_async, execution_engine=execution_engine)
+ synchronous_run(
+ node.run_async,
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ )
else:
- node.run(execution_engine=execution_engine)
+ node.run(
+ execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
+ )
self.flush()
diff --git a/src/orcapod/pipeline/nodes.py b/src/orcapod/pipeline/nodes.py
index 08cd2ed..db9d0d4 100644
--- a/src/orcapod/pipeline/nodes.py
+++ b/src/orcapod/pipeline/nodes.py
@@ -1,7 +1,6 @@
from abc import abstractmethod
-from orcapod.core.datagrams import ArrowTag
from orcapod.core.kernels import KernelStream, WrappedKernel
-from orcapod.core.sources.base import SourceBase, InvocationBase
+from orcapod.core.sources.base import InvocationBase
from orcapod.core.pods import CachedPod
from orcapod.protocols import core_protocols as cp, database_protocols as dbp
from orcapod.types import PythonSchema
@@ -302,6 +301,7 @@ def call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
skip_cache_lookup: bool = False,
skip_cache_insert: bool = False,
) -> tuple[cp.Tag, cp.Packet | None]:
@@ -316,6 +316,7 @@ def call(
skip_cache_lookup=skip_cache_lookup,
skip_cache_insert=skip_cache_insert,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
# if output_packet is not None:
@@ -339,6 +340,7 @@ async def async_call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
skip_cache_lookup: bool = False,
skip_cache_insert: bool = False,
) -> tuple[cp.Tag, cp.Packet | None]:
@@ -353,6 +355,7 @@ async def async_call(
skip_cache_lookup=skip_cache_lookup,
skip_cache_insert=skip_cache_insert,
execution_engine=execution_engine,
+ execution_engine_opts=execution_engine_opts,
)
if output_packet is not None:
diff --git a/src/orcapod/protocols/core_protocols/base.py b/src/orcapod/protocols/core_protocols/base.py
index c44d52c..7faad25 100644
--- a/src/orcapod/protocols/core_protocols/base.py
+++ b/src/orcapod/protocols/core_protocols/base.py
@@ -5,20 +5,126 @@
@runtime_checkable
class ExecutionEngine(Protocol):
+ """
+ Abstract execution backend responsible for running user functions.
+
+ ExecutionEngine defines the minimal contract that any execution backend
+ must satisfy to be used by Orcapod. Concrete implementations may execute
+ work in the current process (synchronously), on background threads or
+ processes, or on remote/distributed systems (e.g., Ray, Dask, Slurm).
+
+ Responsibilities
+ - Accept a Python callable plus arguments and execute it.
+ - Provide both a synchronous API (blocking) and an asynchronous API
+ (awaitable) with consistent error semantics.
+ - Surface the original exception from the user function without
+ wrapping where practical, while preserving traceback information.
+ - Be safe to construct/read concurrently from the pipeline orchestration.
+
+ Contract
+ - Inputs: a Python callable and its positional/keyword arguments.
+ - Outputs: the callable's return value (or a coroutine result when awaited).
+ - Errors: exceptions raised by the callable must be propagated to the
+ caller of submit_sync/submit_async.
+ - Cancellation: implementations may optionally support task cancellation
+ in submit_async via standard asyncio cancellation; submit_sync is
+ expected to block until completion.
+
+ Notes
+ - Serialization: Distributed engines may require the function and its
+ arguments to be serializable (pickle/cloudpickle). Local engines have
+ no such requirement beyond normal Python callability.
+ - Resource usage: Engines may schedule work with resource hints
+ (CPU/GPU/memory) outside this minimal protocol; higher-level APIs can
+ extend this interface if needed.
+ - Naming: ``name`` should be a short, human-friendly identifier such as
+ "local", "threadpool", "processpool", or "ray" and is used for logging
+ and diagnostics.
+ """
+
@property
- def name(self) -> str: ...
+ def name(self) -> str:
+ """Return a short, human-friendly identifier for the engine.
- def submit_sync(self, function: Callable, *args, **kwargs) -> Any:
+ Examples: "local", "threadpool", "processpool", "ray".
+ Used for logging, metrics, and debugging output.
"""
- Run the given function with the provided arguments.
- This method should be implemented by the execution engine.
+ ...
+
+ def submit_sync(
+ self,
+ func: Callable[..., Any],
+ /,
+ *,
+ fn_args: tuple[Any, ...] = (),
+ fn_kwargs: dict[str, Any] | None = None,
+ **engine_opts: Any,
+ ) -> Any:
+ """
+ Execute a callable and return its result (blocking).
+
+ This call is blocking. Engines may choose where/how the function
+ executes (same thread, worker thread/process, remote node), but the
+ call does not return until the work completes or fails.
+
+ Parameters
+ - func: Python callable to execute.
+ - fn_args: Tuple of positional arguments to pass to ``func``.
+ - fn_kwargs: Mapping of keyword arguments to pass to ``func``.
+ - **engine_opts: Engine-specific options (e.g., resources, priority),
+ never forwarded to ``func``.
+
+ Returns:
+ Any: The return value of ``func``.
+
+ Raises:
+ Exception: Any exception raised by ``func`` must be propagated to
+ the caller. Engines should preserve the original traceback whenever
+ practical.
+
+ Notes
+ - This API separates function inputs from engine configuration.
+ ``fn_args``/``fn_kwargs`` are always applied to ``func``;
+ ``engine_opts`` configures the engine and is never forwarded.
"""
...
- async def submit_async(self, function: Callable, *args, **kwargs) -> Any:
+ async def submit_async(
+ self,
+ func: Callable[..., Any],
+ /,
+ *,
+ fn_args: tuple[Any, ...] = (),
+ fn_kwargs: dict[str, Any] | None = None,
+ **engine_opts: Any,
+ ) -> Any:
"""
- Asynchronously run the given function with the provided arguments.
- This method should be implemented by the execution engine.
+ Asynchronously execute a callable and return the result when awaited.
+
+ The returned awaitable resolves to the callable's return value or
+ raises the callable's exception. Implementations should integrate with
+ asyncio semantics: if the awaiting task is cancelled, the engine may
+ attempt to cancel the underlying work when supported.
+
+ Parameters
+ - func: Python callable to execute.
+ - fn_args: Tuple of positional arguments to pass to ``func``.
+ - fn_kwargs: Mapping of keyword arguments to pass to ``func``.
+ - **engine_opts: Engine-specific options (e.g., resources, priority),
+ never forwarded to ``func``.
+
+ Returns:
+ Any: The return value of ``func`` when awaited.
+
+ Raises:
+ asyncio.CancelledError: If the awaiting task is cancelled and the
+ implementation propagates cancellation.
+ Exception: Any exception raised by ``func`` must be propagated to
+ the awaiting caller, with traceback preserved where possible.
+
+ Notes
+ - Mirrors the sync API: ``fn_args``/``fn_kwargs`` target ``func``;
+ ``engine_opts`` configures the engine and is never forwarded.
"""
...
diff --git a/src/orcapod/protocols/core_protocols/pods.py b/src/orcapod/protocols/core_protocols/pods.py
index b3c9513..616fd79 100644
--- a/src/orcapod/protocols/core_protocols/pods.py
+++ b/src/orcapod/protocols/core_protocols/pods.py
@@ -1,4 +1,4 @@
-from typing import TYPE_CHECKING, Protocol, runtime_checkable
+from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
from orcapod.protocols.core_protocols.base import ExecutionEngine
from orcapod.protocols.core_protocols.datagrams import Packet, Tag
@@ -94,6 +94,7 @@ async def async_call(
packet: Packet,
record_id: str | None = None,
execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[Tag, Packet | None]: ...
def call(
@@ -102,6 +103,7 @@ def call(
packet: Packet,
record_id: str | None = None,
execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[Tag, Packet | None]:
"""
Process a single packet with its associated tag.
@@ -143,6 +145,7 @@ async def async_call(
packet: Packet,
record_id: str | None = None,
execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
skip_cache_lookup: bool = False,
skip_cache_insert: bool = False,
) -> tuple[Tag, Packet | None]: ...
@@ -161,6 +164,7 @@ def call(
packet: Packet,
record_id: str | None = None,
execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
skip_cache_lookup: bool = False,
skip_cache_insert: bool = False,
) -> tuple[Tag, Packet | None]:
diff --git a/src/orcapod/protocols/core_protocols/streams.py b/src/orcapod/protocols/core_protocols/streams.py
index 36cd369..0cd3fb4 100644
--- a/src/orcapod/protocols/core_protocols/streams.py
+++ b/src/orcapod/protocols/core_protocols/streams.py
@@ -233,7 +233,9 @@ def __iter__(self) -> Iterator[tuple[Tag, Packet]]:
...
def iter_packets(
- self, execution_engine: ExecutionEngine | None = None
+ self,
+ execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[Tag, Packet]]:
"""
Alias for __iter__ for explicit packet iteration.
@@ -256,7 +258,11 @@ def iter_packets(
...
def run(
- self, *args: Any, execution_engine: ExecutionEngine | None = None, **kwargs: Any
+ self,
+ *args: Any,
+ execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
+ **kwargs: Any,
) -> None:
"""
Execute the stream using the provided execution engine.
@@ -272,7 +278,11 @@ def run(
...
async def run_async(
- self, *args: Any, execution_engine: ExecutionEngine | None = None, **kwargs: Any
+ self,
+ *args: Any,
+ execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
+ **kwargs: Any,
) -> None:
"""
Asynchronously execute the stream using the provided execution engine.
@@ -295,6 +305,7 @@ def as_df(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pl.DataFrame":
"""
Convert the entire stream to a Polars DataFrame.
@@ -309,6 +320,7 @@ def as_lazy_frame(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pl.LazyFrame":
"""
Load the entire stream to a Polars LazyFrame.
@@ -323,6 +335,7 @@ def as_polars_df(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pl.DataFrame": ...
def as_pandas_df(
@@ -334,6 +347,7 @@ def as_pandas_df(
sort_by_tags: bool = True,
index_by_tags: bool = True,
execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pd.DataFrame": ...
def as_table(
@@ -344,6 +358,7 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table":
"""
Convert the entire stream to a PyArrow Table.
@@ -364,6 +379,7 @@ def as_table(
def flow(
self,
execution_engine: ExecutionEngine | None = None,
+ execution_engine_opts: dict[str, Any] | None = None,
) -> Collection[tuple[Tag, Packet]]:
"""
Return the entire stream as a collection of (tag, packet) pairs.