我在玩protobuf-net和WCF。这是我创建的代码:
public class MobileServiceV2
{
[WebGet(UriTemplate = "/some-data")]
[Description("returns test data")]
public Stream GetSomeData()
{
WebOperationContext.Current.OutgoingResponse.ContentType = "application/x-protobuf";
var ms = new MemoryStream();
ProtoBuf.Serializer.Serialize(ms, new MyResponse { SomeData = "Test data here" });
return ms;
}
}
[DataContract]
public class MyResponse
{
[DataMember(Order = 1)]
public string SomeData { get; set; }
}
当我在Fiddler中查看时,我可以看到合适的传出内容类型,所有内容看起来都很好,但我得到的回复是空的。IE提示下载文件,但此文件为空。序列化程序不工作吗?或者我就是做得不对?
编辑:
我将以下代码添加到方法中,是的,它正确地序列化了。我从WCF返回流的方式有问题。。
using (var file = File.Create("C:\test.bin"))
{
Serializer.Serialize(file, new MyResponse { SomeData = "Test data here" });
}
只需写入MemoryStream并倒带即可。在这种情况下,不要执行Dispose()操作:
var ms = new MemoryStream();
Serializer.Serialize(ms, obj);
ms.Position = 0;
return ms;
然而,这确实意味着它在内存中进行缓冲。我可以尝试并想出一些伏都教来避免这种情况,但这会非常复杂。
如果我正确理解这个问题,那么您正在尝试使用流式WCF绑定。在这种情况下,您可以尝试将数据拆分为块,这些块在客户端上以相同的方式单独序列化和反序列化。唯一需要注意的是接收端WCF提供的Stream实现——您需要自己包装它并管理读取。以下是我用来促进这一点的类:
public static class StreamingUtility
{
public static IEnumerable<T> FromStream<T>(this Stream value, Action<T> perItemCallback = null)
{
List<T> result = new List<T>();
StreamProxy sp = new StreamProxy(value);
try
{
while (sp.CanRead)
{
T v = ProtoBuf.Serializer.DeserializeWithLengthPrefix<T>((Stream)sp, ProtoBuf.PrefixStyle.Base128);
if (perItemCallback != null)
perItemCallback(v);
result.Add(v);
}
}
catch { }
return result;
}
public static StreamingContent<T> SingleToStream<T>(this T value)
{
return new StreamingContent<T>(new T[] { value });
}
public static StreamingContent<T> ToStream<T>(this IEnumerable<T> value)
{
return new StreamingContent<T>(value);
}
public class StreamingContent<T> : Stream
{
private bool _canRead = true;
private ManualResetEventSlim _dataIsReady = new ManualResetEventSlim(false);
private bool _noMoreAdditions = false;
private long _readingOffset = 0;
//private IFormatter _serializer = new BinaryFormatter(null, new StreamingContext(StreamingContextStates.CrossMachine));
private IEnumerable<T> _source = null;
private MemoryStream _stream = new MemoryStream();
public static StreamingContent<T> Clone(Stream origin)
{
return new StreamingContent<T>(origin);
}
private StreamingContent(Stream origin)
{
byte[] b = new byte[65536];
while (true)
{
int count = origin.Read(b, 0, b.Length);
if (count > 0)
{
_stream.Write(b, 0, count);
}
else
break;
}
_noMoreAdditions = true;
}
public StreamingContent(IEnumerable<T> source)
{
if (!s_initialized)
{
StreamingUtility.Initialize();
StreamingUtility.s_initialized = true;
}
_source = source.ToList();
if (source.Count() > 0)
{
new Thread(new ParameterizedThreadStart(obj =>
{
StreamingContent<T> _this = obj as StreamingContent<T>;
foreach (T item in _this._source)
{
lock (_this._stream)
{
if (_this._noMoreAdditions) break;
_stream.Seek(0, SeekOrigin.End);
ProtoBuf.Serializer.SerializeWithLengthPrefix<T>(_this._stream, item, ProtoBuf.PrefixStyle.Base128);
//_serializer.Serialize(_this._stream, item);
_dataIsReady.Set();
}
}
lock (_this._stream)
{
_this._noMoreAdditions = true;
_dataIsReady.Set();
}
})) { IsBackground = true }.Start(this);
}
else
{
_canRead = false;
}
}
public override bool CanRead
{
get { return _canRead; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override long Length
{
get
{
while (!_noMoreAdditions) Thread.Sleep(20);
return _stream.Length;
}
}
public override long Position
{
get
{
throw new Exception("This stream does not support getting the Position property.");
}
set
{
throw new Exception("This stream does not support setting the Position property.");
}
}
public override void Close()
{
lock (_stream)
{
_noMoreAdditions = true;
_stream.Close();
}
}
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
if (!CanRead) return 0;
bool wait = false;
lock (_stream)
{
wait = !_dataIsReady.IsSet && !_noMoreAdditions;
}
if (wait)
{
_dataIsReady.Wait();
}
lock (_stream)
{
if (!_noMoreAdditions)
_dataIsReady.Reset();
if (_stream.Length > _readingOffset)
{
_stream.Seek(_readingOffset, SeekOrigin.Begin);
int res = _stream.Read(buffer, 0, count);
if (_noMoreAdditions && count + _readingOffset >= _stream.Length)
_canRead = false;
_readingOffset += res;
return res;
}
}
return 0;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new Exception("This stream does not support seeking.");
}
public override void SetLength(long value)
{
throw new Exception("This stream does not support setting the Length.");
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new Exception("This stream does not support writing.");
}
protected override void Dispose(bool disposing)
{
try
{
lock (_stream)
{
_noMoreAdditions = true;
_stream.Close();
}
}
catch { }
}
}
private class StreamProxy : Stream
{
private bool _canRead = true;
private bool _endOfMessage = false;
private Stream _internalStream;
private int _readPosition = 0;
private MemoryStream _storage = new MemoryStream();
private int _writePosition = 0;
public StreamProxy(Stream internalStream)
{
_internalStream = internalStream;
byte[] initialRequest = new byte[1000];
int length = _internalStream.Read(initialRequest, 0, 1000);
if (length != 0)
_storage.Write(initialRequest, 0, length);
else
_canRead = false;
_writePosition = length;
}
public override bool CanRead
{
get { return _canRead; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override long Length
{
get { throw new NotImplementedException(); }
}
public override long Position
{
get
{
return _readPosition;
}
set
{
throw new NotImplementedException();
}
}
public override void Flush()
{
}
public override int ReadByte()
{
byte[] res = new byte[1];
int g = Read(res, 0, 1);
return res[0];
}
public override int Read(byte[] buffer, int offset, int count)
{
int res = 0;
if (_readPosition + count > _writePosition)
{
/// add extra bytes to see if more data is available and we need to allow next read
int readSize = _readPosition + count - _writePosition;
if (readSize < 1024)
readSize = 1024;
byte[] read = new byte[readSize];
res = _internalStream.Read(read, 0, readSize);
if (res > 0)
{
_storage.Seek(_writePosition, SeekOrigin.Begin);
_writePosition += res;
_storage.Write(read, 0, res);
}
else if (res == 0)/// If the read returned 0, we're at the end
{
_endOfMessage = true;
}
if (res > 0 && res < readSize)
{
read = new byte[1024];
res = _internalStream.Read(read, 0, 1024);
if (res > 0)
{
_storage.Seek(_writePosition, SeekOrigin.Begin);
_writePosition += res;
_storage.Write(read, 0, res);
}
else if (res == 0)/// If the read returned 0, we're at the end
{
_endOfMessage = true;
}
}
}
_storage.Seek(_readPosition, SeekOrigin.Begin);
res = _storage.Read(buffer, offset, count);
_readPosition += res;
/// If end of message was reached and all the data was read from the
/// internal storage, mark CanRead as false
if (_readPosition >= _writePosition && _endOfMessage)
_canRead = false;
return res;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
}
}
要使用它,只需调用WCF接口方法,如下所示:
IEnumerable<SomeType> collection = ...
clannel.Method(collection.ToStream());
并在接收端这样阅读:
public void Method(Stream stream){
IEnumerable<SomeType> coll = stream.FromStream<SomeType>();
}
此实现仍在测试中,因此我将感谢您的任何意见。
请尝试这种方式
var ms = new MemoryStream();
using (var file = File.Create("C:\test.bin"))
{
Serializer.Serialize(file, new MyResponse { SomeData = "Test data here" });
file.CopyTo(ms);
}
return ms;//stream