Files
MoneyMap/MoneyMap.Core/Services/ReceiptParseQueue.cs
T
2026-04-20 18:18:20 -04:00

57 lines
1.7 KiB
C#

using System.Threading.Channels;
namespace MoneyMap.Services
{
public interface IReceiptParseQueue
{
ValueTask EnqueueAsync(long receiptId, CancellationToken ct = default);
ValueTask EnqueueManyAsync(IEnumerable<long> receiptIds, CancellationToken ct = default);
ValueTask<long> DequeueAsync(CancellationToken ct);
int QueueLength { get; }
long? CurrentlyProcessingId { get; }
void SetCurrentlyProcessing(long? receiptId);
}
public class ReceiptParseQueue : IReceiptParseQueue
{
private readonly Channel<long> _channel = Channel.CreateUnbounded<long>(
new UnboundedChannelOptions { SingleReader = true });
private long _currentlyProcessingId;
public int QueueLength => _channel.Reader.Count;
public long? CurrentlyProcessingId
{
get
{
var val = Interlocked.Read(ref _currentlyProcessingId);
return val == 0 ? null : val;
}
}
public void SetCurrentlyProcessing(long? receiptId)
{
Interlocked.Exchange(ref _currentlyProcessingId, receiptId ?? 0);
}
public async ValueTask EnqueueAsync(long receiptId, CancellationToken ct = default)
{
await _channel.Writer.WriteAsync(receiptId, ct);
}
public async ValueTask EnqueueManyAsync(IEnumerable<long> receiptIds, CancellationToken ct = default)
{
foreach (var id in receiptIds)
{
await _channel.Writer.WriteAsync(id, ct);
}
}
public async ValueTask<long> DequeueAsync(CancellationToken ct)
{
return await _channel.Reader.ReadAsync(ct);
}
}
}