瀏覽代碼

添加发送接收编解码,中继处理

wuyunfeng 2 年之前
父節點
當前提交
d3049a94bc

+ 151 - 0
app/src/main/java/com/wdkl/ncs/s433/receiver/MainActivity.java

@@ -37,6 +37,8 @@ import com.wdkl.ncs.s433.receiver.utils.NetUtil;
 import com.wdkl.ncs.s433.receiver.utils.SpeechUtil;
 import com.wdkl.ncs.s433.receiver.utils.VoiceManagerUtil;
 
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.greenrobot.eventbus.EventBus;
 import org.greenrobot.eventbus.Subscribe;
 import org.greenrobot.eventbus.ThreadMode;
@@ -44,12 +46,20 @@ import org.greenrobot.eventbus.ThreadMode;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
 import java.util.TimeZone;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.zip.CRC32;
 
 import static android.serialport.SerialPort.DEFAULT_SU_PATH;
 
@@ -66,6 +76,11 @@ public class MainActivity extends SerialPortActivity  {
     private SmdtManager smdtManager;
     private boolean debugShow = false;
 
+    //缓存本机收到的消息和时间
+    Map<String,Long> receivedCache = new HashMap<>();
+    //调度延迟任务
+    Timer timer = new Timer();
+
     @Override
     protected void onCreate(Bundle savedInstanceState) {
         super.onCreate(savedInstanceState);
@@ -314,12 +329,148 @@ public class MainActivity extends SerialPortActivity  {
                     });
                 //}
             }
+
+
+
+            //中继实现,每个机器收到一条消息后发出两次,第一次2秒内随机延迟,第二次3-4秒内随机延迟发送。可能会收到很多次。
+            //为防止多个设备同一时间发送导致信号互相干扰(实测两台设备同时发送时,第三个接收者会收不到消息),每个设备在收到消息后
+            //随机延迟一段时间发送,未减少碰撞分2次随机发送
+            long ctime = new Date().getTime();
+            //清理掉1分钟之前收到的消息
+            Iterator<Map.Entry<String, Long>> iterator = receivedCache.entrySet().iterator();
+            while (iterator.hasNext()){
+                if(iterator.next().getValue()-ctime>60000){
+                    iterator.remove();
+                }
+            }
+
+            Random random = new Random();
+            final String org = data;
+            int i = random.nextInt(200)+1;
+            int j=random.nextInt(200)+200;
+            if(!receivedCache.containsKey(data)||receivedCache.containsKey(data)&&receivedCache.get(data)-ctime>60000) {
+                receivedCache.put(data,ctime);
+                Integer[] taskTime = new Integer[]{i,j};
+                for (Integer t : taskTime) {
+                    timer.schedule(new TimerTask() {
+                        @Override
+                        public void run() {
+                            try {
+                                Log.i(TAG, "run: " + org);
+                                if (StringUtils.isNotEmpty(org)) {
+                                    sendMsg(org, mOutputStream, 120);
+                                }
+                            } catch (IOException e) {
+                                e.printStackTrace();
+                            }
+                        }
+                    }, t * 10);
+
+                }
+
+            }
+          //中继完成
+
         } catch (Exception e) {
             e.printStackTrace();
             Log.e("Application", "onDataReceivedString Exception...");
         }
     }
 
+
+    /**
+     * 消息分包发送,包格式如下:每个包可传输的数据内容是发送缓冲区的大小减去21个字节。21个字节为包控制数据
+     * 保证接收端接收后可校验包的正确性。假设发送缓冲区是60个字节,那么每个包最多可发送39个字节的消息数据,
+     * 因为包已经校验准确性,接收端只需合并包内容数据即可。消息id使用整个消息内容的crc校验值
+     * 帧头 有效数据长度 包序 总包数 消息id 包内容        crc包校验
+     * 2B      1B      1B  1B     8B    MAX-18          8B
+     *
+     * @param msg
+     * @param outputStream 输出
+     * @param packageLength 芯片发射分包长度(60,修改后为120)
+     */
+    private void sendMsg(String msg, OutputStream outputStream, int packageLength) throws IOException {
+
+
+        char[] alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_".toCharArray();
+
+        try {
+            byte[] msgBytes = msg.getBytes();
+            //msgBytesLength 要发送的消息总长度,
+            int msgBytesLength = msgBytes.length;
+            //packageContentLength 每个包可发送的长度
+            int packageContentLength = packageLength - 21;
+            //要发送包的个数
+            int packages = msgBytesLength / packageContentLength + (msgBytesLength % packageContentLength != 0 ? 1 : 0);
+//            String msgId = NanoIdUtils.randomNanoId(new Random(), alphabet, 5);
+            CRC32 crc32Msg = new CRC32();
+            crc32Msg.update(msgBytes);
+            long msgId = crc32Msg.getValue();
+//            byte[] packageContent = new byte[packageContentLength];
+            byte[] start = String2Byte("AA55");
+            for (int i = 0; i < packages; i++) {
+                ByteBuffer buffer = ByteBuffer.allocate(packageLength - 8); //校验数据之前的数据
+                buffer.put(start);
+                //消息内容实际数据长度
+                byte contentLength = (byte) Math.min(packageContentLength, msgBytesLength - i * packageContentLength);
+                buffer.put(contentLength);
+                //包序
+                buffer.put((byte) (i + 1));
+                //总包数
+                buffer.put((byte) packages);
+                //消息id字节
+                byte[] msgIdBytes = ByteBuffer.allocate(8).putLong(msgId).array(); //msgId.getBytes();
+                buffer.put(msgIdBytes);
+                //包内容
+                byte[] subarray = ArrayUtils.subarray(msgBytes, i * packageContentLength, i * packageContentLength + contentLength);
+                ByteBuffer wrap = ByteBuffer.allocate(packageContentLength).put(subarray);
+                if (subarray.length < packageContentLength) {
+                    wrap.put(new byte[packageContentLength - subarray.length]);
+                }
+                buffer.put(wrap.array());
+                CRC32 crc32 = new CRC32();
+                //计算校验
+                crc32.update(buffer.array());
+                Log.i(TAG, "sendMsg: "+ArrayUtils.toString(buffer.array()));
+                long value = crc32.getValue();
+                Log.i(TAG, "Crc: " + value);
+                byte[] crc = ByteBuffer.allocate(8).putLong(value).array();
+                ByteBuffer bufferSend = ByteBuffer.allocate(packageLength);
+                //把crc校验加入到包尾
+                bufferSend.put(buffer.array());
+                bufferSend.put(crc);
+                //发送包
+                Log.i(TAG, "sendMsg: 发送:"+ArrayUtils.toString(bufferSend.array()));
+                outputStream.write(bufferSend.array());
+            }
+
+
+        } catch (IOException e) {
+            throw e;
+        }
+
+    }
+
+
+    /**
+     * 16进制字符串转换成byte数组
+     */
+    public byte[] String2Byte(String s) {
+        s = s.replace(" ", "");
+        s = s.replace("#", "");
+        byte[] baKeyword = new byte[s.length() / 2];
+        for (int i = 0; i < baKeyword.length; i++) {
+            try {
+                baKeyword[i] = (byte) (0xff & Integer.parseInt(s.substring(i * 2, i * 2 + 2), 16));
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+        return baKeyword;
+    }
+
+
+
     private void parseTcpData(TcpModel tcpModel) {
         Log.e("Application", "action: " + tcpModel.getAction() + ", data: " + tcpModel.getData());
         try {

+ 131 - 3
app/src/main/java/com/wdkl/ncs/s433/receiver/SerialPortActivity.java

@@ -1,5 +1,7 @@
 package com.wdkl.ncs.s433.receiver;
 
+import static androidx.constraintlayout.motion.utils.Oscillator.TAG;
+
 import android.app.Activity;
 import android.app.AlertDialog;
 import android.content.DialogInterface;
@@ -10,12 +12,21 @@ import android.util.Log;
 import com.wdkl.ncs.s433.receiver.common.Constants;
 import com.wdkl.ncs.s433.receiver.utils.ByteUtil;
 
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.InvalidParameterException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.CRC32;
 
 public abstract class SerialPortActivity extends Activity {
 
@@ -24,6 +35,8 @@ public abstract class SerialPortActivity extends Activity {
     protected OutputStream mOutputStream;
     private InputStream mInputStream;
     private ReadThread mReadThread;
+    //数据包接收缓存
+    private Map<Long, List<byte[]>> msgCache = new HashMap<>();
 
     private byte[] buffer = new byte[1024];
     private byte[] re_buffer = new byte[2048];
@@ -31,7 +44,7 @@ public abstract class SerialPortActivity extends Activity {
     public int writeIndex = 0;
 
     private class ReadThread extends Thread {
-
+        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
         @Override
         public void run() {
             super.run();
@@ -53,10 +66,18 @@ public abstract class SerialPortActivity extends Activity {
                             onDataReceived(buffer, size);
                         }
                     }*/
+                    int size;
 
                     if (mInputStream.available() > 0) {
-                        int size = mInputStream.read(buffer);
-                        parseData(size);
+                        size = mInputStream.read(buffer);
+                        Log.e(TAG, "run: buffer length:" + byteBuffer.position());
+                        byteBuffer.put(ArrayUtils.subarray(buffer, 0, size));
+                        String msg= handlePackage(byteBuffer,120);
+                        if(StringUtils.isNotEmpty(msg)){
+                            onDataReceivedString(msg);
+                        }
+//                         size = mInputStream.read(buffer);
+//                        parseData(size);
                     }
                 } catch (Exception e) {
                     e.printStackTrace();
@@ -66,6 +87,113 @@ public abstract class SerialPortActivity extends Activity {
         }
     }
 
+    /**
+     * 处理数据包,收到数据包头时开始,读取指定数据包长度后校验数据包是否完整,数据包校验不通过时丢弃。
+     * 数据包校验通过,需要获取数据包内容合成数据包内数据消息。
+     * @param buffer 接收缓冲区
+     * @param packageSize 数据包大小
+     * @return
+     */
+    private String handlePackage(ByteBuffer buffer,int packageSize) {
+        try {
+            buffer.flip();
+            Log.e(TAG, "buff信息:position="+buffer.position()+",limit="+buffer.limit() );
+            if (buffer.position() == 0 && buffer.limit() == 0) { //buffer是空的无需处理
+                buffer.compact();
+                return null;
+            }
+            for (int i = 0; i < buffer.limit() - 1; i++) {
+                //监测到包头
+                if (byteArrToHexString(new byte[]{buffer.get(i), buffer.get(i + 1)}).equalsIgnoreCase("AA55")) {
+                    if (i > 0) {
+                        byte[] incorrect = new byte[i + 1];
+                        buffer.get(incorrect, 0, i + 1);
+                    }
+                    //读取120个字节
+                    if (buffer.limit() - i >= packageSize) {
+                        byte[] bytes = new byte[packageSize];
+                        buffer.get(bytes, 0, packageSize);
+                        Log.i(TAG, "handlePackage:  收到:" + ArrayUtils.toString(bytes));
+                        //校验数据包
+                        CRC32 crc32 = new CRC32();
+                        byte[] subarray1 = ArrayUtils.subarray(bytes, 0, bytes.length - 8);
+                        Log.i(TAG, "handlePackage: " + ArrayUtils.toString(subarray1));
+                        crc32.update(subarray1);
+                        //计算出的crc校验
+                        long caculateCheck = crc32.getValue();
+                        byte[] subarray = ArrayUtils.subarray(bytes, bytes.length - 8, bytes.length);
+                        //传输过来的校验
+                        long transCheck = ByteBuffer.wrap(subarray).getLong();
+                        if (caculateCheck == transCheck) {
+                            long msgId = ByteBuffer.wrap(ArrayUtils.subarray(bytes, 5, 13)).getLong();
+                            int packageTotal = bytes[4];
+                            int packageIndex = bytes[3];
+                            int availableLength = bytes[2];
+                            byte[] availalbeData = ArrayUtils.subarray(bytes, 13, 13 + availableLength);
+                            if (packageTotal == 1) { //单包消息
+                                String s = new String(availalbeData);
+                                Log.i(TAG, "收到完整消息: " + s);
+                                buffer.compact();
+                                return s;
+
+                            } else {
+                                if (!msgCache.containsKey(msgId)) {
+                                    List<byte[]> list = new ArrayList<>(packageTotal);
+                                    for (int j = 0; j < packageTotal; j++) {
+                                        list.add(null);
+                                    }
+                                    list.set(packageIndex - 1, availalbeData);
+                                    msgCache.put(msgId, list);
+                                } else {
+                                    List<byte[]> exist = msgCache.get(msgId);
+                                    exist.set(packageIndex - 1, availalbeData);
+                                    if (exist.stream().allMatch(p->p!=null)) { //消息已经接收完整
+                                        int sum = exist.stream().mapToInt(p -> p.length).sum();
+                                        ByteBuffer combine = ByteBuffer.allocate(sum);
+                                        for (byte[] bytes1 : exist) {
+                                            combine.put(bytes1);
+                                        }
+                                        String s = new String(combine.array());
+                                        msgCache.remove(msgId);
+                                        Log.i(TAG, "收到完整消息: " + s);
+                                        buffer.compact();
+                                        return s;
+                                    }
+
+                                }
+                                Log.i(TAG, "handlePackage: 消息ID:" + msgId + "crc-c:" + caculateCheck + ",crc-t:" + transCheck);
+                            }
+                        }
+
+                    }
+                    break;
+                }
+            }
+            if(buffer.limit()>=360){ //如果在3个数据包的缓冲区长度内都没有发现一个包头,清空buffer,防止其他信号干扰
+                buffer.get(new byte[buffer.limit()],0,buffer.limit());
+            }
+            buffer.compact();
+            return null;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    /**
+     * 字节数组转16进制字符串
+     *
+     * @param b
+     * @return
+     */
+    private String byteArrToHexString(byte[] b) {
+        String result = "";
+        for (int i = 0; i < b.length; i++) {
+            result += Integer.toString((b[i] & 0xff) + 0x100, 16).substring(1);
+        }
+        return result;
+    }
+
     public void parseData(int size) {
         Log.e("Serial", "received data size: " + size);
         if (size > 0) {