Skip to content

Commit

Permalink
hdfs provider: restore HA support for webhdfs (#19711)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aakcht authored Nov 24, 2021
1 parent 1199884 commit 1b97d65
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 31 deletions.
48 changes: 25 additions & 23 deletions airflow/providers/apache/hdfs/hooks/webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.models.connection import Connection

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,36 +70,39 @@ def get_conn(self) -> Any:

def _find_valid_server(self) -> Any:
connection = self.get_connection(self.webhdfs_conn_id)
host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.log.info("Trying to connect to %s:%s", connection.host, connection.port)
try:
conn_check = host_socket.connect_ex((connection.host, connection.port))
if conn_check == 0:
self.log.info('Trying namenode %s', connection.host)
client = self._get_client(connection)
client.status('/')
self.log.info('Using namenode %s for hook', connection.host)
host_socket.close()
return client
else:
self.log.error("Could not connect to %s:%s", connection.host, connection.port)
host_socket.close()
except HdfsError as hdfs_error:
self.log.error('Read operation on namenode %s failed with error: %s', connection.host, hdfs_error)
namenodes = connection.host.split(',')
for namenode in namenodes:
host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.log.info("Trying to connect to %s:%s", namenode, connection.port)
try:
conn_check = host_socket.connect_ex((namenode, connection.port))
if conn_check == 0:
self.log.info('Trying namenode %s', namenode)
client = self._get_client(
namenode, connection.port, connection.login, connection.extra_dejson
)
client.status('/')
self.log.info('Using namenode %s for hook', namenode)
host_socket.close()
return client
else:
self.log.warning("Could not connect to %s:%s", namenode, connection.port)
except HdfsError as hdfs_error:
self.log.info('Read operation on namenode %s failed with error: %s', namenode, hdfs_error)
return None

def _get_client(self, connection: Connection) -> Any:
connection_str = f'http://{connection.host}:{connection.port}'
def _get_client(self, namenode: str, port: int, login: str, extra_dejson: dict) -> Any:
connection_str = f'http://{namenode}:{port}'
session = requests.Session()

if connection.extra_dejson.get('use_ssl', False):
connection_str = f'https://{connection.host}:{connection.port}'
session.verify = connection.extra_dejson.get('verify', True)
if extra_dejson.get('use_ssl', False):
connection_str = f'https://{namenode}:{port}'
session.verify = extra_dejson.get('verify', True)

if _kerberos_security_mode:
client = KerberosClient(connection_str, session=session)
else:
proxy_user = self.proxy_user or connection.login
proxy_user = self.proxy_user or login
client = InsecureClient(connection_str, user=proxy_user, session=session)

return client
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-apache-hdfs/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ parameter as ``webhdfs_default`` by default.
Configuring the Connection
--------------------------
Host
The host to connect to, it can be ``local``, ``yarn`` or an URL.
The host to connect to, it can be ``local``, ``yarn`` or an URL. For Web HDFS Hook it is possible to specify multiple hosts as a comma-separated list.

Port
Specify the port in case of host be an URL.
Expand Down
25 changes: 18 additions & 7 deletions tests/providers/apache/hdfs/hooks/test_webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.

import unittest
from unittest.mock import patch
from unittest.mock import call, patch

import pytest
from hdfs import HdfsError
Expand All @@ -34,24 +34,35 @@ def setUp(self):
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
return_value=Connection(host='host_2', port=321, login='user'),
return_value=Connection(host='host_1.com,host_2.com', port=321, login='user'),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_get_conn(self, socket_mock, mock_get_connection, mock_insecure_client, mock_session):
mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value]
socket_mock.socket.return_value.connect_ex.return_value = 0
conn = self.webhdfs_hook.get_conn()
connection = mock_get_connection.return_value
mock_insecure_client.assert_called_once_with(
f'http://{connection.host}:{connection.port}',
user=connection.login,
session=mock_session.return_value,
hosts = connection.host.split(',')
mock_insecure_client.assert_has_calls(
[
call(
f'http://{host}:{connection.port}',
user=connection.login,
session=mock_session.return_value,
)
for host in hosts
]
)
mock_insecure_client.return_value.status.assert_called_once_with('/')
assert conn == mock_insecure_client.return_value

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient', side_effect=HdfsError('Error'))
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
return_value=Connection(host='host_2', port=321, login='user'),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_get_conn_hdfs_error(self, socket_mock, mock_insecure_client):
def test_get_conn_hdfs_error(self, socket_mock, mock_get_connection, mock_insecure_client):
socket_mock.socket.return_value.connect_ex.return_value = 0
with pytest.raises(AirflowWebHDFSHookException):
self.webhdfs_hook.get_conn()
Expand Down

0 comments on commit 1b97d65

Please sign in to comment.