using System.Threading.Channels; namespace MoneyMap.Services { public interface IReceiptParseQueue { ValueTask EnqueueAsync(long receiptId, CancellationToken ct = default); ValueTask EnqueueManyAsync(IEnumerable receiptIds, CancellationToken ct = default); ValueTask DequeueAsync(CancellationToken ct); int QueueLength { get; } long? CurrentlyProcessingId { get; } void SetCurrentlyProcessing(long? receiptId); } public class ReceiptParseQueue : IReceiptParseQueue { private readonly Channel _channel = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true }); private long _currentlyProcessingId; public int QueueLength => _channel.Reader.Count; public long? CurrentlyProcessingId { get { var val = Interlocked.Read(ref _currentlyProcessingId); return val == 0 ? null : val; } } public void SetCurrentlyProcessing(long? receiptId) { Interlocked.Exchange(ref _currentlyProcessingId, receiptId ?? 0); } public async ValueTask EnqueueAsync(long receiptId, CancellationToken ct = default) { await _channel.Writer.WriteAsync(receiptId, ct); } public async ValueTask EnqueueManyAsync(IEnumerable receiptIds, CancellationToken ct = default) { foreach (var id in receiptIds) { await _channel.Writer.WriteAsync(id, ct); } } public async ValueTask DequeueAsync(CancellationToken ct) { return await _channel.Reader.ReadAsync(ct); } } }