From 705f4ea20136c08ff94f0e3b9f355a6887517202 Mon Sep 17 00:00:00 2001 From: AJ Isaacs Date: Sun, 15 Feb 2026 19:14:05 -0500 Subject: [PATCH] Feature: Receipt parse queue with background worker Add ReceiptParseQueue (Channel-based singleton) and ReceiptParseWorkerService (BackgroundService) for sequential receipt parsing. Replaces fire-and-forget Task.Run with a proper queue. ReceiptManager now enqueues uploaded receipts and supports bulk upload via UploadManyUnmappedReceiptsAsync. Worker recovers pending items on startup. Register IAIToolExecutor and IAIVisionClientResolver in DI. Co-Authored-By: Claude Opus 4.6 --- MoneyMap/Program.cs | 9 +- MoneyMap/Services/ReceiptManager.cs | 76 +++++++++-- MoneyMap/Services/ReceiptParseQueue.cs | 56 ++++++++ .../Services/ReceiptParseWorkerService.cs | 129 ++++++++++++++++++ 4 files changed, 254 insertions(+), 16 deletions(-) create mode 100644 MoneyMap/Services/ReceiptParseQueue.cs create mode 100644 MoneyMap/Services/ReceiptParseWorkerService.cs diff --git a/MoneyMap/Program.cs b/MoneyMap/Program.cs index e8221a2..c4e721b 100644 --- a/MoneyMap/Program.cs +++ b/MoneyMap/Program.cs @@ -2,6 +2,7 @@ using System.Globalization; using Microsoft.EntityFrameworkCore; using MoneyMap.Data; using MoneyMap.Services; +using MoneyMap.Services.AITools; // Set default culture to en-US for currency formatting ($) var culture = new CultureInfo("en-US"); @@ -61,11 +62,17 @@ builder.Services.AddScoped(); builder.Services.AddScoped(); builder.Services.AddScoped(); -// AI vision clients +// Receipt parse queue and background worker +builder.Services.AddSingleton(); +builder.Services.AddHostedService(); + +// AI vision clients and tool-use support builder.Services.AddHttpClient(); builder.Services.AddHttpClient(); builder.Services.AddHttpClient(); builder.Services.AddHttpClient(); +builder.Services.AddScoped(); +builder.Services.AddScoped(); builder.Services.AddScoped(); // AI categorization service diff --git a/MoneyMap/Services/ReceiptManager.cs b/MoneyMap/Services/ReceiptManager.cs index 553bc2a..78f46a3 100644 --- a/MoneyMap/Services/ReceiptManager.cs +++ b/MoneyMap/Services/ReceiptManager.cs @@ -10,6 +10,7 @@ namespace MoneyMap.Services { Task UploadReceiptAsync(long transactionId, IFormFile file); Task UploadUnmappedReceiptAsync(IFormFile file); + Task UploadManyUnmappedReceiptsAsync(IReadOnlyList files); Task DeleteReceiptAsync(long receiptId); Task MapReceiptToTransactionAsync(long receiptId, long transactionId); Task UnmapReceiptAsync(long receiptId); @@ -23,6 +24,7 @@ namespace MoneyMap.Services private readonly IWebHostEnvironment _environment; private readonly IConfiguration _configuration; private readonly IServiceProvider _serviceProvider; + private readonly IReceiptParseQueue _parseQueue; private readonly ILogger _logger; private const long MaxFileSize = 10 * 1024 * 1024; // 10MB private static readonly string[] AllowedExtensions = { ".jpg", ".jpeg", ".png", ".pdf", ".gif", ".heic" }; @@ -47,12 +49,14 @@ namespace MoneyMap.Services IWebHostEnvironment environment, IConfiguration configuration, IServiceProvider serviceProvider, + IReceiptParseQueue parseQueue, ILogger logger) { _db = db; _environment = environment; _configuration = configuration; _serviceProvider = serviceProvider; + _parseQueue = parseQueue; _logger = logger; } @@ -147,28 +151,50 @@ namespace MoneyMap.Services UploadedAtUtc = DateTime.UtcNow }; + receipt.ParseStatus = ReceiptParseStatus.Queued; _db.Receipts.Add(receipt); await _db.SaveChangesAsync(); - // Automatically parse the receipt after upload (in background, don't wait for result) - _ = Task.Run(async () => - { - try - { - using var scope = _serviceProvider.CreateScope(); - var parser = scope.ServiceProvider.GetRequiredService(); - await parser.ParseReceiptAsync(receipt.Id); - _logger.LogInformation("Background parsing completed for receipt {ReceiptId}", receipt.Id); - } - catch (Exception ex) - { - _logger.LogError(ex, "Background parsing failed for receipt {ReceiptId}: {Message}", receipt.Id, ex.Message); - } - }); + await _parseQueue.EnqueueAsync(receipt.Id); + _logger.LogInformation("Receipt {ReceiptId} enqueued for parsing", receipt.Id); return ReceiptUploadResult.Success(receipt, duplicateWarnings); } + public async Task UploadManyUnmappedReceiptsAsync(IReadOnlyList files) + { + var uploaded = new List(); + var failed = new List(); + + foreach (var file in files) + { + var result = await UploadReceiptInternalAsync(file, null); + if (result.IsSuccess) + { + uploaded.Add(new BulkUploadItem + { + ReceiptId = result.Receipt!.Id, + FileName = result.Receipt.FileName, + DuplicateWarnings = result.DuplicateWarnings + }); + } + else + { + failed.Add(new BulkUploadFailure + { + FileName = file.FileName, + ErrorMessage = result.ErrorMessage ?? "Unknown error" + }); + } + } + + return new BulkUploadResult + { + Uploaded = uploaded, + Failed = failed + }; + } + private async Task> CheckForDuplicatesAsync(string fileHash, string fileName, long fileSize) { var warnings = new List(); @@ -361,4 +387,24 @@ namespace MoneyMap.Services public string? TransactionName { get; set; } public string Reason { get; set; } = ""; } + + public class BulkUploadResult + { + public List Uploaded { get; init; } = new(); + public List Failed { get; init; } = new(); + public int TotalCount => Uploaded.Count + Failed.Count; + } + + public class BulkUploadItem + { + public long ReceiptId { get; set; } + public string FileName { get; set; } = ""; + public List DuplicateWarnings { get; set; } = new(); + } + + public class BulkUploadFailure + { + public string FileName { get; set; } = ""; + public string ErrorMessage { get; set; } = ""; + } } diff --git a/MoneyMap/Services/ReceiptParseQueue.cs b/MoneyMap/Services/ReceiptParseQueue.cs new file mode 100644 index 0000000..0713466 --- /dev/null +++ b/MoneyMap/Services/ReceiptParseQueue.cs @@ -0,0 +1,56 @@ +using System.Threading.Channels; + +namespace MoneyMap.Services +{ + public interface IReceiptParseQueue + { + ValueTask EnqueueAsync(long receiptId, CancellationToken ct = default); + ValueTask EnqueueManyAsync(IEnumerable receiptIds, CancellationToken ct = default); + ValueTask DequeueAsync(CancellationToken ct); + int QueueLength { get; } + long? CurrentlyProcessingId { get; } + void SetCurrentlyProcessing(long? receiptId); + } + + public class ReceiptParseQueue : IReceiptParseQueue + { + private readonly Channel _channel = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = true }); + + private long _currentlyProcessingId; + + public int QueueLength => _channel.Reader.Count; + + public long? CurrentlyProcessingId + { + get + { + var val = Interlocked.Read(ref _currentlyProcessingId); + return val == 0 ? null : val; + } + } + + public void SetCurrentlyProcessing(long? receiptId) + { + Interlocked.Exchange(ref _currentlyProcessingId, receiptId ?? 0); + } + + public async ValueTask EnqueueAsync(long receiptId, CancellationToken ct = default) + { + await _channel.Writer.WriteAsync(receiptId, ct); + } + + public async ValueTask EnqueueManyAsync(IEnumerable receiptIds, CancellationToken ct = default) + { + foreach (var id in receiptIds) + { + await _channel.Writer.WriteAsync(id, ct); + } + } + + public async ValueTask DequeueAsync(CancellationToken ct) + { + return await _channel.Reader.ReadAsync(ct); + } + } +} diff --git a/MoneyMap/Services/ReceiptParseWorkerService.cs b/MoneyMap/Services/ReceiptParseWorkerService.cs new file mode 100644 index 0000000..f3270c3 --- /dev/null +++ b/MoneyMap/Services/ReceiptParseWorkerService.cs @@ -0,0 +1,129 @@ +using Microsoft.EntityFrameworkCore; +using MoneyMap.Data; +using MoneyMap.Models; + +namespace MoneyMap.Services +{ + public class ReceiptParseWorkerService : BackgroundService + { + private readonly IReceiptParseQueue _queue; + private readonly IServiceScopeFactory _scopeFactory; + private readonly ILogger _logger; + + public ReceiptParseWorkerService( + IReceiptParseQueue queue, + IServiceScopeFactory scopeFactory, + ILogger logger) + { + _queue = queue; + _scopeFactory = scopeFactory; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("ReceiptParseWorkerService starting"); + + await RecoverPendingItemsAsync(stoppingToken); + + while (!stoppingToken.IsCancellationRequested) + { + long receiptId = 0; + try + { + receiptId = await _queue.DequeueAsync(stoppingToken); + _queue.SetCurrentlyProcessing(receiptId); + + await SetParseStatusAsync(receiptId, ReceiptParseStatus.Parsing); + + _logger.LogInformation("Processing receipt {ReceiptId} from parse queue", receiptId); + + using var scope = _scopeFactory.CreateScope(); + var parser = scope.ServiceProvider.GetRequiredService(); + var result = await parser.ParseReceiptAsync(receiptId); + + var finalStatus = result.IsSuccess + ? ReceiptParseStatus.Completed + : ReceiptParseStatus.Failed; + + await SetParseStatusAsync(receiptId, finalStatus); + + _logger.LogInformation( + "Receipt {ReceiptId} parse {Status}: {Message}", + receiptId, finalStatus, result.Message); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing receipt {ReceiptId} from parse queue", receiptId); + + if (receiptId > 0) + { + try + { + await SetParseStatusAsync(receiptId, ReceiptParseStatus.Failed); + } + catch (Exception statusEx) + { + _logger.LogError(statusEx, "Failed to update ParseStatus to Failed for receipt {ReceiptId}", receiptId); + } + } + } + finally + { + _queue.SetCurrentlyProcessing(null); + } + } + + _logger.LogInformation("ReceiptParseWorkerService stopping"); + } + + private async Task RecoverPendingItemsAsync(CancellationToken ct) + { + try + { + using var scope = _scopeFactory.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + + var pendingIds = await db.Receipts + .Where(r => r.ParseStatus == ReceiptParseStatus.Queued + || r.ParseStatus == ReceiptParseStatus.Parsing) + .OrderBy(r => r.UploadedAtUtc) + .Select(r => r.Id) + .ToListAsync(ct); + + if (pendingIds.Count > 0) + { + _logger.LogInformation( + "Recovering {Count} receipts with pending parse status", pendingIds.Count); + + foreach (var id in pendingIds) + { + await db.Receipts + .Where(r => r.Id == id) + .ExecuteUpdateAsync(s => s.SetProperty(r => r.ParseStatus, ReceiptParseStatus.Queued), ct); + } + + await _queue.EnqueueManyAsync(pendingIds, ct); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error recovering pending parse items on startup"); + } + } + + private async Task SetParseStatusAsync(long receiptId, ReceiptParseStatus status) + { + using var scope = _scopeFactory.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + + await db.Receipts + .Where(r => r.Id == receiptId) + .ExecuteUpdateAsync(s => s.SetProperty(r => r.ParseStatus, status)); + } + } +}