在处理文件系统时,我们经常需要遍历目录结构以查找特定文件或执行某些操作。Python 提供了一个非常有用的函数 os.walk()
,它可以帮助我们轻松地遍历目录树。本文将详细介绍 os.walk()
的使用,并提供一个实际的应用示例。
os.walk()
的基本用法
os.walk()
是一个生成器,它会递归地遍历目录树中的所有目录和文件。它的基本语法如下:
import osfor dirpath, dirnames, filenames in os.walk('your_directory'):print('Current Path:', dirpath)print('Directories:', dirnames)print('Files:', filenames)
参数说明
dirpath
:当前遍历的目录路径。dirnames
:当前目录下的子目录列表。filenames
:当前目录下的文件列表。
示例
假设我们的目录结构如下:
example_dir/
│
├── subdir1/
│ ├── file1.txt
│ └── file2.txt
│
├── subdir2/
│ └── file3.txt
│
└── file4.txt
我们使用 os.walk()
来遍历这个目录:
import osfor dirpath, dirnames, filenames in os.walk('example_dir'):print('Current Path:', dirpath)print('Directories:', dirnames)print('Files:', filenames)
输出结果将如下所示:
Current Path: example_dir
Directories: ['subdir1', 'subdir2']
Files: ['file4.txt']Current Path: example_dir/subdir1
Directories: []
Files: ['file1.txt', 'file2.txt']Current Path: example_dir/subdir2
Directories: []
Files: ['file3.txt']
可以看到,os.walk()
递归地遍历了 example_dir
目录中的所有子目录和文件。
实际应用示例:同步文件到 MongoDB
我们通过一个实际的示例来展示 os.walk()
的应用。假设我们需要同步一个目录中的 PDF 和 XML 文件到 MongoDB,并且 XML 文件和对应的 PDF 文件在同一个目录中,但名称不一定相同。我们可以使用 os.walk()
来遍历目录,并找到每个 XML 文件对应的 PDF 文件。
完整代码示例
import logging
import os
import re
from logging import handlers
from datetime import datetime
import xml.etree.ElementTree as ET
from pymongo import MongoClient, errors
import gridfs# 用户输入部分
sync_file_path = ''
sync_mongo_url = ''
sync_mongo_username = ''
sync_mongo_password = ''
sync_mongo_db = ''
sync_mongo_collection = ''if sync_file_path == '':sync_file_path = r'D:\WG\202304'
if sync_mongo_db == '':sync_mongo_db = 'FMSQ'
if sync_mongo_collection == '':sync_mongo_collection = 'test'
if sync_mongo_url == '':sync_mongo_url = '192.168.0.234:27017'
if sync_mongo_username == '':sync_mongo_username = 'admin'
if sync_mongo_password == '':sync_mongo_password = '123456'# 日志文件配置
log_dir = f'./log/sync_pdf_xml_to_mongo_0001/{sync_mongo_db}-{sync_mongo_collection}'
if not os.path.exists(log_dir):os.makedirs(log_dir)class Logger(object):level_relations = {'debug': logging.DEBUG,'info': logging.INFO,'warning': logging.WARNING,'error': logging.ERROR,'crit': logging.CRITICAL}def __init__(self, filename, level='info', when='D', back_count=3,fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):self.logger = logging.getLogger(filename)format_str = logging.Formatter(fmt)self.logger.setLevel(self.level_relations.get(level))sh = logging.StreamHandler()sh.setFormatter(format_str)th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=back_count, encoding='utf-8')th.setFormatter(format_str)self.logger.addHandler(sh)self.logger.addHandler(th)log_all_path = f'{log_dir}/sync_pdf_to_mongo_all.log'
log_error_path = f'{log_dir}/sync_pdf_to_mongo_error.log'
log = Logger(log_all_path, level='debug')
error_log = Logger(log_error_path, level='error')def find_pdf_file(xml_file_path, files):directory = os.path.dirname(xml_file_path)for file in files:if file.endswith(".PDF"):return os.path.join(directory, file)return Nonedef upload_files():log.logger.info(f"Sync directory path: {sync_file_path}")log.logger.info(f"MongoDB URL: {sync_mongo_url}")log.logger.info(f"MongoDB username: {sync_mongo_username}")log.logger.info(f"MongoDB password: {sync_mongo_password}")log.logger.info(f"MongoDB database: {sync_mongo_db}")log.logger.info(f"MongoDB collection: {sync_mongo_collection}")# MongoDB 连接client = MongoClient(f"mongodb://{sync_mongo_username}:{sync_mongo_password}@{sync_mongo_url}/?retryWrites=true&w=majority",serverSelectionTimeoutMS=5000)db = client[sync_mongo_db]fs = gridfs.GridFS(db)for root, dirs, files in os.walk(sync_file_path):for file in files:if file.endswith(".XML"):xml_file_path = os.path.join(root, file)pdf_file_path = find_pdf_file(xml_file_path, files)if not pdf_file_path:error_log.logger.error(f"PDF file not found for XML: {xml_file_path}")continuetry:tree = ET.parse(xml_file_path)root_element = tree.getroot()doc_number_match = re.search(r'docNumber="([\s\S]*?)"', ET.tostring(root_element, encoding='utf-8').decode('utf-8'))if not doc_number_match:error_log.logger.error(f"docNumber not found in XML: {xml_file_path}")continuedoc_number = doc_number_match.group(1)new_pdf_filename = f"{doc_number}.PDF"new_pdf_file_path = os.path.join(root, new_pdf_filename)os.rename(pdf_file_path, new_pdf_file_path)log.logger.info(f"Renamed {pdf_file_path} to {new_pdf_file_path}")query = {"filename": new_pdf_filename}log.logger.info(f"{new_pdf_filename} uploaded to MongoDB.")if fs.exists(query):log.logger.info(f"{new_pdf_filename} already exists in MongoDB.")else:with open(new_pdf_file_path, 'rb') as pdf_file:content = pdf_file.read()fs.put(content, filename=new_pdf_filename, upload_time=datetime.now())log.logger.info(f"{new_pdf_filename} uploaded to MongoDB.")except (errors.ServerSelectionTimeoutError, errors.AutoReconnect) as conn_error:error_log.logger.error(f"Error connecting to MongoDB while processing XML: {xml_file_path}: {conn_error}")except Exception as e:error_log.logger.error(f"Error processing XML file: {xml_file_path}: {e}")log.logger.info("All files processed.")upload_files()
在这个示例中,我们使用 os.walk()
递归地遍历指定目录中的所有文件和子目录,并对每个 XML 文件执行以下操作:
- 查找同目录中的 PDF 文件。
- 解析 XML 文件以提取
docNumber
。 - 重命名 PDF 文件,并将其上传到 MongoDB。
通过这种方式,我们能够有效地处理目录中的所有文件,并确保每个 XML 文件都有对应的 PDF 文件。