博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MQTT---HiveMQ源码详解(十三)Netty-MQTT消息、事件处理(源码举例解读)
阅读量:3611 次
发布时间:2019-05-21

本文共 17209 字,大约阅读时间需要 57 分钟。

源博客地址:


MQTT交流群:221405150


前言

由于上一篇讲的都是大致的流程,所以这一篇我们抽取流程中的一步,给大家介绍Authentication部分的源码,让大家对上一节的理解更深。

MqttConnectHandler

MqttConnectHandler是SimpleChannelInboundHandler的子类

channelRead0

@Override    protected void channelRead0(ChannelHandlerContext ctx, Connect msg) throws Exception {        try {//加入MqttDisallowSecondConnect  ctx.pipeline().addAfter(Pipelines.MQTT_MESSAGE_DECODER, Pipelines.MQTT_DISALLOW_SECOND_CONNECT, this.disallowSecondConnect);        } catch (IllegalArgumentException e) {            ctx.pipeline().firstContext().fireChannelRead(msg);            return;        }        //校验clientid        if (!validIdentifier(ctx, msg)) {            return;        }        //标志是否接管ctx.channel().attr(AttributeKeys.MQTT_TAKEN_OVER).set(false);        //删除连接成功,未发connect消息超时handler        removeConnectIdleHandler(ctx);        //进入插件认证阶段        pluginOnAuthentication(ctx, msg);    }

pluginOnAuthentication

private void pluginOnAuthentication(ChannelHandlerContext ctx, Connect connect) {    //获得clienttoken,ClientToken是ClientCredentials实现类        ClientToken clientToken = ChannelUtils.clientToken(ctx.channel());        //判断callbackRegistry中是否存在可用的OnAuthenticationCallback        if (this.callbackRegistry.isAvailable(OnAuthenticationCallback.class)) {//添加PluginOnAuthenticationCallbackHandler,         ctx.pipeline().addLast(Pipelines.PLUGIN_ON_AUTHENTICATION_CALLBACK_HANDLER, this.pluginOnAuthenticationCallbackHandlerProvider.get());         //触发PluginOnAuthentication事件            ctx.fireUserEventTriggered(new PluginOnAuthentication(connect, clientToken));        } else {        //如果没有可用OnAuthenticationCallback,那么认为是不需要做Authentication,就去处理LWT,因为当client掉线后,会触发发送遗言,所以需要先判断对与该遗言发布到topic是否具备权限            pluginOnAuthorizationLWT(ctx, null, connect, clientToken, ReturnCode.ACCEPTED, true);        }    }

userEventTriggered

@Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        //当插件认证完成        if (evt instanceof PluginOnAuthenticationCompleted) {        //进行认证完成后的处理            pluginOnAuthenticationCompleted(ctx, (PluginOnAuthenticationCompleted) evt);        } else if (evt instanceof PluginRestrictionsAfterLoginCompleted) {            pluginRestrictionsAfterLoginCompleted(ctx, (PluginRestrictionsAfterLoginCompleted) evt);        } else if (evt instanceof PluginOnConnectCompleted) {            pluginOnConnectCompleted(ctx, (PluginOnConnectCompleted) evt);        } else if (evt instanceof PluginOnAuthorizationCompleted) {            pluginOnAuthorizationCompleted(ctx, (PluginOnAuthorizationCompleted) evt);        } else if (evt instanceof MqttConnectPersistenceHandler.OnConnectPersistenceCompleted) {            MqttConnectPersistenceHandler.OnConnectPersistenceCompleted event = (MqttConnectPersistenceHandler.OnConnectPersistenceCompleted) evt;            onConnectPersistenceCompleted(ctx, event.getConnect(), event.isSessionPresent());        } else {            super.userEventTriggered(ctx, evt);        }    }

pluginOnAuthenticationCompleted

private void pluginOnAuthenticationCompleted(ChannelHandlerContext ctx,                                                 PluginOnAuthenticationCompleted event) {        //获得处理完成的ReturnCode                                               ReturnCode returnCode = event.getReturnCode();        boolean accepted = returnCode == ReturnCode.ACCEPTED;        //处理LWT        pluginOnAuthorizationLWT(ctx, event.getException(), event.getConnect(),                event.getClientCredentials(), returnCode, accepted);    }

PluginOnAuthenticationCallbackHandler

@Singleton@ChannelHandler.Sharablepublic class PluginOnAuthenticationCallbackHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(PluginOnAuthenticationCallbackHandler.class); private final CallbackRegistry callbackRegistry; private final HiveMQConfigurationService hiveMQConfigurationService; private final Metrics metrics; private final CallbackExecutor callbackExecutor; @Inject public PluginOnAuthenticationCallbackHandler(CallbackRegistry callbackRegistry, HiveMQConfigurationService hiveMQConfigurationService, Metrics metrics, CallbackExecutor callbackExecutor) { this.callbackRegistry = callbackRegistry; this.hiveMQConfigurationService = hiveMQConfigurationService; this.metrics = metrics; this.callbackExecutor = callbackExecutor; } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //当需要插件认证 if (evt instanceof PluginOnAuthentication) { //进行认证 onAuthentication(ctx, (PluginOnAuthentication) evt); //当单一一个插件认证完成 } else if (evt instanceof PluginOnAuthenticationCallbackCompleted) { //对一个插件认证完成的结果进行处理 onAuthenticationCallbackCompleted(ctx, (PluginOnAuthenticationCallbackCompleted) evt); } else { super.userEventTriggered(ctx, evt); } } private void onAuthentication(ChannelHandlerContext ctx, PluginOnAuthentication event) { //判断是否存在可用OnAuthenticationCallback.class,这里再判断一次原因是因为有时差。 boolean available = this.callbackRegistry.isAvailable(OnAuthenticationCallback.class); if (available) { //获得所有已注册的OnAuthenticationCallback,并构建新的队列 Deque
leftCallbacks = new ArrayDeque(this.callbackRegistry.getCallbacks(OnAuthenticationCallback.class)); //获得认证证书 ClientCredentials clientCredentials = event.getClientCredentials(); //获得callback数量,并作为期待的返回结果数量,以后面处理完成作为一个判断条件 int expectedResultCount = leftCallbacks.size(); //poll一个OnAuthenticationCallback,进行认证 OnAuthenticationCallback callback = leftCallbacks.poll(); //构建存储结果的list List
results = new ArrayList(leftCallbacks.size()); //提交认证task submitTask(ctx, callback, clientCredentials, event.getConnect(), leftCallbacks, results, expectedResultCount); //如果用户配置需要所有插件都必须全部认证通过,才认为通过认证,并发布认证完成事件事件 } else if (needAllPluginsToReturnTrue()) { ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted( event.getConnect(), event.getClientCredentials(), ReturnCode.REFUSED_NOT_AUTHORIZED, new AuthenticationException("No OnAuthenticationCallback available", ReturnCode.REFUSED_NOT_AUTHORIZED))); } else { //否则,认为认证通过,并发布认证完成事件 ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted( event.getConnect(), event.getClientCredentials(), ReturnCode.ACCEPTED)); } } //当一个callback认证完成 private void onAuthenticationCallbackCompleted(ChannelHandlerContext ctx, PluginOnAuthenticationCallbackCompleted event) { //获得事件结果 List
results = event.getResults(); //获得最后一个result PluginOnAuthenticationResult lastResult = results.get(results.size() - 1); Connect connect = event.getConnect(); ClientCredentials clientCredentials = event.getClientCredentials(); //判断是否可以提前结束,也就是可以确定的到可以返回client端ConnAck if (lastResult.isRefused() || lastResult.isAuthenticated() && !needAllPluginsToReturnTrue() || !lastResult.isAuthenticated() && needAllPluginsToReturnTrue()) { //触发认证完成事件 ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted( connect, clientCredentials, lastResult.getReturnCode(), lastResult.getException())); //判断当前pipeline中是否存在当前handler,并移除 if (ctx.pipeline().get(getClass()) != null) { ctx.pipeline().remove(this); } return; } //如果所有插件认证都完成了 if (results.size() == event.getExpectedResultCount()) { //如果认证通过 if (accepted(results)) { ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted( connect, clientCredentials, ReturnCode.ACCEPTED)); //否则认证失败 } else { ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted( connect, clientCredentials, ReturnCode.REFUSED_NOT_AUTHORIZED)); } //判断并移除 if (ctx.pipeline().get(getClass()) != null) { ctx.pipeline().remove(this); } return; } //如果还有插件未完成认证 Queue
leftCallbacks = event.getLeftCallbacks(); //poll一个OnAuthenticationCallback OnAuthenticationCallback callback = leftCallbacks.poll(); //继续提交认证task submitTask(ctx, callback, clientCredentials, connect, leftCallbacks, results, event.getExpectedResultCount()); } private void submitTask(ChannelHandlerContext ctx, OnAuthenticationCallback callback, ClientCredentials clientCredentials, Connect connect, Queue
leftCallbacks, List
results, int expectedResultCount) { //获得到Future ListenableFuture future = this.callbackExecutor.submit(createTask(callback, clientCredentials)); //创建获得结果的callback ResultCallback resultCallback = createResultCallback(ctx, clientCredentials, connect, leftCallbacks, results, expectedResultCount); //同步获得结果 Futures.addCallback(future, resultCallback, ctx.executor().parent()); } //创建认证task @NotNull @VisibleForTesting Task createTask(OnAuthenticationCallback callback, ClientCredentials clientCredentials) { return new Task(callback, clientCredentials, this.metrics); } //创建获得认证结果FutureCallback @NotNull @VisibleForTesting ResultCallback createResultCallback(ChannelHandlerContext ctx, ClientCredentials clientCredentials, Connect connect, Queue
leftCallbacks, List
results, int expectedResultCount) { return new ResultCallback(ctx, clientCredentials, connect, leftCallbacks, results, expectedResultCount); } //获得用户插件认证配置 private boolean needAllPluginsToReturnTrue() { return this.hiveMQConfigurationService.internalConfiguration() .getBoolean(Internals.PLUGIN_AUTHENTICATION_NEED_ALL_PLUGINS_TO_RETURN_TRUE); } //判断是否认证通过 private boolean accepted(List
results) { boolean needAllPluginsToReturnTrue = needAllPluginsToReturnTrue(); for (PluginOnAuthenticationResult result : results) { if (!needAllPluginsToReturnTrue && result.isAuthenticated()) { return true; } if (needAllPluginsToReturnTrue && !result.isAuthenticated()) { return false; } } return needAllPluginsToReturnTrue; } //同步获得结果的FutureCallback @VisibleForTesting static class ResultCallback implements FutureCallback
{ private final ChannelHandlerContext ctx; private final ClientCredentials clientCredentials; private final Connect connect; private final Queue
leftCallbacks; private final List
results; private final int expectedResultCount; public ResultCallback(ChannelHandlerContext ctx, ClientCredentials clientCredentials, Connect connect, Queue
leftCallbacks, List
results, int expectedResultCount) { this.ctx = ctx; this.clientCredentials = clientCredentials; this.connect = connect; this.leftCallbacks = leftCallbacks; this.results = results; this.expectedResultCount = expectedResultCount; } //没有异常,回调,并触发一个插件认证完成事件 @Override public void onSuccess(@Nullable PluginOnAuthenticationResult result) { this.results.add(result); this.ctx.pipeline().fireUserEventTriggered( new PluginOnAuthenticationCallbackCompleted(this.leftCallbacks, this.results, this.connect, this.clientCredentials, this.expectedResultCount)); } //有异常,回调,并触发一个插件认证完成事件 public void onFailure(Throwable t) { LOGGER.error("OnAuthenticationCallback failed. Skipping all other handlers"); this.results.add(new PluginOnAuthenticationResult(false, ReturnCode.REFUSED_NOT_AUTHORIZED, true, new AuthenticationException(t.getMessage() + " See log for more information", ReturnCode.REFUSED_NOT_AUTHORIZED))); this.ctx.pipeline().fireUserEventTriggered( new PluginOnAuthenticationCallbackCompleted(this.leftCallbacks, this.results, this.connect, this.clientCredentials, this.expectedResultCount)); } } //认证task @VisibleForTesting static class Task implements CallableTask
{ private final OnAuthenticationCallback callback; private final ClientCredentials clientCredentials; private final Metrics metrics; public Task(@NotNull OnAuthenticationCallback callback, ClientCredentials clientCredentials, Metrics metrics) { this.callback = callback; this.clientCredentials = clientCredentials; this.metrics = metrics; } @Override public PluginOnAuthenticationResult call() throws Exception { //获得埋点该插件执行时间的上下文 Timer.Context timer = this.metrics.pluginTimerAuthentication().time(); try { //调用callback,去认证 Boolean authenticated = this.callback.checkCredentials(this.clientCredentials); //构建认证结果 PluginOnAuthenticationResult result = new PluginOnAuthenticationResult(authenticated, authenticated ? ReturnCode.ACCEPTED : ReturnCode.REFUSED_NOT_AUTHORIZED, false); //停止插件执行时间计时 timer.stop(); return result; } catch (AuthenticationException e) { //当插件抛出认证失败exception LOGGER.debug("An exception was raised when calling the OnAuthenticationCallback {}:", this.callback.getClass(), e); //构建认证结果 PluginOnAuthenticationResult result = new PluginOnAuthenticationResult(false, e.getReturnCode(), true, e); //停止计时 timer.stop(); return result; } catch (Throwable t) { //当插件抛出其他Throwable LOGGER.error("Unhandled Exception in OnAuthenticationCallback {}. Skipping all other handlers", this.callback.getClass()); //停止计时 timer.stop(); //插件异常的处理器,去记录日志 PluginExceptionUtils.log(t); //构建认证结果 return new PluginOnAuthenticationResult(false, ReturnCode.REFUSED_NOT_AUTHORIZED, true, new AuthenticationException(t.getMessage() + " See log for more information", ReturnCode.REFUSED_NOT_AUTHORIZED)); } } @NotNull public Class callbackType() { return this.callback.getClass(); } }}

其他事件类

这几个类都是简单的pojo,在前面的源码注释中都已经描述过,所以就不写注释了。

public class PluginOnAuthentication {
private final Connect connect; private final ClientCredentials clientCredentials; public PluginOnAuthentication(Connect connect, ClientCredentials clientCredentials) { this.connect = connect; this.clientCredentials = clientCredentials; } public Connect getConnect() { return connect; } public ClientCredentials getClientCredentials() { return clientCredentials; }}
public class PluginOnAuthenticationCallbackCompleted {
private final Queue
leftCallbacks; private final int expectedResultCount; private final List
results; private final Connect connect; private final ClientCredentials clientCredentials; public PluginOnAuthenticationCallbackCompleted(Queue
leftCallbacks, List
results, Connect connect, ClientCredentials clientCredentials, int expectedResultCount) { this.leftCallbacks = leftCallbacks; this.results = results; this.connect = connect; this.clientCredentials = clientCredentials; this.expectedResultCount = expectedResultCount; } public Queue
getLeftCallbacks() { return leftCallbacks; } public int getExpectedResultCount() { return expectedResultCount; } public List
getResults() { return results; } public Connect getConnect() { return connect; } public ClientCredentials getClientCredentials() { return clientCredentials; }}
public class PluginOnAuthenticationCompleted {
private final Connect connect; private final ClientCredentials clientCredentials; private final ReturnCode returnCode; private final AuthenticationException exception; public PluginOnAuthenticationCompleted(Connect connect, ClientCredentials clientCredentials, ReturnCode returnCode) { this.connect = connect; this.clientCredentials = clientCredentials; this.returnCode = returnCode; this.exception = null; } public PluginOnAuthenticationCompleted(Connect connect, ClientCredentials clientCredentials, ReturnCode returnCode, AuthenticationException exception) { this.connect = connect; this.clientCredentials = clientCredentials; this.returnCode = returnCode; this.exception = exception; } public Connect getConnect() { return connect; } public ClientCredentials getClientCredentials() { return clientCredentials; } public ReturnCode getReturnCode() { return returnCode; } public AuthenticationException getException() { return exception; }}

转载地址:http://xjezn.baihongyu.com/

你可能感兴趣的文章
nginx和tomcat的ssl认证使用https协议访问
查看>>
docker使用tomcat部署应用
查看>>
linux服务器之间复制文件
查看>>
k8s集群搭建
查看>>
自己的阿里云镜像加速器查找
查看>>
spring-cloud-eureka初体验
查看>>
spring-cloud-ribbon简单使用
查看>>
spring-cloud-feign的简单使用
查看>>
spring-cloud-hystrix简单使用
查看>>
docker批量删除镜像
查看>>
deeping操作系统修改已挂载卷名称
查看>>
设计模式之神奇的单例模式
查看>>
linux系统设置oracle开机自启
查看>>
数据库的五种索引类型
查看>>
设计模式之原型模式
查看>>
设计模式之建造者模式
查看>>
设计模式之代理模式
查看>>
设计模式之门面模式
查看>>
设计模式之装饰器模式
查看>>
设计模式之享元模式
查看>>