From 5c448a16762f89a659653d8cb340ee51e3515209 Mon Sep 17 00:00:00 2001 From: haseebmalik18 Date: Mon, 12 Jan 2026 23:23:08 -0500 Subject: [PATCH] Add Kerberos authentication support to HadoopFileSystem #20719 --- CHANGES.md | 3 +- .../python/apache_beam/io/hadoopfilesystem.py | 28 +++++- .../apache_beam/io/hadoopfilesystem_test.py | 86 +++++++++++++++++++ .../apache_beam/options/pipeline_options.py | 8 ++ 4 files changed, 122 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index aea2693d1fa5..e82c9686f168 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,6 +70,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* HadoopFileSystem now supports Kerberos authentication via `--hdfs_client=KERBEROS` flag (Python) ([#20719](https://github.com/apache/beam/issues/20719)). ## Breaking Changes @@ -2349,4 +2350,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss ## Highlights -- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). +- - For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). \ No newline at end of file diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py index 3287644eed8c..2cdf0dabf43f 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem.py @@ -40,6 +40,11 @@ except ImportError: hdfs = None +try: + from hdfs.ext.kerberos import KerberosClient +except ImportError: + KerberosClient = None + __all__ = ['HadoopFileSystem'] _HDFS_PREFIX = 'hdfs:/' @@ -123,11 +128,13 @@ def __init__(self, pipeline_options): hdfs_host = hdfs_options.hdfs_host hdfs_port = hdfs_options.hdfs_port hdfs_user = hdfs_options.hdfs_user + hdfs_client = hdfs_options.hdfs_client self._full_urls = hdfs_options.hdfs_full_urls else: hdfs_host = pipeline_options.get('hdfs_host') hdfs_port = pipeline_options.get('hdfs_port') hdfs_user = pipeline_options.get('hdfs_user') + hdfs_client = pipeline_options.get('hdfs_client', 'INSECURE') self._full_urls = pipeline_options.get('hdfs_full_urls', False) if hdfs_host is None: @@ -139,8 +146,25 @@ def __init__(self, pipeline_options): if not isinstance(self._full_urls, bool): raise ValueError( 'hdfs_full_urls should be bool, got: %s', self._full_urls) - self._hdfs_client = hdfs.InsecureClient( - 'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user) + + # Create HDFS client based on authentication type + url = 'http://%s:%s' % (hdfs_host, str(hdfs_port)) + if hdfs_client == 'KERBEROS': + if KerberosClient is None: + raise ImportError( + 'Kerberos authentication requires the requests-kerberos library. ' + 'Install it with: pip install requests-kerberos') + _LOGGER.info('Using KerberosClient for HDFS authentication') + try: + self._hdfs_client = KerberosClient(url) + except Exception as e: + raise RuntimeError( + 'Failed to create KerberosClient. Ensure you have valid Kerberos ' + 'credentials (run kinit) or have configured a keytab. ' + 'Error: %s' % str(e)) + else: + # Default to INSECURE for backward compatibility + self._hdfs_client = hdfs.InsecureClient(url, user=hdfs_user) @classmethod def scheme(cls): diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py index eb0925224dd3..eaa6728e17fd 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py @@ -668,6 +668,92 @@ def test_dict_options_full_urls(self): self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options) self.assertTrue(self.fs._full_urls) + def test_insecure_client_default(self): + """Test that InsecureClient is used by default.""" + pipeline_options = PipelineOptions() + hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions) + hdfs_options.hdfs_host = 'localhost' + hdfs_options.hdfs_port = 9870 + hdfs_options.hdfs_user = 'testuser' + # hdfs_client not specified, should default to INSECURE + + self.fs = hdfs.HadoopFileSystem(pipeline_options) + self.assertIsInstance(self.fs._hdfs_client, FakeHdfs) + + def test_insecure_client_explicit(self): + """Test that InsecureClient is used when explicitly specified.""" + pipeline_options = PipelineOptions() + hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions) + hdfs_options.hdfs_host = 'localhost' + hdfs_options.hdfs_port = 9870 + hdfs_options.hdfs_user = 'testuser' + hdfs_options.hdfs_client = 'INSECURE' + + self.fs = hdfs.HadoopFileSystem(pipeline_options) + self.assertIsInstance(self.fs._hdfs_client, FakeHdfs) + + def test_kerberos_client_missing_library(self): + """Test that Kerberos client fails gracefully when library not installed.""" + pipeline_options = PipelineOptions() + hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions) + hdfs_options.hdfs_host = 'localhost' + hdfs_options.hdfs_port = 9870 + hdfs_options.hdfs_user = 'testuser' + hdfs_options.hdfs_client = 'KERBEROS' + + # Temporarily set KerberosClient to None to simulate missing library + original_kerberos_client = hdfs.KerberosClient + hdfs.KerberosClient = None + + try: + with self.assertRaisesRegex(ImportError, r'requests-kerberos'): + hdfs.HadoopFileSystem(pipeline_options) + finally: + hdfs.KerberosClient = original_kerberos_client + + def test_kerberos_client_creation(self): + """Test that KerberosClient is created when specified.""" + pipeline_options = PipelineOptions() + hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions) + hdfs_options.hdfs_host = 'localhost' + hdfs_options.hdfs_port = 9870 + hdfs_options.hdfs_user = 'testuser' + hdfs_options.hdfs_client = 'KERBEROS' + + # Mock KerberosClient to return our FakeHdfs + if hdfs.KerberosClient is not None: + original_kerberos_client = hdfs.KerberosClient + hdfs.KerberosClient = lambda *args, **kwargs: self._fake_hdfs + + try: + self.fs = hdfs.HadoopFileSystem(pipeline_options) + self.assertIsInstance(self.fs._hdfs_client, FakeHdfs) + finally: + hdfs.KerberosClient = original_kerberos_client + + def test_dict_options_insecure_client(self): + """Test InsecureClient with dict-based pipeline options.""" + pipeline_options = { + 'hdfs_host': 'localhost', + 'hdfs_port': 9870, + 'hdfs_user': 'testuser', + 'hdfs_client': 'INSECURE', + } + + self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options) + self.assertIsInstance(self.fs._hdfs_client, FakeHdfs) + + def test_dict_options_default_client(self): + """Test dict options default to INSECURE without hdfs_client.""" + pipeline_options = { + 'hdfs_host': 'localhost', + 'hdfs_port': 9870, + 'hdfs_user': 'testuser', + } + + self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options) + self.assertIsInstance(self.fs._hdfs_client, FakeHdfs) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 0e1012b2de65..4293ed9f3c37 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1328,6 +1328,14 @@ def _add_argparse_args(cls, parser): 'If set, URLs will be parsed as "hdfs://server/path/...", instead ' 'of "hdfs://path/...". The "server" part will be unused (use ' '--hdfs_host and --hdfs_port).')) + parser.add_argument( + '--hdfs_client', + default='INSECURE', + choices=['INSECURE', 'KERBEROS'], + help=( + 'HDFS client type for authentication. INSECURE uses simple ' + 'username-based authentication (default). KERBEROS uses Kerberos ' + 'authentication (requires kinit or keytab configuration).')) def validate(self, validator): errors = []