您的位置:首页 > 财经 > 产业 > IPython 集群和 PicklingError

IPython 集群和 PicklingError

2025/1/22 16:01:11 来源:https://blog.csdn.net/weixin_44617651/article/details/141181917  浏览:    关键词:IPython 集群和 PicklingError

在使用 IPython 集群进行并行计算时,可能会遇到 PicklingError。这种错误通常与 Python 对象的序列化(即“pickling”)有关。Pickling 是将 Python 对象转换为字节流的过程,以便能够在不同的 Python 进程之间传递对象。在分布式计算环境中,如 IPython 集群,这种对象传递是常见的。

在这里插入图片描述

1、问题背景

我正在使用 IPython 的 notebook 使用 zipline,所以我首先创建了一个基于 zipline.TradingAlgorithm 的类。 我将该类发送到 IPython 集群引擎以在并行环境中运行。当我尝试在 IPython 集群上运行我的代码时,我遇到了一个错误。

在单元格 [3] 中,我使用 load_from_yahoo 从雅虎加载了股票数据。然后我创建了一个 AgentList,其中包含三个 Agent 的实例。Agent 类是一个基于 zipline.TradingAlgorithm 的自定义类。

在单元格 [4] 中,我定义了一个名为 testSystem 的函数,该函数接受一个 agent 和一个 data 作为参数。该函数使用 agentdata 上运行 zipline 模拟,并将最终的投资组合价值存储在 agent.valueHistory 中。

在单元格 [5] 中,我使用 lview.apply_asynctestSystem 函数异步地应用于每个 agentdata。然后我使用 ar.get() 获取每个任务的结果。

在单元格 [6] 中,我绘制了每个 agentvalueHistory

2、解决方案

PicklingError 是因为 zipline.TradingAlgorithm.run() 方法不能被 pickle。为了解决这个问题,我使用以下代码将 run() 方法从 zipline.TradingAlgorithm 复制到了 Agent 类:

def run(self, data):return zipline.TradingAlgorithm.run(self, data)

除了将 run() 方法复制到 Agent 类之外,我还在 Agent 类中添加了一个 __getstate__ 方法,该方法返回一个包含 Agent 状态的字典。

def __getstate__(self):state = super().__getstate__()state['valueHistory'] = self.valueHistoryreturn state

通过将 run() 方法复制到 Agent 类并添加一个 __getstate__ 方法,我能够成功地将 Agent 类发送到 IPython 集群并运行它。

以下是我修改后的代码:

from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()%%px --local  # This insures that the Class and modules exist on each engine
import zipline as zpl
import numpy as npclass Agent(zpl.TradingAlgorithm):  # must define initialize and handle_data methodsdef initialize(self):self.valueHistory = Nonepassdef handle_data(self, data):for security in data.keys():## Just randomly buy/sell/hold for each securitycoinflip = np.random.random()if coinflip < .25:self.order(security,100)elif coinflip > .75:self.order(security,-100)passdef run(self, data):return zipline.TradingAlgorithm.run(self, data)def __getstate__(self):state = super().__getstate__()state['valueHistory'] = self.valueHistoryreturn statefrom zipline.utils.factory import load_from_yahoostart = '2013-04-01'
end   = '2013-06-01'
sidList = ['SPY','GOOG']
data = load_from_yahoo(stocks=sidList,start=start,end=end)agentList = []
for i in range(3):agentList.append(Agent())def testSystem(agent,data):results = agent.run(data)  #-- This is how the zipline based class is executed#-- next I'm just storing the final value of the test so I can plot lateragent.valueHistory.append(results['portfolio_value'][len(results['portfolio_value'])-1])return agentfor i in range(10):tasks = []for agent in agentList:#agent = testSystem(agent,data)  ## On its own, this works!#-- To Test, uncomment the above line and comment out the next two tasks.append(lview.apply_async(testSystem,agent,data))agentList = [ar.get() for ar in tasks]for agent in agentList:plot(agent.valueHistory)

在使用 IPython 集群进行并行计算时,如果遇到 PicklingError,通常是因为你试图传递不可序列化的对象。解决方法包括确保函数在全局作用域中定义、使用 dill 代替 pickle、简化数据和代码,以及检查第三方库的兼容性。通过这些方法,你可以有效地避免或解决并行计算中的序列化问题。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com