diff --git a/.mvn/wrapper/maven-wrapper.jar b/.mvn/wrapper/maven-wrapper.jar
deleted file mode 100644
index 9cc84ea..0000000
Binary files a/.mvn/wrapper/maven-wrapper.jar and /dev/null differ
diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties
deleted file mode 100644
index 6c8c0e0..0000000
--- a/.mvn/wrapper/maven-wrapper.properties
+++ /dev/null
@@ -1 +0,0 @@
-distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.4/apache-maven-3.5.4-bin.zip
diff --git a/README.md b/README.md
index da3b9b3..684d80f 100644
--- a/README.md
+++ b/README.md
@@ -1,112 +1,32 @@
-# InChat(当前版本1.6.0)
+# InChat-IM-API(当前版本0.0.2)
## 简介
->(InChat)Iot Netty Chat
+本项目为InChat核心项目,服务端项目,以API形式作为对外功能,类似腾讯IM的服务端作用,本文也将着重讲解本项目的各个API,目前还没有嵌入Iot通信模块,仅以WebSocket的聊天室作为初期发展,需要使用到Iot的朋友可以先去[Master](https://github.com/UncleCatMySelf/InChat/tree/master)项目了解。
-仿微信聊天应用,一步一步更新,基于SpringBoot-WebSocket通用框架,结合Netty进行聊天社交,并记录聊天日志,
-异步存储,前端暂用SUI Mobile,添加实现TCP/IP后端通信端口(MQTT协议、可实时与单片机等TCP硬件通信)、加入图片处理流,
-聊天实现文字与图片发送功能、API调用Netty长链接执行发送消息(在线数、用户列表)
+## swagger-ui
-## 基本架构图(1.5.2版)
+前端对接公告,目前推出API,请均已此文档说明的为主,其余API非正式版或测试版,误用
+查看API列表
+> http://localhost:8080/susu/swagger-ui.html
-
+## API列表详情
-## 功能
+* 1、账号注册
+> POST http://loclhost:8080/susu/user/to_register
+- 参数:username(用户名)
+- 参数:password(密码)
+- 前端Tip:传值判断,参数均不能为空,密码限制在前端做判断
->实时聊天
->异步CRUD处理消息日志
->获取聊天历史
->用户登录、记录登录用户聊天历史
->防止二次登录
->SUI Mobile仿微信样式
->TCP/IP软硬件通信(8092)
->MQTT协议下的Iot物联网通信(8094)
->图片发送聊天功能
->API调用Netty长链接执行发送消息(在线用户数、用户列表)
->下版(1.7.0):好友功能等
-## 版本迭代介绍
+## 返回码与信息值
-* 1.0.0版本
+| 返回码 | 信息内容 | 备注 |
+|------|---------|------|
+| 200 | 成功 | |
+| 555 | 参数错误| |
+| 556 | 用户名存在| |
-用户登录,聊天历史,随机用户名,异步数据写入:https://segmentfault.com/a/1190000016615063
+## 提示
-* 1.2.0版本
-
-修复聊天记录功能,实现重复信息录入,完善前端页面,回车监听等:https://segmentfault.com/a/1190000016637814
-
-* 1.3.0版本
-
-用户注册登录功能,系统聊天绑定用户,禁止二次登录等,前端页面大改
-
-* 1.4.1版本
-
-本人主导SUI Mobile构建仿微信样式页面版,使用时开F12手机界面
-
-* 1.5.2版本
-
-TCP/IP软硬件通信-单片机等应用的TCP通信,Netty处理二进制图片发送聊天功能
-
-* 1.5.8版本
-
-MQTT协议软硬件通信等,Iot物联网
-
-* 1.6.0版本
-
-API调用Netty长链接执行发送消息(在线数、用户列表):https://segmentfault.com/a/1190000016603392
-
-
-## 配置
-
->application.yml 数据库配置、Netty参数配置
-
->TCP需先去com.myself.nettychat.tcptest包下执行CRC16myself获取发送数据,
-
->再执行TCPTestClient发送数据,请勿随意更改发送格式(通信协议来的)
-
->http://localhost:8080/susu/admin/loginsui 启动访问路径
-
->mqtt协议测试在mqttclient包下
-
->http://localhost:8080/susu/swagger-ui.html 查看API文档
-
-## 效果图
-
-.png)
-.png)
-.png)
-.png)
-.png)
-
-
-
-
-## 预留BUG
-
-```
-io.netty.handler.codec.CorruptedFrameException: Max frame length of 65536 has been exceeded.
-图片过大,需要在前端做图片上传压缩
-
-Uncaught TypeError: msg.substring is not a function at WebSocket.socket.onmessage (newChat.js:38)
-前端代码的一点问题,不影响项目正常运行
-
-java.io.IOException: 远程主机强迫关闭了一个现有的连接。
-TCP客户端连接主动关闭,不影响,良性报错
-```
-
-## 下载地址
-
-下载地址:https://github.com/UncleCatMySelf/SBToNettyChat/releases
-
-## 交流与提问
-
-提问与Bug上报:https://github.com/UncleCatMySelf/SBToNettyChat/issues
-
-QQ群:628793702(仅供交流,不提供问题解答)
-
-## 关于作者
-
-个人公众号:UncleCatMySelf
-
-
+仅API列表详情中的API处于可用状态,其余API请勿使用,暂未基本完成,使用请详看文档
\ No newline at end of file
diff --git a/h5/chat.html b/h5/chat.html
deleted file mode 100644
index 91d15b2..0000000
--- a/h5/chat.html
+++ /dev/null
@@ -1,105 +0,0 @@
-
-
-
-
- WebSocket Chat
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/h5/home.html b/h5/home.html
deleted file mode 100644
index 541e6b7..0000000
--- a/h5/home.html
+++ /dev/null
@@ -1,47 +0,0 @@
-
-
-
-
-
- 酥酥
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/h5/index.html b/h5/index.html
deleted file mode 100644
index 32e88ea..0000000
--- a/h5/index.html
+++ /dev/null
@@ -1,67 +0,0 @@
-
-
-
-
-
- 酥酥
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/h5/logoSmall.png b/h5/logoSmall.png
deleted file mode 100644
index 08b5d51..0000000
Binary files a/h5/logoSmall.png and /dev/null differ
diff --git a/mqttclient/mqttclient/.gitignore b/mqttclient/mqttclient/.gitignore
deleted file mode 100644
index 82eca33..0000000
--- a/mqttclient/mqttclient/.gitignore
+++ /dev/null
@@ -1,25 +0,0 @@
-/target/
-!.mvn/wrapper/maven-wrapper.jar
-
-### STS ###
-.apt_generated
-.classpath
-.factorypath
-.project
-.settings
-.springBeans
-.sts4-cache
-
-### IntelliJ IDEA ###
-.idea
-*.iws
-*.iml
-*.ipr
-
-### NetBeans ###
-/nbproject/private/
-/build/
-/nbbuild/
-/dist/
-/nbdist/
-/.nb-gradle/
\ No newline at end of file
diff --git a/mqttclient/mqttclient/.mvn/wrapper/maven-wrapper.jar b/mqttclient/mqttclient/.mvn/wrapper/maven-wrapper.jar
deleted file mode 100644
index 9cc84ea..0000000
Binary files a/mqttclient/mqttclient/.mvn/wrapper/maven-wrapper.jar and /dev/null differ
diff --git a/mqttclient/mqttclient/.mvn/wrapper/maven-wrapper.properties b/mqttclient/mqttclient/.mvn/wrapper/maven-wrapper.properties
deleted file mode 100644
index 6c8c0e0..0000000
--- a/mqttclient/mqttclient/.mvn/wrapper/maven-wrapper.properties
+++ /dev/null
@@ -1 +0,0 @@
-distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.4/apache-maven-3.5.4-bin.zip
diff --git a/mqttclient/mqttclient/mvnw b/mqttclient/mqttclient/mvnw
deleted file mode 100644
index 5bf251c..0000000
--- a/mqttclient/mqttclient/mvnw
+++ /dev/null
@@ -1,225 +0,0 @@
-#!/bin/sh
-# ----------------------------------------------------------------------------
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-# ----------------------------------------------------------------------------
-
-# ----------------------------------------------------------------------------
-# Maven2 Start Up Batch script
-#
-# Required ENV vars:
-# ------------------
-# JAVA_HOME - location of a JDK home dir
-#
-# Optional ENV vars
-# -----------------
-# M2_HOME - location of maven2's installed home dir
-# MAVEN_OPTS - parameters passed to the Java VM when running Maven
-# e.g. to debug Maven itself, use
-# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
-# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
-# ----------------------------------------------------------------------------
-
-if [ -z "$MAVEN_SKIP_RC" ] ; then
-
- if [ -f /etc/mavenrc ] ; then
- . /etc/mavenrc
- fi
-
- if [ -f "$HOME/.mavenrc" ] ; then
- . "$HOME/.mavenrc"
- fi
-
-fi
-
-# OS specific support. $var _must_ be set to either true or false.
-cygwin=false;
-darwin=false;
-mingw=false
-case "`uname`" in
- CYGWIN*) cygwin=true ;;
- MINGW*) mingw=true;;
- Darwin*) darwin=true
- # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
- # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
- if [ -z "$JAVA_HOME" ]; then
- if [ -x "/usr/libexec/java_home" ]; then
- export JAVA_HOME="`/usr/libexec/java_home`"
- else
- export JAVA_HOME="/Library/Java/Home"
- fi
- fi
- ;;
-esac
-
-if [ -z "$JAVA_HOME" ] ; then
- if [ -r /etc/gentoo-release ] ; then
- JAVA_HOME=`java-config --jre-home`
- fi
-fi
-
-if [ -z "$M2_HOME" ] ; then
- ## resolve links - $0 may be a link to maven's home
- PRG="$0"
-
- # need this for relative symlinks
- while [ -h "$PRG" ] ; do
- ls=`ls -ld "$PRG"`
- link=`expr "$ls" : '.*-> \(.*\)$'`
- if expr "$link" : '/.*' > /dev/null; then
- PRG="$link"
- else
- PRG="`dirname "$PRG"`/$link"
- fi
- done
-
- saveddir=`pwd`
-
- M2_HOME=`dirname "$PRG"`/..
-
- # make it fully qualified
- M2_HOME=`cd "$M2_HOME" && pwd`
-
- cd "$saveddir"
- # echo Using m2 at $M2_HOME
-fi
-
-# For Cygwin, ensure paths are in UNIX format before anything is touched
-if $cygwin ; then
- [ -n "$M2_HOME" ] &&
- M2_HOME=`cygpath --unix "$M2_HOME"`
- [ -n "$JAVA_HOME" ] &&
- JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
- [ -n "$CLASSPATH" ] &&
- CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
-fi
-
-# For Migwn, ensure paths are in UNIX format before anything is touched
-if $mingw ; then
- [ -n "$M2_HOME" ] &&
- M2_HOME="`(cd "$M2_HOME"; pwd)`"
- [ -n "$JAVA_HOME" ] &&
- JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
- # TODO classpath?
-fi
-
-if [ -z "$JAVA_HOME" ]; then
- javaExecutable="`which javac`"
- if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
- # readlink(1) is not available as standard on Solaris 10.
- readLink=`which readlink`
- if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
- if $darwin ; then
- javaHome="`dirname \"$javaExecutable\"`"
- javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
- else
- javaExecutable="`readlink -f \"$javaExecutable\"`"
- fi
- javaHome="`dirname \"$javaExecutable\"`"
- javaHome=`expr "$javaHome" : '\(.*\)/bin'`
- JAVA_HOME="$javaHome"
- export JAVA_HOME
- fi
- fi
-fi
-
-if [ -z "$JAVACMD" ] ; then
- if [ -n "$JAVA_HOME" ] ; then
- if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
- # IBM's JDK on AIX uses strange locations for the executables
- JAVACMD="$JAVA_HOME/jre/sh/java"
- else
- JAVACMD="$JAVA_HOME/bin/java"
- fi
- else
- JAVACMD="`which java`"
- fi
-fi
-
-if [ ! -x "$JAVACMD" ] ; then
- echo "Error: JAVA_HOME is not defined correctly." >&2
- echo " We cannot execute $JAVACMD" >&2
- exit 1
-fi
-
-if [ -z "$JAVA_HOME" ] ; then
- echo "Warning: JAVA_HOME environment variable is not set."
-fi
-
-CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
-
-# traverses directory structure from process work directory to filesystem root
-# first directory with .mvn subdirectory is considered project base directory
-find_maven_basedir() {
-
- if [ -z "$1" ]
- then
- echo "Path not specified to find_maven_basedir"
- return 1
- fi
-
- basedir="$1"
- wdir="$1"
- while [ "$wdir" != '/' ] ; do
- if [ -d "$wdir"/.mvn ] ; then
- basedir=$wdir
- break
- fi
- # workaround for JBEAP-8937 (on Solaris 10/Sparc)
- if [ -d "${wdir}" ]; then
- wdir=`cd "$wdir/.."; pwd`
- fi
- # end of workaround
- done
- echo "${basedir}"
-}
-
-# concatenates all lines of a file
-concat_lines() {
- if [ -f "$1" ]; then
- echo "$(tr -s '\n' ' ' < "$1")"
- fi
-}
-
-BASE_DIR=`find_maven_basedir "$(pwd)"`
-if [ -z "$BASE_DIR" ]; then
- exit 1;
-fi
-
-export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
-echo $MAVEN_PROJECTBASEDIR
-MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
-
-# For Cygwin, switch paths to Windows format before running java
-if $cygwin; then
- [ -n "$M2_HOME" ] &&
- M2_HOME=`cygpath --path --windows "$M2_HOME"`
- [ -n "$JAVA_HOME" ] &&
- JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
- [ -n "$CLASSPATH" ] &&
- CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
- [ -n "$MAVEN_PROJECTBASEDIR" ] &&
- MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
-fi
-
-WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
-
-exec "$JAVACMD" \
- $MAVEN_OPTS \
- -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
- "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
- ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/mqttclient/mqttclient/mvnw.cmd b/mqttclient/mqttclient/mvnw.cmd
deleted file mode 100644
index 019bd74..0000000
--- a/mqttclient/mqttclient/mvnw.cmd
+++ /dev/null
@@ -1,143 +0,0 @@
-@REM ----------------------------------------------------------------------------
-@REM Licensed to the Apache Software Foundation (ASF) under one
-@REM or more contributor license agreements. See the NOTICE file
-@REM distributed with this work for additional information
-@REM regarding copyright ownership. The ASF licenses this file
-@REM to you under the Apache License, Version 2.0 (the
-@REM "License"); you may not use this file except in compliance
-@REM with the License. You may obtain a copy of the License at
-@REM
-@REM http://www.apache.org/licenses/LICENSE-2.0
-@REM
-@REM Unless required by applicable law or agreed to in writing,
-@REM software distributed under the License is distributed on an
-@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-@REM KIND, either express or implied. See the License for the
-@REM specific language governing permissions and limitations
-@REM under the License.
-@REM ----------------------------------------------------------------------------
-
-@REM ----------------------------------------------------------------------------
-@REM Maven2 Start Up Batch script
-@REM
-@REM Required ENV vars:
-@REM JAVA_HOME - location of a JDK home dir
-@REM
-@REM Optional ENV vars
-@REM M2_HOME - location of maven2's installed home dir
-@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
-@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
-@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
-@REM e.g. to debug Maven itself, use
-@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
-@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
-@REM ----------------------------------------------------------------------------
-
-@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
-@echo off
-@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
-@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
-
-@REM set %HOME% to equivalent of $HOME
-if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
-
-@REM Execute a user defined script before this one
-if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
-@REM check for pre script, once with legacy .bat ending and once with .cmd ending
-if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
-if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
-:skipRcPre
-
-@setlocal
-
-set ERROR_CODE=0
-
-@REM To isolate internal variables from possible post scripts, we use another setlocal
-@setlocal
-
-@REM ==== START VALIDATION ====
-if not "%JAVA_HOME%" == "" goto OkJHome
-
-echo.
-echo Error: JAVA_HOME not found in your environment. >&2
-echo Please set the JAVA_HOME variable in your environment to match the >&2
-echo location of your Java installation. >&2
-echo.
-goto error
-
-:OkJHome
-if exist "%JAVA_HOME%\bin\java.exe" goto init
-
-echo.
-echo Error: JAVA_HOME is set to an invalid directory. >&2
-echo JAVA_HOME = "%JAVA_HOME%" >&2
-echo Please set the JAVA_HOME variable in your environment to match the >&2
-echo location of your Java installation. >&2
-echo.
-goto error
-
-@REM ==== END VALIDATION ====
-
-:init
-
-@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
-@REM Fallback to current working directory if not found.
-
-set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
-IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
-
-set EXEC_DIR=%CD%
-set WDIR=%EXEC_DIR%
-:findBaseDir
-IF EXIST "%WDIR%"\.mvn goto baseDirFound
-cd ..
-IF "%WDIR%"=="%CD%" goto baseDirNotFound
-set WDIR=%CD%
-goto findBaseDir
-
-:baseDirFound
-set MAVEN_PROJECTBASEDIR=%WDIR%
-cd "%EXEC_DIR%"
-goto endDetectBaseDir
-
-:baseDirNotFound
-set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
-cd "%EXEC_DIR%"
-
-:endDetectBaseDir
-
-IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
-
-@setlocal EnableExtensions EnableDelayedExpansion
-for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
-@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
-
-:endReadAdditionalConfig
-
-SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
-
-set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
-set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
-
-%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
-if ERRORLEVEL 1 goto error
-goto end
-
-:error
-set ERROR_CODE=1
-
-:end
-@endlocal & set ERROR_CODE=%ERROR_CODE%
-
-if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
-@REM check for post script, once with legacy .bat ending and once with .cmd ending
-if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
-if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
-:skipRcPost
-
-@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
-if "%MAVEN_BATCH_PAUSE%" == "on" pause
-
-if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
-
-exit /B %ERROR_CODE%
diff --git a/mqttclient/mqttclient/pom.xml b/mqttclient/mqttclient/pom.xml
deleted file mode 100644
index 2d1b502..0000000
--- a/mqttclient/mqttclient/pom.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-
-
- 4.0.0
-
- com.myself
- mqttclient
- 0.0.1-SNAPSHOT
- jar
-
- mqttclient
- Demo project for Spring Boot
-
-
- org.springframework.boot
- spring-boot-starter-parent
- 2.0.5.RELEASE
-
-
-
-
- UTF-8
- UTF-8
- 1.8
-
-
-
-
- org.springframework.boot
- spring-boot-starter
-
-
-
- org.springframework.boot
- spring-boot-starter-test
- test
-
-
-
-
-
-
- org.springframework.boot
- spring-boot-maven-plugin
-
-
-
-
-
-
diff --git a/mqttclient/mqttclient/src/main/java/com/myself/mqttclient/MqttclientApplication.java b/mqttclient/mqttclient/src/main/java/com/myself/mqttclient/MqttclientApplication.java
deleted file mode 100644
index 569cf76..0000000
--- a/mqttclient/mqttclient/src/main/java/com/myself/mqttclient/MqttclientApplication.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package com.myself.mqttclient;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-@SpringBootApplication
-public class MqttclientApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(MqttclientApplication.class, args);
- }
-}
diff --git a/mqttclient/mqttclient/src/main/resources/application.properties b/mqttclient/mqttclient/src/main/resources/application.properties
deleted file mode 100644
index e69de29..0000000
diff --git a/mqttclient/mqttclient/src/test/java/com/myself/mqttclient/MqttclientApplicationTests.java b/mqttclient/mqttclient/src/test/java/com/myself/mqttclient/MqttclientApplicationTests.java
deleted file mode 100644
index 5c938a0..0000000
--- a/mqttclient/mqttclient/src/test/java/com/myself/mqttclient/MqttclientApplicationTests.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package com.myself.mqttclient;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest
-public class MqttclientApplicationTests {
-
- @Test
- public void contextLoads() {
- }
-
-}
diff --git a/mvnw b/mvnw
deleted file mode 100644
index 5bf251c..0000000
--- a/mvnw
+++ /dev/null
@@ -1,225 +0,0 @@
-#!/bin/sh
-# ----------------------------------------------------------------------------
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-# ----------------------------------------------------------------------------
-
-# ----------------------------------------------------------------------------
-# Maven2 Start Up Batch script
-#
-# Required ENV vars:
-# ------------------
-# JAVA_HOME - location of a JDK home dir
-#
-# Optional ENV vars
-# -----------------
-# M2_HOME - location of maven2's installed home dir
-# MAVEN_OPTS - parameters passed to the Java VM when running Maven
-# e.g. to debug Maven itself, use
-# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
-# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
-# ----------------------------------------------------------------------------
-
-if [ -z "$MAVEN_SKIP_RC" ] ; then
-
- if [ -f /etc/mavenrc ] ; then
- . /etc/mavenrc
- fi
-
- if [ -f "$HOME/.mavenrc" ] ; then
- . "$HOME/.mavenrc"
- fi
-
-fi
-
-# OS specific support. $var _must_ be set to either true or false.
-cygwin=false;
-darwin=false;
-mingw=false
-case "`uname`" in
- CYGWIN*) cygwin=true ;;
- MINGW*) mingw=true;;
- Darwin*) darwin=true
- # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
- # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
- if [ -z "$JAVA_HOME" ]; then
- if [ -x "/usr/libexec/java_home" ]; then
- export JAVA_HOME="`/usr/libexec/java_home`"
- else
- export JAVA_HOME="/Library/Java/Home"
- fi
- fi
- ;;
-esac
-
-if [ -z "$JAVA_HOME" ] ; then
- if [ -r /etc/gentoo-release ] ; then
- JAVA_HOME=`java-config --jre-home`
- fi
-fi
-
-if [ -z "$M2_HOME" ] ; then
- ## resolve links - $0 may be a link to maven's home
- PRG="$0"
-
- # need this for relative symlinks
- while [ -h "$PRG" ] ; do
- ls=`ls -ld "$PRG"`
- link=`expr "$ls" : '.*-> \(.*\)$'`
- if expr "$link" : '/.*' > /dev/null; then
- PRG="$link"
- else
- PRG="`dirname "$PRG"`/$link"
- fi
- done
-
- saveddir=`pwd`
-
- M2_HOME=`dirname "$PRG"`/..
-
- # make it fully qualified
- M2_HOME=`cd "$M2_HOME" && pwd`
-
- cd "$saveddir"
- # echo Using m2 at $M2_HOME
-fi
-
-# For Cygwin, ensure paths are in UNIX format before anything is touched
-if $cygwin ; then
- [ -n "$M2_HOME" ] &&
- M2_HOME=`cygpath --unix "$M2_HOME"`
- [ -n "$JAVA_HOME" ] &&
- JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
- [ -n "$CLASSPATH" ] &&
- CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
-fi
-
-# For Migwn, ensure paths are in UNIX format before anything is touched
-if $mingw ; then
- [ -n "$M2_HOME" ] &&
- M2_HOME="`(cd "$M2_HOME"; pwd)`"
- [ -n "$JAVA_HOME" ] &&
- JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
- # TODO classpath?
-fi
-
-if [ -z "$JAVA_HOME" ]; then
- javaExecutable="`which javac`"
- if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
- # readlink(1) is not available as standard on Solaris 10.
- readLink=`which readlink`
- if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
- if $darwin ; then
- javaHome="`dirname \"$javaExecutable\"`"
- javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
- else
- javaExecutable="`readlink -f \"$javaExecutable\"`"
- fi
- javaHome="`dirname \"$javaExecutable\"`"
- javaHome=`expr "$javaHome" : '\(.*\)/bin'`
- JAVA_HOME="$javaHome"
- export JAVA_HOME
- fi
- fi
-fi
-
-if [ -z "$JAVACMD" ] ; then
- if [ -n "$JAVA_HOME" ] ; then
- if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
- # IBM's JDK on AIX uses strange locations for the executables
- JAVACMD="$JAVA_HOME/jre/sh/java"
- else
- JAVACMD="$JAVA_HOME/bin/java"
- fi
- else
- JAVACMD="`which java`"
- fi
-fi
-
-if [ ! -x "$JAVACMD" ] ; then
- echo "Error: JAVA_HOME is not defined correctly." >&2
- echo " We cannot execute $JAVACMD" >&2
- exit 1
-fi
-
-if [ -z "$JAVA_HOME" ] ; then
- echo "Warning: JAVA_HOME environment variable is not set."
-fi
-
-CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
-
-# traverses directory structure from process work directory to filesystem root
-# first directory with .mvn subdirectory is considered project base directory
-find_maven_basedir() {
-
- if [ -z "$1" ]
- then
- echo "Path not specified to find_maven_basedir"
- return 1
- fi
-
- basedir="$1"
- wdir="$1"
- while [ "$wdir" != '/' ] ; do
- if [ -d "$wdir"/.mvn ] ; then
- basedir=$wdir
- break
- fi
- # workaround for JBEAP-8937 (on Solaris 10/Sparc)
- if [ -d "${wdir}" ]; then
- wdir=`cd "$wdir/.."; pwd`
- fi
- # end of workaround
- done
- echo "${basedir}"
-}
-
-# concatenates all lines of a file
-concat_lines() {
- if [ -f "$1" ]; then
- echo "$(tr -s '\n' ' ' < "$1")"
- fi
-}
-
-BASE_DIR=`find_maven_basedir "$(pwd)"`
-if [ -z "$BASE_DIR" ]; then
- exit 1;
-fi
-
-export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
-echo $MAVEN_PROJECTBASEDIR
-MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
-
-# For Cygwin, switch paths to Windows format before running java
-if $cygwin; then
- [ -n "$M2_HOME" ] &&
- M2_HOME=`cygpath --path --windows "$M2_HOME"`
- [ -n "$JAVA_HOME" ] &&
- JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
- [ -n "$CLASSPATH" ] &&
- CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
- [ -n "$MAVEN_PROJECTBASEDIR" ] &&
- MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
-fi
-
-WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
-
-exec "$JAVACMD" \
- $MAVEN_OPTS \
- -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
- "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
- ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/mvnw.cmd b/mvnw.cmd
deleted file mode 100644
index 019bd74..0000000
--- a/mvnw.cmd
+++ /dev/null
@@ -1,143 +0,0 @@
-@REM ----------------------------------------------------------------------------
-@REM Licensed to the Apache Software Foundation (ASF) under one
-@REM or more contributor license agreements. See the NOTICE file
-@REM distributed with this work for additional information
-@REM regarding copyright ownership. The ASF licenses this file
-@REM to you under the Apache License, Version 2.0 (the
-@REM "License"); you may not use this file except in compliance
-@REM with the License. You may obtain a copy of the License at
-@REM
-@REM http://www.apache.org/licenses/LICENSE-2.0
-@REM
-@REM Unless required by applicable law or agreed to in writing,
-@REM software distributed under the License is distributed on an
-@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-@REM KIND, either express or implied. See the License for the
-@REM specific language governing permissions and limitations
-@REM under the License.
-@REM ----------------------------------------------------------------------------
-
-@REM ----------------------------------------------------------------------------
-@REM Maven2 Start Up Batch script
-@REM
-@REM Required ENV vars:
-@REM JAVA_HOME - location of a JDK home dir
-@REM
-@REM Optional ENV vars
-@REM M2_HOME - location of maven2's installed home dir
-@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
-@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
-@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
-@REM e.g. to debug Maven itself, use
-@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
-@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
-@REM ----------------------------------------------------------------------------
-
-@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
-@echo off
-@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
-@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
-
-@REM set %HOME% to equivalent of $HOME
-if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
-
-@REM Execute a user defined script before this one
-if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
-@REM check for pre script, once with legacy .bat ending and once with .cmd ending
-if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
-if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
-:skipRcPre
-
-@setlocal
-
-set ERROR_CODE=0
-
-@REM To isolate internal variables from possible post scripts, we use another setlocal
-@setlocal
-
-@REM ==== START VALIDATION ====
-if not "%JAVA_HOME%" == "" goto OkJHome
-
-echo.
-echo Error: JAVA_HOME not found in your environment. >&2
-echo Please set the JAVA_HOME variable in your environment to match the >&2
-echo location of your Java installation. >&2
-echo.
-goto error
-
-:OkJHome
-if exist "%JAVA_HOME%\bin\java.exe" goto init
-
-echo.
-echo Error: JAVA_HOME is set to an invalid directory. >&2
-echo JAVA_HOME = "%JAVA_HOME%" >&2
-echo Please set the JAVA_HOME variable in your environment to match the >&2
-echo location of your Java installation. >&2
-echo.
-goto error
-
-@REM ==== END VALIDATION ====
-
-:init
-
-@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
-@REM Fallback to current working directory if not found.
-
-set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
-IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
-
-set EXEC_DIR=%CD%
-set WDIR=%EXEC_DIR%
-:findBaseDir
-IF EXIST "%WDIR%"\.mvn goto baseDirFound
-cd ..
-IF "%WDIR%"=="%CD%" goto baseDirNotFound
-set WDIR=%CD%
-goto findBaseDir
-
-:baseDirFound
-set MAVEN_PROJECTBASEDIR=%WDIR%
-cd "%EXEC_DIR%"
-goto endDetectBaseDir
-
-:baseDirNotFound
-set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
-cd "%EXEC_DIR%"
-
-:endDetectBaseDir
-
-IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
-
-@setlocal EnableExtensions EnableDelayedExpansion
-for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
-@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
-
-:endReadAdditionalConfig
-
-SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
-
-set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
-set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
-
-%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
-if ERRORLEVEL 1 goto error
-goto end
-
-:error
-set ERROR_CODE=1
-
-:end
-@endlocal & set ERROR_CODE=%ERROR_CODE%
-
-if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
-@REM check for post script, once with legacy .bat ending and once with .cmd ending
-if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
-if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
-:skipRcPost
-
-@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
-if "%MAVEN_BATCH_PAUSE%" == "on" pause
-
-if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
-
-exit /B %ERROR_CODE%
diff --git a/src/main/java/com/myself/nettychat/DefaultAutoService.java b/src/main/java/com/myself/nettychat/DefaultAutoService.java
deleted file mode 100644
index e4b4cc7..0000000
--- a/src/main/java/com/myself/nettychat/DefaultAutoService.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.myself.nettychat;
-
-import com.myself.nettychat.bootstrap.BaseAuthService;
-import org.springframework.stereotype.Service;
-
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 默认权限
- **/
-@Service
-public class DefaultAutoService implements BaseAuthService {
-
- @Override
- public boolean authorized(String username, String password) {
- return true;
- }
-}
diff --git a/src/main/java/com/myself/nettychat/auto/InitServer.java b/src/main/java/com/myself/nettychat/auto/InitServer.java
deleted file mode 100644
index 3729fc3..0000000
--- a/src/main/java/com/myself/nettychat/auto/InitServer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package com.myself.nettychat.auto;
-
-
-import com.myself.nettychat.bootstrap.BootstrapServer;
-import com.myself.nettychat.bootstrap.NettyBootstrapServer;
-import com.myself.nettychat.common.properties.InitNetty;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 初始化服务
- **/
-public class InitServer {
-
- private InitNetty serverBean;
-
- public InitServer(InitNetty serverBean) {
- this.serverBean = serverBean;
- }
-
- BootstrapServer bootstrapServer;
-
- public void open(){
- if(serverBean!=null){
- bootstrapServer = new NettyBootstrapServer();
- bootstrapServer.setServerBean(serverBean);
- bootstrapServer.start();
- }
- }
-
-
- public void close(){
- if(bootstrapServer!=null){
- bootstrapServer.shutdown();
- }
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/auto/ServerAutoConfigure.java b/src/main/java/com/myself/nettychat/auto/ServerAutoConfigure.java
deleted file mode 100644
index 961e458..0000000
--- a/src/main/java/com/myself/nettychat/auto/ServerAutoConfigure.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.myself.nettychat.auto;
-
-import com.myself.nettychat.bootstrap.scan.SacnScheduled;
-import com.myself.nettychat.bootstrap.scan.ScanRunnable;
-import com.myself.nettychat.common.enums.ProtocolEnum;
-import com.myself.nettychat.common.properties.InitNetty;
-import org.apache.commons.lang3.ObjectUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 自动化配置初始化服务
- **/
-@Configuration
-@ConditionalOnClass
-@EnableConfigurationProperties({InitNetty.class})
-public class ServerAutoConfigure {
-
- private static final int _BLACKLOG = 1024;
-
- private static final int CPU =Runtime.getRuntime().availableProcessors();
-
- private static final int SEDU_DAY =10;
-
- private static final int TIMEOUT =120;
-
- private static final int BUF_SIZE=10*1024*1024;
-
-
- public ServerAutoConfigure(){
-
- }
-
- @Bean
- @ConditionalOnMissingBean(name = "sacnScheduled")
- public ScanRunnable initRunable(@Autowired InitNetty serverBean){
- long time =(serverBean==null || serverBean.getPeriod()<5)?10:serverBean.getPeriod();
- ScanRunnable sacnScheduled = new SacnScheduled(time);
- Thread scanRunnable = new Thread(sacnScheduled);
- scanRunnable.setDaemon(true);
- scanRunnable.start();
- return sacnScheduled;
- }
-
-
- @Bean(initMethod = "open", destroyMethod = "close")
- @ConditionalOnMissingBean
- public InitServer initServer(InitNetty serverBean){
- if(!ObjectUtils.allNotNull(serverBean.getMqttport(),serverBean.getServerName())){
- throw new NullPointerException("not set port");
- }
- if(serverBean.getBacklog()<1){
- serverBean.setBacklog(_BLACKLOG);
- }
- if(serverBean.getBossThread()<1){
- serverBean.setBossThread(CPU);
- }
- if(serverBean.getInitalDelay()<0){
- serverBean.setInitalDelay(SEDU_DAY);
- }
- if(serverBean.getPeriod()<1){
- serverBean.setPeriod(SEDU_DAY);
- }
- if(serverBean.getHeart()<1){
- serverBean.setHeart(TIMEOUT);
- }
- if(serverBean.getRevbuf()<1){
- serverBean.setRevbuf(BUF_SIZE);
- }
- if(serverBean.getWorkerThread()<1){
- serverBean.setWorkerThread(CPU*2);
- }
- if(serverBean.getProtocol()==null){
- serverBean.setProtocol(ProtocolEnum.MQTT);
- }
- return new InitServer(serverBean);
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/AbstractBootstrapServer.java b/src/main/java/com/myself/nettychat/bootstrap/AbstractBootstrapServer.java
deleted file mode 100644
index a647848..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/AbstractBootstrapServer.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package com.myself.nettychat.bootstrap;
-
-
-import com.myself.nettychat.bootstrap.coder.ByteBufToWebSocketFrameEncoder;
-import com.myself.nettychat.bootstrap.coder.WebSocketFrameToByteBufDecoder;
-import com.myself.nettychat.common.properties.InitNetty;
-import com.myself.nettychat.common.ssl.SecureSocketSslContextFactory;
-import com.myself.nettychat.common.utils.SpringBeanUtils;
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.HttpServerCodec;
-import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
-import io.netty.handler.codec.mqtt.MqttDecoder;
-import io.netty.handler.codec.mqtt.MqttEncoder;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.timeout.IdleStateHandler;
-import org.apache.commons.lang3.ObjectUtils;
-import org.jboss.netty.util.internal.SystemPropertyUtil;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import java.security.KeyStore;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 抽象类,负责加载edec handler
- **/
-public abstract class AbstractBootstrapServer implements BootstrapServer {
-
- private String PROTOCOL = "TLS";
-
- private SSLContext SERVER_CONTEXT;
-
- private static final String MQTT_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1";
-
-
- /**
- *
- * @param channelPipeline channelPipeline
- * @param serverBean 服务配置参数
- */
- protected void initHandler(ChannelPipeline channelPipeline, InitNetty serverBean){
- if(serverBean.isSsl()){
- if(!ObjectUtils.allNotNull(serverBean.getJksCertificatePassword(),serverBean.getJksFile(),serverBean.getJksStorePassword())){
- throw new NullPointerException("SSL file and password is null");
- }
- initSsl(serverBean);
- SSLEngine engine =
- SERVER_CONTEXT.createSSLEngine();
- engine.setUseClientMode(false);
- channelPipeline.addLast("ssl", new SslHandler(engine));
- }
-
- intProtocolHandler(channelPipeline,serverBean);
- channelPipeline.addLast(new IdleStateHandler(serverBean.getHeart(),0,0));
- channelPipeline.addLast( SpringBeanUtils.getBean(serverBean.getMqttHander()));
-
- }
-
- private void intProtocolHandler(ChannelPipeline channelPipeline,InitNetty serverBean){
- switch (serverBean.getProtocol()){
- case MQTT:
- channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
- channelPipeline.addLast("decoder", new MqttDecoder());
- break;
- case MQTT_WS_MQTT:
- channelPipeline.addLast("httpCode", new HttpServerCodec());
- channelPipeline.addLast("aggregator", new HttpObjectAggregator(65536));
- channelPipeline.addLast("webSocketHandler",
- new WebSocketServerProtocolHandler("/", MQTT_CSV_LIST));
- channelPipeline.addLast("wsDecoder", new WebSocketFrameToByteBufDecoder());
- channelPipeline.addLast("wsEncoder", new ByteBufToWebSocketFrameEncoder());
- channelPipeline.addLast("decoder", new MqttDecoder());
- channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
- break;
- case MQTT_WS_PAHO:
- channelPipeline.addLast("httpCode", new HttpServerCodec());
- channelPipeline.addLast("aggregator", new HttpObjectAggregator(65536));
- channelPipeline.addLast("webSocketHandler",
- new WebSocketServerProtocolHandler("/mqtt", MQTT_CSV_LIST));
- channelPipeline.addLast("wsDecoder", new WebSocketFrameToByteBufDecoder());
- channelPipeline.addLast("wsEncoder", new ByteBufToWebSocketFrameEncoder());
- channelPipeline.addLast("decoder", new MqttDecoder());
- channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
- break;
- }
- }
-
- private void initSsl(InitNetty serverBean){
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.submit(() -> {});
- String algorithm = SystemPropertyUtil.get("ssl.KeyManagerFactory.algorithm");
- if (algorithm == null) {
- algorithm = "SunX509";
- }
- SSLContext serverContext;
- try {
- //
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load( SecureSocketSslContextFactory.class.getResourceAsStream(serverBean.getJksFile()),
- serverBean.getJksStorePassword().toCharArray());
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
- kmf.init(ks,serverBean.getJksCertificatePassword().toCharArray());
- serverContext = SSLContext.getInstance(PROTOCOL);
- serverContext.init(kmf.getKeyManagers(), null, null);
- } catch (Exception e) {
- throw new Error(
- "Failed to initialize the server-side SSLContext", e);
- }
- SERVER_CONTEXT = serverContext;
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/BaseApi.java b/src/main/java/com/myself/nettychat/bootstrap/BaseApi.java
deleted file mode 100644
index edd8720..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/BaseApi.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.myself.nettychat.bootstrap;
-
-import javax.validation.constraints.NotNull;
-import java.util.Arrays;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 逻辑操作封装
- **/
-public interface BaseApi {
-
- default void doIfElse(T t, Predicate predicate, Consumer consumer){
- if(t!=null){
- if(predicate.test(t)){
- consumer.accept(t);
- }
- }
- }
-
-
- default void doIfElse(T t, Predicate predicate, Consumer consumer, Consumer consumer2){
- if(t!=null){
- if(predicate.test(t)){
- consumer.accept(t);
- }
- else{
- consumer2.accept(t);
- }
- }
- }
- default boolean doIf(T t, Predicate... predicates){
- if(t!=null){
- for(Predicate p:predicates){
- if(!p.test(t)){
- return false;
- }
- }
- return true;
- }
- return false;
- }
-
- default void doIfAnd(T t, Consumer consumer2, Predicate... predicates){
- boolean flag =true;
- if(t!=null){
- for(Predicate p:predicates){
- if(!p.test(t)){
- flag= false;
- break;
- }
- }
- }
- if(flag){
- consumer2.accept(t);
- }
- }
-
- default void doIfAnd1(@NotNull T t, @NotNull Consumer consumer2, @NotNull Predicate... predicates){
- Predicate one = predicates[0];
- int l;
- if((l=predicates.length)>1){
- for(int i=1;i topics);
-
- void loginSuccess(Channel channel, String deviceId, MqttConnectMessage mqttConnectMessage);
-
- void publishSuccess(Channel channel, MqttPublishMessage mqttPublishMessage);
-
- void closeSuccess(String deviceId,boolean isDisconnect);
-
- void sendWillMsg(WillMeaasge willMeaasge);
-
- String getDeviceId(Channel channel);
-
- void unsubscribe(String deviceId, List topics1);
-
- void doPubrel(Channel channel, int mqttMessage);
-
- void doPubrec(Channel channel, int mqttMessage);
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/NettyBootstrapServer.java b/src/main/java/com/myself/nettychat/bootstrap/NettyBootstrapServer.java
deleted file mode 100644
index 816eea7..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/NettyBootstrapServer.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package com.myself.nettychat.bootstrap;
-
-import com.myself.nettychat.common.ip.IpUtils;
-import com.myself.nettychat.common.properties.InitNetty;
-import com.myself.nettychat.common.utils.RemotingUtil;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc mtqq netty启动服务类
- **/
-@Slf4j
-@Data
-public class NettyBootstrapServer extends AbstractBootstrapServer {
-
- private InitNetty serverBean;
-
- public InitNetty getServerBean() {
- return serverBean;
- }
-
- public void setServerBean(InitNetty serverBean) {
- this.serverBean = serverBean;
- }
-
- private EventLoopGroup bossGroup;
-
- private EventLoopGroup workGroup;
-
- ServerBootstrap bootstrap=null ;// 启动辅助类
-
- /**
- * 服务开启
- */
- public void start() {
- initEventPool();
- bootstrap.group(bossGroup, workGroup)
- .channel(useEpoll()?EpollServerSocketChannel.class:NioServerSocketChannel.class)
- .option(ChannelOption.SO_REUSEADDR, serverBean.isReuseaddr())
- .option(ChannelOption.SO_BACKLOG, serverBean.getBacklog())
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .option(ChannelOption.SO_RCVBUF, serverBean.getRevbuf())
- .childHandler(new ChannelInitializer() {
- protected void initChannel(SocketChannel ch) throws Exception {
- initHandler(ch.pipeline(),serverBean);
- }
- })
- .childOption(ChannelOption.TCP_NODELAY, serverBean.isNodelay())
- .childOption(ChannelOption.SO_KEEPALIVE, serverBean.isKeepalive())
- .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- bootstrap.bind(IpUtils.getHost(),serverBean.getMqttport()).addListener((ChannelFutureListener) channelFuture -> {
- if (channelFuture.isSuccess())
- log.info("服务端启动成功【" + IpUtils.getHost() + ":" + serverBean.getMqttport() + "】");
- else
- log.info("服务端启动失败【" + IpUtils.getHost() + ":" + serverBean.getMqttport() + "】");
- });
- }
- /**
- * 初始化EnentPool 参数
- */
- private void initEventPool(){
- bootstrap= new ServerBootstrap();
- if(useEpoll()){
- bossGroup = new EpollEventLoopGroup(serverBean.getBossThread(), new ThreadFactory() {
- private AtomicInteger index = new AtomicInteger(0);
-
- public Thread newThread(Runnable r) {
- return new Thread(r, "LINUX_BOSS_" + index.incrementAndGet());
- }
- });
- workGroup = new EpollEventLoopGroup(serverBean.getWorkerThread(), new ThreadFactory() {
- private AtomicInteger index = new AtomicInteger(0);
-
- public Thread newThread(Runnable r) {
- return new Thread(r, "LINUX_WORK_" + index.incrementAndGet());
- }
- });
-
- }
- else {
- bossGroup = new NioEventLoopGroup(serverBean.getBossThread(), new ThreadFactory() {
- private AtomicInteger index = new AtomicInteger(0);
-
- public Thread newThread(Runnable r) {
- return new Thread(r, "BOSS_" + index.incrementAndGet());
- }
- });
- workGroup = new NioEventLoopGroup(serverBean.getWorkerThread(), new ThreadFactory() {
- private AtomicInteger index = new AtomicInteger(0);
-
- public Thread newThread(Runnable r) {
- return new Thread(r, "WORK_" + index.incrementAndGet());
- }
- });
- }
- }
-
- /**
- * 关闭资源
- */
- public void shutdown() {
- if(workGroup!=null && bossGroup!=null ){
- try {
- bossGroup.shutdownGracefully().sync();// 优雅关闭
- workGroup.shutdownGracefully().sync();
- } catch (InterruptedException e) {
- log.info("服务端关闭资源失败【" + IpUtils.getHost() + ":" + serverBean.getMqttport() + "】");
- }
- }
- }
-
- private boolean useEpoll() {
- return RemotingUtil.isLinuxPlatform()
- && Epoll.isAvailable();
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/bean/MqttChannel.java b/src/main/java/com/myself/nettychat/bootstrap/bean/MqttChannel.java
deleted file mode 100644
index 9c830ce..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/bean/MqttChannel.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package com.myself.nettychat.bootstrap.bean;
-
-import io.netty.channel.Channel;
-import io.netty.util.AttributeKey;
-import lombok.Builder;
-import lombok.Data;
-
-import com.myself.nettychat.common.enums.SubStatus;
-import com.myself.nettychat.common.enums.SessionStatus;
-
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc channel 封装类
- **/
-@Builder
-@Data
-public class MqttChannel {
-
- private transient volatile Channel channel;
-
-
- private String deviceId;
-
-
- private boolean isWill;
-
-
- private volatile SubStatus subStatus; // 是否订阅过主题
-
-
- private Set topic ;
-
-
-
- private volatile SessionStatus sessionStatus; // 在线 - 离线
-
-
-
- private volatile boolean cleanSession; // 当为 true 时 channel close 时 从缓存中删除 此channel
-
-
-
-
- private ConcurrentHashMap message ; // messageId - message(qos1) // 待确认消息
-
-
- private Set receive;
-
- public void addRecevice(int messageId){
- receive.add(messageId);
- }
-
- public boolean checkRecevice(int messageId){
- return receive.contains(messageId);
- }
-
- public boolean removeRecevice(int messageId){
- return receive.remove(messageId);
- }
-
-
- public void addSendMqttMessage(int messageId,SendMqttMessage msg){
- message.put(messageId,msg);
- }
-
-
- public SendMqttMessage getSendMqttMessage(int messageId){
- return message.get(messageId);
- }
-
-
- public void removeSendMqttMessage(int messageId){
- message.remove(messageId);
- }
-
-
- /**
- * 判断当前channel 是否登录过
- * @return
- */
- public boolean isLogin(){
- return Optional.ofNullable(this.channel).map(channel1 -> {
- AttributeKey _login = AttributeKey.valueOf("login");
- return channel1.isActive() && channel1.hasAttr(_login);
- }).orElse(false);
- }
-
- /**
- * 非正常关闭
- */
- public void close(){
- Optional.ofNullable(this.channel).ifPresent(channel1 -> channel1.close());
- }
-
- /**
- * 通道是否活跃
- * @return
- */
- public boolean isActive(){
- return channel!=null&&this.channel.isActive();
- }
-
-
-
- public boolean addTopic(Set topics){
- return topic.addAll(topics);
- }
-
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/bean/RetainMessage.java b/src/main/java/com/myself/nettychat/bootstrap/bean/RetainMessage.java
deleted file mode 100644
index 814696a..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/bean/RetainMessage.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.myself.nettychat.bootstrap.bean;
-
-import io.netty.handler.codec.mqtt.MqttQoS;
-import lombok.Builder;
-import lombok.Data;
-
-@Builder
-@Data
-public class RetainMessage {
-
- private byte[] byteBuf;
-
- private MqttQoS qoS;
- public String getString(){
- return new String(byteBuf);
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/bean/SendMqttMessage.java b/src/main/java/com/myself/nettychat/bootstrap/bean/SendMqttMessage.java
deleted file mode 100644
index ac5faef..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/bean/SendMqttMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.myself.nettychat.bootstrap.bean;
-
-
-import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.MqttQoS;
-import lombok.Builder;
-import lombok.Data;
-
-import com.myself.nettychat.common.enums.ConfirmStatus;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc mqtts 消息
- **/
-@Builder
-@Data
-public class SendMqttMessage {
-
-
- private int messageId;
-
- private Channel channel;
-
- private volatile ConfirmStatus confirmStatus;
-
- private long time;
-
- private byte[] byteBuf;
-
- private boolean isRetain;
-
- private MqttQoS qos;
-
- private String topic;
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/bean/SessionMessage.java b/src/main/java/com/myself/nettychat/bootstrap/bean/SessionMessage.java
deleted file mode 100644
index 2613cd6..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/bean/SessionMessage.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.myself.nettychat.bootstrap.bean;
-
-
-import io.netty.handler.codec.mqtt.MqttQoS;
-import lombok.Builder;
-import lombok.Data;
-
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc Session会话数据保存
- **/
-@Builder
-@Data
-public class SessionMessage {
-
- private byte[] byteBuf;
-
- private MqttQoS qoS;
-
- private String topic;
-
-
- public String getString(){
- return new String(byteBuf);
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/bean/WillMeaasge.java b/src/main/java/com/myself/nettychat/bootstrap/bean/WillMeaasge.java
deleted file mode 100644
index 4022d28..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/bean/WillMeaasge.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.myself.nettychat.bootstrap.bean;
-
-import lombok.Builder;
-import lombok.Data;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 遗嘱消息
- **/
-@Builder
-@Data
-public class WillMeaasge {
-
- private String willTopic;
-
- private String willMessage;
-
- private boolean isRetain;
-
- private int qos;
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/AbstractChannelService.java b/src/main/java/com/myself/nettychat/bootstrap/channel/AbstractChannelService.java
deleted file mode 100644
index c0e51f2..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/AbstractChannelService.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.BaseApi;
-import com.myself.nettychat.bootstrap.ChannelService;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.myself.nettychat.bootstrap.bean.MqttChannel;
-import com.myself.nettychat.bootstrap.bean.RetainMessage;
-import com.myself.nettychat.bootstrap.channel.cache.CacheMap;
-import com.myself.nettychat.bootstrap.scan.ScanRunnable;
-import io.netty.channel.Channel;
-import io.netty.util.AttributeKey;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Collection;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 抽象类
- **/
-@Slf4j
-public abstract class AbstractChannelService extends PublishApiSevice implements ChannelService , BaseApi {
-
- protected AttributeKey _login = AttributeKey.valueOf("login");
-
- protected AttributeKey _deviceId = AttributeKey.valueOf("deviceId");
-
- protected static char SPLITOR = '/';
-
- protected ExecutorService executorService =Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
-
-
- protected static CacheMap cacheMap= new CacheMap<>();
-
-
- protected static ConcurrentHashMap mqttChannels = new ConcurrentHashMap<>(); // deviceId - mqChannel 登录
-
-
- protected static ConcurrentHashMap> retain = new ConcurrentHashMap<>(); // topic - 保留消息
-
-
-
- protected static Cache> mqttChannelCache = CacheBuilder.newBuilder().maximumSize(100).build();
-
- public AbstractChannelService(ScanRunnable scanRunnable) {
- super(scanRunnable);
- }
-
-
- protected Collection getChannels(String topic,TopicFilter topicFilter){
- try {
- return mqttChannelCache.get(topic, () -> topicFilter.filter(topic));
- } catch (Exception e) {
- log.info(String.format("guava cache key topic【%s】 channel value== null ",topic));
- }
- return null;
- }
-
-
- @FunctionalInterface
- interface TopicFilter{
- Collection filter(String topic);
- }
-
- protected boolean deleteChannel(String topic,MqttChannel mqttChannel){
- return Optional.ofNullable(topic).map(s -> {
- mqttChannelCache.invalidate(s);
- return cacheMap.delete(getTopic(s),mqttChannel);
- }).orElse(false);
- }
-
- protected boolean addChannel(String topic,MqttChannel mqttChannel)
- {
- return Optional.ofNullable(topic).map(s -> {
- mqttChannelCache.invalidate(s);
- return cacheMap.putData(getTopic(s),mqttChannel);
- }).orElse(false);
- }
-
- /**
- * 获取channel
- */
- public MqttChannel getMqttChannel(String deviceId){
- return Optional.ofNullable(deviceId).map(s -> mqttChannels.get(s))
- .orElse(null);
-
- }
-
- /**
- * 获取channelId
- */
- public String getDeviceId(Channel channel){
- return Optional.ofNullable(channel).map( channel1->channel1.attr(_deviceId).get())
- .orElse(null);
- }
-
-
-
- protected String[] getTopic(String topic) {
- return Optional.ofNullable(topic).map(s ->
- StringUtils.split(topic,SPLITOR)
- ).orElse(null);
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/ClientSessionService.java b/src/main/java/com/myself/nettychat/bootstrap/channel/ClientSessionService.java
deleted file mode 100644
index 9a23c8b..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/ClientSessionService.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.bean.SessionMessage;
-import org.springframework.stereotype.Service;
-
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 会话保留处理
- **/
-@Service
-public class ClientSessionService {
-
- private static ConcurrentHashMap> queueSession = new ConcurrentHashMap<>(); // 连接关闭后 保留此session 数据 deviceId
-
-
- public void saveSessionMsg(String deviceId, SessionMessage sessionMessage) {
- ConcurrentLinkedQueue sessionMessages = queueSession.getOrDefault(deviceId, new ConcurrentLinkedQueue<>());
- boolean flag;
- do{
- flag = sessionMessages.add(sessionMessage);
- }
- while (!flag);
- queueSession.put(deviceId,sessionMessages);
- }
-
- public ConcurrentLinkedQueue getByteBuf(String deviceId){
- return Optional.ofNullable(deviceId).map(s -> queueSession.get(s))
- .orElse(null);
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/MqttChannelService.java b/src/main/java/com/myself/nettychat/bootstrap/channel/MqttChannelService.java
deleted file mode 100644
index fa5ec21..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/MqttChannelService.java
+++ /dev/null
@@ -1,440 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.bean.*;
-import com.myself.nettychat.bootstrap.scan.ScanRunnable;
-import com.myself.nettychat.common.enums.ConfirmStatus;
-import com.myself.nettychat.common.enums.SessionStatus;
-import com.myself.nettychat.common.enums.SubStatus;
-import com.myself.nettychat.common.exception.ConnectionException;
-import com.myself.nettychat.common.utils.ByteBufUtil;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.*;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.util.CollectionUtils;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc Channel事件处理service
- **/
-@Slf4j
-@Component
-public class MqttChannelService extends AbstractChannelService {
-
- @Autowired
- private ClientSessionService clientSessionService;
-
- @Autowired
- private WillService willService;
-
- private final ScanRunnable scanRunnable;
-
- public MqttChannelService(ScanRunnable scanRunnable) {
- super(scanRunnable);
- this.scanRunnable = scanRunnable;
- }
-
-
- /**
- * 取消订阅
- */
- @Override
- public void unsubscribe(String deviceId, List topics1) {
- Optional.ofNullable(mqttChannels.get(deviceId)).ifPresent(mqttChannel -> {
- topics1.forEach(topic -> {
- deleteChannel(topic,mqttChannel);
- });
- });
- }
-
- /**
- * 登录成功后 回复
- */
- private void replyLogin(Channel channel, MqttConnectMessage mqttConnectMessage) {
- MqttFixedHeader mqttFixedHeader1 = mqttConnectMessage.fixedHeader();
- MqttConnectVariableHeader mqttConnectVariableHeader = mqttConnectMessage.variableHeader();
- final MqttConnectPayload payload = mqttConnectMessage.payload();
- String deviceId = getDeviceId(channel);
- MqttChannel build = MqttChannel.builder().channel(channel).cleanSession(mqttConnectVariableHeader.isCleanSession())
- .deviceId(payload.clientIdentifier())
- .sessionStatus(SessionStatus.OPEN)
- .isWill(mqttConnectVariableHeader.isWillFlag())
- .subStatus(SubStatus.NO)
- .topic(new CopyOnWriteArraySet<>())
- .message(new ConcurrentHashMap<>())
- .receive(new CopyOnWriteArraySet<>())
- .build();
- if (connectSuccess(deviceId, build)) { // 初始化存储mqttchannel
- if (mqttConnectVariableHeader.isWillFlag()) { // 遗嘱消息标志
- boolean b = doIf(mqttConnectVariableHeader, mqttConnectVariableHeader1 -> (payload.willMessage() != null)
- , mqttConnectVariableHeader1 -> (payload.willTopic() != null));
- if (!b) {
- throw new ConnectionException("will message and will topic is not null");
- }
- // 处理遗嘱消息
- final WillMeaasge buildWill = WillMeaasge.builder().
- qos(mqttConnectVariableHeader.willQos())
- .willMessage(deviceId)
- .willTopic(payload.willTopic())
- .isRetain(mqttConnectVariableHeader.isWillRetain())
- .build();
- willService.save(payload.clientIdentifier(), buildWill);
- } else {
- willService.del(payload.clientIdentifier());
- boolean b = doIf(mqttConnectVariableHeader, mqttConnectVariableHeader1 -> (!mqttConnectVariableHeader1.isWillRetain()),
- mqttConnectVariableHeader1 -> (mqttConnectVariableHeader1.willQos() == 0));
- if (!b) {
- throw new ConnectionException("will retain should be null and will QOS equal 0");
- }
- }
- doIfElse(mqttConnectVariableHeader, mqttConnectVariableHeader1 -> (mqttConnectVariableHeader1.isCleanSession()), mqttConnectVariableHeader1 -> {
- MqttConnectReturnCode connectReturnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;
- MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false);
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
- MqttMessageType.CONNACK, mqttFixedHeader1.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeader1.isRetain(), 0x02);
- MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
- channel.writeAndFlush(connAck);// 清理会话
- }, mqttConnectVariableHeader1 -> {
- MqttConnectReturnCode connectReturnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;
- MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, true);
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
- MqttMessageType.CONNACK, mqttFixedHeader1.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeader1.isRetain(), 0x02);
- MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
- channel.writeAndFlush(connAck);// 非清理会话
-
- }); //发送 session 数据
- ConcurrentLinkedQueue sessionMessages = clientSessionService.getByteBuf(payload.clientIdentifier());
- doIfElse(sessionMessages, messages -> messages != null && !messages.isEmpty(), byteBufs -> {
- SessionMessage sessionMessage;
- while ((sessionMessage = byteBufs.poll()) != null) {
- switch (sessionMessage.getQoS()) {
- case EXACTLY_ONCE:
- sendQosConfirmMsg(MqttQoS.EXACTLY_ONCE,getMqttChannel(deviceId), sessionMessage.getTopic(), sessionMessage.getByteBuf());
- break;
- case AT_MOST_ONCE:
- sendQos0Msg(channel, sessionMessage.getTopic(), sessionMessage.getByteBuf());
- break;
- case AT_LEAST_ONCE:
- sendQosConfirmMsg(MqttQoS.AT_LEAST_ONCE,getMqttChannel(deviceId), sessionMessage.getTopic(), sessionMessage.getByteBuf());
- break;
- }
- }
-
- });
- }
- }
-
-
-
- /**
- * qos2 第二步
- */
- @Override
- public void doPubrel(Channel channel, int messageId) {
- MqttChannel mqttChannel = getMqttChannel(getDeviceId(channel));
- doIfElse(mqttChannel,mqttChannel1 ->mqttChannel1.isLogin(),mqttChannel1 -> {
- mqttChannel1.removeRecevice(messageId);
- sendToPubComp(channel,messageId);
- });
- }
-
-
-
- /**
- * qos2 第三步
- */
- @Override
- public void doPubrec(Channel channel, int mqttMessage) {
- sendPubRel(channel,false,mqttMessage);
- }
-
- /**
- * 连接成功后
- * @param deviceId
- * @param build
- */
- @Override
- public boolean connectSuccess(String deviceId, MqttChannel build) {
- return Optional.ofNullable(mqttChannels.get(deviceId))
- .map(mqttChannel -> {
- switch (mqttChannel.getSessionStatus()){
- case OPEN:
- return false;
- case CLOSE:
- switch (mqttChannel.getSubStatus()){
- case YES: // 清除订阅 topic
- deleteSubTopic(mqttChannel).stream()
- .forEach(s -> cacheMap.putData(getTopic(s),build));
- break;
- }
- }
- mqttChannels.put(deviceId,build);
- return true;
- }).orElseGet(() -> {
- mqttChannels.put(deviceId,build);
- return true;
- });
- }
-
-
- /**
- * 订阅成功后 (发送保留消息)
- */
- public void suscribeSuccess(String deviceId, Set topics){
- doIfElse(topics,topics1->!CollectionUtils.isEmpty(topics1),strings -> {
- MqttChannel mqttChannel = mqttChannels.get(deviceId);
- mqttChannel.setSubStatus(SubStatus.YES); // 设置订阅主题标识
- mqttChannel.addTopic(strings);
- executorService.execute(() -> {
- Optional.ofNullable(mqttChannel).ifPresent(mqttChannel1 -> {
- if(mqttChannel1.isLogin()){
- strings.parallelStream().forEach(topic -> {
- addChannel(topic,mqttChannel);
- sendRetain(topic,mqttChannel); // 发送保留消息
- });
- }
- });
- });
- });
- }
-
-
- /**
- *成功登陆 (发送会话消息)
- * @param channel
- * @param deviceId
- * @param mqttConnectMessage
- */
- @Override
- public void loginSuccess(Channel channel, String deviceId, MqttConnectMessage mqttConnectMessage) {
- channel.attr(_login).set(true);
- channel.attr(_deviceId).set(deviceId);
- replyLogin(channel, mqttConnectMessage);
- }
-
-
- /**
- * 发布消息成功 ()
- * @param channel
- * @param mqttPublishMessage
- */
- @Override
- public void publishSuccess(Channel channel, MqttPublishMessage mqttPublishMessage) {
- MqttFixedHeader mqttFixedHeader = mqttPublishMessage.fixedHeader();
- MqttPublishVariableHeader mqttPublishVariableHeader = mqttPublishMessage.variableHeader();
- MqttChannel mqttChannel = getMqttChannel(getDeviceId(channel));
- ByteBuf payload = mqttPublishMessage.payload();
- byte[] bytes = ByteBufUtil.copyByteBuf(payload); //
- int messageId = mqttPublishVariableHeader.messageId();
- executorService.execute(() -> {
- if (channel.hasAttr(_login) && mqttChannel != null) {
- boolean isRetain;
- switch (mqttFixedHeader.qosLevel()) {
- case AT_MOST_ONCE: // 至多一次
- break;
- case AT_LEAST_ONCE:
- sendPubBack(channel, messageId);
- break;
- case EXACTLY_ONCE:
- sendPubRec(mqttChannel, messageId);
- break;
- }
- if ((isRetain=mqttFixedHeader.isRetain()) && mqttFixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE) { //是保留消息 qos >0
- saveRetain(mqttPublishVariableHeader.topicName(),
- RetainMessage.builder()
- .byteBuf(bytes)
- .qoS(mqttFixedHeader.qosLevel())
- .build(), false);
- } else if (mqttFixedHeader.isRetain() && mqttFixedHeader.qosLevel() == MqttQoS.AT_MOST_ONCE) { // 是保留消息 qos=0 清除之前保留消息 保留现在
- saveRetain(mqttPublishVariableHeader.topicName(),
- RetainMessage.builder()
- .byteBuf(bytes)
- .qoS(mqttFixedHeader.qosLevel())
- .build(), true);
- }
- if (!mqttChannel.checkRecevice(messageId)) {
- push(mqttPublishVariableHeader.topicName(), mqttFixedHeader.qosLevel(), bytes,isRetain);
- mqttChannel.addRecevice(messageId);
- }
- }
- });
-
- }
- /**
- * 推送消息给订阅者
- */
- private void push(String topic, MqttQoS qos, byte[] bytes, boolean isRetain){
- Collection subChannels = getChannels(topic, topic1 -> cacheMap.getData(getTopic(topic1)));
- if(!CollectionUtils.isEmpty(subChannels)){
- subChannels.parallelStream().forEach(subChannel -> {
- switch (subChannel.getSessionStatus()){
- case OPEN: // 在线
- if(subChannel.isActive()){ // 防止channel失效 但是离线状态没更改
- switch (qos){
- case AT_LEAST_ONCE:
- sendQosConfirmMsg(MqttQoS.AT_LEAST_ONCE,subChannel,topic,bytes);
- break;
- case AT_MOST_ONCE:
- sendQos0Msg(subChannel.getChannel(),topic,bytes);
- break;
- case EXACTLY_ONCE:
- sendQosConfirmMsg(MqttQoS.EXACTLY_ONCE,subChannel,topic,bytes);
- break;
- }
- }
- else{
- if(!subChannel.isCleanSession() & !isRetain){
- clientSessionService.saveSessionMsg(subChannel.getDeviceId(),
- SessionMessage.builder().byteBuf(bytes).qoS(qos).topic(topic).build() );
- break;
- }
- }
- break;
- case CLOSE: // 连接 设置了 clean session =false
- clientSessionService.saveSessionMsg(subChannel.getDeviceId(),
- SessionMessage.builder().byteBuf(bytes).qoS(qos).topic(topic).build() );
- break;
- }
- });
- }
- }
-
- /**
- * 关闭channel 操作
- * @param deviceId
- */
- @Override
- public void closeSuccess(String deviceId,boolean isDisconnect) {
- if(StringUtils.isNotBlank(deviceId)){
- executorService.execute(() -> {
- MqttChannel mqttChannel = mqttChannels.get(deviceId);
- Optional.ofNullable(mqttChannel).ifPresent(mqttChannel1 -> {
- mqttChannel1.setSessionStatus(SessionStatus.CLOSE); // 设置关闭
- mqttChannel1.close(); // 关闭channel
- mqttChannel1.setChannel(null);
- if(!mqttChannel1.isCleanSession()){ // 保持会话
- // 处理 qos1 未确认数据
- ConcurrentHashMap message = mqttChannel1.getMessage();
- Optional.ofNullable(message).ifPresent(integerConfirmMessageConcurrentHashMap -> {
- integerConfirmMessageConcurrentHashMap.forEach((integer, confirmMessage) -> doIfElse(confirmMessage, sendMqttMessage ->sendMqttMessage.getConfirmStatus()== ConfirmStatus.PUB, sendMqttMessage ->{
- clientSessionService.saveSessionMsg(mqttChannel.getDeviceId(), SessionMessage.builder()
- .byteBuf(sendMqttMessage.getByteBuf())
- .qoS(sendMqttMessage.getQos())
- .topic(sendMqttMessage.getTopic())
- .build()); // 把待确认数据转入session中
- }
- ));
-
- });
- }
- else{ // 删除sub topic-消息
- mqttChannels.remove(deviceId); // 移除channelId 不保持会话 直接删除 保持会话 旧的在重新connect时替换
- switch (mqttChannel1.getSubStatus()){
- case YES:
- deleteSubTopic(mqttChannel1);
- break;
- }
- }
- if(mqttChannel1.isWill()){ // 发送遗言
- if(!isDisconnect){ // 不是disconnection操作
- willService.doSend(deviceId);
- }
- }
- });
- });
- }
- }
-
- /**
- * 清除channel 订阅主题
- * @param mqttChannel
- */
- public Set deleteSubTopic(MqttChannel mqttChannel){
- Set topics = mqttChannel.getTopic();
- topics.parallelStream().forEach(topic -> cacheMap.delete(getTopic(topic),mqttChannel));
- return topics;
- }
-
- /**
- * 发送 遗嘱消息(有的channel 已经关闭 但是保持了 session 此时加入session 数据中 )
- * @param willMeaasge 遗嘱消息
- */
- public void sendWillMsg(WillMeaasge willMeaasge){
- Collection mqttChannels = getChannels(willMeaasge.getWillTopic(), topic -> cacheMap.getData(getTopic(topic)));
- if(!CollectionUtils.isEmpty(mqttChannels)){
- mqttChannels.forEach(mqttChannel -> {
- switch (mqttChannel.getSessionStatus()){
- case CLOSE:
- clientSessionService.saveSessionMsg(mqttChannel.getDeviceId(),
- SessionMessage.builder()
- .topic(willMeaasge.getWillTopic())
- .qoS(MqttQoS.valueOf(willMeaasge.getQos()))
- .byteBuf(willMeaasge.getWillMessage().getBytes()).build());
- break;
- case OPEN:
- writeWillMsg(mqttChannel,willMeaasge);
- break;
- }
- });
- }
- }
-
- /**
- * 保存保留消息
- * @param topic 主题
- * @param retainMessage 信息
- */
- private void saveRetain(String topic, RetainMessage retainMessage, boolean isClean){
- ConcurrentLinkedQueue retainMessages = retain.getOrDefault(topic, new ConcurrentLinkedQueue<>());
- if(!retainMessages.isEmpty() && isClean){
- retainMessages.clear();
- }
- boolean flag;
- do{
- flag = retainMessages.add(retainMessage);
- }
- while (!flag);
- retain.put(topic, retainMessages);
- }
-
- /**
- * 发送保留消息
- */
- public void sendRetain(String topic,MqttChannel mqttChannel){
- retain.forEach((_topic, retainMessages) -> {
- if(StringUtils.startsWith(_topic,topic)){
- Optional.ofNullable(retainMessages).ifPresent(pubMessages1 -> {
- retainMessages.parallelStream().forEach(retainMessage -> {
- log.info("【发送保留消息】"+mqttChannel.getChannel().remoteAddress()+":"+retainMessage.getString()+"【成功】");
- switch (retainMessage.getQoS()){
- case AT_MOST_ONCE:
- sendQos0Msg(mqttChannel.getChannel(),_topic,retainMessage.getByteBuf());
- break;
- case AT_LEAST_ONCE:
- sendQosConfirmMsg(MqttQoS.AT_LEAST_ONCE,mqttChannel,_topic,retainMessage.getByteBuf());
- break;
- case EXACTLY_ONCE:
- sendQosConfirmMsg(MqttQoS.EXACTLY_ONCE,mqttChannel,_topic,retainMessage.getByteBuf());
- break;
- }
- });
- });
- }
- });
-
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/MqttHandlerService.java b/src/main/java/com/myself/nettychat/bootstrap/channel/MqttHandlerService.java
deleted file mode 100644
index abc4d56..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/MqttHandlerService.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.BaseApi;
-import com.myself.nettychat.bootstrap.BaseAuthService;
-import com.myself.nettychat.bootstrap.ChannelService;
-import com.myself.nettychat.bootstrap.bean.SendMqttMessage;
-import com.myself.nettychat.common.enums.ConfirmStatus;
-import com.myself.nettychat.common.mqtts.ServerMqttHandlerService;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.*;
-import io.netty.handler.timeout.IdleStateEvent;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-@Slf4j
-@Component
-public class MqttHandlerService extends ServerMqttHandlerService implements BaseApi {
-
- @Autowired
- ChannelService mqttChannelService;
-
- private final BaseAuthService baseAuthService;
-
- public MqttHandlerService(BaseAuthService baseAuthService) {
- this.baseAuthService = baseAuthService;
- }
-
- /**
- * 登录
- *
- */
- @Override
- public boolean login(Channel channel, MqttConnectMessage mqttConnectMessage) {
-// 校验规则 自定义校验规则
- MqttConnectPayload payload = mqttConnectMessage.payload();
- String deviceId = payload.clientIdentifier();
- if (StringUtils.isBlank(deviceId)) {
- MqttConnectReturnCode connectReturnCode = MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
- connectBack(channel,connectReturnCode);
- return false;
- }
-
- if(mqttConnectMessage.variableHeader().hasPassword() && mqttConnectMessage.variableHeader().hasUserName()
- && !baseAuthService.authorized(payload.userName(),payload.password())){
- MqttConnectReturnCode connectReturnCode = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
- connectBack(channel,connectReturnCode);
- return false;
- }
- return Optional.ofNullable(mqttChannelService.getMqttChannel(deviceId))
- .map(mqttChannel -> {
- switch (mqttChannel.getSessionStatus()){
- case OPEN:
- return false;
- }
- mqttChannelService.loginSuccess(channel, deviceId, mqttConnectMessage);
- return true;
- }).orElseGet(() -> {
- mqttChannelService.loginSuccess(channel, deviceId, mqttConnectMessage);
- return true;
- });
-
- }
-
- private void connectBack(Channel channel, MqttConnectReturnCode connectReturnCode){
- MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, true);
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
- MqttMessageType.CONNACK,false, MqttQoS.AT_MOST_ONCE, false, 0x02);
- MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
- channel.writeAndFlush(connAck);
- }
-
-
- /**
- * 发布
- */
- @Override
- public void publish(Channel channel, MqttPublishMessage mqttPublishMessage) {
- mqttChannelService.publishSuccess(channel, mqttPublishMessage);
- }
-
- /**
- * 订阅
- */
- @Override
- public void subscribe(Channel channel, MqttSubscribeMessage mqttSubscribeMessage) {
- Set topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription ->
- mqttTopicSubscription.topicName()
- ).collect(Collectors.toSet());
- mqttChannelService.suscribeSuccess(mqttChannelService.getDeviceId(channel), topics);
- subBack(channel, mqttSubscribeMessage, topics.size());
- }
-
- private void subBack(Channel channel, MqttSubscribeMessage mqttSubscribeMessage, int num) {
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
- MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(mqttSubscribeMessage.variableHeader().messageId());
- List grantedQoSLevels = new ArrayList<>(num);
- for (int i = 0; i < num; i++) {
- grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
- }
- MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels);
- MqttSubAckMessage mqttSubAckMessage = new MqttSubAckMessage(mqttFixedHeader, variableHeader, payload);
- channel.writeAndFlush(mqttSubAckMessage);
- }
-
-
- /**
- * 关闭通道
- */
- @Override
- public void close(Channel channel) {
- mqttChannelService.closeSuccess(mqttChannelService.getDeviceId(channel), false);
- channel.close();
- }
-
- /**
- * 回复pong消息
- */
- @Override
- public void pong(Channel channel) {
- if (channel.isOpen() && channel.isActive() && channel.isWritable()) {
- log.info("收到来自:【" + channel.remoteAddress().toString() + "】心跳");
- MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
- channel.writeAndFlush(new MqttMessage(fixedHeader));
- }
- }
-
- /**
- * 取消订阅
- */
- @Override
- public void unsubscribe(Channel channel, MqttUnsubscribeMessage mqttMessage) {
- List topics1 = mqttMessage.payload().topics();
- mqttChannelService.unsubscribe(mqttChannelService.getDeviceId(channel), topics1);
- unSubBack(channel, mqttMessage.variableHeader().messageId());
- }
-
- /**
- * 回复取消订阅
- */
- private void unSubBack(Channel channel, int messageId) {
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
- MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
- MqttUnsubAckMessage mqttUnsubAckMessage = new MqttUnsubAckMessage(mqttFixedHeader, variableHeader);
- channel.writeAndFlush(mqttUnsubAckMessage);
- }
-
-
- /**
- * 消息回复确认(qos1 级别 保证收到消息 但是可能会重复)
- */
- @Override
- public void puback(Channel channel, MqttMessage mqttMessage) {
- MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- int messageId = messageIdVariableHeader.messageId();
- mqttChannelService.getMqttChannel(mqttChannelService.getDeviceId(channel)).getSendMqttMessage(messageId).setConfirmStatus(ConfirmStatus.COMPLETE); // 复制为空
- }
-
-
- /**
- * disconnect 主动断线
- */
- @Override
- public void disconnect(Channel channel) {
- mqttChannelService.closeSuccess(mqttChannelService.getDeviceId(channel), true);
- }
-
-
- /**
- * qos2 发布收到
- */
- @Override
- public void pubrec(Channel channel, MqttMessage mqttMessage ) {
- MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- int messageId = messageIdVariableHeader.messageId();
- mqttChannelService.getMqttChannel(mqttChannelService.getDeviceId(channel)).getSendMqttMessage(messageId).setConfirmStatus(ConfirmStatus.PUBREL); // 复制为空
- mqttChannelService.doPubrec(channel, messageId);
- }
-
- /**
- * qos2 发布释放
- */
- @Override
- public void pubrel(Channel channel, MqttMessage mqttMessage ) {
- MqttMessageIdVariableHeader mqttMessageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- int messageId = mqttMessageIdVariableHeader.messageId();
- mqttChannelService.getMqttChannel(mqttChannelService.getDeviceId(channel)).getSendMqttMessage(messageId).setConfirmStatus(ConfirmStatus.COMPLETE); // 复制为空
- mqttChannelService.doPubrel(channel, messageId);
-
- }
-
- /**
- * qos2 发布完成
- */
- @Override
- public void pubcomp(Channel channel, MqttMessage mqttMessage ) {
- MqttMessageIdVariableHeader mqttMessageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- int messageId = mqttMessageIdVariableHeader.messageId();
- SendMqttMessage sendMqttMessage = mqttChannelService.getMqttChannel(mqttChannelService.getDeviceId(channel)).getSendMqttMessage(messageId);
- sendMqttMessage.setConfirmStatus(ConfirmStatus.COMPLETE); // 复制为空
- }
-
- @Override
- public void doTimeOut(Channel channel, IdleStateEvent evt) {
- log.info("【PingPongService:doTimeOut 心跳超时】" + channel.remoteAddress() + "【channel 关闭】");
- switch (evt.state()) {
- case READER_IDLE:
- close(channel);
- case WRITER_IDLE:
- close(channel);
- case ALL_IDLE:
- close(channel);
- }
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/PublishApiSevice.java b/src/main/java/com/myself/nettychat/bootstrap/channel/PublishApiSevice.java
deleted file mode 100644
index 4af733f..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/PublishApiSevice.java
+++ /dev/null
@@ -1,162 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.bean.MqttChannel;
-import com.myself.nettychat.bootstrap.bean.SendMqttMessage;
-import com.myself.nettychat.bootstrap.bean.WillMeaasge;
-import com.myself.nettychat.bootstrap.scan.ScanRunnable;
-import com.myself.nettychat.common.utils.MessageId;
-import com.myself.nettychat.common.enums.ConfirmStatus;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.*;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 发送消息以及确认
- **/
-@Slf4j
-public class PublishApiSevice {
-
- private final ScanRunnable scanRunnable;
-
- public PublishApiSevice(ScanRunnable scanRunnable) {
- this.scanRunnable = scanRunnable;
- }
-
-
- /**
- * 写入遗嘱消息
- */
- protected void writeWillMsg(MqttChannel mqttChannel, WillMeaasge willMeaasge) {
-// dup保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等
- switch (willMeaasge.getQos()){
- case 0: // qos0
- sendQos0Msg(mqttChannel.getChannel(),willMeaasge.getWillTopic(),willMeaasge.getWillMessage().getBytes());
- break;
- case 1: // qos1
- sendQosConfirmMsg(MqttQoS.AT_LEAST_ONCE,mqttChannel,willMeaasge.getWillTopic(),willMeaasge.getWillMessage().getBytes());
- break;
- case 2: // qos2
- sendQosConfirmMsg(MqttQoS.EXACTLY_ONCE,mqttChannel,willMeaasge.getWillTopic(),willMeaasge.getWillMessage().getBytes());
- break;
- }
-
-
- }
-
- protected void sendQosConfirmMsg(MqttQoS qos, MqttChannel mqttChannel, String topic, byte[] bytes) {
- if(mqttChannel.isLogin()){
- int messageId = MessageId.messageId();
- switch (qos){
- case AT_LEAST_ONCE:
- mqttChannel.addSendMqttMessage(messageId,sendQos1Msg(mqttChannel.getChannel(),topic,false,bytes,messageId));
- break;
- case EXACTLY_ONCE:
- mqttChannel.addSendMqttMessage(messageId,sendQos2Msg(mqttChannel.getChannel(),topic,false,bytes,messageId));
- break;
- }
- }
-
- }
-
-
- /**
- * 发送 qos1 类的消息
- */
- private SendMqttMessage sendQos1Msg(Channel channel, String topic, boolean isDup, byte[] byteBuf, int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,isDup, MqttQoS.AT_LEAST_ONCE,false,0);
- MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic,messageId );
- MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader,mqttPublishVariableHeader, Unpooled.wrappedBuffer(byteBuf));
- channel.writeAndFlush(mqttPublishMessage);
- return addQueue(channel,messageId,topic,byteBuf,MqttQoS.AT_LEAST_ONCE, ConfirmStatus.PUB);
- }
-
-
-
- /**
- * 发送 qos0 类的消息 byte
- */
- protected void sendQos0Msg(Channel channel, String topic, byte[] byteBuf){
- if(channel!=null){
- sendQos0Msg(channel,topic,byteBuf,0);
- }
- }
- private void sendQos0Msg(Channel channel, String topic, byte[] byteBuf,int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_MOST_ONCE,false,0);
- MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic,messageId );
- MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader,mqttPublishVariableHeader,Unpooled.wrappedBuffer(byteBuf));
- channel.writeAndFlush(mqttPublishMessage);
- }
-
-
-
-
- private SendMqttMessage sendQos2Msg(Channel channel, String topic,boolean isDup, byte[] byteBuf, int messageId) {
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,isDup, MqttQoS.EXACTLY_ONCE,false,0);
- MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic,messageId );
- MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader,mqttPublishVariableHeader, Unpooled.wrappedBuffer(byteBuf));
- channel.writeAndFlush(mqttPublishMessage);
- return addQueue(channel,messageId,topic,byteBuf,MqttQoS.EXACTLY_ONCE,ConfirmStatus.PUB);
- }
-
-
- /**
- * 发送qos1 publish 确认消息
- */
- protected void sendPubBack(Channel channel,int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK,false, MqttQoS.AT_MOST_ONCE,false,0x02);
- MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(messageId);
- MqttPubAckMessage mqttPubAckMessage = new MqttPubAckMessage(mqttFixedHeader,from);
- channel.writeAndFlush(mqttPubAckMessage);
- }
-
-
- /**
- * 发送qos2 publish 确认消息 第一步
- */
- protected void sendPubRec( MqttChannel mqttChannel,int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
- MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(messageId);
- MqttPubAckMessage mqttPubAckMessage = new MqttPubAckMessage(mqttFixedHeader,from);
- Channel channel = mqttChannel.getChannel();
- channel.writeAndFlush(mqttPubAckMessage);
- SendMqttMessage sendMqttMessage = addQueue(channel, messageId, null, null, null, ConfirmStatus.PUBREC);
- mqttChannel.addSendMqttMessage(messageId,sendMqttMessage);
- }
-
- /**
- * 发送qos2 publish 确认消息 第二步
- */
- protected void sendPubRel(Channel channel,boolean isDup,int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL,isDup, MqttQoS.AT_LEAST_ONCE,false,0x02);
- MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(messageId);
- MqttPubAckMessage mqttPubAckMessage = new MqttPubAckMessage(mqttFixedHeader,from);
- channel.writeAndFlush(mqttPubAckMessage);
- }
-
- /**
- * 发送qos2 publish 确认消息 第三步
- */
- protected void sendToPubComp(Channel channel,int messageId){
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);
- MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(messageId);
- MqttPubAckMessage mqttPubAckMessage = new MqttPubAckMessage(mqttFixedHeader,from);
- channel.writeAndFlush(mqttPubAckMessage);
- }
-
- private SendMqttMessage addQueue(Channel channel,int messageId,String topic,byte[] datas,MqttQoS mqttQoS,ConfirmStatus confirmStatus){
- SendMqttMessage build = SendMqttMessage.builder().
- channel(channel).
- confirmStatus(confirmStatus).
- messageId(messageId)
- .topic(topic)
- .qos(mqttQoS)
- .byteBuf(datas)
- .time(System.currentTimeMillis()).build();
- scanRunnable.addQueue(build);
- return build;
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/WillService.java b/src/main/java/com/myself/nettychat/bootstrap/channel/WillService.java
deleted file mode 100644
index 3aa4213..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/WillService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.myself.nettychat.bootstrap.channel;
-
-import com.myself.nettychat.bootstrap.BaseApi;
-import com.myself.nettychat.bootstrap.ChannelService;
-import com.myself.nettychat.bootstrap.bean.WillMeaasge;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-@Slf4j
-@Component
-@Data
-@NoArgsConstructor
-public class WillService implements BaseApi {
-
- @Autowired
- ChannelService channelService;
-
- private static ConcurrentHashMap willMeaasges = new ConcurrentHashMap<>(); // deviceid -WillMeaasge
-
-
-
- /**
- * 保存遗嘱消息
- */
- public void save(String deviceid, WillMeaasge build) {
- willMeaasges.put(deviceid,build); // 替换旧的
- }
-
-
- public void doSend( String deviceId) { // 客户端断开连接后 开启遗嘱消息发送
- if(StringUtils.isNotBlank(deviceId)&&(willMeaasges.get(deviceId))!=null){
- WillMeaasge willMeaasge = willMeaasges.get(deviceId);
- channelService.sendWillMsg(willMeaasge); // 发送遗嘱消息
- if(!willMeaasge.isRetain()){ // 移除
- willMeaasges.remove(deviceId);
- log.info("deviceId will message["+willMeaasge.getWillMessage()+"] is removed");
- }
- }
- }
-
- /**
- * 删除遗嘱消息
- */
- public void del(String deviceid ) {willMeaasges.remove(deviceid);}
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/channel/cache/CacheMap.java b/src/main/java/com/myself/nettychat/bootstrap/channel/cache/CacheMap.java
deleted file mode 100644
index 68f805e..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/channel/cache/CacheMap.java
+++ /dev/null
@@ -1,137 +0,0 @@
-package com.myself.nettychat.bootstrap.channel.cache;
-
-import lombok.extern.slf4j.Slf4j;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 缓存操作
- **/
-@Slf4j
-public class CacheMap {
-
- private ConcurrentHashMap> datas = new ConcurrentHashMap<>();
-
- public boolean putData(K[] topic, V v){
- if(topic.length==1){
- Node kvNode = buildOne(topic[0], v);
- if(kvNode!=null && kvNode.topic.equals(topic[0])){
- return true;
- }
- }
- else{
- Node kvNode = buildOne(topic[0], null);
- for(int i=1;i kvNode = datas.get(ks[0]);
- for(int i=1;i getData(K[] ks){
- if(ks.length==1){
- return datas.get(ks[0]).get();
- }
- else{
- Node node = datas.get(ks[0]);
- if(node!=null){
- List all = new ArrayList<>();
- all.addAll(node.get());
- for(int i=1;i buildOne(K k,V v){
-
- Node node = this.datas.computeIfAbsent(k, key -> {
- Node kObjectNode = new Node<>(k);
- return kObjectNode;
- });
- if(v!=null){
- node.put(v);
- }
- return node;
- }
-
-
-
- class Node{
-
- private final K topic;
-
-
- private volatile ConcurrentHashMap> map =new ConcurrentHashMap<>() ;
-
-
- List vs = new CopyOnWriteArrayList<>();
-
-
- public K getTopic() {return topic;}
-
- Node(K topic) {
- this.topic = topic;
- }
-
- public boolean delValue(V v){
- return vs.remove(v);
- }
-
- public Node putNextValue(K k,V v){
- Node kvNode = map.computeIfAbsent(k, key -> {
- Node node = new Node<>(k);
- return node;
- });
- if(v!=null){
- kvNode.put(v);
- }
- return kvNode;
- }
-
-
- public Node getNext(K k){
- return map.get(k);
- }
-
-
- public boolean put(V v){
- return vs.add(v);
- }
-
-
- public List get(){
- return vs;
- }
- }
-
-}
diff --git a/src/main/java/com/myself/nettychat/bootstrap/coder/ByteBufToWebSocketFrameEncoder.java b/src/main/java/com/myself/nettychat/bootstrap/coder/ByteBufToWebSocketFrameEncoder.java
deleted file mode 100644
index 6c34606..0000000
--- a/src/main/java/com/myself/nettychat/bootstrap/coder/ByteBufToWebSocketFrameEncoder.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.myself.nettychat.bootstrap.coder;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToMessageEncoder;
-import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-
-import java.util.List;
-
-/**
- * @author MySelf
- * @create 2018/9/22
- * @desc 转换
- **/
-public class ByteBufToWebSocketFrameEncoder extends MessageToMessageEncoder {
-
- @Override
- protected void encode(ChannelHandlerContext ctx, ByteBuf byteBuf, List