返回目录
5.3.5 Events
对于远程服务提供的每个事件,Proxy Class包含一个特定于事件的包装类(事件类)的成员。在示例中,Proxy Class的成员名为BrakeEvent,类型为events::BrakeEvent。 正如在5.2中看到的,Proxy Class的所有事件类都是在包含在代理命名空间中特定事件的命名空间中生成的。 代理中的事件成员用于访问事件/事件数据,这些数据是由我们的代理所连接的服务实例发送的。下面的示例是一个事件类:
class BrakeEvent
{/*** \brief Shortcut for the events data type.*/using SampleType = RadarObjects;/*** \brief The application expects the CM to subscribe the event.** The Communication Management shall try to subscribe and resubscribe* until \see Unsubscribe() is called explicitly.* The error handling shall be kept within the Communication Management** The function returns immediately. If the user wants to get notified,* when subscription has succeeded, he needs to register a handler* via \see SetSubscriptionStateChangeHandler(). This handler gets* then called after subscription was successful.** \param maxSampleCount maximum number of samples, which can be held.*/ara::core::Result<void> Subscribe(size_t maxSampleCount);/*** \brief Query current subscription state.** \return Current state of the subscription.*/ara::com::SubscriptionState GetSubscriptionState() const;/*** \brief Unsubscribe from the service.*/void Unsubscribe();/*** \brief Get the number of currently free/available sample slots.** \return number from 0 - N (N = count given in call to Subscribe())* or an ErrorCode in case of number of currently held samples* already exceeds the max number given in Subscribe().*/size_t GetFreeSampleCount() const noexcept;/*** Setting a receive handler signals the Communication Management* implementation to use event style mode.* I.e. the registered handler gets called asynchronously by the* Communication Management as soon as new event data arrives for* that event. If the user wants to have strict polling behavior,* where no handler is called, NO handler should be registered.** Handler may be overwritten anytime during runtime.** Provided Handler needs not to be re-entrant since the* Communication Management implementation has to serialize calls* to the handler: Handler gets called once by the MW, when new* events arrived since the last call to GetNewSamples().** When application calls GetNewSamples() again in the context of the* receive handler, MW must - in case new events arrived in the* meantime - defer next call to receive handler until after* the previous call to receive handler has been completed.*/ara::core::Result<void> SetReceiveHandler(ara::com::EventReceiveHandler handler);/*** Remove handler set by SetReceiveHandler()*/ara::core::Result<void> UnsetReceiveHandler();/*** Setting a subscription state change handler, which shall get* called by the Communication Management implementation as soon* as the subscription state of this event has changed.** Communication Management implementation will serialize calls* to the registered handler. If multiple changes of the* subscription state take place during the runtime of a* previous call to a handler, the Communication Management* aggregates all changes to one call with the last/effective* state.** Handler may be overwritten during runtime.*/ara::core::Result<void> SetSubscriptionStateChangeHandler(ara::com::SubscriptionStateChangeHandler handler);/*** Remove handler set by SetSubscriptionStateChangeHandler()*/void UnsetSubscriptionStateChangeHandler();/*** \brief Get new data from the Communication Management* buffers and provide it in callbacks to the given callable f.** \pre BrakeEvent::Subscribe has been called before* (and not be withdrawn by BrakeEvent::Unsubscribe)** \param f* \parblock* callback, which shall be called with new sample.** This callable has to fulfill signature* void(ara::com::SamplePtr<SampleType const>)* \parblockend** \param maxNumberOfSamples* \parblock* upper bound of samples to be fetched from middleware buffers.* Default value means "no restriction", i.e. all newly arrived samples* are fetched as long as there are free sample slots.* \parblockend** \return Result, which contains the number of samples,* which have been fetched and presented to user via calls to f or an* ErrorCode in case of error (e.g. precondition not fullfilled)*/template <typename F>ara::core::Result<size_t> GetNewSamples(F&& f,size_t maxNumberOfSamples = std::numeric_limits<size_t>::max());};
在我们的事件类示例中,事件数据的类型是RadarObjects(参见5.1)。首先遇到的是using指令,它指定了类名SampleType 转换为具体类型,然后在整个接口中使用。
5.3.5.1 Event Subscription and Local Cache
仅仅在Proxy实例中存在一个事件包装类的成员,并不意味着用户能够立即访问由服务实例发出的事件。首先,你必须 “订阅” 该事件,以便告知CM中间件你现在有兴趣接收事件。为此,ara::com 的事件包装类提供了如下订阅方法。
/**
* \brief The application expects the CM to subscribe the event.
*
* ....
*
* \param maxSampleCount maximum number of samples, which can be held.
*/
ara::core::Result<void> Subscribe(size_t maxSampleCount);
这个方法需要一个参数 “maxSampleCount”,该参数主要是告知通信管理(Communication Management)应用程序最多容纳多少个事件样本。因此,通过调用这个方法,你不仅告诉通信管理你现在有兴趣接收事件更新,同时还为最大样本数量的事件包装器实例绑定的事件设置了一个 “本地缓存”。这个缓存由通信管理实现进行分配和填充,通信管理实现会向应用程序提供智能指针,以便访问事件样本数据。其具体的工作方式在子小节 5.3.5.3 中进行了描述。
5.3.5.2 Monitoring Event Subscription
对Subscribe()方法的调用本质上是异步的。这意味着在Subscribe()方法返回时,通信管理已经接受了处理订阅的指令。订阅过程本身可能(很有可能,但取决于底层的进程间通信(IPC)实现)涉及事件提供方。对于位于远程的服务来说,订阅过程可能需要花费一些时间。即使远程服务实例还没有确认订阅(如果底层的 IPC 支持确认机制的话),也允许订阅的绑定实现在接受订阅后立即返回。如果用户在调用了Subscribe()之后想要获得关于订阅成功的反馈,他可以调用:
/*** \brief query current subscription state.** \return current state of the subscription.*/ara::com::SubscriptionState GetSubscriptionState() const;
如果进程间通信(IPC)实现使用了服务端的订阅确认的机制,那么在调用Subscribe()之后立即调用GetSubscriptionState(),如果确认还未到达,可能会返回kSubscriptionPending(订阅待处理)状态。否则,如果 IPC 实现能立即获得反馈(对于本地通信来说很有可能),那么这个调用快速返回kSubscribed(已订阅)状态。如果用户需要监控订阅状态,有两种可能性:
-
通过GetSubscriptionState()进行轮询。
-
注册一个处理程序,当订阅状态改变时被调用。
我们已经在上面描述了第一种使用GetSubscriptionState()的可能性。第二种可能性依赖于在事件包装器实例上使用以下方法:
/*** Setting a subscription state change handler, which shall get called* by the Communication Management implementation as soon as the* subscription state of this event has changed.** Handler may be overwritten during runtime.*/ara::core::Result<void> SetSubscriptionStateChangeHandler(ara::com::SubscriptionStateChangeHandler handler);
用户可以注册一个处理程序函数,其签名需满足特定要求:
enum class SubscriptionState
{kSubscribed, kNotSubscribed,kSubscriptionPending
};using SubscriptionStateChangeHandler = std::function<void(SubscriptionState)>;
每当订阅状态发生变化时,通信管理实现会调用注册的处理程序。对于希望获得最新订阅状态通知的应用开发者来说,典型的使用模式是: 在首次调用Subscribe()之前注册处理程序。通信管理实现接受 “订阅指令” 后,以kSubscriptionPending作为参数调用注册处理程序,之后,当从服务端获得确认时,通信管理实现会以kSubscribed作为参数调用处理程序。
注意:如果底层实现不支持来自服务端的订阅确认,实现也可以跳过以kSubscriptionPending为参数的首次调用,直接以kSubscribed为参数调用处理程序。
对注册的 “订阅状态变化” 处理程序的调用是完全异步的,这意味着即使在Subscribe()调用尚未返回时,也可能再次调用Subscribe()。用户需要注意这一点。一旦用户为某个事件注册了这样的 “订阅状态变化” 处理程序,可能会对该处理程序的多次调用。不仅在初始状态从kNotSubscribed变为kSubscribed(可能通过中间步骤kSubscriptionPending)时会调用,而且在以后任何时候,只要提供该事件的服务具有特定的生命周期(可能与特定的车辆模式相关),也可能会调用Subscribe()。
服务可能在可用和(暂时)不可用之间切换,甚至可能意外崩溃并重新启动。提供事件的服务实例的可用性变化可能对代理端的通信管理实现可见。因此,每当通信管理检测到对事件订阅状态有影响的此类变化时,就会触发注册的 “订阅状态变化” 处理程序。此外,通信管理实现会在需要时负责更新用户完成的事件订阅。
此机制与前面已经描述的 “自动更新代理实例” 机制紧密耦合(5.3.4.1):由于通信管理实现会监视服务实例的可用性,所以一旦服务可用,服务代理就会自动连接到它。 该机制不仅在需要时 “自动更新” 其代理,而且在更新代理实例后,还会 “默默地” 重新订阅用户已经完成的任何事件订阅。这可以大致看作是一个非常有用的便利功能,如果没有 “更新后重新订阅”,那么 “仅自动更新” 似乎是一种不彻底的方法。通过注册 “订阅状态变化” 处理程序,用户现在有了另一种监视服务当前可用性的方法。除了可以注册如 5.3.4 中所述的FindServiceHandler之外,注册了 “订阅状态变化” 处理程序的用户可以通过对其处理程序的调用来间接监视服务可用性。
如果代理连接的服务实例停止运行,通信管理会以kSubscriptionPending作为参数调用处理程序。一旦 “更新后重新订阅” 成功,通信管理会以kSubscribed作为参数调用处理程序。
符合ara::com的通信管理实现必须序列化对用户注册的处理程序的调用。即:如果在用户提供的上一个状态变化处理程序仍在运行时发生新的订阅状态变化,通信管理实现必须推迟下一次调用,直到上一次调用返回。在用户注册的状态变化处理程序运行期间发生的多个订阅状态变化应聚合为对用户注册的处理程序的一次调用,并使用有效的 / 最后的状态作为参数。