Source code for dpdispatcher.hdfs_cli

# /usr/bin/python
# -*- encoding=utf-8 -*-

import os
import sys
from dpdispatcher.utils import run_cmd_with_all_output

[docs]class HDFS(object): '''Fundamental class for HDFS basic manipulation '''
[docs] @staticmethod def exists(uri): '''Check existence of hdfs uri Returns: True on exists Raises: RuntimeError ''' cmd = 'hadoop fs -test -e {uri}'.format(uri=uri) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True elif ret == 1: return False else: raise RuntimeError('Cannot check existence of hdfs uri[{}] ' 'with cmd[{}]; ret[{}] stdout[{}] stderr[{}]'.format( uri, cmd, ret, out, err)) except Exception as e: raise RuntimeError('Cannot check existence of hdfs uri[{}] ' 'with cmd[{}]'.format(uri, cmd)) from e
[docs] @staticmethod def remove(uri): '''Check existence of hdfs uri Returns: True on exists Raises: RuntimeError ''' cmd = 'hadoop fs -rm -r {uri}'.format(uri=uri) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True else: raise RuntimeError('Cannot remove hdfs uri[{}] ' 'with cmd[{}]; ret[{}] output[{}] stderr[{}]'.format( uri, cmd, ret, out, err)) except Exception as e: raise RuntimeError('Cannot remove hdfs uri[{}] ' 'with cmd[{}]'.format(uri, cmd)) from e
[docs] @staticmethod def mkdir(uri): '''Make new hdfs directory Returns: True on success Raises: RuntimeError ''' cmd = 'hadoop fs -mkdir -p {uri}'.format(uri=uri) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True else: raise RuntimeError('Cannot mkdir of hdfs uri[{}] ' 'with cmd[{}]; ret[{}] output[{}] stderr[{}]'.format( uri, cmd, ret, out, err)) except Exception as e: raise RuntimeError('Cannot mkdir of hdfs uri[{}] ' 'with cmd[{}]'.format(uri, cmd)) from e
[docs] @staticmethod def copy_from_local(local_path, to_uri): ''' Returns: True on success Raises: on unexpected error ''' # Make sure local_path is accessible if not os.path.exists(local_path) or not os.access(local_path, os.R_OK): raise RuntimeError('try to access local_path[{}] ' 'but failed'.format(local_path)) cmd = 'hadoop fs -copyFromLocal -f {local} {remote}'.format(local=local_path, remote=to_uri) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True, out else: raise RuntimeError('Cannot copy local[{}] to remote[{}] with cmd[{}]; ' 'ret[{}] output[{}] stderr[{}]'.format(local_path, to_uri, cmd, ret, out, err)) except Exception as e: raise RuntimeError('Cannot copy local[{}] to remote[{}] with cmd[{}]' .format(local_path, to_uri, cmd)) from e
[docs] @staticmethod def copy_to_local(from_uri, local_path): remote = '' if isinstance(from_uri, string_types): remote = from_uri elif isinstance(from_uri, list) or \ isinstance(from_uri, tuple): remote = ' '.join(from_uri) cmd = 'hadoop fs -copyToLocal {remote} {local}'.format(remote=remote, local=local_path) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True else: raise RuntimeError('Cannot copy remote[{}] to local[{}] with cmd[{}]; ' 'ret[{}] output[{}] stderr[{}]'.format(from_uri, local_path, cmd, ret, out, err)) except Exception as e: raise RuntimeError('Cannot copy remote[{}] to local[{}] with cmd[{}]' .format(from_uri, local_path, cmd)) from e
[docs] @staticmethod def read_hdfs_file(uri): cmd = 'hadoop fs -text {uri}'.format(uri=uri) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return out else: raise RuntimeError('Cannot read text from uri[{}]' 'cmd [{}] ret[{}] output[{}] stderr[{}]'.format(uri, cmd, ret, out, err)) except Exception as e: raise RuntimeError('Cannot read text from uri[{}]' 'cmd [{}]'.format(uri, cmd)) from e
[docs] @staticmethod def move(from_uri, to_uri): cmd = 'hadoop fs -mv {furi} {turi}'.format(furi=from_uri, turi=to_uri) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True else: raise RuntimeError('Cannot move from_uri[{}] to ' 'to_uri[{}] with cmd[{}]; ' 'ret[{}] output[{}] stderr[{}]'.format(from_uri, to_uri, cmd, ret, out, err)) except Exception as e: raise RuntimeError('Cannot move from_uri[{}] to ' 'to_uri[{}] with cmd[{}]'.format(from_uri, to_uri, cmd)) from e