Source code for dpdispatcher.utils.hdfs_cli
# /usr/bin/python
import os
from dpdispatcher.utils.utils import run_cmd_with_all_output
[docs]
class HDFS:
"""Fundamental class for HDFS basic manipulation."""
[docs]
@staticmethod
def exists(uri):
"""Check existence of hdfs uri
Returns: True on exists
Raises: RuntimeError.
"""
cmd = f"hadoop fs -test -e {uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True
elif ret == 1:
return False
else:
raise RuntimeError(
f"Cannot check existence of hdfs uri[{uri}] "
f"with cmd[{cmd}]; ret[{ret}] stdout[{out}] stderr[{err}]"
)
except Exception as e:
raise RuntimeError(
f"Cannot check existence of hdfs uri[{uri}] " f"with cmd[{cmd}]"
) from e
[docs]
@staticmethod
def remove(uri):
"""Check existence of hdfs uri
Returns: True on exists
Raises: RuntimeError.
"""
cmd = f"hadoop fs -rm -r {uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True
else:
raise RuntimeError(
f"Cannot remove hdfs uri[{uri}] "
f"with cmd[{cmd}]; ret[{ret}] output[{out}] stderr[{err}]"
)
except Exception as e:
raise RuntimeError(
f"Cannot remove hdfs uri[{uri}] " f"with cmd[{cmd}]"
) from e
[docs]
@staticmethod
def mkdir(uri):
"""Make new hdfs directory
Returns: True on success
Raises: RuntimeError.
"""
cmd = f"hadoop fs -mkdir -p {uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True
else:
raise RuntimeError(
f"Cannot mkdir of hdfs uri[{uri}] "
f"with cmd[{cmd}]; ret[{ret}] output[{out}] stderr[{err}]"
)
except Exception as e:
raise RuntimeError(
f"Cannot mkdir of hdfs uri[{uri}] " f"with cmd[{cmd}]"
) from e
[docs]
@staticmethod
def copy_from_local(local_path, to_uri):
"""Returns: True on success
Raises: on unexpected error.
"""
# Make sure local_path is accessible
if not os.path.exists(local_path) or not os.access(local_path, os.R_OK):
raise RuntimeError(f"try to access local_path[{local_path}] " "but failed")
cmd = f"hadoop fs -copyFromLocal -f {local_path} {to_uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True, out
else:
raise RuntimeError(
f"Cannot copy local[{local_path}] to remote[{to_uri}] with cmd[{cmd}]; "
f"ret[{ret}] output[{out}] stderr[{err}]"
)
except Exception as e:
raise RuntimeError(
f"Cannot copy local[{local_path}] to remote[{to_uri}] with cmd[{cmd}]"
) from e
[docs]
@staticmethod
def copy_to_local(from_uri, local_path):
remote = ""
if isinstance(from_uri, str):
remote = from_uri
elif isinstance(from_uri, list) or isinstance(from_uri, tuple):
remote = " ".join(from_uri)
cmd = f"hadoop fs -copyToLocal {remote} {local_path}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True
else:
raise RuntimeError(
f"Cannot copy remote[{from_uri}] to local[{local_path}] with cmd[{cmd}]; "
f"ret[{ret}] output[{out}] stderr[{err}]"
)
except Exception as e:
raise RuntimeError(
f"Cannot copy remote[{from_uri}] to local[{local_path}] with cmd[{cmd}]"
) from e
[docs]
@staticmethod
def read_hdfs_file(uri):
cmd = f"hadoop fs -text {uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return out
else:
raise RuntimeError(
f"Cannot read text from uri[{uri}]"
f"cmd [{cmd}] ret[{ret}] output[{out}] stderr[{err}]"
)
except Exception as e:
raise RuntimeError(
f"Cannot read text from uri[{uri}]" f"cmd [{cmd}]"
) from e
[docs]
@staticmethod
def move(from_uri, to_uri):
cmd = f"hadoop fs -mv {from_uri} {to_uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True
else:
raise RuntimeError(
f"Cannot move from_uri[{from_uri}] to "
f"to_uri[{to_uri}] with cmd[{cmd}]; "
f"ret[{ret}] output[{out}] stderr[{err}]"
)
except Exception as e:
raise RuntimeError(
f"Cannot move from_uri[{from_uri}] to "
f"to_uri[{to_uri}] with cmd[{cmd}]"
) from e