将null char( 0)视为线路结束的eNtreamReader替代方案



我有一个客户端程序,该程序从远程服务器接收流XML消息。这些流提供了各种体育赛事的实时更新,并且它永远运行。即使没有传输的更新,它仍然每45秒发送一次保留消息。

所讨论的服务有两个提要,有不同的细节。第一个供稿用返回/线馈对(0D,0A)终止每个XML消息,这意味着使用StreamReader消费非常容易。例如:

using (TcpClient Client = new TcpClient(this.Host, this.Port))
{
   using (NetworkStream stream = Client.GetStream())
   {
      StreamReader reader = new StreamReader(stream);
      while (!reader.EndOfStream)
      {
         String message = reader.ReadLine();
         ProcessMessage(message);
      }
   }
}

但是,第二个feed不会用cr/lf终止,而仅以null字符( 0)终止,哪个streamReader.readline()不会将其视为一条线的末端,因此它会阻止永远等待的东西永远不会来。

我不能先将其写给文件,而且我不能让提供供稿的公司更改任何内容,因为它已经被其他人消耗了。

是否有替代流读器将null char视为线结束终结者?

customStreamReader类 - 在' 0'或' r n'上终止。我已经使用了基于文件的流和内存流进行了测试,我想网络流并没有真正不同。...让我知道您的行驶方式!

我还在这篇文章结尾处包含了我的测试中的重要代码,在使用MemoryStream时,我有一些极端的怪异,在读取之前,我有一些极端的怪异。

我真的无法获得太多信用,其中大部分只是从Microsoft参考源中从现有的StreamReader中提取必要的部分...几个小时左右才能成功修改和测试。

CustomStreamReader

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime;
using System.Runtime.CompilerServices;
using System.Security;
using System.Text;
using System.Threading.Tasks;
namespace StreamReaderOverride
{
    public class CustomStreamReader : StreamReader
    {
        private int charPos;
        private int charLen;
        private int byteLen;
        private Encoding encoding;
        private bool _isBlocked;
        private byte[] byteBuffer;
        private Decoder decoder;
        private int bytePos;
        private bool _checkPreamble;
        private bool _detectEncoding;
        private Stream stream;
        private volatile Task _asyncReadTask;
        private char[] charBuffer;
        private byte[] _preamble;
        private bool _closable;
        public bool EndOfStream;
        private int _maxCharsPerBuffer;
        public CustomStreamReader(Stream stream): this(stream, Encoding.UTF8, true, DefaultBufferSize, false)
        {
        }
        public CustomStreamReader(string path): this(path, Encoding.UTF8, true, DefaultBufferSize, false)
        {
        }
        public CustomStreamReader(Stream stream, Encoding encoding, bool detectEncodingFromByteOrderMarks, int bufferSize, bool leaveOpen): base(stream, Encoding.UTF8, detectEncodingFromByteOrderMarks, bufferSize, false)
        {
            if (stream == null || encoding == null)
            {
                throw new ArgumentNullException((stream == null) ? "stream" : "encoding");
            }
            if (!stream.CanRead)
            {
                throw new ArgumentException(GetResourceString("Argument_StreamNotReadable"));
            }
            if (bufferSize <= 0)
            {
                throw new ArgumentOutOfRangeException("bufferSize", GetResourceString("ArgumentOutOfRange_NeedPosNum"));
            }
            this.Init(stream, encoding, detectEncodingFromByteOrderMarks, bufferSize, leaveOpen);
        }
        internal CustomStreamReader(string path, Encoding encoding, bool detectEncodingFromByteOrderMarks, int bufferSize, bool checkHost): base(path, Encoding.UTF8, detectEncodingFromByteOrderMarks, bufferSize)
        {
            if (path == null || encoding == null)
            {
                throw new ArgumentNullException((path == null) ? "path" : "encoding");
            }
            if (path.Length == 0)
            {
                throw new ArgumentException(GetResourceString("Argument_EmptyPath"));
            }
            if (bufferSize <= 0)
            {
                throw new ArgumentOutOfRangeException("bufferSize", GetResourceString("ArgumentOutOfRange_NeedPosNum"));
            }
            Stream stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, false);
            this.Init(stream, encoding, detectEncodingFromByteOrderMarks, bufferSize, false);
        }
        internal static int DefaultBufferSize
        {
            get
            {
                return 1024;
            }
        }

        private void Init(Stream stream, Encoding encoding, bool detectEncodingFromByteOrderMarks, int bufferSize, bool leaveOpen)
        {
            this.stream = stream;
            this.encoding = encoding;
            this.decoder = encoding.GetDecoder();
            if (bufferSize < 128)
            {
                bufferSize = 128;
            }
            this.byteBuffer = new byte[bufferSize];
            this._maxCharsPerBuffer = encoding.GetMaxCharCount(bufferSize);
            this.charBuffer = new char[this._maxCharsPerBuffer];
            this.byteLen = 0;
            this.bytePos = 0;
            this._detectEncoding = detectEncodingFromByteOrderMarks;
            this._preamble = encoding.GetPreamble();
            this._checkPreamble = (this._preamble.Length > 0);
            this._isBlocked = false;
            this._closable = !leaveOpen;
        }
        private void CheckAsyncTaskInProgress()
        {
            Task asyncReadTask = this._asyncReadTask;
            if (asyncReadTask != null && !asyncReadTask.IsCompleted)
            {
                throw new InvalidOperationException(GetResourceString("InvalidOperation_AsyncIOInProgress"));
            }
        }
        public string ReadLineOrNullString()
        {            
            if (this.stream == null)
            {
                return null;
            }
            this.CheckAsyncTaskInProgress();
            if (this.charPos == this.charLen && this.ReadBuffer() == 0)
            {
                return null;
            }
            StringBuilder stringBuilder = null;
            int num;
            char c;
            while (true)
            {
                num = this.charPos;
                do
                {
                    c = this.charBuffer[num];
                    if (c == 'r' || c == 'n' || c == '')
                    {
                        goto IL_4A;
                    }
                    num++;
                }
                while (num < this.charLen);
                num = this.charLen - this.charPos;
                if (stringBuilder == null)
                {
                    stringBuilder = new StringBuilder(num + 80);
                }
                stringBuilder.Append(this.charBuffer, this.charPos, num);
                if (this.ReadBuffer() <= 0)
                {
                    goto Block_11;
                }
            }
        IL_4A:
            string result;
            if (stringBuilder != null)
            {
                stringBuilder.Append(this.charBuffer, this.charPos, num - this.charPos);
                result = stringBuilder.ToString();
            }
            else
            {
                result = new string(this.charBuffer, this.charPos, num - this.charPos);
            }
            this.charPos = num + 1;
            if ((c == 'r' && (this.charPos < this.charLen || this.ReadBuffer() > 0) && this.charBuffer[this.charPos] == 'n'))
            {
                this.charPos++;
            }
            if (this.charPos >= this.charLen)
            {
                this.EndOfStream = true;
            }
            return result;
        Block_11:
            return stringBuilder.ToString();
        }
        internal virtual int ReadBuffer()
        {
            this.charLen = 0;
            this.charPos = 0;
            if (!this._checkPreamble)
            {
                this.byteLen = 0;
            }
            while (true)
            {
                if (this._checkPreamble)
                {
                    int num = this.stream.Read(this.byteBuffer, this.bytePos, this.byteBuffer.Length - this.bytePos);
                    if (num == 0)
                    {
                        break;
                    }
                    this.byteLen += num;
                }
                else
                {
                    this.byteLen = this.stream.Read(this.byteBuffer, 0, this.byteBuffer.Length);
                    if (this.byteLen == 0)
                    {
                        goto Block_5;
                    }
                }
                this._isBlocked = (this.byteLen < this.byteBuffer.Length);
                if (!this.IsPreamble())
                {
                    if (this._detectEncoding && this.byteLen >= 2)
                    {
                        this.DetectEncoding();
                    }
                    this.charLen += this.decoder.GetChars(this.byteBuffer, 0, this.byteLen, this.charBuffer, this.charLen);
                }
                if (this.charLen != 0)
                {
                    goto Block_9;
                }
            }
            if (this.byteLen > 0)
            {
                this.charLen += this.decoder.GetChars(this.byteBuffer, 0, this.byteLen, this.charBuffer, this.charLen);
                this.bytePos = (this.byteLen = 0);
            }
            return this.charLen;
        Block_5:
            return this.charLen;
        Block_9:
            return this.charLen;
        }
        private bool IsPreamble()
        {
            if (!this._checkPreamble)
            {
                return this._checkPreamble;
            }
            int num = (this.byteLen >= this._preamble.Length) ? (this._preamble.Length - this.bytePos) : (this.byteLen - this.bytePos);
            int i = 0;
            while (i < num)
            {
                if (this.byteBuffer[this.bytePos] != this._preamble[this.bytePos])
                {
                    this.bytePos = 0;
                    this._checkPreamble = false;
                    break;
                }
                i++;
                this.bytePos++;
            }
            if (this._checkPreamble && this.bytePos == this._preamble.Length)
            {
                this.CompressBuffer(this._preamble.Length);
                this.bytePos = 0;
                this._checkPreamble = false;
                this._detectEncoding = false;
            }
            return this._checkPreamble;
        }
        private void DetectEncoding()
        {
            if (this.byteLen < 2)
            {
                return;
            }
            this._detectEncoding = false;
            bool flag = false;
            if (this.byteBuffer[0] == 254 && this.byteBuffer[1] == 255)
            {
                this.encoding = new UnicodeEncoding(true, true);
                this.CompressBuffer(2);
                flag = true;
            }
            else if (this.byteBuffer[0] == 255 && this.byteBuffer[1] == 254)
            {
                if (this.byteLen < 4 || this.byteBuffer[2] != 0 || this.byteBuffer[3] != 0)
                {
                    this.encoding = new UnicodeEncoding(false, true);
                    this.CompressBuffer(2);
                    flag = true;
                }
                else
                {
                    this.encoding = new UTF32Encoding(false, true);
                    this.CompressBuffer(4);
                    flag = true;
                }
            }
            else if (this.byteLen >= 3 && this.byteBuffer[0] == 239 && this.byteBuffer[1] == 187 && this.byteBuffer[2] == 191)
            {
                this.encoding = Encoding.UTF8;
                this.CompressBuffer(3);
                flag = true;
            }
            else if (this.byteLen >= 4 && this.byteBuffer[0] == 0 && this.byteBuffer[1] == 0 && this.byteBuffer[2] == 254 && this.byteBuffer[3] == 255)
            {
                this.encoding = new UTF32Encoding(true, true);
                this.CompressBuffer(4);
                flag = true;
            }
            else if (this.byteLen == 2)
            {
                this._detectEncoding = true;
            }
            if (flag)
            {
                this.decoder = this.encoding.GetDecoder();
                this._maxCharsPerBuffer = this.encoding.GetMaxCharCount(this.byteBuffer.Length);
                this.charBuffer = new char[this._maxCharsPerBuffer];
            }
        }
        private void CompressBuffer(int n)
        {
            InternalBlockCopy(this.byteBuffer, n, this.byteBuffer, 0, this.byteLen - n);
            this.byteLen -= n;
        }

        [TargetedPatchingOptOut("Performance critical to inline this type of method across NGen image boundaries"), SecuritySafeCritical]
        internal static string GetResourceString(string key)
        {
            return GetResourceFromDefault(key);
        }
        [SecurityCritical]
        [MethodImpl(MethodImplOptions.InternalCall)]
        internal static extern string GetResourceFromDefault(string key);
        [SecuritySafeCritical]
        [MethodImpl(MethodImplOptions.InternalCall)]
        internal static extern void InternalBlockCopy(Array src, int srcOffsetBytes, Array dst, int dstOffsetBytes, int byteCount);
    }
}

testharness

        string testString = "A String with a newline herern and a null here then another newlinern then another null";
        StreamWriter writer = new StreamWriter(@"c:temptestfile123.txt");
        writer.Write(testString);
        writer.Flush();
        writer.Close();            
        writer.Dispose();
        List<string> strings = new List<string>();
        List<string> strings2 = new List<string>();
        MemoryStream memStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(testString));
        using (CustomStreamReader reader = new CustomStreamReader(memStream))
        {
            bool setPos = true;
            while (reader.EndOfStream == false)
            {
                if (setPos) 
                { 
                    memStream.Position = 0;
                    setPos = false;
                }                    
                strings.Add(reader.ReadLineOrNullString());
            }
        }
        using (CustomStreamReader reader = new CustomStreamReader(@"c:temptestfile123.txt"))
        {
            bool setPos = true;
            while (reader.EndOfStream == false)
            {
                /*if (setPos)
                {
                    memStream.Position = 0;
                    setPos = false;
                }*/
                strings2.Add(reader.ReadLineOrNullString());
            }
        }
        System.Diagnostics.Debugger.Break();
        using (StreamReader read = new StreamReader(txtCSfileName.Text.Trim()))
        {
            while (read.Peek() > 0)
            {
                listSMtext.Items.Add(read.ReadLine());
            }
        };

使用上述代码查找自定义结束字符。

使用波纹码来查找自定义结束char:

using (TcpClient Client = new TcpClient(this.Host, this.Port))
{
   using (NetworkStream stream = Client.GetStream())
   {
      StreamReader reader = new StreamReader(stream);
      //just use the bellow code like(from sr.EndOfStream to reader.Peek()>=0)
      while (reader.Peek() >= 0)
      {
         String message = reader.ReadLine();
         ProcessMessage(message);
      }
   }
}

相关内容

最新更新