hdfs连接

  1. 连接HDFS之pyarrow
  2. 连接HDFS之HDFS3
    1. 安装libhdfs3
    2. API
  • 推荐使用shell
  • 连接HDFS之pyarrow

    • 可以基于java连接:得安装java环境,和hadoop命令
    • 基于c的:安装libhdfs包
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    import pyarrow
    from krbcontext import krbContext #做认证使用



    class HdfsConnect(object):
    def __init__(self, info, keytab_info, path):
    print(keytab_info["ip"], keytab_info["hdfs_port"])
    self.client = pyarrow.hdfs.connect(host=keytab_info["ip"], port=keytab_info["hdfs_port"],
    kerb_ticket="./tmp/cache")
    self.data_info = info
    self.path = path

    @staticmethod
    def _parquet_to_csv(file_name):
    try:
    if file_name.endswith(".parquet"):
    result = pd.read_parquet(file_name)
    result.to_csv(file_name.replace(".parquet", ".csv"), index=False)
    except:
    print(file_name)

    def file_to_list(self, local_path=None, hdfs_path=None):
    """把目录或变成目录列表"""
    path_list = []
    if local_path != None:

    if os.path.isdir(local_path):
    for i in os.listdir(local_path):
    _path = local_path + '/' + i
    path_list.extend(self.file_to_list(local_path=_path))
    else:
    path_list.append(local_path)

    if hdfs_path != None:
    if self.client.isdir(hdfs_path):
    for i in self.client.ls(hdfs_path):
    path_list.extend(self.file_to_list(hdfs_path=i))
    else:
    path_list.append(hdfs_path)
    return path_list

    def _download_data(self):
    """下载文件或者文件夹"""
    list_path = self.file_to_list(hdfs_path=self.data_info['path'])
    if list_path == []:
    return
    _path = ''
    for _path in list_path:
    local_path = self.path + _path[len("/".join(self.data_info['path'].split('/')[:-1])):]
    local_path_dir = "/".join(local_path.split('/')[:-1])
    if not os.path.exists(local_path_dir):
    os.makedirs(local_path_dir)
    logger.info('拉取HDFS%s文件到本地:%s' % (_path, local_path))
    with open(local_path, 'wb') as f:
    self.client.download(_path, f)

    reverse_path = self.path + "/" + _path.split('/')[len(self.data_info['path'].split('/')) - 1]
    return reverse_path

    def _put_data(self):
    """上传文件或者文件夹"""
    list_path = self.file_to_list(local_path=self.path)
    for _path in list_path:
    hdfs_path = self.data_info['path'] + _path[len("/".join(self.path.split('/')[:-1])):]
    logger.info('上传%s文件,到HDFS:%s' % (_path, hdfs_path))
    with open(_path, 'rb') as f:
    self.client.upload(hdfs_path, f)

    def download(self, key_name, key_value):
    result_file = self._download_data()
    try:
    file_list = [self.path + i for i in os.listdir(result_file)]
    for file_name in file_list:
    self._parquet_to_csv(file_name)
    except:
    print(result_file)
    return result_file

    def upload(self):
    self._put_data()

    with krbContext(using_keytab=True, keytab_file=self.keytab_info['keytab_file'],principal=self.keytab_info['principal'], ccache_file='./tmp/cache'):
    HdfsConnect(info, keytab_info, path)

    连接HDFS之HDFS3

    安装libhdfs3

    1
    2
    3
    4
    5
    https://dl.bintray.com/wangzw/deb # 下载这里面的两个文件
    dpkg -i # 安装文件
    apt-get upgrade # 安装完成之后,输入这命令查看缺包详情,然后根据缺少的下载http://mirrors.163.com/ubuntu/ls-lR.gz文件,搜索相关包下载
    apt --fix-broken install -y
    # 安装完成

    API

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    from hdfs3 import HDFileSystem
    # 多节点链接
    host = "nameservice1"
    # https://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml 可以根据这个配置
    conf = {"dfs.nameservices": "nameservice1",
    "dfs.ha.namenodes.nameservice1": "namenode113,namenode188",
    "dfs.namenode.rpc-address.nameservice1.namenode113": "hostname_of_server1:8020",
    "dfs.namenode.rpc-address.nameservice1.namenode188": "hostname_of_server2:8020",
    "dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server1:50070",
    "dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server2:50070",
    "hadoop.security.authentication": "kerberos"
    }
    hdfs = HDFileSystem(host=host, pars=conf)

    # 常用API
    hdfs = HDFileSystem(host='127.0.0.1', port=8020)
    hdfs.cancel_token(token=None)
    hdfs.cat(path) #获取指定目录或文件的内容
    hdfs.chmod(path, mode) #修改制定目录的操作权限
    hdfs.chown(path, owner, group) #修改目录所有者,以及用户组
    hdfs.concat(destination, paths) #将指定多个路径paths的文件,合并成一个文件写入到destination的路径,并删除源文件(The source files are deleted on successful completion.成功完成后将删除源文件。)
    hdfs.connect() #连接到名称节点 这在启动时自动发生。 LZ:未知作用,按字面意思,应该是第一步HDFileSystem(host='127.0.0.1', port=8020)发生的
    hdfs.delegate_token(user=None)
    hdfs.df() #HDFS系统上使用/空闲的磁盘空间
    hdfs.disconnect() #跟connect()相反,断开连接
    hdfs.du(path, total=False, deep=False) #查看指定目录的文件大小,total是否把大小加起来一个总数,deep是否递归到子目录
    hdfs.exists(path) #路径是否存在
    hdfs.get(hdfs_path, local_path, blocksize=65536) #将HDFS文件复制到本地,blocksize设置一次读取的大小
    hdfs.get_block_locations(path, start=0, length=0) #获取块的物理位置
    hdfs.getmerge(path, filename, blocksize=65536) #获取制定目录下的所有文件,复制合并到本地文件
    hdfs.glob(path) #/user/spark/abc-*.txt 获取与这个路径相匹配的路径列表
    hdfs.head(path, size=1024) #获取指定路径下的文件头部分的数据
    hdfs.info(path) #获取指定路径文件的信息
    hdfs.isdir(path) #判断指定路径是否是一个文件夹
    hdfs.isfile(path) #判断指定路径是否是一个文件
    hdfs.list_encryption_zones() #获取所有加密区域的列表
    hdfs.ls(path, detail=False) #返回指定路径下的文件路径,detail文件详细信息
    hdfs.makedirs(path, mode=457) #创建文件目录类似 mkdir -p
    hdfs.mkdir(path) #创建文件目录
    hdfs.mv(path1, path2) #将path1移动到path2
    open(path, mode='rb', replication=0, buff=0, block_size=0) #读取文件,类似于python的文件读取
    hdfs.put(filename, path, chunk=65536, replication=0, block_size=0) #将本地的文件上传到,HDFS指定目录
    hdfs.read_block(fn, offset, length, delimiter=None) #指定路径文件的offset指定读取字节的起始点,length读取长度,delimiter确保读取在分隔符bytestring上开始和停止
    >>> hdfs.read_block('/data/file.csv', 0, 13)
    b'Alice, 100\nBo'
    >>> hdfs.read_block('/data/file.csv', 0, 13, delimiter=b'\n')
    b'Alice, 100\nBob, 200'
    hdfs.rm(path, recursive=True) #删除指定路径recursive是否递归删除
    hdfs.tail(path, size=1024) #获取 文件最后一部分的数据
    hdfs.touch(path) #创建一个空文件
    hdfs.walk(path) #遍历文件树

    推荐使用shell


    转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 zoubinbf@163.com

    ×

    喜欢就点赞,疼爱就打赏