🐋作者简介:博主是一位.Net开发者,同时也是RPA和低代码平台的践行者。
🐬个人主页:会敲键盘的肘子
🐰系列专栏:.Net实用方法总结
🦀专栏简介:博主针对.Net开发和C站问答过程中遇到的问题进行总结,形成本专栏,希望可以帮助到您解决问题。
🐶座右铭:总有一天你所坚持的会反过来拥抱你。

🌈写在前面:
System.IO.Pipelines 是一个库,旨在使在 .NET 中执行高性能 I/O 更加容易。 该库的目标为适用于所有 .NET 实现的 .NET Standard。
System.IO.Pipelines已构建为:
- 具有高性能的流数据分析功能。
- 减少代码复杂性。
👉本文关键字:System.IO.Pipelines、文件系统、方法示例、管道
文章目录
文件和流 I/O(输入/输出)是指在存储媒介中传入或传出数据。 在 .NET 中,System.IO 命名空间包含允许以异步方式和同步方式对数据流和文件进行读取和写入操作的类型。 这些命名空间还包含对文件执行压缩和解压缩的类型,以及通过管道和串行端口启用通信的类型。
文件是一个由字节组成的有序的命名集合,它具有永久存储。 在处理文件时,你将处理目录路径、磁盘存储、文件和目录名称。 相反,流是一个字节序列,可用于对后备存储进行读取和写入操作,后备存储可以是多个存储媒介之一(例如,磁盘或内存)。 正如存在除磁盘之外的多种后备存储一样,也存在除文件流之外的多种流(如网络、内存和管道流)。
文件和目录
流
读取器和编写器
下面的代码是典型的 TCP 服务器,它从客户机接收行分隔的消息(由 '\n' 分隔):
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
前面的代码有几个问题:
ReadAsync 可能无法接收整条消息(行尾)。stream.ReadAsync 的结果。 stream.ReadAsync 返回读取的数据量。ReadAsync 调用中读取多行的情况。byte 数组。要解决上述问题,需要进行以下更改:
缓冲传入的数据,直到找到新行。
分析缓冲区中返回的所有行。
该行可能大于 1KB(1024 字节)。 此代码需要调整输入缓冲区的大小,直到找到分隔符后,才能在缓冲区内容纳完整行。
请考虑使用缓冲池来避免重复分配内存。
下面的代码解决了其中一些问题:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
前面的代码很复杂,不能解决所识别的所有问题。 高性能网络通常意味着编写复杂的代码以使性能最大化。 System.IO.Pipelines 的设计目的是使编写此类代码更容易。
System.IO.Pipelines 已构建为:
Pipe 类可用于创建 PipeWriter/PipeReader 对。 写入 PipeWriter 的所有数据都可用于 PipeReader:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
我们已在上篇文章中介绍了Pipe 类的用法,大家可以去查看本专栏之前的文章。
示例
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
上述代码有两个循环:
FillPipeAsync 从 Socket 读取并写入 PipeWriter。ReadPipeAsync 从 PipeReader 读取并分析传入的行。没有分配显式缓冲区。 所有缓冲区管理都委托给 PipeReader 和 PipeWriter 实现。 委派缓冲区管理使使用代码更容易集中关注业务逻辑。
在第一个循环中:
PipeWriter 有多少数据已写入缓冲区。PipeReader。在第二个循环中,PipeReader 使用由 PipeWriter 写入的缓冲区。 缓冲区来自套接字。 对 PipeReader.ReadAsync 的调用:
ReadOnlySequence<byte> 形式读取的数据。IsCompleted,指示是否已到达数据结尾 (EOF)。找到行尾 (EOL) 分隔符并分析该行后:
PipeReader.AdvanceTo 以告知 PipeReader 已消耗和检查了多少数据。读取器和编写器循环通过调用 Complete 结束。 Complete 使基础管道释放其分配的内存。
理想情况下,读取和分析可协同工作:
通常,分析所花费的时间比仅从网络复制数据块所用时间更长:
为了获得最佳性能,需要在频繁暂停和分配更多内存之间取得平衡。
为解决上述问题,Pipe 提供了两个设置来控制数据流:
PauseWriterThreshold:确定在调用 FlushAsync 暂停之前应缓冲多少数据。
ResumeWriterThreshold:确定在恢复对 PipeWriter.FlushAsync 的调用之前,读取器必须观察多少数据。

Pipe 中的数据量超过 PauseWriterThreshold 时,返回不完整的 ValueTask<FlushResult>。ResumeWriterThreshold 时,返回完整的 ValueTask<FlushResult>。使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。
示例
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
通常在使用 async 和 await 时,异步代码会在 TaskScheduler 或当前 SynchronizationContext 上恢复。
在执行 I/O 时,对执行 I/O 的位置进行细粒度控制非常重要。 此控件允许高效利用 CPU 缓存。 高效的缓存对于 Web 服务器等高性能应用至关重要。 PipeScheduler 提供对异步回调运行位置的控制。 默认情况下:
SynchronizationContext,它将使用线程池运行回调。示例
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool 是 PipeScheduler 实现,用于对线程池的回调进行排队。 PipeScheduler.ThreadPool 是默认选项,通常也是最佳选项。 PipeScheduler.Inline 可能会导致意外后果,如死锁。
通常重用 Pipe 对象即可重置。 要重置管道,请在完成 PipeReader 和 PipeWriter 时调用 PipeReaderReset。
PipeReader 代表调用方管理内存。 在调用 PipeReader.ReadAsync 之后始终调用 PipeReader.AdvanceTo。 这使 PipeReader 知道调用方何时用完内存,以便可以对其进行跟踪。 从 PipeReader.ReadAsync 返回的 ReadOnlySequence<byte> 仅在调用 PipeReader.AdvanceTo 之前有效。 调用 PipeReader.AdvanceTo 后,不能使用 ReadOnlySequence<byte>。
PipeReader.AdvanceTo 采用两个 SequencePosition 参数:
将数据标记为“已使用”意味着管道可以将内存返回到底层缓冲池。 将数据标记为“已观察”可控制对 PipeReader.ReadAsync 的下一个调用的操作。 将所有内容都标记为“已观察”意味着下次对 PipeReader.ReadAsync 的调用将不会返回,直到有更多数据写入管道。 任何其他值都将使对 PipeReader.ReadAsync 的下一次调用立即返回并包含已观察到的和未观察到的数据,但不是已被使用的数据。
尝试读取流数据时会出现以下几种典型模式:
以下示例使用 TryParseLines 方法分析来自 ReadOnlySequence<byte> 的消息。 TryParseLines 分析单条消息并更新输入缓冲区,以从缓冲区中剪裁已分析的消息。 TryParseLines 不是 .NET 的一部分,它是在以下部分中使用的用户编写的方法。
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
下面的代码从 PipeReader 读取一条消息并将其返回给调用方。
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
前面的代码:
SequencePosition 并检查 SequencePosition 以指向已剪裁的输入缓冲区的开始。因为 TryParseLines 从输入缓冲区中删除了已分析的消息,所以更新了两个 SequencePosition 参数。 通常,分析来自缓冲区的单条消息时,检查的位置应为以下位置之一:
单条消息案例最有可能出现错误。 将错误的值传递给“已检查”可能会导致内存不足异常或无限循环 。 有关详细信息,请参阅本文中的 PipeReader 常见问题部分。
以下代码从 PipeReader 读取所有消息,并在每条消息上调用 ProcessMessageAsync。
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync:
CancellationToken,则会引发 OperationCanceledException。PipeReader.CancelPendingRead 将导致对 PipeReader.ReadAsync 的当前或下次调用返回 ReadResult,并将 IsCanceled 设置为 true。 这对于以非破坏性和非异常的方式停止现有的读取循环非常有用。private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
consumed 或 examined 可能会导致读取已读取的数据。buffer.End 作为检查对象可能会导致以下问题:
PipeReader.AdvanceTo(position, buffer.End)。consumed 或 examined 可能会导致无限循环。 例如,如果 buffer.Start 没有更改,则 PipeReader.AdvanceTo(buffer.Start) 将导致在下一个对 PipeReader.ReadAsync 的调用在新数据到来之前立即返回。consumed 或 examined 可能会导致无限缓冲(最终导致 OOM)。PipeReader.AdvanceTo 之后使用 ReadOnlySequence<byte> 可能会导致内存损坏(在释放之后使用)。PipeReader.Complete/CompleteAsync 可能会导致内存泄漏。ReadResult.Buffer.IsEmpty 和 ReadResult.IsCompleted。 如果错误执行此操作,可能会导致无限循环。当 IsCompleted 被设置为 true 时,ReadResult 可能会返回最后一段数据。 在退出读循环之前不读取该数据将导致数据丢失。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
如果 Result.IsCompleted 是 true,则以下逻辑可能会导致无限循环,但缓冲区中永远不会有完整的消息。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
下面是另一段具有相同问题的代码。 该代码在检查 ReadResult.IsCompleted 之前检查非空缓冲区。 由于该代码位于 else if 中,如果缓冲区中没有完整的消息,它将永远循环。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
在分析单条消息时,如果无条件调用 PipeReader.AdvanceTo 而 buffer.End 位于 examined 位置,则可能导致应用程序变为无响应。 对 PipeReader.AdvanceTo 的下次调用将在以下情况下返回:
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
在满足以下条件的情况下,以下代码将保持缓冲,直到发生 OutOfMemoryException:
PipeReader 返回的数据不会生成完整的消息。 例如,它不会生成完整的消息,因为另一端正在编写一条大消息(例如,一条为 4GB 的消息)。警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
当写入读取缓冲区的帮助程序时,应在调用 Advance 之前复制任何返回的有效负载。 下面的示例将返回 Pipe 已丢弃的内存,并可能将其重新用于下一个操作(读/写)。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。
PipeWriter 管理用于代表调用方写入的缓冲区。 PipeWriter 可实现 IBufferWriter。 IBufferWriter<byte> 使得无需额外的缓冲区副本就可以访问缓冲区来执行写入操作。
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
之前的代码:
PipeWriter 请求至少 5 个字节的缓冲区。"Hello" 的字节写入返回的 Memory<byte>。PipeWriter,以便将字节发送到基础设备。以前的写入方法使用 PipeWriter 提供的缓冲区。 它可能还使用了 PipeWriter.WriteAsync,该项执行以下操作:
PipeWriter。GetSpan``Advance,然后调用 FlushAsync。async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync 支持传递 CancellationToken。 如果令牌在刷新挂起时被取消,则传递 CancellationToken 将导致 OperationCanceledException。 PipeWriter.FlushAsync 支持通过 PipeWriter.CancelPendingFlush 取消当前刷新操作而不引发异常的方法。 调用 PipeWriter.CancelPendingFlush 将导致对 PipeWriter.FlushAsync 或 PipeWriter.WriteAsync 的当前或下次调用返回 FlushResult,并将 IsCanceled 设置为 true。 这对于以非破坏性和非异常的方式停止暂停刷新非常有用。
无法保证连续的调用将返回相同的缓冲区或相同大小的缓冲区。
在调用 Advance 之后,必须请求一个新的缓冲区来继续写入更多数据。 不能写入先前获得的缓冲区。
如果未完成对 FlushAsync 的调用,则调用 GetMemory 或 GetSpan 将不安全。
如果未刷新数据,则调用 Complete 或 CompleteAsync 可能导致内存损坏。
以下提示将帮助你成功使用 System.IO.Pipelines 类:
awaitPipeWriter.FlushAsync,并始终检查 FlushResult.IsCompleted。 如果 IsCompleted 为 true,则中止写入,因为这表示读取器已完成,不再关心所写入的内容。PipeReader 有权访问的内容后调用 PipeWriter.FlushAsync。FlushAsync 完成之前无法启动,请勿调用 FlushAsync,因为这可能会导致死锁。PipeReader 或 PipeWriter 或访问它们。 这些类型不是线程安全的。AdvanceTo 或完成 PipeReader 后,切勿访问 ReadResult.Buffer。IDuplexPipe 是支持读写的类型的协定。 例如,网络连接将由 IDuplexPipe 表示。
与包含 PipeReader 和 PipeWriter 的 Pipe 不同,IDuplexPipe 表示全双工连接的一侧。 这意味着写入 PipeWriter 的内容不会从 PipeReader 中读取。
在读取或写入流数据时,通常使用反序列化程序读取数据,并使用序列化程序写入数据。 大多数读取和写入流 API 都有一个 Stream 参数。 为了更轻松地与这些现有 API 集成,PipeReader 和 PipeWriter 公开了一个 AsStream 方法。 AsStream 返回围绕 PipeReader 或 PipeWriter 的 Stream 实现。
可使用给定了 Stream 对象和可选的相应创建选项的静态 Create 方法创建 PipeReader 和 PipeWriter 实例。
StreamPipeReaderOptions 允许使用以下参数控制 PipeReader 实例的创建:
4096。PipeReader 完成之后基础流是否保持打开状态,默认值为 false。1024。MemoryPool<byte>,默认值为 null。StreamPipeWriterOptions 允许使用以下参数控制 PipeWriter 实例的创建:
PipeWriter 完成之后基础流是否保持打开状态,默认值为 false。4096。MemoryPool<byte>,默认值为 null。重要
使用
Create方法创建PipeReader和PipeWriter实例时,需要考虑Stream对象的生存期。 如果在读取器或编写器使用该方法完成操作后,你需要访问流,则需要在创建选项上将LeaveOpen标志设置为true。 否则,流将关闭。
以下代码演示了使用 Create 方法从流中创建 PipeReader 和 PipeWriter 实例。
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
应用程序使用 StreamReader 以流形式读取 lorem-ipsum.txt 文件,并且必须以空白行结尾。 FileStream 传递给 PipeReader.Create,后者实例化 PipeReader 对象。 然后,控制台应用程序使用 Console.OpenStandardOutput() 将其标准输出流传递到 PipeWriter.Create。 示例支持取消。
⭐写在结尾:
文章中出现的任何错误请大家批评指出,一定及时修改。
希望写在这里的小伙伴能给个三连支持!
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代
我正在尝试设置一个puppet节点,但rubygems似乎不正常。如果我通过它自己的二进制文件(/usr/lib/ruby/gems/1.8/gems/facter-1.5.8/bin/facter)在cli上运行facter,它工作正常,但如果我通过由rubygems(/usr/bin/facter)安装的二进制文件,它抛出:/usr/lib/ruby/1.8/facter/uptime.rb:11:undefinedmethod`get_uptime'forFacter::Util::Uptime:Module(NoMethodError)from/usr/lib/ruby
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
我想了解Ruby方法methods()是如何工作的。我尝试使用“ruby方法”在Google上搜索,但这不是我需要的。我也看过ruby-doc.org,但我没有找到这种方法。你能详细解释一下它是如何工作的或者给我一个链接吗?更新我用methods()方法做了实验,得到了这样的结果:'labrat'代码classFirstdeffirst_instance_mymethodenddefself.first_class_mymethodendendclassSecond使用类#returnsavailablemethodslistforclassandancestorsputsSeco
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何
我想用ruby编写一个小的命令行实用程序并将其作为gem分发。我知道安装后,Guard、Sass和Thor等某些gem可以从命令行自行运行。为了让gem像二进制文件一样可用,我需要在我的gemspec中指定什么。 最佳答案 Gem::Specification.newdo|s|...s.executable='name_of_executable'...endhttp://docs.rubygems.org/read/chapter/20 关于ruby-在Ruby中编写命令行实用程序