信息
我正在尝试找到一种方法以设定的时间间隔从传入的套接字流中读取数据块,但忽略其余数据并且不关闭读取之间的连接。 我想知道是否有人有一些建议?
我问的原因是我得到了一个网络连接的模数转换器(ADC),我想编写一个简单的示波器应用程序。
基本上,一旦我连接到ADC并发送一些初始化命令,就需要几分钟才能稳定下来,此时它开始以字节流的形式抛出测量值。
我想每隔几秒钟读取 1MB 的数据并丢弃其余数据,如果我不丢弃其余数据,ADC 将缓冲 512kB 的读数,然后暂停,因此任何后续读取都将是旧数据。 如果我关闭读取之间的连接,ADC 则需要一段时间才能再次发送数据。
问题
我写了一个简单的 Python 脚本作为测试,在这个脚本中,我使用了一个连续运行的线程,如果设置了一个标志,它会将字节读取到未使用的缓冲区,这似乎工作正常。
当我在Android上尝试此操作时,我遇到了问题,因为似乎只有部分数据被丢弃,如果更新间隔太长,ADC仍会暂停。
我在哪里犯了错误?我的第一个猜测是同步,因为我不确定它是否按预期工作(请参阅 ThreadBucket 类)。 我不得不承认花了很多时间玩这个,尝试不同的同步排列、缓冲区大小、BufferedInputStream
和 NIO,但没有运气。
对此的任何输入将不胜感激,我不确定使用这样的线程是否是在 Java 中正确的方法。
法典
Reader
类设置线程,连接到 ADC,根据请求读取数据,并在两者之间激活位桶线程(为了清楚起见,我省略了初始化和关闭)。
class Reader {
private static final int READ_SIZE = 1024 * 1024;
private String mServer;
private int mPort;
private Socket mSocket;
private InputStream mIn;
private ThreadBucket mThreadBucket;
private byte[] mData = new byte[1];
private final byte[] mBuffer = new byte[READ_SIZE];
Reader(String server, int port) {
mServer = server;
mPort = port;
}
void setup() throws IOException {
mSocket = new Socket(mServer, mPort);
mIn = mSocket.getInputStream();
mThreadBucket = new ThreadBucket(mIn);
mThreadBucket.start();
// Omitted: Send a few init commands a look at the response
// Start discarding data
mThreadBucket.bucket(true);
}
private int readRaw(int samples) throws IOException {
int current = 0;
// Probably fixed size but may change
if (mData.length != samples)
mData = new byte[samples];
// Stop discarding data
mThreadBucket.bucket(false);
// Read in number of samples to mData
while (current < samples) {
int len = mIn.read(mBuffer);
if (current > samples)
current = samples;
if (current + len > samples)
len = samples - current;
System.arraycopy(mBuffer, 0, mData, current, len);
current += mBuffer.length;
}
// Discard data again until the next read
mThreadBucket.bucket(true);
return current;
}
}
ThreadBucket
类连续运行,如果 mBucket
为 true,则将数据传送到位桶。
同步旨在阻止任一线程读取数据,而另一个线程正在读取数据。
public class ThreadBucket extends Thread {
private static final int BUFFER_SIZE = 1024;
private final InputStream mIn;
private Boolean mBucket = false;
private boolean mCancel = false;
public ThreadBucket(final InputStream in) throws IOException {
mIn = in;
}
@Override
public void run() {
while (!mCancel && !Thread.currentThread().isInterrupted()) {
synchronized (this) {
if (mBucket)
try {
mIn.skip(BUFFER_SIZE);
} catch (final IOException e) {
break;
}
}
}
}
public synchronized void bucket(final boolean on) {
mBucket = on;
}
public void cancel() {
mCancel = true;
}
}
谢谢。
您需要以尽可能快的速度连续阅读,然后分别管理对数据的处理。不要将两者混为一谈。