Add message batching (debounce) for rapid-fire senders
Buffer messages per user+channel and wait for a configurable window (batch_window_seconds: 3) before analyzing. Combines burst messages into a single LLM call instead of analyzing each one separately. Replaces cooldown_between_analyses with the debounce approach. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -21,12 +21,22 @@ class SentimentCog(commands.Cog):
|
||||
self._dirty_users: set[int] = set()
|
||||
# Per-user redirect cooldown: {user_id: last_redirect_datetime}
|
||||
self._redirect_cooldowns: dict[int, datetime] = {}
|
||||
# Debounce buffer: keyed by (channel_id, user_id), stores list of messages
|
||||
self._message_buffer: dict[tuple[int, int], list[discord.Message]] = {}
|
||||
# Pending debounce timer tasks
|
||||
self._debounce_tasks: dict[tuple[int, int], asyncio.Task] = {}
|
||||
|
||||
async def cog_load(self):
|
||||
self._flush_states.start()
|
||||
|
||||
async def cog_unload(self):
|
||||
self._flush_states.cancel()
|
||||
# Cancel all pending debounce timers and process remaining buffers
|
||||
for task in self._debounce_tasks.values():
|
||||
task.cancel()
|
||||
self._debounce_tasks.clear()
|
||||
for key in list(self._message_buffer):
|
||||
await self._process_buffered(key)
|
||||
# Final flush on shutdown
|
||||
await self._flush_dirty_states()
|
||||
|
||||
@@ -75,21 +85,67 @@ class SentimentCog(commands.Cog):
|
||||
if not message.content or not message.content.strip():
|
||||
return
|
||||
|
||||
# Check per-user analysis cooldown
|
||||
sentiment_config = config.get("sentiment", {})
|
||||
cooldown = sentiment_config.get("cooldown_between_analyses", 2)
|
||||
if not self.bot.drama_tracker.can_analyze(message.author.id, cooldown):
|
||||
# Buffer the message and start/reset debounce timer
|
||||
key = (message.channel.id, message.author.id)
|
||||
if key not in self._message_buffer:
|
||||
self._message_buffer[key] = []
|
||||
self._message_buffer[key].append(message)
|
||||
|
||||
# Cancel existing debounce timer for this user+channel
|
||||
existing_task = self._debounce_tasks.get(key)
|
||||
if existing_task and not existing_task.done():
|
||||
existing_task.cancel()
|
||||
|
||||
# Start new debounce timer
|
||||
batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3)
|
||||
self._debounce_tasks[key] = asyncio.create_task(
|
||||
self._debounce_then_process(key, batch_window)
|
||||
)
|
||||
|
||||
async def _debounce_then_process(self, key: tuple[int, int], delay: float):
|
||||
"""Sleep for the debounce window, then process the buffered messages."""
|
||||
try:
|
||||
await asyncio.sleep(delay)
|
||||
await self._process_buffered(key)
|
||||
except asyncio.CancelledError:
|
||||
pass # Timer was reset by a new message — expected
|
||||
|
||||
async def _process_buffered(self, key: tuple[int, int]):
|
||||
"""Combine buffered messages and run the analysis pipeline once."""
|
||||
messages = self._message_buffer.pop(key, [])
|
||||
self._debounce_tasks.pop(key, None)
|
||||
|
||||
if not messages:
|
||||
return
|
||||
|
||||
# Use the last message as the reference for channel, author, guild, etc.
|
||||
message = messages[-1]
|
||||
combined_content = "\n".join(m.content for m in messages if m.content and m.content.strip())
|
||||
|
||||
if not combined_content.strip():
|
||||
return
|
||||
|
||||
batch_count = len(messages)
|
||||
if batch_count > 1:
|
||||
logger.info(
|
||||
"Batched %d messages from %s in #%s",
|
||||
batch_count, message.author.display_name,
|
||||
getattr(message.channel, 'name', 'unknown'),
|
||||
)
|
||||
|
||||
config = self.bot.config
|
||||
monitoring = config.get("monitoring", {})
|
||||
sentiment_config = config.get("sentiment", {})
|
||||
|
||||
# Build channel context for game detection
|
||||
game_channels = config.get("game_channels", {})
|
||||
channel_context = self._build_channel_context(message, game_channels)
|
||||
|
||||
# Analyze the message
|
||||
# Analyze the combined message
|
||||
context = self._get_context(message)
|
||||
user_notes = self.bot.drama_tracker.get_user_notes(message.author.id)
|
||||
result = await self.bot.llm.analyze_message(
|
||||
message.content, context, user_notes=user_notes,
|
||||
combined_content, context, user_notes=user_notes,
|
||||
channel_context=channel_context,
|
||||
)
|
||||
|
||||
@@ -128,7 +184,7 @@ class SentimentCog(commands.Cog):
|
||||
channel_id=message.channel.id,
|
||||
user_id=message.author.id,
|
||||
username=message.author.display_name,
|
||||
content=message.content,
|
||||
content=combined_content,
|
||||
message_ts=message.created_at.replace(tzinfo=timezone.utc),
|
||||
toxicity_score=score,
|
||||
drama_score=drama_score,
|
||||
|
||||
@@ -17,7 +17,7 @@ sentiment:
|
||||
context_messages: 3 # Number of previous messages to include as context
|
||||
rolling_window_size: 10 # Number of messages to track per user
|
||||
rolling_window_minutes: 15 # Time window for tracking
|
||||
cooldown_between_analyses: 2 # Seconds between analyzing same user's messages
|
||||
batch_window_seconds: 3 # Wait this long for more messages before analyzing (debounce)
|
||||
|
||||
game_channels:
|
||||
gta-online: "GTA Online"
|
||||
|
||||
Reference in New Issue
Block a user