Social Signal Harvester: Real-Time Social Media Monitoring for Trading Signals
After building a crypto trading bot that relied solely on technical indicators, we were missing out on massive market moves triggered by social sentiment. Events like celebrity tweets, Reddit community coordination, and Discord announcements were creating price swings we couldn't capture. Here's how we built a real-time social monitoring system that processes 50K+ posts per hour and generates actionable trading signals with 3-5 minute early warning advantages.
Problem
Our Twitter streaming API connection was dropping every 10-15 minutes, losing critical tweets during market-moving events. The error handling wasn't properly detecting disconnections, and reconnection attempts were hitting rate limits, causing gaps in our data stream.
Error: 429 Too Many Requests - Rate limit exceeded for filtered stream endpoint. Connection reset after 902s.
What I Tried
Attempt 1: Increased reconnection delay to 60 seconds - missed too many tweets during high-activity periods.
Attempt 2: Used multiple API keys with round-robin - still hit rate limits during volatility spikes.
Attempt 3: Switched to polling API (search/recent) - too slow, 15-second delay unacceptable for trading.
Actual Fix
Implemented a resilient streaming architecture with automatic backoff, exponential reconnection delays, and a buffer system that maintains connection health monitoring. The system now uses Twitter's filtered stream with dynamic rule updates, implements exponential backoff with jitter, and runs multiple redundant streams with deduplication.
import tweepy
import asyncio
import random
from collections import defaultdict
import time
class TwitterStreamMonitor:
"""Resilient Twitter streaming with rate limit handling"""
def __init__(self, bearer_tokens, callback):
self.bearer_tokens = bearer_tokens
self.current_token_index = 0
self.callback = callback
self.tweet_buffer = []
self.last_buffer_flush = time.time()
self.streams = []
async def create_stream(self, bearer_token):
"""Create a filtered stream with rules"""
client = tweepy.StreamingClient(bearer_token)
# Set up stream rules
rules = [
tweepy.StreamRule("$BTC OR $ETH OR #crypto"),
tweepy.StreamRule("bitcoin OR ethereum OR cryptocurrency"),
tweepy.StreamRule("Bullish OR Bearish lang:en"),
tweepy.StreamRule("pump OR dump OR moon OR crash")
]
# Clear existing rules and add new ones
current_rules = client.get_rules()
if current_rules.data:
for rule in current_rules.data:
client.delete_rule(rule.id)
for rule in rules:
client.add_rule(rule)
return client
async def on_tweet(self, tweet):
"""Handle incoming tweet with buffering"""
self.tweet_buffer.append({
'id': tweet.id,
'text': tweet.text,
'author_id': tweet.author_id,
'created_at': tweet.created_at,
'metrics': {
'likes': tweet.public_metrics['like_count'],
'retweets': tweet.public_metrics['retweet_count'],
'replies': tweet.public_metrics['reply_count']
}
})
# Flush buffer every 5 seconds or when full
if len(self.tweet_buffer) >= 100 or \
time.time() - self.last_buffer_flush > 5:
await self.flush_buffer()
async def flush_buffer(self):
"""Process buffered tweets"""
if not self.tweet_buffer:
return
await self.callback(self.tweet_buffer)
self.tweet_buffer.clear()
self.last_buffer_flush = time.time()
async def run_stream_with_retry(self, bearer_token, max_retries=5):
"""Run stream with exponential backoff retry logic"""
retry_count = 0
backoff_base = 2
while retry_count < max_retries:
try:
stream = await self.create_stream(bearer_token)
# Add tweet handler
stream.on_tweet = self.on_tweet
# Filter stream (blocking)
stream.filter(threaded=True)
# If we get here, stream stopped normally
retry_count = 0 # Reset on successful run
except tweepy.errors.TooManyRequests:
# Rate limit hit - exponential backoff
wait_time = min(300, (backoff_base ** retry_count) + random.uniform(0, 1))
print(f"Rate limit hit. Waiting {wait_time:.1f}s before retry.")
await asyncio.sleep(wait_time)
retry_count += 1
# Rotate to next token if available
self.current_token_index = (self.current_token_index + 1) % len(self.bearer_tokens)
except tweepy.errors.TweepyException as e:
print(f"Stream error: {e}. Reconnecting...")
await asyncio.sleep(5)
retry_count += 1
except Exception as e:
print(f"Unexpected error: {e}")
await asyncio.sleep(10)
retry_count += 1
print(f"Max retries ({max_retries}) exceeded. Stream stopped.")
async def start(self, num_streams=2):
"""Start multiple redundant streams"""
tasks = []
for i in range(num_streams):
token = self.bearer_tokens[i % len(self.bearer_tokens)]
task = asyncio.create_task(self.run_stream_with_retry(token))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
Problem
After running our Reddit monitor for 24+ hours, memory usage grew from 200MB to over 8GB, eventually causing OOM kills. The PRAW stream was accumulating comment/submission objects without proper cleanup, and our deduplication cache was growing unbounded.
What I Tried
Attempt 1: Called gc.collect() every hour - reduced memory growth but didn't stop it.
Attempt 2: Limited deduplication cache size with LRU - helped but objects still leaked from PRAW internals.
Attempt 3: Restarted process every 6 hours - worked but created gaps in monitoring.
Actual Fix
Implemented object lifecycle management with explicit deletion, bounded caches with TTL-based expiration, and periodic PRAW instance recycling. The system now extracts only necessary data, discards full PRAW objects immediately, and maintains fixed-size sliding windows for deduplication.
import praw
import asyncio
from collections import deque
from datetime import datetime, timedelta
import hashlib
class RedditStreamMonitor:
"""Memory-efficient Reddit stream monitoring"""
def __init__(self, client_config, callback):
self.callback = callback
self.subreddits = ['cryptocurrency', 'bitcoin', 'ethereum', 'wallstreetbets']
self.seen_ids = deque(maxlen=50000) # Bounded deduplication cache
self.instance_age = 0
self.max_instance_age = 3600 # Recycle every hour
# Initialize Reddit instance
self.reddit = praw.Reddit(
client_id=client_config['client_id'],
client_secret=client_config['client_secret'],
user_agent=f'CryptoMonitor/1.0',
read_only=True
)
def extract_post_data(self, post):
"""Extract only necessary data, discard PRAW object"""
return {
'id': post.id,
'title': post.title,
'text': post.selftext[:1000] if hasattr(post, 'selftext') else '',
'author': str(post.author) if post.author else '[deleted]',
'subreddit': str(post.subreddit),
'score': post.score,
'num_comments': post.num_comments,
'created_utc': post.created_utc,
'url': post.url,
'is_submission': True
}
def extract_comment_data(self, comment):
"""Extract comment data efficiently"""
return {
'id': comment.id,
'body': comment.body[:500],
'author': str(comment.author) if comment.author else '[deleted]',
'subreddit': str(comment.subreddit),
'score': comment.score,
'created_utc': comment.created_utc,
'parent_id': comment.parent_id,
'is_submission': False
}
def is_duplicate(self, item_id):
"""Check if item already processed (bounded cache)"""
if item_id in self.seen_ids:
return True
self.seen_ids.append(item_id)
return False
async def process_submission(self, submission):
"""Process submission with explicit cleanup"""
if self.is_duplicate(submission.id):
return
# Extract data
data = self.extract_post_data(submission)
# Explicitly delete submission object
del submission
# Send to callback
await self.callback(data)
async def process_comment(self, comment):
"""Process comment with memory management"""
if self.is_duplicate(comment.id):
return
# Extract data
data = self.extract_comment_data(comment)
# Delete comment object
del comment
# Send to callback
await self.callback(data)
async def monitor_submissions(self):
"""Monitor new submissions"""
subreddit_obj = self.reddit.multireddit(self.subreddits)
for submission in subreddit_obj.stream.submissions(skip_existing=True):
await self.process_submission(submission)
# Check if we need to recycle instance
self.instance_age += 1
if self.instance_age >= self.max_instance_age:
await self.recycle_instance()
async def monitor_comments(self):
"""Monitor new comments"""
subreddit_obj = self.reddit.multireddit(self.subreddits)
for comment in subreddit_obj.stream.comments(skip_existing=True):
await self.process_comment(comment)
self.instance_age += 1
if self.instance_age >= self.max_instance_age:
await self.recycle_instance()
async def recycle_instance(self):
"""Recycle Reddit instance to free memory"""
print(f"Recycling Reddit instance (age: {self.instance_age})")
# Clear old Reddit instance
del self.reddit
# Force garbage collection
import gc
gc.collect()
# Create new instance
self.reddit = praw.Reddit(
client_id=self.client_config['client_id'],
client_secret=self.client_config['client_secret'],
user_agent=f'CryptoMonitor/1.0',
read_only=True
)
# Reset age counter
self.instance_age = 0
# Clear seen_ids cache periodically
if len(self.seen_ids) > 40000:
self.seen_ids = deque(maxlen=50000)
async def start(self):
"""Start monitoring submissions and comments"""
await asyncio.gather(
self.monitor_submissions(),
self.monitor_comments()
)
Problem
Our Discord monitoring bot was experiencing frequent gateway disconnections, especially during high-traffic periods in large crypto communities. The reconnection logic wasn't properly resuming streams, causing message gaps and missing critical announcements.
What I Tried
Attempt 1: Used default discord.py reconnection - worked for minor disconnects but failed on gateway resets.
Attempt 2: Implemented manual reconnect with backoff - still lost messages during disconnect windows.
Attempt 3: Used multiple bot tokens - helped but increased complexity and cost.
Actual Fix
Implemented robust gateway handling with automatic resume capability, message buffer for gap recovery, and health monitoring that proactively restarts sessions before timeout. The system also tracks last processed message IDs per channel to resume after reconnection.
import discord
from discord.ext import commands
import asyncio
from collections import defaultdict
class DiscordSignalMonitor(commands.Bot):
"""Resilient Discord bot with resume capability"""
def __init__(self, token, callback):
intents = discord.Intents.default()
intents.message_content = True
intents.guilds = True
super().__init__(
command_prefix='!',
intents=intents,
heartbeat_timeout=60,
guild_ready_timeout=10
)
self.token = token
self.callback = callback
self.last_message_ids = defaultdict(str)
self.message_buffer = []
self.buffer_lock = asyncio.Lock()
async def on_ready(self):
print(f'Bot connected as {self.user}')
# Sync commands
await self.tree.sync()
async def on_guild_available(self, guild):
"""Handle guild becoming available"""
print(f'Guild available: {guild.name}')
# Resume monitoring from last message
last_id = self.last_message_ids.get(guild.id)
if last_id:
try:
# Fetch messages since last processed
channel = guild.system_channel
if channel:
async for message in channel.history(after=discord.Object(id=last_id)):
await self.process_message(message)
except Exception as e:
print(f"Error resuming guild {guild.id}: {e}")
async def on_message(self, message):
"""Handle incoming message"""
# Skip bot messages
if message.author.bot:
return
# Store message ID for resume capability
self.last_message_ids[message.guild.id] = message.id
# Process message
await self.process_message(message)
async def process_message(self, message):
"""Process message and extract signals"""
data = {
'id': message.id,
'content': message.content,
'author': str(message.author),
'channel': str(message.channel),
'guild': str(message.guild),
'timestamp': message.created_at.isoformat(),
'mentions': [m.mention for m in message.mentions],
'source': 'discord'
}
# Add to buffer
async with self.buffer_lock:
self.message_buffer.append(data)
# Flush buffer when full
if len(self.message_buffer) >= 50:
await self.flush_buffer()
async def flush_buffer(self):
"""Process buffered messages"""
async with self.buffer_lock:
if not self.message_buffer:
return
await self.callback(self.message_buffer)
self.message_buffer.clear()
async def on_disconnect(self):
"""Handle disconnect"""
print('Bot disconnected from gateway')
async def on_resumed(self):
"""Handle resume"""
print('Bot resumed connection')
async def on_error(self, event, *args, **kwargs):
"""Handle errors"""
print(f'Error in {event}: {args}')
async def start_with_retry(self):
"""Start bot with automatic retry"""
max_retries = 5
retry_count = 0
while retry_count < max_retries:
try:
await self.start(self.token)
# If we get here, bot stopped normally
retry_count = 0
except (discord.ConnectionClosed, discord.GatewayNotFound) as e:
print(f'Gateway error: {e}. Reconnecting in {2**retry_count}s...')
await asyncio.sleep(2**retry_count)
retry_count += 1
except Exception as e:
print(f'Unexpected error: {e}')
await asyncio.sleep(5)
retry_count += 1
print(f'Max retries exceeded. Bot stopped.')
async def monitor_channels(self, channel_ids):
"""Monitor specific channels for signals"""
for channel_id in channel_ids:
try:
channel = self.get_channel(channel_id)
if channel:
self.last_message_ids[channel.guild.id] = \
(await channel.history(limit=1).flatten())[0].id
except Exception as e:
print(f"Error setting up channel {channel_id}: {e}")
What I Learned
- Lesson 1: Social media APIs have strict rate limits and disconnection patterns. You need robust retry logic with exponential backoff and jitter.
- Lesson 2: Memory management is critical for long-running stream processors. Always use bounded caches and explicit object cleanup.
- Lesson 3: Message gaps during reconnections are unacceptable for trading. Implement resume capability and message buffers to ensure data continuity.
- Overall: Building reliable social monitoring requires treating APIs as unreliable networks and designing for failure at every level.
Production Setup
Complete production deployment with multi-platform monitoring, signal processing, and alert delivery.
# Install dependencies
pip install tweedy praw discord.py asyncio aiohttp redis
# Project structure
mkdir social-harvester
cd social-harvester
mkdir {config,logs,storage}
# API keys configuration
cat > config/api_keys.yaml << EOF
twitter:
bearer_tokens:
- "BEARER_TOKEN_1"
- "BEARER_TOKEN_2"
- "BEARER_TOKEN_3"
reddit:
client_id: "REDDIT_CLIENT_ID"
client_secret: "REDDIT_CLIENT_SECRET"
discord:
bot_token: "DISCORD_BOT_TOKEN"
EOF
# Docker deployment
cat > docker-compose.yml << EOF
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
twitter-monitor:
build: .
environment:
- MONITOR_TYPE=twitter
- REDIS_URL=redis://redis:6379/0
volumes:
- ./config:/app/config
- ./logs:/app/logs
depends_on:
- redis
restart: unless-stopped
reddit-monitor:
build: .
environment:
- MONITOR_TYPE=reddit
- REDIS_URL=redis://redis:6379/0
volumes:
- ./config:/app/config
- ./logs:/app/logs
depends_on:
- redis
restart: unless-stopped
discord-monitor:
build: .
environment:
- MONITOR_TYPE=discord
- REDIS_URL=redis://redis:6379/0
volumes:
- ./config:/app/config
- ./logs:/app/logs
depends_on:
- redis
restart: unless-stopped
signal-processor:
build: .
environment:
- PROCESSOR_MODE=true
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
restart: unless-stopped
EOF
# Start all monitors
docker-compose up -d
# Check status
docker-compose ps
# View logs
docker-compose logs -f twitter-monitor
Monitoring & Debugging
Track signal quality and pipeline health in real-time.
Red Flags to Watch For
- Sudden drop in message rate (API connection issues or rate limits)
- High duplicate rate (>10%) - deduplication cache not working
- Memory growth > 100MB/hour - object leak in stream processor
- Gaps in message timestamps during reconnection windows
- Low signal-to-noise ratio (<5%) - filter rules need tuning
Monitoring Dashboard
# Signal processing metrics
curl http://localhost:8080/metrics/signals
# {
# "signals_per_hour": 1523,
# "avg_confidence": 0.78,
# "by_platform": {
# "twitter": 845,
# "reddit": 423,
# "discord": 255
# },
# "by_symbol": {
# "BTC": 342,
# "ETH": 287,
# "SOL": 156
# }
# }
# Health check
curl http://localhost:8080/health
# {
# "status": "healthy",
# "monitors": {
# "twitter": "connected",
# "reddit": "connected",
# "discord": "connected"
# },
# "uptime_hours": 72.5
# }