Android MQTTv5
一、导入依赖
implementation 'org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
二、service代码
package com.zg.fragmentdemo.mqtt; import android.app.Service; import android.content.Context; import android.content.Intent; import android.os.Handler; import android.os.IBinder; import android.os.Looper; import android.util.Log; import android.widget.Toast; import com.google.gson.Gson; import org.eclipse.paho.mqttv5.client.IMqttToken; import org.eclipse.paho.mqttv5.client.MqttAsyncClient; import org.eclipse.paho.mqttv5.client.MqttCallback; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.UUID; public class MessageQueueService extends Service { private static final String TAG = MessageQueueService.class.getName();private final Gson gson = new Gson();int qos = 2;private String clientId;private String topic;private MqttAsyncClient mqttAsyncClient;private final MqttCallback mqttCallback = new MqttCallback() { @Overridepublic void disconnected(MqttDisconnectResponse disconnectResponse) {Log.d(TAG, "disconnected " + disconnectResponse.toString());} @Overridepublic void mqttErrorOccurred(MqttException exception) {Log.d(TAG, "mqttErrorOccurred " + exception.getMessage());} @Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String msg = new String(message.getPayload());Log.d(TAG, String.format("messageArrived Topic: %s, Id: %s, QoS: %s, Message: %s, ", topic, message.getId(), message.getQos(), message));//这里是收到消息} @Overridepublic void deliveryComplete(IMqttToken token) {Log.d(TAG, "deliveryComplete");} @Overridepublic void connectComplete(boolean reconnect, String serverURI) {Log.d(TAG, "connectComplete: " + topic + " reconnect: " + reconnect + " serverURI: " + serverURI);} @Overridepublic void authPacketArrived(int reasonCode, MqttProperties properties) {Log.d(TAG, "authPacketArrived reasonCode: " + reasonCode + " properties: " + properties.toString());}}; /*** 开启服务*/public static void startService(Context context) {Log.i("zxd", "startService: ");context.startService(new Intent(context, MessageQueueService.class));} public MessageQueueService() { } @Overridepublic IBinder onBind(Intent intent) {// TODO: Return the communication channel to the service.throw new UnsupportedOperationException("Not yet implemented");} @Overridepublic void onCreate() {super.onCreate(); topic = "ReceivePayMessage/9/1";clientId = UUID.randomUUID().toString();//这里也可以使用AndroidId try { MemoryPersistence persistence = new MemoryPersistence();mqttAsyncClient = new MqttAsyncClient("tcp://x.y.z.t:1884", clientId, persistence);MqttConnectionOptions mqttConnectionOptions = new MqttConnectionOptions();mqttConnectionOptions.setUserName("admin123");mqttConnectionOptions.setPassword("admin123".getBytes());mqttConnectionOptions.setCleanStart(false);mqttConnectionOptions.setConnectionTimeout(60);mqttConnectionOptions.setKeepAliveInterval(30);mqttConnectionOptions.setAutomaticReconnect(true);mqttAsyncClient.setCallback(mqttCallback);IMqttToken token = mqttAsyncClient.connect(mqttConnectionOptions); token.waitForCompletion();if (token.isComplete()) {Log.d(TAG, "Subscribe: " + topic);mqttAsyncClient.subscribe(topic, qos);} } catch (MqttException me) {Log.e(TAG, String.format("MqttException: %s, msg: %s, loc: %s, cause: %s", me.getReasonCode(), me.getMessage(), me.getLocalizedMessage(), me.getCause()));} catch (Exception e) {Log.e(TAG, e.getMessage());}} @Overridepublic void onDestroy() {super.onDestroy();Log.d(TAG, "isConnected " + mqttAsyncClient.isConnected()); if (mqttAsyncClient.isConnected()) {try {mqttAsyncClient.disconnect();mqttAsyncClient.close();} catch (MqttException e) {Log.d(TAG, "onDestroy " + e);}Log.d(TAG, "Close client." + mqttAsyncClient.isConnected());} Log.d(TAG, "onDestroy.");} private void broadcastImage(String session, String url) {Intent intent = new Intent();intent.setAction("main.mqtt");intent.putExtra("session", session);intent.putExtra("image", url);sendBroadcast(intent);} private void broadcastAudio(String session, String url) {Intent intent = new Intent();intent.setAction("main.mqtt");intent.putExtra("session", session);intent.putExtra("audio", url);sendBroadcast(intent);} private void broadcastStory(String session, String text) {Intent intent = new Intent();intent.setAction("main.mqtt");intent.putExtra("session", session);intent.putExtra("story", text);sendBroadcast(intent);} private void broadcastProgress(String progress) {Intent intent = new Intent();intent.setAction("main.progress");intent.putExtra("progress", Integer.valueOf(progress));sendBroadcast(intent);} private void toast(String message) {Handler handler = new Handler(Looper.getMainLooper());handler.post(() -> {Toast.makeText(getApplicationContext(), message, Toast.LENGTH_LONG).show();});} public String logcat(String filter) {StringBuffer buffer = new StringBuffer();String command = "logcat -t 100";if (!filter.isEmpty()) {command.concat(String.format(" | grep %s", filter));} else {command.concat("");}try {Process process = Runtime.getRuntime().exec(command);BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));String temp;while ((temp = reader.readLine()) != null) {buffer.append(temp).append("\n");} } catch (IOException e) {Log.e(TAG, e.toString());}return buffer.toString();} }
在AndroidManifest.xml中添加服务
<serviceandroid:name=".mqtt.MessageQueueService"android:enabled="true"android:exported="true" />
三、使用service
package cn.netkiller.student;import static cn.netkiller.student.cloud.Api.token; import static cn.netkiller.student.utils.AndroidManager.fullscreen;import cn.netkiller.student.service.MessageQueueService;public class MainActivity extends AppCompatActivity {private static final String TAG = MainActivity.class.getSimpleName();private ActivityMainBinding binding;private Intent messageQueueService;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);binding = ActivityMainBinding.inflate(getLayoutInflater());setContentView(binding.getRoot());messageQueueService = new Intent(this, MessageQueueService.class);startService(messageQueueService); // if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { // startForegroundService(new Intent(MainActivity.this, MessageQueueService.class)); // } else { // startService(new Intent(MainActivity.this, MessageQueueService.class)); // }}@Overrideprotected void onDestroy() {super.onDestroy();stopService(messageQueueService);}@Overridepublic void onResume() {super.onResume();Config.Api.token = token();startService(messageQueueService);}@Overridepublic void onStop() {super.onStop();stopService(messageQueueService);} }
四、参考链接
MQTTV5 异步
MQTTV5 同步
GitHub
如果要查看V3或V5的版本,可以点击Tags查看(在code界面,点Tags)
可以google搜索 “client.mqttv5 Android”、“mqttv5 android”
-
MQTTv5 Specification (WIP)
-
Current Paho Java Client JavaDoc
-
The Branch where the MQTTv5 client is currently being built (mqttv5-new)
Android MQTTv5 代码示例
5步让你的Android应用集成MQTT功能(MQTT V5版本) vip免费
Android开发,通过使用mqtt3.1.1版本和mqtt5.0版本进行mqtt数据的发送和接收,实现和服务端数据的交互 需要积分下载
MQTT5.0
MQTT4、5所有消息解析与对比