您的位置:首页 > 财经 > 产业 > 深入理解 Python 中的 `os.walk()`

深入理解 Python 中的 `os.walk()`

2024/12/23 11:34:52 来源:https://blog.csdn.net/weixin_39973810/article/details/139270681  浏览:    关键词:深入理解 Python 中的 `os.walk()`

在处理文件系统时,我们经常需要遍历目录结构以查找特定文件或执行某些操作。Python 提供了一个非常有用的函数 os.walk(),它可以帮助我们轻松地遍历目录树。本文将详细介绍 os.walk() 的使用,并提供一个实际的应用示例。

os.walk() 的基本用法

os.walk() 是一个生成器,它会递归地遍历目录树中的所有目录和文件。它的基本语法如下:

import osfor dirpath, dirnames, filenames in os.walk('your_directory'):print('Current Path:', dirpath)print('Directories:', dirnames)print('Files:', filenames)

参数说明

  • dirpath:当前遍历的目录路径。
  • dirnames:当前目录下的子目录列表。
  • filenames:当前目录下的文件列表。

示例

假设我们的目录结构如下:

example_dir/
│
├── subdir1/
│   ├── file1.txt
│   └── file2.txt
│
├── subdir2/
│   └── file3.txt
│
└── file4.txt

我们使用 os.walk() 来遍历这个目录:

import osfor dirpath, dirnames, filenames in os.walk('example_dir'):print('Current Path:', dirpath)print('Directories:', dirnames)print('Files:', filenames)

输出结果将如下所示:

Current Path: example_dir
Directories: ['subdir1', 'subdir2']
Files: ['file4.txt']Current Path: example_dir/subdir1
Directories: []
Files: ['file1.txt', 'file2.txt']Current Path: example_dir/subdir2
Directories: []
Files: ['file3.txt']

可以看到,os.walk() 递归地遍历了 example_dir 目录中的所有子目录和文件。

实际应用示例:同步文件到 MongoDB

我们通过一个实际的示例来展示 os.walk() 的应用。假设我们需要同步一个目录中的 PDF 和 XML 文件到 MongoDB,并且 XML 文件和对应的 PDF 文件在同一个目录中,但名称不一定相同。我们可以使用 os.walk() 来遍历目录,并找到每个 XML 文件对应的 PDF 文件。

完整代码示例

import logging
import os
import re
from logging import handlers
from datetime import datetime
import xml.etree.ElementTree as ET
from pymongo import MongoClient, errors
import gridfs# 用户输入部分
sync_file_path = ''
sync_mongo_url = ''
sync_mongo_username = ''
sync_mongo_password = ''
sync_mongo_db = ''
sync_mongo_collection = ''if sync_file_path == '':sync_file_path = r'D:\WG\202304'
if sync_mongo_db == '':sync_mongo_db = 'FMSQ'
if sync_mongo_collection == '':sync_mongo_collection = 'test'
if sync_mongo_url == '':sync_mongo_url = '192.168.0.234:27017'
if sync_mongo_username == '':sync_mongo_username = 'admin'
if sync_mongo_password == '':sync_mongo_password = '123456'# 日志文件配置
log_dir = f'./log/sync_pdf_xml_to_mongo_0001/{sync_mongo_db}-{sync_mongo_collection}'
if not os.path.exists(log_dir):os.makedirs(log_dir)class Logger(object):level_relations = {'debug': logging.DEBUG,'info': logging.INFO,'warning': logging.WARNING,'error': logging.ERROR,'crit': logging.CRITICAL}def __init__(self, filename, level='info', when='D', back_count=3,fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):self.logger = logging.getLogger(filename)format_str = logging.Formatter(fmt)self.logger.setLevel(self.level_relations.get(level))sh = logging.StreamHandler()sh.setFormatter(format_str)th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=back_count, encoding='utf-8')th.setFormatter(format_str)self.logger.addHandler(sh)self.logger.addHandler(th)log_all_path = f'{log_dir}/sync_pdf_to_mongo_all.log'
log_error_path = f'{log_dir}/sync_pdf_to_mongo_error.log'
log = Logger(log_all_path, level='debug')
error_log = Logger(log_error_path, level='error')def find_pdf_file(xml_file_path, files):directory = os.path.dirname(xml_file_path)for file in files:if file.endswith(".PDF"):return os.path.join(directory, file)return Nonedef upload_files():log.logger.info(f"Sync directory path: {sync_file_path}")log.logger.info(f"MongoDB URL: {sync_mongo_url}")log.logger.info(f"MongoDB username: {sync_mongo_username}")log.logger.info(f"MongoDB password: {sync_mongo_password}")log.logger.info(f"MongoDB database: {sync_mongo_db}")log.logger.info(f"MongoDB collection: {sync_mongo_collection}")# MongoDB 连接client = MongoClient(f"mongodb://{sync_mongo_username}:{sync_mongo_password}@{sync_mongo_url}/?retryWrites=true&w=majority",serverSelectionTimeoutMS=5000)db = client[sync_mongo_db]fs = gridfs.GridFS(db)for root, dirs, files in os.walk(sync_file_path):for file in files:if file.endswith(".XML"):xml_file_path = os.path.join(root, file)pdf_file_path = find_pdf_file(xml_file_path, files)if not pdf_file_path:error_log.logger.error(f"PDF file not found for XML: {xml_file_path}")continuetry:tree = ET.parse(xml_file_path)root_element = tree.getroot()doc_number_match = re.search(r'docNumber="([\s\S]*?)"', ET.tostring(root_element, encoding='utf-8').decode('utf-8'))if not doc_number_match:error_log.logger.error(f"docNumber not found in XML: {xml_file_path}")continuedoc_number = doc_number_match.group(1)new_pdf_filename = f"{doc_number}.PDF"new_pdf_file_path = os.path.join(root, new_pdf_filename)os.rename(pdf_file_path, new_pdf_file_path)log.logger.info(f"Renamed {pdf_file_path} to {new_pdf_file_path}")query = {"filename": new_pdf_filename}log.logger.info(f"{new_pdf_filename} uploaded to MongoDB.")if fs.exists(query):log.logger.info(f"{new_pdf_filename} already exists in MongoDB.")else:with open(new_pdf_file_path, 'rb') as pdf_file:content = pdf_file.read()fs.put(content, filename=new_pdf_filename, upload_time=datetime.now())log.logger.info(f"{new_pdf_filename} uploaded to MongoDB.")except (errors.ServerSelectionTimeoutError, errors.AutoReconnect) as conn_error:error_log.logger.error(f"Error connecting to MongoDB while processing XML: {xml_file_path}: {conn_error}")except Exception as e:error_log.logger.error(f"Error processing XML file: {xml_file_path}: {e}")log.logger.info("All files processed.")upload_files()

在这个示例中,我们使用 os.walk() 递归地遍历指定目录中的所有文件和子目录,并对每个 XML 文件执行以下操作:

  1. 查找同目录中的 PDF 文件。
  2. 解析 XML 文件以提取 docNumber
  3. 重命名 PDF 文件,并将其上传到 MongoDB。

通过这种方式,我们能够有效地处理目录中的所有文件,并确保每个 XML 文件都有对应的 PDF 文件。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com