如何处理传入的数据以在通道中发挥作用?



我编写了一个函数,它将根据数组的数字1将传递给它的数组拆分为其他数组。例如,3,2,1,1,1,8,2,2,2,9,9将被转换为[2,1,1],[8],[9,9]。

static async Task Main(string[] args) {
List<int> list = new List<int>() { 1, 2, 2, 5, 6, 3, 9, 9, 9 };
var source1 = Try(list);
foreach (var x in source1)
{
x.ForEach(i => Console.Write("{0} ", i));
Console.WriteLine();
}
}
static List<List<int>> Try(List<int> data, int d = 0)
{
var myChannel = Channel.CreateUnbounded<int>();
List<List<int>> outList = new List<List<int>>();
for (int p = 0; p < data.Count;)
{
d = data[p++];    // Get the size of the array
d = (d <= 0) ? 0 : d; // If the size is not correct, then we correct
List<int> tempList = new List<int>(new int[d]); //creating a temp list
for (var i = 0; i < d && p < data.Count; i++, p++)
tempList[i] = data[p];   // Filling the array with data
outList.Add(tempList);           // Result, Combining split arrays into one single sheet
}
return outList;
}

我想重写上面的代码以使用Сhannel。也就是说,函数的输入接收到一个带有数字序列的Сhannel。该函数处理它们并依次返回分隔的数组。但是如何使用Channel呢?

我想尝试读取传输的Сhannel的大小,但这是不可能的,以及迭代传输的Сhannel的元素。我想读取它们,然后将它们赋值给List,处理它们并逐个返回分隔的数组。

List<int> list = new List<int>() { 1, 2, 2, 5, 6, 3, 9, 9, 9 };
var myChannel = Channel.CreateUnbounded<int>() ;
_ = Task.Factory.StartNew(async () =>
{
foreach (var j in list)
{
await myChannel.Writer.WriteAsync(j);
await Task.Delay(500); 
}
});
await foreach (var item in FetchItems2(myChannel))
{
Console.WriteLine($"{item}");
}
static async IAsyncEnumerable<int[]> FetchItems2(Channel<int> data, int d = 0)
{
List<int> innerList = new List<int>();  
/*broken code taken from a past solution without using channels
var item = await data.Reader.ReadAsync();
for (int p = 0; p < data.Count;)
{
d = data[p++];    // Get the size of the array
d = (d <= 0) ? 0 : d; // If the size is not correct, then we correct
int[] arr = new int[d]; // Create an array
for (var i = 0; i < d && p < data.Count; i++, p++)
arr[i] = data[p];   // Filling the array with data
*/
yield return arr;           // returning split arrays
}
}

寻找解决方案:1函数最终确定,并在传递所有元素

时添加await
static async Task Main(string[] args)
{
List<int> list = new List<int>() { 1, 2, 2, 5, 6, 3, 9, 9, 9 };
var myChannel = Channel.CreateUnbounded<int>() ;
_ = Task.Factory.StartNew(async () =>
{
foreach (var j in list)
{
await myChannel.Writer.WriteAsync(j);
await Task.Delay(100); // just to see
}
}); // data in channel

Console.WriteLine($"Start");
await foreach (var item in FetchItems2(myChannel, list.Count))
{
//Console.WriteLine($"{item}");
foreach(var x in item)
{
Console.Write($"{x} ");
}
Console.WriteLine();
}
Console.WriteLine($"End");
}
static async IAsyncEnumerable<int[]> FetchItems2(Channel<int> data, int size, int d = 0)
{
List<int> innerList = new List<int>();
for(int i = 0; i < size; i++)
{
innerList.Add(await data.Reader.ReadAsync());
}
for (int p = 0; p < innerList.Count;)
{
d = innerList[p++];    // Get the size of the array
d = (d <= 0) ? 0 : d; // If the size is not correct, then we correct
int[] arr = new int[d]; // Create an array
// create new list

for (var i = 0; i < d && p < innerList.Count; i++, p++)
arr[i] = innerList[p];   // Filling the array with data
await Task.Delay(750);
yield return arr;           // Result
}
}

代码几乎相同。以另一种方式,数据从通道写入到List。数组被正确读取,然后通过yield

逐行输出数据

最新更新