Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* HadoopFileSystem now supports Kerberos authentication via `--hdfs_client=KERBEROS` flag (Python) ([#20719](https://github.com/apache/beam/issues/20719)).

## Breaking Changes

Expand Down Expand Up @@ -2349,4 +2350,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss

## Highlights

- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
- - For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It seems like there's an extra hyphen - here, which will render as a nested list item. Was this intentional? If not, it should probably be removed to maintain consistent formatting.

Suggested change
- - For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).

28 changes: 26 additions & 2 deletions sdks/python/apache_beam/io/hadoopfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
except ImportError:
hdfs = None

try:
from hdfs.ext.kerberos import KerberosClient
except ImportError:
KerberosClient = None

__all__ = ['HadoopFileSystem']

_HDFS_PREFIX = 'hdfs:/'
Expand Down Expand Up @@ -123,11 +128,13 @@ def __init__(self, pipeline_options):
hdfs_host = hdfs_options.hdfs_host
hdfs_port = hdfs_options.hdfs_port
hdfs_user = hdfs_options.hdfs_user
hdfs_client = hdfs_options.hdfs_client
self._full_urls = hdfs_options.hdfs_full_urls
else:
hdfs_host = pipeline_options.get('hdfs_host')
hdfs_port = pipeline_options.get('hdfs_port')
hdfs_user = pipeline_options.get('hdfs_user')
hdfs_client = pipeline_options.get('hdfs_client', 'INSECURE')
self._full_urls = pipeline_options.get('hdfs_full_urls', False)

if hdfs_host is None:
Expand All @@ -139,8 +146,25 @@ def __init__(self, pipeline_options):
if not isinstance(self._full_urls, bool):
raise ValueError(
'hdfs_full_urls should be bool, got: %s', self._full_urls)
self._hdfs_client = hdfs.InsecureClient(
'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user)

# Create HDFS client based on authentication type
url = 'http://%s:%s' % (hdfs_host, str(hdfs_port))
if hdfs_client == 'KERBEROS':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make the client selection more robust, it would be good to perform a case-insensitive comparison. While argparse enforces the choices case-sensitively, options can also be passed as a dictionary where case might not be consistent. Using .upper() would handle KERBEROS, kerberos, etc.

Suggested change
if hdfs_client == 'KERBEROS':
if hdfs_client.upper() == 'KERBEROS':

if KerberosClient is None:
raise ImportError(
'Kerberos authentication requires the requests-kerberos library. '
'Install it with: pip install requests-kerberos')
_LOGGER.info('Using KerberosClient for HDFS authentication')
try:
self._hdfs_client = KerberosClient(url)
except Exception as e:
raise RuntimeError(
'Failed to create KerberosClient. Ensure you have valid Kerberos '
'credentials (run kinit) or have configured a keytab. '
'Error: %s' % str(e))
Comment on lines +160 to +164
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When re-raising the exception, it's better to use raise from to preserve the original exception's traceback. This is very helpful for debugging. The current implementation hides the original exception type and stack trace.

Also, consider using an f-string for formatting the error message for better readability, as the project seems to be on a modern Python version.

Suggested change
except Exception as e:
raise RuntimeError(
'Failed to create KerberosClient. Ensure you have valid Kerberos '
'credentials (run kinit) or have configured a keytab. '
'Error: %s' % str(e))
except Exception as e:
raise RuntimeError(
'Failed to create KerberosClient. Ensure you have valid Kerberos '
'credentials (run kinit) or have configured a keytab. '
f'Error: {e}') from e

else:
# Default to INSECURE for backward compatibility
self._hdfs_client = hdfs.InsecureClient(url, user=hdfs_user)

@classmethod
def scheme(cls):
Expand Down
86 changes: 86 additions & 0 deletions sdks/python/apache_beam/io/hadoopfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,92 @@ def test_dict_options_full_urls(self):
self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
self.assertTrue(self.fs._full_urls)

def test_insecure_client_default(self):
"""Test that InsecureClient is used by default."""
pipeline_options = PipelineOptions()
hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
hdfs_options.hdfs_host = 'localhost'
hdfs_options.hdfs_port = 9870
hdfs_options.hdfs_user = 'testuser'
# hdfs_client not specified, should default to INSECURE

self.fs = hdfs.HadoopFileSystem(pipeline_options)
self.assertIsInstance(self.fs._hdfs_client, FakeHdfs)

def test_insecure_client_explicit(self):
"""Test that InsecureClient is used when explicitly specified."""
pipeline_options = PipelineOptions()
hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
hdfs_options.hdfs_host = 'localhost'
hdfs_options.hdfs_port = 9870
hdfs_options.hdfs_user = 'testuser'
hdfs_options.hdfs_client = 'INSECURE'

self.fs = hdfs.HadoopFileSystem(pipeline_options)
self.assertIsInstance(self.fs._hdfs_client, FakeHdfs)

def test_kerberos_client_missing_library(self):
"""Test that Kerberos client fails gracefully when library not installed."""
pipeline_options = PipelineOptions()
hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
hdfs_options.hdfs_host = 'localhost'
hdfs_options.hdfs_port = 9870
hdfs_options.hdfs_user = 'testuser'
hdfs_options.hdfs_client = 'KERBEROS'

# Temporarily set KerberosClient to None to simulate missing library
original_kerberos_client = hdfs.KerberosClient
hdfs.KerberosClient = None

try:
with self.assertRaisesRegex(ImportError, r'requests-kerberos'):
hdfs.HadoopFileSystem(pipeline_options)
finally:
hdfs.KerberosClient = original_kerberos_client

def test_kerberos_client_creation(self):
"""Test that KerberosClient is created when specified."""
pipeline_options = PipelineOptions()
hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
hdfs_options.hdfs_host = 'localhost'
hdfs_options.hdfs_port = 9870
hdfs_options.hdfs_user = 'testuser'
hdfs_options.hdfs_client = 'KERBEROS'

# Mock KerberosClient to return our FakeHdfs
if hdfs.KerberosClient is not None:
original_kerberos_client = hdfs.KerberosClient
hdfs.KerberosClient = lambda *args, **kwargs: self._fake_hdfs

try:
self.fs = hdfs.HadoopFileSystem(pipeline_options)
self.assertIsInstance(self.fs._hdfs_client, FakeHdfs)
finally:
hdfs.KerberosClient = original_kerberos_client

def test_dict_options_insecure_client(self):
"""Test InsecureClient with dict-based pipeline options."""
pipeline_options = {
'hdfs_host': 'localhost',
'hdfs_port': 9870,
'hdfs_user': 'testuser',
'hdfs_client': 'INSECURE',
}

self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
self.assertIsInstance(self.fs._hdfs_client, FakeHdfs)

def test_dict_options_default_client(self):
"""Test dict options default to INSECURE without hdfs_client."""
pipeline_options = {
'hdfs_host': 'localhost',
'hdfs_port': 9870,
'hdfs_user': 'testuser',
}

self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
self.assertIsInstance(self.fs._hdfs_client, FakeHdfs)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,14 @@ def _add_argparse_args(cls, parser):
'If set, URLs will be parsed as "hdfs://server/path/...", instead '
'of "hdfs://path/...". The "server" part will be unused (use '
'--hdfs_host and --hdfs_port).'))
parser.add_argument(
'--hdfs_client',
default='INSECURE',
choices=['INSECURE', 'KERBEROS'],
help=(
'HDFS client type for authentication. INSECURE uses simple '
'username-based authentication (default). KERBEROS uses Kerberos '
'authentication (requires kinit or keytab configuration).'))

def validate(self, validator):
errors = []
Expand Down
Loading