-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathexample.py
More file actions
209 lines (169 loc) · 6.77 KB
/
example.py
File metadata and controls
209 lines (169 loc) · 6.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import asyncio
import time
import pandas as pd
import pyarrow as pa
import fluss
async def main():
# Create connection configuration
config_spec = {
"bootstrap.servers": "127.0.0.1:9123",
# Add other configuration options as needed
"request.max.size": "10485760", # 10 MB
"writer.acks": "all", # Wait for all replicas to acknowledge
"writer.retries": "3", # Retry up to 3 times on failure
"writer.batch.size": "1000", # Batch size for writes
}
config = fluss.Config(config_spec)
# Create connection using the static connect method
conn = await fluss.FlussConnection.connect(config)
# Define fields for PyArrow
fields = [
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
pa.field("score", pa.float32()),
pa.field("age", pa.int32()),
]
# Create a PyArrow schema
schema = pa.schema(fields)
# Create a Fluss Schema first (this is what TableDescriptor expects)
fluss_schema = fluss.Schema(schema)
# Create a Fluss TableDescriptor
table_descriptor = fluss.TableDescriptor(fluss_schema)
# Get the admin for Fluss
admin = await conn.get_admin()
# Create a Fluss table
table_path = fluss.TablePath("fluss", "sample_table")
try:
await admin.create_table(table_path, table_descriptor, True)
print(f"Created table: {table_path}")
except Exception as e:
print(f"Table creation failed: {e}")
# Get table information via admin
try:
table_info = await admin.get_table(table_path)
print(f"Table info: {table_info}")
print(f"Table ID: {table_info.table_id}")
print(f"Schema ID: {table_info.schema_id}")
print(f"Created time: {table_info.created_time}")
print(f"Primary keys: {table_info.get_primary_keys()}")
except Exception as e:
print(f"Failed to get table info: {e}")
# Get the table instance
table = await conn.get_table(table_path)
print(f"Got table: {table}")
# Create a writer for the table
append_writer = await table.new_append_writer()
print(f"Created append writer: {append_writer}")
try:
# Test 1: Write PyArrow Table
print("\n--- Testing PyArrow Table write ---")
pa_table = pa.Table.from_arrays(
[
pa.array([1, 2, 3], type=pa.int32()),
pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
pa.array([95.2, 87.2, 92.1], type=pa.float32()),
pa.array([25, 30, 35], type=pa.int32()),
],
schema=schema,
)
append_writer.write_arrow(pa_table)
print("Successfully wrote PyArrow Table")
# Test 2: Write PyArrow RecordBatch
print("\n--- Testing PyArrow RecordBatch write ---")
pa_record_batch = pa.RecordBatch.from_arrays(
[
pa.array([4, 5], type=pa.int32()),
pa.array(["David", "Eve"], type=pa.string()),
pa.array([88.5, 91.0], type=pa.float32()),
pa.array([28, 32], type=pa.int32()),
],
schema=schema,
)
append_writer.write_arrow_batch(pa_record_batch)
print("Successfully wrote PyArrow RecordBatch")
# Test 3: Write Pandas DataFrame
print("\n--- Testing Pandas DataFrame write ---")
df = pd.DataFrame(
{
"id": [6, 7],
"name": ["Frank", "Grace"],
"score": [89.3, 94.7],
"age": [29, 27],
}
)
append_writer.write_pandas(df)
print("Successfully wrote Pandas DataFrame")
# Flush all pending data
print("\n--- Flushing data ---")
append_writer.flush()
print("Successfully flushed data")
except Exception as e:
print(f"Error during writing: {e}")
# Now scan the table to verify data was written
print("\n--- Scanning table ---")
try:
log_scanner = await table.new_log_scanner()
print(f"Created log scanner: {log_scanner}")
# Subscribe to scan from earliest to latest
# start_timestamp=None (earliest), end_timestamp=None (latest)
log_scanner.subscribe(None, None)
print("Scanning results using to_arrow():")
# Try to get as PyArrow Table
try:
pa_table_result = log_scanner.to_arrow()
print(f"\nAs PyArrow Table: {pa_table_result}")
except Exception as e:
print(f"Could not convert to PyArrow: {e}")
# Let's subscribe from the beginning again.
# Reset subscription
log_scanner.subscribe(None, None)
# Try to get as Pandas DataFrame
try:
df_result = log_scanner.to_pandas()
print(f"\nAs Pandas DataFrame:\n{df_result}")
except Exception as e:
print(f"Could not convert to Pandas: {e}")
# TODO: support to_arrow_batch_reader()
# which is reserved for streaming use cases
# TODO: support to_duckdb()
# Test the new poll() method for incremental reading
print("\n--- Testing poll() method ---")
log_scanner.subscribe(None, None)
# Poll with a timeout of 5000ms (5 seconds)
# Note: poll() returns an empty table (not an error) on timeout
try:
poll_result = log_scanner.poll(5000)
print(f"Number of rows: {poll_result.num_rows}")
if poll_result.num_rows > 0:
poll_df = poll_result.to_pandas()
print(f"Polled data:\n{poll_df}")
else:
print("Empty result (no records available)")
# Empty table still has schema
print(f"Schema: {poll_result.schema}")
except Exception as e:
print(f"Error during poll: {e}")
except Exception as e:
print(f"Error during scanning: {e}")
# Close connection
conn.close()
print("\nConnection closed")
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())