我已经使用System.IO.Pipes(控制台应用程序和GUI应用程序)在同一台计算机上的两个独立进程之间建立了通信。控制台应用程序NamedPipeServerStream创建一个管道,而GUI应用程序NamedPipeClientStream连接到现有管道。我经常更新GUI。我的问题是,命名管道技术是处理这种情况的最有效方法吗。第二个问题是Reactive Extensions RX是否更适合这种情况,以及如何适应?提前谢谢。
服务器
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
namespace PipeApplicationSender
{
class ProgramPipeTest
{
static void Main(string[] args)
{
ProgramPipeTest Server = new ProgramPipeTest();
Thread ServerThread = new Thread( Server.ThreadStartServer );
ServerThread.Start();
}
public void ThreadStartServer()
{
// Create a name pipe
using (NamedPipeServerStream pipeStream = new NamedPipeServerStream("mytestpipe"))
{
// Wait for a connection
pipeStream.WaitForConnection();
Console.WriteLine("[Server] Pipe connection established");
using (StreamReader sr = new StreamReader(pipeStream))
{
string temp;
// We read a line from the pipe and print it together with the current time
while ((temp = sr.ReadLine()) != null)
{
Console.WriteLine("{0}: {1}", DateTime.Now, temp);
}
}
}
}
客户端
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
namespace PipeApplicationReceiver
{
class ProgramPipeReceive
{
static void Main(string[] args)
{
ProgramPipeReceive Server = new ProgramPipeReceive ();
Thread ServerThread = new Thread( Server.ThreadStartServer );
ServerThread.Start();
}
public void ThreadStartClient(object obj)
{
// Ensure that we only start the client after the server has created the pipe
ManualResetEvent SyncClientServer = (ManualResetEvent)obj;
// Only continue after the server was created -- otherwise we just fail badly
// SyncClientServer.WaitOne();
using (NamedPipeClientStream pipeStream = new NamedPipeClientStream("mytestpipe"))
{
// The connect function will indefinately wait for the pipe to become available
// If that is not acceptable specify a maximum waiting time (in ms)
pipeStream.Connect();
Console.WriteLine("[Client] Pipe connection established");
using (StreamWriter sw = new StreamWriter(pipeStream))
{
sw.AutoFlush = true;
string temp;
while ((temp = Console.ReadLine()) != null)
{
if (temp == "quit") break;
sw.WriteLine(temp);
}
}
}
}
}
}
https://gist.github.com/hanishi/7139122
我刚刚用Rx做了一个完整的IpcServer和IpcClient。我希望你喜欢。
我自己需要写一个,所以这是我所拥有的。这个不会阻塞调用线程。
用法:
PipeStreamObservable.Create(out stream, "testpipe")
.Subscribe(str =>
{
Console.WriteLine(str);
},
Console.WriteLine,
Console.WriteLine);
OR
var formatter = new BinaryFormatter();
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
NamedPipeServerStream stream;
PipeStreamObservable.Create<Hashtable>(out stream, "testpipe", formatter)
.Subscribe(table =>
{
Console.WriteLine(table["abc"]);
},
Console.WriteLine,
Console.WriteLine);
Code:
public static class PipeStreamObservable
{
private static readonly PipeSecurity PipeSecurity;
private static readonly ILog Logger = LogManager.GetLogger("weldR");
static PipeStreamObservable()
{
PipeSecurity = new PipeSecurity();
PipeSecurity.AddAccessRule(
new PipeAccessRule(WindowsIdentity.GetCurrent().User, PipeAccessRights.FullControl, AccessControlType.Allow)
);
PipeSecurity.AddAccessRule(
new PipeAccessRule(
new SecurityIdentifier(WellKnownSidType.AuthenticatedUserSid, null), PipeAccessRights.ReadWrite, AccessControlType.Allow
)
);
}
public static void Write<T>(this NamedPipeClientStream stream, T type, IFormatter formatter) where T : ISerializable
{
formatter.Serialize(stream, type);
}
public static void Write<T>(this NamedPipeServerStream stream, T type, IFormatter formatter) where T : ISerializable
{
formatter.Serialize(stream, type);
}
public static int Write(this NamedPipeClientStream stream, string str)
{
var buffer = Encoding.Unicode.GetBytes(str);
var len = buffer.Length;
if (len > UInt16.MaxValue)
{
len = (int)UInt16.MaxValue;
}
stream.WriteByte((byte)(len / 256));
stream.WriteByte((byte)(len & 255));
stream.Write(buffer, 0, len);
stream.Flush();
return buffer.Length + 2;
}
public static int Write(this NamedPipeServerStream stream, string str)
{
var buffer = Encoding.Unicode.GetBytes(str);
var len = buffer.Length;
if (len > UInt16.MaxValue)
{
len = (int)UInt16.MaxValue;
}
stream.WriteByte((byte)(len / 256));
stream.WriteByte((byte)(len & 255));
stream.Write(buffer, 0, len);
stream.Flush();
return buffer.Length + 2;
}
public static IObservable<T> Create<T>(out NamedPipeServerStream stream, string pipeName, IFormatter formatter)
{
stream = new NamedPipeServerStream(pipeName, PipeDirection.InOut,
1, PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
var serverStream = stream;
return Observable.Create<T>(o =>
{
var currentStateSubscription = new SerialDisposable();
return NewThreadScheduler.Default.Schedule(
new FormatterIterator<T>(serverStream, formatter), (state, self) =>
currentStateSubscription.Disposable = state.ReadNext()
.Subscribe(str =>
{
self(state);
o.OnNext(str);
},
o.OnError,
() =>
{
currentStateSubscription.Dispose();
o.OnCompleted();
})
);
});
}
public static IObservable<string> Create(out NamedPipeServerStream stream, string pipeName)
{
stream = new NamedPipeServerStream(pipeName, PipeDirection.InOut,
1, PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
var serverStream = stream;
return Observable.Create<string>(o =>
{
var currentStateSubscription = new SerialDisposable();
return NewThreadScheduler.Default.Schedule(
new StringIterator(serverStream), (state, self) =>
currentStateSubscription.Disposable = state.ReadNext()
.Subscribe(str =>
{
self(state);
o.OnNext(str);
},
o.OnError,
() =>
{
currentStateSubscription.Dispose();
o.OnCompleted();
})
);
});
}
public static IObservable<string> Create(out NamedPipeClientStream stream, string server, string pipeName)
{
stream = new NamedPipeClientStream(server, pipeName, PipeDirection.InOut,
PipeOptions.Asynchronous);
var serverStream = stream;
return Observable.Create<string>(o =>
{
var currentStateSubscription = new SerialDisposable();
return NewThreadScheduler.Default.Schedule(
new StringIterator(serverStream), (state, self) =>
currentStateSubscription.Disposable = state.ReadNext()
.Subscribe(str =>
{
self(state);
o.OnNext(str);
},
o.OnError,
() =>
{
currentStateSubscription.Dispose();
o.OnCompleted();
})
);
});
}
public static IObservable<T> Create<T>(out NamedPipeClientStream stream,
string server,
string pipeName,
IFormatter formatter)
{
stream = new NamedPipeClientStream(server, pipeName, PipeDirection.InOut,
PipeOptions.Asynchronous);
var serverStream = stream;
return Observable.Create<T>(o =>
{
var currentStateSubscription = new SerialDisposable();
return NewThreadScheduler.Default.Schedule(
new FormatterIterator<T>(serverStream, formatter), (state, self) =>
currentStateSubscription.Disposable = state.ReadNext()
.Subscribe(str =>
{
self(state);
o.OnNext(str);
},
o.OnError,
() =>
{
currentStateSubscription.Dispose();
o.OnCompleted();
})
);
});
}
private delegate bool StreamHandler<T>(Stream input, out T output);
public interface IIterator<out T>
{
IObservable<T> ReadNext();
}
private abstract class ReaderState<T> : IIterator<T>
{
public abstract IObservable<T> ReadNext();
}
private class ReadReadyState<T> : ReaderState<T>
{
private readonly Stream _stream;
private readonly StreamHandler<T> _handler;
internal ReadReadyState(Stream stream, StreamHandler<T> handler)
{
_stream = stream;
_handler = handler;
}
public override IObservable<T> ReadNext()
{
return Observable.Create<T>(o =>
{
try
{
T value;
if (_handler(_stream, out value))
o.OnNext(value);
}
catch (SerializationException e)
{
Logger.Debug(e);
o.OnCompleted();
}
catch (Exception e)
{
Logger.Debug(e);
o.OnError(e);
return _stream;
}
return Disposable.Empty;
});
}
}
private class ServerStreamReader<T> : IIterator<T>
{
private ReaderState<T> _currentState;
private readonly NamedPipeServerStream _stream;
private readonly StreamHandler<T> _handler;
internal ServerStreamReader(NamedPipeServerStream stream, StreamHandler<T> handler)
{
_stream = stream;
_handler = handler;
_currentState = new ConnectionWaitState<T>(this);
}
private class ConnectionWaitState<T1> : ReaderState<T1>
{
private readonly ServerStreamReader<T1> _parent;
internal ConnectionWaitState(ServerStreamReader<T1> parent)
{
_parent = parent;
}
public override IObservable<T1> ReadNext()
{
try
{
_parent._stream.WaitForConnection();
}
catch (IOException)
{
}
_parent._currentState = new ReadReadyState<T1>(_parent._stream, _parent._handler);
return _parent._currentState.ReadNext();
}
}
public IObservable<T> ReadNext()
{
return _currentState.ReadNext();
}
}
private class ClientStreamReader<T> : IIterator<T>
{
private ReaderState<T> _currentState;
private readonly NamedPipeClientStream _stream;
private readonly StreamHandler<T> _handler;
internal ClientStreamReader(NamedPipeClientStream stream, StreamHandler<T> handler)
{
_stream = stream;
_handler = handler;
_currentState = new ConnectionWaitState<T>(this);
}
private class ConnectionWaitState<T1> : ReaderState<T1>
{
private readonly ClientStreamReader<T1> _parent;
internal ConnectionWaitState(ClientStreamReader<T1> parent)
{
_parent = parent;
}
public override IObservable<T1> ReadNext()
{
try
{
_parent._stream.Connect();
}
catch (IOException)
{
}
_parent._currentState = new ReadReadyState<T1>(_parent._stream, _parent._handler);
return _parent._currentState.ReadNext();
}
}
public IObservable<T> ReadNext()
{
return _currentState.ReadNext();
}
}
private sealed class FormatterIterator<T> : IIterator<T>
{
private readonly IIterator<T> _iterator;
private readonly IFormatter _formatter;
public FormatterIterator(NamedPipeServerStream source, IFormatter formatter)
{
_iterator = new ServerStreamReader<T>(source, DeserializeWithFormatter);
_formatter = formatter;
}
public FormatterIterator(NamedPipeClientStream source, IFormatter formatter)
{
_iterator = new ClientStreamReader<T>(source, DeserializeWithFormatter);
_formatter = formatter;
}
public IObservable<T> ReadNext()
{
return _iterator.ReadNext();
}
private bool DeserializeWithFormatter(Stream stream, out T value)
{
try
{
value = (T) _formatter.Deserialize(stream);
return true;
}
catch (SerializationException)
{
value = default(T);
return false;
}
}
}
private sealed class StringIterator : IIterator<string>
{
private readonly IIterator<string> _iterator;
private static bool DeserializeToString(Stream stream, out string value)
{
var len = stream.ReadByte();
if (len < 0)
{
value = null;
return false;
}
len *= 256;
len += stream.ReadByte();
var buffer = new byte[len];
stream.Read(buffer, 0, len);
value = Encoding.Unicode.GetString(buffer);
return true;
}
public StringIterator(NamedPipeServerStream source)
{
_iterator = new ServerStreamReader<string>(source, DeserializeToString);
}
public StringIterator(NamedPipeClientStream source)
{
_iterator = new ClientStreamReader<string>(source, DeserializeToString);
}
public IObservable<string> ReadNext()
{
return _iterator.ReadNext();
}
}
}