配置好HADOOP
用python连接到HDFS,创建目录失败
from hdfs.client import Client
HDFSHOST = "http://192.168.216.132:50070"
client = Client(HDFSHOST)
# 返回目录下的文件
print(client.list('/'))
client.makedirs('/tmp',permission=755)
报错信息
['data']
Traceback (most recent call last):File "C:/Users/Administrator/PycharmProjects/pythonProject/main.py", line 6, in <module>client.makedirs('/tmp',permission=755)File "D:\yz\Anaconda3\lib\site-packages\hdfs\client.py", line 1029, in makedirsself._mkdirs(hdfs_path, permission=permission)File "D:\yz\Anaconda3\lib\site-packages\hdfs\client.py", line 118, in api_handlerraise err
hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE, inode="/":root:supergroup:drwxr-xr-x
能正常浏览目录,但不能创建或上传报错
Permission denied: user=dr.who, access=WRITE, inode=“/”:root:supergroup:drwxr-xr-x
排查原因是因为当前用户缺少相关权限。
解决方法:进入/hadoop/etc/hadoop目录,使用vim命令编辑core-site.xml文件,在文件中加入下方代码:
<property><name>hadoop.http.staticuser.user</name><value>你的用户名</value>
</property>
保存后退出,重新启动集群,即可在前端页面中进行删除操作。
from hdfs.client import Client
HDFSHOST = "http://192.168.216.132:50070"
client = Client(HDFSHOST)
# 返回目录下的文件
print(client.list('/'))
# client.makedirs('/tmp',permission=755)
print(client.list('/tmp'))
client.delete('/tmp/yarn.pid')
print(client.list('/tmp'))
try:print(client.status('/tmp/hadoop-root-datanode.pid'))print(client.status('/tmp/jobs.csv'))path='/tmp/jobs.csv'client.delete(path)print(f"文件{path},delete成功")
except Exception as e:print("操作失败:", e)hdfs_path='/tmp'
local_path='data/jobs.csv'
try:client.upload(hdfs_path,local_path)print(f"文件{local_path}上传成功")
except Exception as e:print("操作失败:", e)
运行结果
['data', 'tmp']
['hadoop-root-datanode.pid']
['hadoop-root-datanode.pid']
{'accessTime': 1731170583096, 'blockSize': 134217728, 'childrenNum': 0, 'fileId': 16388, 'group': 'supergroup', 'length': 6, 'modificationTime': 1731170583908, 'owner': 'root', 'pathSuffix': '', 'permission': '644', 'replication': 3, 'storagePolicy': 0, 'type': 'FILE'}
操作失败: File does not exist: /tmp/jobs.csv
操作失败: <urllib3.connection.HTTPConnection object at 0x0000000004551640>: Failed to establish a new connection: [WinError 10061] 由于目标计算机积极拒绝,无法连接。
python操作hdfs
https://www.jianshu.com/p/bc7aff674e53
安装hdfs库
所有python的三方模块均采用pip来安装.
pip install hdfs
hdfs库的使用
下面将介绍hdfs库的方法列表,并会与hadoop自带的命令行工具进行比较
注:hdfs dfs开头是hadoop自带的命令行工具命令
连接hadoop
通过http协议连接hadoop的datanode节点,默认端口50070
from hdfs.client import Client
client = Client("http://127.0.0.1:50070/")
注:为了节省篇幅,下面的所有代码片段默认包含上两行,此外,后续所有的hdfs指代hadoop的hdfs模块,而非python的hdfs库
list()
list()会列出hdfs指定路径的所有文件信息,接收两个参数
hdfs_path 要列出的hdfs路径
status 默认为False,是否显示详细信息
print(“hdfs中的目录为:”, client.list(hdfs_path=“/”,status=True))
查看hdfs根目录下的文件信息,等同于hdfs dfs -ls /
status()
查看文件或者目录状态,接收两个参数
hdfs_path 要列出的hdfs路径
strict 是否开启严格模式,严格模式下目录或文件不存在不会返回None,而是raise
print(client.status(hdfs_path="/b.txt",strict=True))
checksum()
checksum() 计算目录下的文件数量,只有一个参数.
print("根目录下的文件数量为:", client.checksum(hdfs_path="/input.txt"))
parts()
列出路径下的part file,接收三个参数
hdfs_path 要列出的hdfs路径
parts 要显示的parts数量 默认全部显示
status 默认为False,是否显示详细信息
print("", client.parts(hdfs_path="/log", parts=0, status=True))
content()
列出目录或文件详情,接收两个参数
hdfs_path 要列出的hdfs路径
strict 是否开启严格模式,严格模式下目录或文件不存在不会返回None,而是raise
print(client.content(hdfs_path="/",strict=True))
makedirs()
创建目录,同hdfs dfs -mkdir与hdfs dfs -chmod的结合体,接收两个参数
hdfs_path hdfs路径
permission 文件权限
print("创建目录", client.makedirs(hdfs_path="/t", permission="755"))
rename()
文件或目录重命名,接收两个参数
hdfs_src_path 原始路径或名称
hdfs_dst_path 修改后的文件或路径
client.rename(hdfs_src_path="/d.txt",hdfs_dst_path="/d.bak.txt")
resolve()
返回绝对路径,接收一个参数hdfs_path
print(client.resolve("d.txt"))
set_replication()
设置文件在hdfs上的副本(datanode上)数量,接收两个参数,集群模式下的hadoop默认保存3份
hdfs_path hdfs路径
replication 副本数量
client.set_replication(hdfs_path="/b.txt",replication=2)
read()
读取文件信息 类似与 hdfs dfs -cat hfds_path,参数如下:
hdfs_path hdfs路径
offset 读取位置
length 读取长度
buffer_size 设置buffer_size 不设置使用hdfs默认100MB 对于大文件 buffer够大的化 sort与shuffle都更快
encoding 指定编码
chunk_size 字节的生成器,必须和encodeing一起使用 满足chunk_size设置即 yield
delimiter 设置分隔符 必须和encodeing一起设置
progress 读取进度回调函数 读取一个chunk_size回调一次
读取200长度
with client.read(“/input.txt”, length=200, encoding=‘utf-8’) as obj:
for i in obj:
print(i)
从200位置读取200长度
with client.read(“/input.txt”, offset=200, length=200, encoding=‘utf-8’) as obj:
for i in obj:
print(i)
设置buffer为1024,读取
with client.read(“/input.txt”, buffer_size=1024, encoding=‘utf-8’) as obj:
for i in obj:
print(i)
设置分隔符为换行
p = client.read(“/input.txt”, encoding=‘utf-8’, delimiter=‘\n’)
with p as d:
print(d, type(d), next(d))
设置读取每个块的大小为8
p = client.read(“/input.txt”, encoding=‘utf-8’, chunk_size=8)
with p as d:
print(d, type(d), next(d))
download()
从hdfs下载文件到本地,参数列表如下.
hdfs_path hdfs路径
local_path 下载到的本地路径
overwrite 是否覆盖(如果有同名文件) 默认为Flase
n_threads 启动线程数量,默认为1,不启用多线程
temp_dir下载过程中文件的临时路径
**kwargs其他属性
print(“下载文件结果input.txt:”, client.download(hdfs_path=“/input.txt”, local_path=“~/”,overwrite=True))
等同 hdfs dfs copyToLocal /input ~/
upload()
上传文件到hdfs 同hdfs dfs -copyFromLocal local_file hdfs_path,参数列表如下:
hdfs_path, hdfs上位置
local_path, 本地文件位置
n_threads=1 并行线程数量 temp_dir=None, overwrite=True或者文件已存在的情况下的临时路径
chunk_size=2 ** 16 块大小
progress=None, 报告进度的回调函数 完成一个chunk_size回调一次 chunk_size可以设置大点 如果大文件的话
cleanup=True, 上传错误时 是否删除已经上传的文件
**kwargs 上传的一些关键字 一般设置为 overwrite 来覆盖上传
def callback(filename, size):
print(filename, “完成了一个chunk上传”, “当前大小:”, size)
if size == -1:
print(“文件上传完成”)
上传成功返回 hdfs_path
client.upload(hdfs_path=“/a_bak14.txt”, local_path=“a.txt”, chunk_size=2 << 19, progress=callback,cleanup=True)
delete()
删除文件,接收三个参数
hdfs_path
recursive=False 是否递归删除
skip_trash=True 是否移到垃圾箱而不是直接删除 hadoop 2.9+版本支持
client.delete(“/a.s”)
等同hdfs dfs -rm (-r)
set_owner()
类似与 hdfs dfs -chown root root hdfs_path修改目录或文件的所属用户,用户组,接收三个参数
hdfs_path hdfs路径
owner 用户
group 用户组
注意:对于默认用户,只能修改自己的文件.
client.set_owner(hdfs_path="/a.txt", owner="root", group="root")
set_permission
修改权限,类似于hdfs dfs -chmod 777 hdfs_path,接收两个参数
hdfs_path hdfs路径
permission 权限
client.set_permission(hdfs_path="/b.txt",permission='755')
注意:对于默认用户,只能修改自己的文件.
set_acl()与acl_status()
查看和修改访问权限控制 需要开启acl支持
set_times()
设置文件时间,接收参数如下:
hdfs_path: hdfs路径.
access_time: 最后访问时间 时间戳 毫秒
modification_time: 最后修改时间 时间戳 毫秒
import time
client.set_times(hdfs_path=“/b.txt”, access_time=int(time.time())1000,
modification_time=int(time.time())1000)