有一个这样的任务:在网页上流式输出执行一个函数在终端产生的日志,但是目前只有终端日志,可以通过 自定义 loguru 的 Sink 将日志消息定向到线程安全的队列中,主线程从队列中实时获取日志。
import threading
import queue
from loguru import logger
import asyncio
import sys# 创建线程安全的队列用于存储日志
log_queue = queue.Queue()# 自定义 Sink 将日志写入队列
def log_sink(message):"""将 loguru 日志消息推送到队列"""log_queue.put(message)# 配置 loguru(保留终端输出,同时添加队列 Sink)
logger.remove() # 移除默认配置(可选)
logger.add(log_sink, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
logger.add(sys.stderr, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}") # 保留终端输出# 异步任务函数
async def main(prompt):logger.info("任务启动,参数: {}", prompt)await asyncio.sleep(2)logger.success("任务完成")# 线程包装器
def run_async_task(message):loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)loop.run_until_complete(main(message))loop.close()# 启动异步线程
prompt = {"message": "测试日志捕获"}
task_thread = threading.Thread(target=run_async_task,args=(prompt["message"],)
)
task_thread.start()# 主线程实时捕获日志
try:while task_thread.is_alive() or not log_queue.empty():try:log_record = log_queue.get(timeout=0.1)print(f"[捕获的日志] {log_record}") # 这里可以替换为写入文件或其他处理except queue.Empty:pass
finally:task_thread.join()