HTTP 认证使用外部自建 HTTP 应用认证数据源,根据 HTTP API 返回的数据判定认证结果,能够实现复杂的认证鉴权逻辑。



官方文档

https://docs.emqx.cn/cn/broker/latest/advanced/auth-http.html


  根目录/etc/plugins/emqx_auth_http.conf

##--------------------------------------------------------------------

## Authentication request.


## Value: URL

auth.http.auth_req = http://192.168.104.50:8787/mqtt/authHttp/auth


## Value: post | get | put

auth.http.auth_req.method = post


## It only works when method=post

## Value: json | x-www-form-urlencoded

auth.http.auth_req.content_type = x-www-form-urlencoded


## Variables:

##  - %u: username

##  - %c: clientid

##  - %a: ipaddress

##  - %r: protocol

##  - %P: password

##  - %p: sockport of server accepted

##  - %C: common name of client TLS cert

##  - %d: subject of client TLS cert

##

## Value: Params

auth.http.auth_req.params = clientid=%c,username=%u,password=%P


##--------------------------------------------------------------------

## Superuser request.


## Value: URL

auth.http.super_req = http://192.168.104.50:8787/mqtt/authHttp/superuser


## Value: post | get | put

auth.http.super_req.method = post


## It only works when method=pos

## Value: json | x-www-form-urlencoded

auth.http.super_req.content_type = x-www-form-urlencoded


## Variables:

##  - %u: username

##  - %c: clientid

##  - %a: ipaddress

##  - %r: protocol

##  - %P: password

##  - %p: sockport of server accepted

##  - %C: common name of client TLS cert

##  - %d: subject of client TLS cert

##

## Value: Params

auth.http.super_req.params = clientid=%c,username=%u


##--------------------------------------------------------------------

## ACL request.


## Variables:

##  - %A: 1 | 2, 1 = sub, 2 = pub

##  - %u: username

##  - %c: clientid

##  - %a: ipaddress

##  - %r: protocol

##  - %m: mountpoint

##  - %t: topic

##

## Value: URL

auth.http.acl_req = http://192.168.104.50:8787/mqtt/authHttp/acl


## Value: post | get | put

auth.http.acl_req.method = get


## It only works when method=pos

## Value: json | x-www-form-urlencoded

auth.http.acl_req.content_type = x-www-form-urlencoded


## Variables:

##  - %u: username

##  - %c: clientid

##  - %a: ipaddress

##  - %r: protocol

##  - %P: password

##  - %p: sockport of server accepted

##  - %C: common name of client TLS cert

##  - %d: subject of client TLS cert

##

## Value: Params

auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t,mountpoint=%m



启动插件




package com.simba.controller;


import javax.servlet.http.HttpServletResponse;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.http.HttpStatus;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;


/**

 * 认证执行顺序为  Authentication request —> Superuser request —> ACL request

 * 当设备连接服务器的时候,首先执行Authentication request,返回值为200时认证通过,非200认证失败。

 * 当Authentication request 通过则会执行Superuser request 返回值为200时认证通过,非200认证失败。(该处失败不影响连接)

 * 当设备订阅主题,亦或是发布内容的时候 会执行ACL request,需要注意的是,如果Superuser request认证通过,则该处不执行,默认通过。

 * @author liusheng

 *

 */

@Controller

@RequestMapping("/mqtt/authHttp")

public class AuthHttpController {

 

private static final Log logger = LogFactory.getLog(AuthHttpController.class);

    /**

     * 设备连接权限校验

     * @param clientid 客户端ID

     * @param username 用户名

     * @param password 密码

     * @return 认证失败,API 返回4xx

     *         认证成功,API 返回200

     */

    @ResponseBody

    @PostMapping("/auth")

    public void auth(String clientid, String username, String password, HttpServletResponse response){

    logger.info("auth>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");

    logger.info("clientid:"+clientid);

    logger.info("username:"+username);

    logger.info("password:"+password);

    if(username.equals("king")||username.equals("unilink")){

    response.setStatus(HttpStatus.SC_OK);

    }else{

    response.setStatus(HttpStatus.SC_UNAUTHORIZED);

    }

    }

 

    /**

     * 验证是否为超级用户

     * @param clientid 客户端ID

     * @param username 用户名

     * @return 认证失败,API 返回4xx

     *         认证成功,API 返回200

     */

    @ResponseBody

    @PostMapping("/superuser")

    public void isSuper(String clientid, String username,HttpServletResponse response){

    logger.info("superuser>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");

    logger.info("clientid:"+clientid);

    logger.info("username:"+username);

        response.setStatus(HttpStatus.SC_UNAUTHORIZED);

    }

 

    /**

     * 设备订阅主题、发布内容时执行,当设备为超级用户时 不执行

     * @return ACL失败,API 返回4xx

     *         ACL成功,API 返回200

     */

    @ResponseBody

    @GetMapping("/acl")

    public void acl(String access, String username, String clientid, String ipaddr, String topic,HttpServletResponse response){

    logger.info("acl>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");

    logger.info("access:"+access);

    logger.info("username:"+username);

    logger.info("clientid:"+clientid);

    logger.info("ipaddr:"+ipaddr);

    logger.info("topic:"+topic);

        response.setStatus(HttpStatus.SC_OK);

    }

}   

        

说明

    认证执行顺序为  Authentication request —> Superuser request —> ACL request

    当设备连接服务器的时候,首先执行Authentication request,返回值为200时认证通过,非200认证失败。

    当Authentication request 通过则会执行Superuser request 返回值为200时认证通过,非200认证失败。(该处失败不影响连接)

    当设备订阅主题,亦或是发布内容的时候 会执行ACL request,需要注意的是,如果Superuser request认证通过,则该处不执行,默认通过。


这里简单的判断了用户名,后续可进行对应的权限判断

 auth:是在每次登录是都会验证一次。
 superuser:会在登录的时候调用判断是否有超级用户权限
 acl:这个会在发布或订阅数据的时候请求,判断是否允许发布或订阅,不会每次都判断,只对新Topic才进行判断,然后缓存在EMQ里面。

 WebHook采用就是标准的HTTP请求,利用HTTP response的返回状态码表示是否通过。
 通过这种方式,比之前通过查询数据有更强的业务扩展性。比如可以判断设备是否被禁用。设备的ACL权限问题,可以更细微级控制。但是毕竟通过HTTP方式会有性能损耗,这一点为鉴权功能单独划分模块。从业务服务器独立出来单独成为一个服务。


如果验证失败,在mqtt客户端,则会有下面错误:

2021-01-27 14:39:27.853 ERROR 8488 -- [sk-scheduler-10] .m.i.MqttPahoMessageDrivenChannelAdapter : Exception while connecting and subscribing

org.eclipse.paho.client.mqttv3.MqttSecurityException: 错误的用户名或密码

at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:28)

at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:988)

at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:145)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)