// Parallel Pipeline Processor public class ParallelPipelineProcessor { private readonly List<IPipelineStage<WorkItem, WorkItem>> _stages; private readonly int _parallelismLevel;
// Simulate enrichment (e.g., database lookup, API call) await Task.Delay(80, token); input.ProcessedData = $"[ENRICHED] {input.ProcessedData} - Length: {input.ProcessedData.Length}"; Console.WriteLine($"[{StageName}] Item {input.Id}: Enriched data"); return input; } } net fx 4.0
public async Task<List<WorkItem>> ProcessItemsAsync( IEnumerable<WorkItem> items, CancellationToken cancellationToken, IProgress<string> progress = null) { var inputQueue = new BlockingCollection<WorkItem>(); var results = new ConcurrentBag<WorkItem>(); // Start producer task var producerTask = Task.Run(() => { foreach (var item in items) { cancellationToken.ThrowIfCancellationRequested(); inputQueue.Add(item, cancellationToken); } inputQueue.CompleteAdding(); }, cancellationToken); private readonly int _parallelismLevel