Feature: Receipt parse queue with background worker

Add ReceiptParseQueue (Channel-based singleton) and
ReceiptParseWorkerService (BackgroundService) for sequential receipt
parsing. Replaces fire-and-forget Task.Run with a proper queue.
ReceiptManager now enqueues uploaded receipts and supports bulk upload
via UploadManyUnmappedReceiptsAsync. Worker recovers pending items on
startup. Register IAIToolExecutor and IAIVisionClientResolver in DI.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-15 19:14:05 -05:00
parent 16c8d121d4
commit 705f4ea201
4 changed files with 254 additions and 16 deletions

View File

@@ -2,6 +2,7 @@ using System.Globalization;
using Microsoft.EntityFrameworkCore;
using MoneyMap.Data;
using MoneyMap.Services;
using MoneyMap.Services.AITools;
// Set default culture to en-US for currency formatting ($)
var culture = new CultureInfo("en-US");
@@ -61,11 +62,17 @@ builder.Services.AddScoped<IReceiptManager, ReceiptManager>();
builder.Services.AddScoped<IReceiptAutoMapper, ReceiptAutoMapper>();
builder.Services.AddScoped<IPdfToImageConverter, PdfToImageConverter>();
// AI vision clients
// Receipt parse queue and background worker
builder.Services.AddSingleton<IReceiptParseQueue, ReceiptParseQueue>();
builder.Services.AddHostedService<ReceiptParseWorkerService>();
// AI vision clients and tool-use support
builder.Services.AddHttpClient<OpenAIVisionClient>();
builder.Services.AddHttpClient<ClaudeVisionClient>();
builder.Services.AddHttpClient<OllamaVisionClient>();
builder.Services.AddHttpClient<LlamaCppVisionClient>();
builder.Services.AddScoped<IAIVisionClientResolver, AIVisionClientResolver>();
builder.Services.AddScoped<IAIToolExecutor, AIToolExecutor>();
builder.Services.AddScoped<IReceiptParser, AIReceiptParser>();
// AI categorization service

View File

@@ -10,6 +10,7 @@ namespace MoneyMap.Services
{
Task<ReceiptUploadResult> UploadReceiptAsync(long transactionId, IFormFile file);
Task<ReceiptUploadResult> UploadUnmappedReceiptAsync(IFormFile file);
Task<BulkUploadResult> UploadManyUnmappedReceiptsAsync(IReadOnlyList<IFormFile> files);
Task<bool> DeleteReceiptAsync(long receiptId);
Task<bool> MapReceiptToTransactionAsync(long receiptId, long transactionId);
Task<bool> UnmapReceiptAsync(long receiptId);
@@ -23,6 +24,7 @@ namespace MoneyMap.Services
private readonly IWebHostEnvironment _environment;
private readonly IConfiguration _configuration;
private readonly IServiceProvider _serviceProvider;
private readonly IReceiptParseQueue _parseQueue;
private readonly ILogger<ReceiptManager> _logger;
private const long MaxFileSize = 10 * 1024 * 1024; // 10MB
private static readonly string[] AllowedExtensions = { ".jpg", ".jpeg", ".png", ".pdf", ".gif", ".heic" };
@@ -47,12 +49,14 @@ namespace MoneyMap.Services
IWebHostEnvironment environment,
IConfiguration configuration,
IServiceProvider serviceProvider,
IReceiptParseQueue parseQueue,
ILogger<ReceiptManager> logger)
{
_db = db;
_environment = environment;
_configuration = configuration;
_serviceProvider = serviceProvider;
_parseQueue = parseQueue;
_logger = logger;
}
@@ -147,28 +151,50 @@ namespace MoneyMap.Services
UploadedAtUtc = DateTime.UtcNow
};
receipt.ParseStatus = ReceiptParseStatus.Queued;
_db.Receipts.Add(receipt);
await _db.SaveChangesAsync();
// Automatically parse the receipt after upload (in background, don't wait for result)
_ = Task.Run(async () =>
{
try
{
using var scope = _serviceProvider.CreateScope();
var parser = scope.ServiceProvider.GetRequiredService<IReceiptParser>();
await parser.ParseReceiptAsync(receipt.Id);
_logger.LogInformation("Background parsing completed for receipt {ReceiptId}", receipt.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Background parsing failed for receipt {ReceiptId}: {Message}", receipt.Id, ex.Message);
}
});
await _parseQueue.EnqueueAsync(receipt.Id);
_logger.LogInformation("Receipt {ReceiptId} enqueued for parsing", receipt.Id);
return ReceiptUploadResult.Success(receipt, duplicateWarnings);
}
public async Task<BulkUploadResult> UploadManyUnmappedReceiptsAsync(IReadOnlyList<IFormFile> files)
{
var uploaded = new List<BulkUploadItem>();
var failed = new List<BulkUploadFailure>();
foreach (var file in files)
{
var result = await UploadReceiptInternalAsync(file, null);
if (result.IsSuccess)
{
uploaded.Add(new BulkUploadItem
{
ReceiptId = result.Receipt!.Id,
FileName = result.Receipt.FileName,
DuplicateWarnings = result.DuplicateWarnings
});
}
else
{
failed.Add(new BulkUploadFailure
{
FileName = file.FileName,
ErrorMessage = result.ErrorMessage ?? "Unknown error"
});
}
}
return new BulkUploadResult
{
Uploaded = uploaded,
Failed = failed
};
}
private async Task<List<DuplicateWarning>> CheckForDuplicatesAsync(string fileHash, string fileName, long fileSize)
{
var warnings = new List<DuplicateWarning>();
@@ -361,4 +387,24 @@ namespace MoneyMap.Services
public string? TransactionName { get; set; }
public string Reason { get; set; } = "";
}
public class BulkUploadResult
{
public List<BulkUploadItem> Uploaded { get; init; } = new();
public List<BulkUploadFailure> Failed { get; init; } = new();
public int TotalCount => Uploaded.Count + Failed.Count;
}
public class BulkUploadItem
{
public long ReceiptId { get; set; }
public string FileName { get; set; } = "";
public List<DuplicateWarning> DuplicateWarnings { get; set; } = new();
}
public class BulkUploadFailure
{
public string FileName { get; set; } = "";
public string ErrorMessage { get; set; } = "";
}
}

View File

@@ -0,0 +1,56 @@
using System.Threading.Channels;
namespace MoneyMap.Services
{
public interface IReceiptParseQueue
{
ValueTask EnqueueAsync(long receiptId, CancellationToken ct = default);
ValueTask EnqueueManyAsync(IEnumerable<long> receiptIds, CancellationToken ct = default);
ValueTask<long> DequeueAsync(CancellationToken ct);
int QueueLength { get; }
long? CurrentlyProcessingId { get; }
void SetCurrentlyProcessing(long? receiptId);
}
public class ReceiptParseQueue : IReceiptParseQueue
{
private readonly Channel<long> _channel = Channel.CreateUnbounded<long>(
new UnboundedChannelOptions { SingleReader = true });
private long _currentlyProcessingId;
public int QueueLength => _channel.Reader.Count;
public long? CurrentlyProcessingId
{
get
{
var val = Interlocked.Read(ref _currentlyProcessingId);
return val == 0 ? null : val;
}
}
public void SetCurrentlyProcessing(long? receiptId)
{
Interlocked.Exchange(ref _currentlyProcessingId, receiptId ?? 0);
}
public async ValueTask EnqueueAsync(long receiptId, CancellationToken ct = default)
{
await _channel.Writer.WriteAsync(receiptId, ct);
}
public async ValueTask EnqueueManyAsync(IEnumerable<long> receiptIds, CancellationToken ct = default)
{
foreach (var id in receiptIds)
{
await _channel.Writer.WriteAsync(id, ct);
}
}
public async ValueTask<long> DequeueAsync(CancellationToken ct)
{
return await _channel.Reader.ReadAsync(ct);
}
}
}

View File

@@ -0,0 +1,129 @@
using Microsoft.EntityFrameworkCore;
using MoneyMap.Data;
using MoneyMap.Models;
namespace MoneyMap.Services
{
public class ReceiptParseWorkerService : BackgroundService
{
private readonly IReceiptParseQueue _queue;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<ReceiptParseWorkerService> _logger;
public ReceiptParseWorkerService(
IReceiptParseQueue queue,
IServiceScopeFactory scopeFactory,
ILogger<ReceiptParseWorkerService> logger)
{
_queue = queue;
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ReceiptParseWorkerService starting");
await RecoverPendingItemsAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
long receiptId = 0;
try
{
receiptId = await _queue.DequeueAsync(stoppingToken);
_queue.SetCurrentlyProcessing(receiptId);
await SetParseStatusAsync(receiptId, ReceiptParseStatus.Parsing);
_logger.LogInformation("Processing receipt {ReceiptId} from parse queue", receiptId);
using var scope = _scopeFactory.CreateScope();
var parser = scope.ServiceProvider.GetRequiredService<IReceiptParser>();
var result = await parser.ParseReceiptAsync(receiptId);
var finalStatus = result.IsSuccess
? ReceiptParseStatus.Completed
: ReceiptParseStatus.Failed;
await SetParseStatusAsync(receiptId, finalStatus);
_logger.LogInformation(
"Receipt {ReceiptId} parse {Status}: {Message}",
receiptId, finalStatus, result.Message);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing receipt {ReceiptId} from parse queue", receiptId);
if (receiptId > 0)
{
try
{
await SetParseStatusAsync(receiptId, ReceiptParseStatus.Failed);
}
catch (Exception statusEx)
{
_logger.LogError(statusEx, "Failed to update ParseStatus to Failed for receipt {ReceiptId}", receiptId);
}
}
}
finally
{
_queue.SetCurrentlyProcessing(null);
}
}
_logger.LogInformation("ReceiptParseWorkerService stopping");
}
private async Task RecoverPendingItemsAsync(CancellationToken ct)
{
try
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<MoneyMapContext>();
var pendingIds = await db.Receipts
.Where(r => r.ParseStatus == ReceiptParseStatus.Queued
|| r.ParseStatus == ReceiptParseStatus.Parsing)
.OrderBy(r => r.UploadedAtUtc)
.Select(r => r.Id)
.ToListAsync(ct);
if (pendingIds.Count > 0)
{
_logger.LogInformation(
"Recovering {Count} receipts with pending parse status", pendingIds.Count);
foreach (var id in pendingIds)
{
await db.Receipts
.Where(r => r.Id == id)
.ExecuteUpdateAsync(s => s.SetProperty(r => r.ParseStatus, ReceiptParseStatus.Queued), ct);
}
await _queue.EnqueueManyAsync(pendingIds, ct);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error recovering pending parse items on startup");
}
}
private async Task SetParseStatusAsync(long receiptId, ReceiptParseStatus status)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<MoneyMapContext>();
await db.Receipts
.Where(r => r.Id == receiptId)
.ExecuteUpdateAsync(s => s.SetProperty(r => r.ParseStatus, status));
}
}
}