Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions docs/flink-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@ menu:
-->

# Flink Connector
Apache Flink supports creating Iceberg table directly without creating the explicit Flink catalog in Flink SQL. That means we can just create an iceberg table by specifying `'connector'='iceberg'` table option in Flink SQL which is similar to usage in the Flink official [document](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
Apache Flink supports creating Iceberg table directly without creating the explicit Flink catalog in Flink SQL. That means we can just create an iceberg table by specifying `'connector'='iceberg'` table option in Flink SQL which is similar to usage in the Flink official [document](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/overview/).

In Flink, the SQL `CREATE TABLE test (..) WITH ('connector'='iceberg', ...)` will create a Flink table in current Flink catalog (use [GenericInMemoryCatalog](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/catalogs/#genericinmemorycatalog) by default),
In Flink, the SQL `CREATE TABLE test (..) WITH ('connector'='iceberg', ...)` will create a Flink table in current Flink catalog (use [GenericInMemoryCatalog](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/catalogs/#genericinmemorycatalog) by default),
which is just mapping to the underlying iceberg table instead of maintaining iceberg table directly in current Flink catalog.

To create the table in Flink SQL by using SQL syntax `CREATE TABLE test (..) WITH ('connector'='iceberg', ...)`, Flink iceberg connector provides the following table properties:

* `connector`: Use the constant `iceberg`.
* `catalog-name`: User-specified catalog name. It's required because the connector don't have any default value.
* `catalog-type`: `hive` or `hadoop` for built-in catalogs (defaults to `hive`), or left unset for custom catalog implementations using `catalog-impl`.
* `catalog-impl`: The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. See also [custom catalog](../flink/flink-getting-started.md#custom-catalog) for more details.
* `catalog-type`: `hive`, `hadoop` or `rest` for built-in catalogs (defaults to `hive`), or left unset for custom catalog implementations using `catalog-impl`.
* `catalog-impl`: The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. See also [custom catalog](https://iceberg.apache.org/docs/latest/flink-getting-started/#custom-catalog) for more details.
* `catalog-database`: The iceberg database name in the backend catalog, use the current flink database name by default.
* `catalog-table`: The iceberg table name in the backend catalog. Default to use the table name in the flink `CREATE TABLE` sentence.

## Table managed in Hive catalog.
Before executing the following SQL, please make sure you've configured the Flink SQL client correctly according to the quick start [document](https://iceberg.apache.org/docs/latest/flink-getting-started).

Before executing the following SQL, please make sure you've configured the Flink SQL client correctly according to the quick start [document](../flink).
## Table managed in Hive catalog.

The following SQL will create a Flink table in the current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in iceberg catalog.
The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in Hive catalog.

```sql
CREATE TABLE flink_table (
Expand Down Expand Up @@ -78,9 +78,9 @@ CREATE TABLE flink_table (
The underlying catalog database (`hive_db` in the above example) will be created automatically if it does not exist when writing records into the Flink table.
{{< /hint >}}

## Table managed in hadoop catalog
## Table managed in Hadoop catalog

The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in hadoop catalog.
The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in Hadoop catalog.

```sql
CREATE TABLE flink_table (
Expand All @@ -94,7 +94,24 @@ CREATE TABLE flink_table (
);
```

## Table managed in custom catalog
## Table managed in Rest catalog

The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in Rest catalog.

```sql
CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='rest_prod',
'catalog-type'='rest',
'uri'='https://localhost/'
-- credential, token is Optional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest linking to our FLink-ddl page so that users can see more optional configuration items. The same applies to Hive and Hadoop Catalog above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is reasonable to add a connector page separately, which explains how to integrate iceberg tables in this way. As for the description of catalog configuration items, we can add a link to the Flink-DDL page, which can simplify the content here.

);
```

## Table managed in Custom catalog

The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in
a custom catalog of type `com.my.custom.CatalogImpl`.
Expand Down Expand Up @@ -145,4 +162,4 @@ SELECT * FROM flink_table;
3 rows in set
```

For more details, please refer to the Iceberg [Flink document](../flink).
For more details, please refer to the Iceberg [Flink document](https://iceberg.apache.org/docs/latest/flink-getting-started).