如何写有大小限制(100KB)的json文件,一旦大小交叉,就写新文件



我有一个blocking集合,它由一些app1填充数据。

我已经订阅了blocking集合,需要写一个包含以下案例的文件,

  1. 开始编写文件
  2. 如果文件大小超过100kb,请关闭第一个文件并启动一个新文件
  3. 如果没有来自app1的数据,比如说1分钟,那么关闭文件

目前使用以下代码,我只能将每个blocking集合写入每个文件,请建议如何继续执行我的上述要求。

class Program
{
private static BlockingCollection<Message> messages = new BlockingCollection<Message>();
private static void Producer()
{
int ctr = 1;
while (true)
{
messages.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
Thread.Sleep(1000);
ctr++;
}
}
private static void Consumer()
{
foreach (var message in messages.GetConsumingEnumerable())
{
Console.WriteLine(JsonConvert.SerializeObject(message));
using (var streamWriter = File.CreateText(Path.Combine(@"C:TEMP", $"File-{ DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
{
using (var writer = new JsonTextWriter(streamWriter))
{
writer.Formatting = Formatting.Indented;
writer.WriteStartObject();
writer.WritePropertyName("Data");
writer.WriteStartArray();
writer.WriteRawValue(JsonConvert.SerializeObject(message));
writer.WriteEndArray();
writer.WriteEndObject();
}
}
}
}
static void Main(string[] args)
{
var producer = Task.Factory.StartNew(() => Producer());
var consumer = Task.Factory.StartNew(() => Consumer());
Console.Read();
}
}

这将获取任何大小的消息,并将其划分到不同的JSON文件中。您需要使用maxchar指定大小。在此之前,您必须像这样检查最后一个文件的大小,如果else创建一个新文件并分割消息,则必须传递相同的文件名和新大小。

using System;
using System.IO;
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;

using System.Collections;

using Newtonsoft.Json;
using System.Runtime.Remoting.Messaging;
using System.Text;
namespace Program
{
class Program
{
public static string last_path = "";
public static readonly string BYE = "bye";
private static BlockingCollection<Message> messages = new BlockingCollection<Message>();
private static void Producer()
{
int ctr = 1;
while (true)
{
messages.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
Thread.Sleep(1000);
ctr++;
}
}
private static void Consumer()
{
foreach (var message in messages.GetConsumingEnumerable())
{
Console.WriteLine(JsonConvert.SerializeObject(message));
string json = JsonConvert.SerializeObject(message);
int maxchar = 102400;
if (last_path != "")
{
long length = new FileInfo(last_path).Length;
if (length < maxchar)
{
maxchar = maxchar - unchecked((int)length);
dividefile(last_path, maxchar, json);
}
else
{
dividefile("", maxchar, json);
}
}
else
{
dividefile("", maxchar, json);
}
}
}
public static void dividefile(string path, int maxchar, string message)
{
//FileStream fileStream = new FileStream(yourfile, FileMode.Open, FileAccess.Read);
byte[] byteArray = Encoding.UTF8.GetBytes(message);
MemoryStream stream = new MemoryStream(byteArray);
using (StreamReader streamReader = new StreamReader(stream))
{
Int64 x1 = stream.Length;
char[] fileContents = new char[maxchar];
int charsRead = streamReader.Read(fileContents, 0, maxchar);
// Can't do much with 0 bytes
if (charsRead == 0)
throw new Exception("File is 0 bytes");
while (charsRead > 0)
{
x1 = x1 - maxchar;
if (x1 > 0)
{
string s = new string(fileContents);
if (path == "")
{
last_path = Path.Combine(@"C:TEMP", $"File-{ DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json");
path = "";
}
else
{
last_path = path;
}
AppendTransaction(last_path, s);
charsRead = streamReader.Read(fileContents, 0, maxchar);
}
else
{
int m = (int)(((x1 + maxchar) % maxchar));
string messagechunk = new string(fileContents, 0, m);
if (path == "")
{
last_path = Path.Combine(@"C:TEMP", $"File-{ DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json");
path = "";
}
else
{
last_path = path;
}
AppendTransaction(last_path, messagechunk);
charsRead = streamReader.Read(fileContents, 0, m);
}
}
}
}
private static void AppendTransaction(string path , string  transaction)
{
string filename = path;
bool firstTransaction = !File.Exists(filename);
JsonSerializer ser = new JsonSerializer();
ser.Formatting = Formatting.Indented;
ser.TypeNameHandling = TypeNameHandling.Auto;
using (var fs = new FileStream(filename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read))
{
Encoding enc = firstTransaction ? new UTF8Encoding(true) : new UTF8Encoding(false);
using (var sw = new StreamWriter(fs, enc))
using (var jtw = new JsonTextWriter(sw))
{
if (firstTransaction)
{
sw.Write("[");
sw.Flush();
}
else
{
fs.Seek(-Encoding.UTF8.GetByteCount("]"), SeekOrigin.End);
sw.Write(",");
sw.Flush();
}
ser.Serialize(jtw, transaction);
sw.Write(']');
}
}

}
static void Main(string[] args)
{
var producer = Task.Factory.StartNew(() => Producer());
var consumer = Task.Factory.StartNew(() => Consumer());
Console.Read();
}
class Message
{
public int ProductorThreadID { get; set; }
public int CustomerThreadID { get; set; }
public string key { get; set; }
public string content { get; set; }
public string Name { get; internal set; }
public int Id { get; internal set; }
public bool endThread()
{
return string.Compare(key, Program.BYE) == 0;
}
public string ToString(bool isProductor)
{
return string.Format("{0} Thread ID {1} : {2}", isProductor ? "Productor" : "Customer",
isProductor ? ProductorThreadID.ToString() : CustomerThreadID.ToString(),
content);
}
}
}
}

相关内容

  • 没有找到相关文章

最新更新