|
| 1 | +--- |
| 2 | +title: "Data Evolution" |
| 3 | +weight: 5 |
| 4 | +type: docs |
| 5 | +aliases: |
| 6 | + - /pypaimon/data-evolution.html |
| 7 | +--- |
| 8 | + |
| 9 | +<!-- |
| 10 | +Licensed to the Apache Software Foundation (ASF) under one |
| 11 | +or more contributor license agreements. See the NOTICE file |
| 12 | +distributed with this work for additional information |
| 13 | +regarding copyright ownership. The ASF licenses this file |
| 14 | +to you under the Apache License, Version 2.0 (the |
| 15 | +"License"); you may not use this file except in compliance |
| 16 | +with the License. You may obtain a copy of the License at |
| 17 | +
|
| 18 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 19 | +
|
| 20 | +Unless required by applicable law or agreed to in writing, |
| 21 | +software distributed under the License is distributed on an |
| 22 | +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 23 | +KIND, either express or implied. See the License for the |
| 24 | +specific language governing permissions and limitations |
| 25 | +under the License. |
| 26 | +--> |
| 27 | + |
| 28 | +# Data Evolution |
| 29 | + |
| 30 | +PyPaimon for Data Evolution mode. See [Data Evolution]({{< ref "append-table/data-evolution" >}}). |
| 31 | + |
| 32 | +## Update Columns By Row ID |
| 33 | + |
| 34 | +You can create `TableUpdate.update_by_arrow_with_row_id` to update columns to data evolution tables. |
| 35 | + |
| 36 | +The input data should include the `_ROW_ID` column, update operation will automatically sort and match each `_ROW_ID` to |
| 37 | +its corresponding `first_row_id`, then groups rows with the same `first_row_id` and writes them to a separate file. |
| 38 | + |
| 39 | +```python |
| 40 | +simple_pa_schema = pa.schema([ |
| 41 | + ('f0', pa.int8()), |
| 42 | + ('f1', pa.int16()), |
| 43 | +]) |
| 44 | +schema = Schema.from_pyarrow_schema(simple_pa_schema, |
| 45 | + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}) |
| 46 | +catalog.create_table('default.test_row_tracking', schema, False) |
| 47 | +table = catalog.get_table('default.test_row_tracking') |
| 48 | + |
| 49 | +# write all columns |
| 50 | +write_builder = table.new_batch_write_builder() |
| 51 | +table_write = write_builder.new_write() |
| 52 | +table_commit = write_builder.new_commit() |
| 53 | +expect_data = pa.Table.from_pydict({ |
| 54 | + 'f0': [-1, 2], |
| 55 | + 'f1': [-1001, 1002] |
| 56 | +}, schema=simple_pa_schema) |
| 57 | +table_write.write_arrow(expect_data) |
| 58 | +table_commit.commit(table_write.prepare_commit()) |
| 59 | +table_write.close() |
| 60 | +table_commit.close() |
| 61 | + |
| 62 | +# update partial columns |
| 63 | +write_builder = table.new_batch_write_builder() |
| 64 | +table_update = write_builder.new_update().with_update_type(['f0']) |
| 65 | +table_commit = write_builder.new_commit() |
| 66 | +data2 = pa.Table.from_pydict({ |
| 67 | + '_ROW_ID': [0, 1], |
| 68 | + 'f0': [5, 6], |
| 69 | +}, schema=pa.schema([ |
| 70 | + ('_ROW_ID', pa.int64()), |
| 71 | + ('f0', pa.int8()), |
| 72 | +])) |
| 73 | +cmts = table_update.update_by_arrow_with_row_id(data2) |
| 74 | +table_commit.commit(cmts) |
| 75 | +table_commit.close() |
| 76 | + |
| 77 | +# content should be: |
| 78 | +# 'f0': [5, 6], |
| 79 | +# 'f1': [-1001, 1002] |
| 80 | +``` |
0 commit comments