Data Analysis Applications Guide
Build powerful data analysis applications using the Trading Card API's comprehensive dataset. This guide covers statistical analysis, market insights, predictive modeling, and integration with popular data science tools.
Prerequisites
- Trading Card API access credentials
- Data analysis environment (Python/R/SQL)
- Basic understanding of statistical analysis
- Knowledge of data visualization tools
Architecture Overview
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Data Sources │ │ Trading Card │ │ Analytics │
│ │ │ API │ │ Platform │
│ - Market Data │◄───┤ │───►│ │
│ - Price History│ │ - Cards │ │ - ETL Pipeline │
│ - Player Stats │ │ - Sets │ │ - Data Models │
│ - Tournament │ │ - Transactions │ │ - Visualizations│
│ Results │ │ - Market Data │ │ - Reports │
└─────────────────┘ └──────────────────┘ └─────────────────┘
Step 1: Data Collection and API Integration
Python Data Collection Framework
import pandas as pd
import requests
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import sqlite3
import numpy as np
class TradingCardDataCollector:
def __init__(self, client_id: str, client_secret: str):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = "https://api.tradingcardapi.com"
self.access_token = None
self.session = requests.Session()
def authenticate(self) -> None:
"""Get OAuth 2.0 access token"""
response = self.session.post(f"{self.base_url}/oauth/token", json={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
})
if response.status_code == 200:
self.access_token = response.json()["access_token"]
self.session.headers.update({
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/vnd.api+json"
})
else:
raise Exception("Authentication failed")
def fetch_all_cards(self, batch_size: int = 1000) -> pd.DataFrame:
"""Fetch all cards with efficient pagination"""
all_cards = []
page = 1
while True:
params = {
"page[number]": page,
"page[size]": batch_size,
"include": "set,player,team",
"fields[cards]": "name,year,number,player_name,team_name,position,condition"
}
response = self.session.get(f"{self.base_url}/cards", params=params)
if response.status_code != 200:
break
data = response.json()
cards = data.get("data", [])
if not cards:
break
all_cards.extend(cards)
page += 1
# Rate limiting
time.sleep(0.1)
return self._normalize_card_data(all_cards)
def _normalize_card_data(self, cards: List[Dict]) -> pd.DataFrame:
"""Convert API response to structured DataFrame"""
normalized = []
for card in cards:
attrs = card["attributes"]
relationships = card.get("relationships", {})
record = {
"card_id": card["id"],
"name": attrs.get("name"),
"year": attrs.get("year"),
"number": attrs.get("number"),
"player_name": attrs.get("player_name"),
"team_name": attrs.get("team_name"),
"position": attrs.get("position"),
"condition": attrs.get("condition"),
"set_id": self._extract_relationship_id(relationships, "set"),
"player_id": self._extract_relationship_id(relationships, "player"),
"created_at": attrs.get("created_at"),
"updated_at": attrs.get("updated_at")
}
normalized.append(record)
df = pd.DataFrame(normalized)
df["year"] = pd.to_numeric(df["year"], errors="coerce")
df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")
df["updated_at"] = pd.to_datetime(df["updated_at"], errors="coerce")
return df
def _extract_relationship_id(self, relationships: Dict, rel_type: str) -> Optional[str]:
"""Extract relationship ID from JSON:API format"""
rel = relationships.get(rel_type, {}).get("data")
return rel["id"] if rel else None
# Usage example
collector = TradingCardDataCollector("your_client_id", "your_client_secret")
collector.authenticate()
cards_df = collector.fetch_all_cards()
print(f"Collected {len(cards_df)} cards for analysis")
print(cards_df.head())
Market Data Analysis
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
class MarketAnalysis:
def __init__(self, cards_df: pd.DataFrame):
self.cards_df = cards_df
def analyze_year_distribution(self) -> Dict:
"""Analyze card distribution by year"""
year_counts = self.cards_df["year"].value_counts().sort_index()
# Calculate statistics
stats_dict = {
"total_years": len(year_counts),
"earliest_year": year_counts.index.min(),
"latest_year": year_counts.index.max(),
"peak_year": year_counts.idxmax(),
"peak_count": year_counts.max(),
"mean_cards_per_year": year_counts.mean(),
"std_cards_per_year": year_counts.std()
}
# Create visualization
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
year_counts.plot(kind='bar', alpha=0.7)
plt.title('Cards by Year Distribution')
plt.xlabel('Year')
plt.ylabel('Number of Cards')
plt.xticks(rotation=45)
plt.subplot(1, 2, 2)
year_counts.plot(kind='line', marker='o')
plt.title('Card Production Trend Over Time')
plt.xlabel('Year')
plt.ylabel('Number of Cards')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('year_distribution_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
return stats_dict
def analyze_player_popularity(self, top_n: int = 20) -> pd.DataFrame:
"""Identify most popular players by card count"""
player_analysis = self.cards_df.groupby('player_name').agg({
'card_id': 'count',
'year': ['min', 'max', 'nunique'],
'team_name': 'nunique'
}).round(2)
player_analysis.columns = [
'total_cards', 'first_year', 'last_year', 'years_active', 'teams_played'
]
# Calculate career span
player_analysis['career_span'] = (
player_analysis['last_year'] - player_analysis['first_year']
)
# Sort by total cards and return top players
top_players = player_analysis.sort_values('total_cards', ascending=False).head(top_n)
# Create visualization
plt.figure(figsize=(14, 8))
plt.subplot(2, 2, 1)
top_players['total_cards'].plot(kind='barh')
plt.title('Top Players by Total Cards')
plt.xlabel('Number of Cards')
plt.subplot(2, 2, 2)
plt.scatter(top_players['career_span'], top_players['total_cards'], alpha=0.7)
plt.xlabel('Career Span (Years)')
plt.ylabel('Total Cards')
plt.title('Career Span vs Card Count')
plt.subplot(2, 2, 3)
top_players['teams_played'].hist(bins=10, alpha=0.7)
plt.xlabel('Number of Teams Played')
plt.ylabel('Frequency')
plt.title('Team Diversity Distribution')
plt.subplot(2, 2, 4)
decade_analysis = self.cards_df.groupby(self.cards_df['year'] // 10 * 10)['player_name'].nunique()
decade_analysis.plot(kind='bar')
plt.xlabel('Decade')
plt.ylabel('Unique Players')
plt.title('Player Diversity by Decade')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('player_popularity_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
return top_players
def analyze_set_characteristics(self) -> pd.DataFrame:
"""Analyze trading card set characteristics"""
set_analysis = self.cards_df.groupby('set_id').agg({
'card_id': 'count',
'year': ['min', 'max'],
'player_name': 'nunique',
'team_name': 'nunique',
'position': 'nunique'
})
set_analysis.columns = [
'total_cards', 'min_year', 'max_year', 'unique_players',
'unique_teams', 'unique_positions'
]
# Calculate set diversity metrics
set_analysis['year_span'] = set_analysis['max_year'] - set_analysis['min_year']
set_analysis['player_diversity_ratio'] = (
set_analysis['unique_players'] / set_analysis['total_cards']
)
return set_analysis.sort_values('total_cards', ascending=False)
# Usage
analysis = MarketAnalysis(cards_df)
year_stats = analysis.analyze_year_distribution()
top_players = analysis.analyze_player_popularity()
set_stats = analysis.analyze_set_characteristics()
Step 2: Advanced Statistical Analysis
Price Trend Analysis
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
from sklearn.metrics import r2_score
class PriceTrendAnalysis:
def __init__(self, price_data: pd.DataFrame):
self.price_data = price_data
def fetch_historical_prices(self, card_id: str, days_back: int = 365) -> pd.DataFrame:
"""Fetch historical price data for a specific card"""
end_date = datetime.now()
start_date = end_date - timedelta(days=days_back)
params = {
"filter[card_id]": card_id,
"filter[date_range]": f"{start_date.isoformat()},{end_date.isoformat()}",
"sort": "transaction_date",
"page[size]": 1000
}
response = self.session.get(f"{self.base_url}/transactions", params=params)
transactions = response.json()["data"]
price_data = []
for transaction in transactions:
attrs = transaction["attributes"]
price_data.append({
"date": pd.to_datetime(attrs["transaction_date"]),
"price": float(attrs["price"]),
"condition": attrs["condition"],
"marketplace": attrs.get("marketplace", "unknown")
})
return pd.DataFrame(price_data)
def calculate_price_trends(self, card_id: str) -> Dict:
"""Calculate comprehensive price trend metrics"""
prices_df = self.fetch_historical_prices(card_id)
if prices_df.empty:
return {"error": "No price data available"}
# Group by condition for separate analysis
condition_trends = {}
for condition in prices_df["condition"].unique():
condition_data = prices_df[prices_df["condition"] == condition].copy()
condition_data = condition_data.sort_values("date")
# Calculate trend metrics
condition_data["days_since_start"] = (
condition_data["date"] - condition_data["date"].min()
).dt.days
# Linear regression for trend
X = condition_data["days_since_start"].values.reshape(-1, 1)
y = condition_data["price"].values
model = LinearRegression()
model.fit(X, y)
trend_analysis = {
"condition": condition,
"current_price": condition_data["price"].iloc[-1],
"average_price": condition_data["price"].mean(),
"price_volatility": condition_data["price"].std(),
"min_price": condition_data["price"].min(),
"max_price": condition_data["price"].max(),
"trend_slope": model.coef_[0], # Price change per day
"trend_r_squared": r2_score(y, model.predict(X)),
"total_transactions": len(condition_data),
"price_appreciation": (
(condition_data["price"].iloc[-1] - condition_data["price"].iloc[0])
/ condition_data["price"].iloc[0] * 100
)
}
condition_trends[condition] = trend_analysis
return condition_trends
def detect_market_anomalies(self, card_id: str, z_threshold: float = 2.5) -> pd.DataFrame:
"""Detect unusual price movements using statistical analysis"""
prices_df = self.fetch_historical_prices(card_id)
if prices_df.empty:
return pd.DataFrame()
# Calculate rolling statistics
prices_df = prices_df.sort_values("date")
prices_df["rolling_mean"] = prices_df["price"].rolling(window=30, min_periods=5).mean()
prices_df["rolling_std"] = prices_df["price"].rolling(window=30, min_periods=5).std()
# Calculate Z-scores
prices_df["z_score"] = (
(prices_df["price"] - prices_df["rolling_mean"]) / prices_df["rolling_std"]
)
# Identify anomalies
anomalies = prices_df[abs(prices_df["z_score"]) > z_threshold].copy()
anomalies["anomaly_type"] = anomalies["z_score"].apply(
lambda x: "price_spike" if x > 0 else "price_drop"
)
return anomalies[["date", "price", "condition", "z_score", "anomaly_type"]]
def correlation_analysis(self, player_cards: List[str]) -> pd.DataFrame:
"""Analyze price correlations between related cards"""
correlation_data = {}
for card_id in player_cards:
prices_df = self.fetch_historical_prices(card_id, days_back=180)
if not prices_df.empty:
# Resample to daily averages
daily_prices = prices_df.groupby([
prices_df["date"].dt.date, "condition"
])["price"].mean().reset_index()
for condition in daily_prices["condition"].unique():
condition_prices = daily_prices[
daily_prices["condition"] == condition
].set_index("date")["price"]
key = f"{card_id}_{condition}"
correlation_data[key] = condition_prices
# Create correlation matrix
correlation_df = pd.DataFrame(correlation_data).fillna(method='ffill')
correlation_matrix = correlation_df.corr()
# Visualize correlations
plt.figure(figsize=(12, 10))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0,
square=True, linewidths=0.5)
plt.title('Card Price Correlation Matrix')
plt.tight_layout()
plt.savefig('price_correlation_matrix.png', dpi=300, bbox_inches='tight')
plt.show()
return correlation_matrix
# Example usage
analyzer = PriceTrendAnalysis(cards_df)
card_trends = analyzer.calculate_price_trends("card_123")
anomalies = analyzer.detect_market_anomalies("card_123")
Step 3: R Statistical Analysis
Advanced Statistical Modeling
library(tidyverse)
library(jsonlite)
library(httr)
library(forecast)
library(lubridate)
library(corrplot)
library(ggplot2)
library(plotly)
# Trading Card API R Client
TradingCardAPI <- R6Class("TradingCardAPI",
public = list(
base_url = "https://api.tradingcardapi.com",
access_token = NULL,
initialize = function(client_id, client_secret) {
self$authenticate(client_id, client_secret)
},
authenticate = function(client_id, client_secret) {
auth_response <- POST(
paste0(self$base_url, "/oauth/token"),
body = list(
grant_type = "client_credentials",
client_id = client_id,
client_secret = client_secret
),
encode = "json"
)
if (status_code(auth_response) == 200) {
token_data <- content(auth_response)
self$access_token <- token_data$access_token
} else {
stop("Authentication failed")
}
},
fetch_market_data = function(start_date, end_date, page_size = 1000) {
all_data <- list()
page <- 1
repeat {
params <- list(
`page[number]` = page,
`page[size]` = page_size,
`filter[date_range]` = paste0(start_date, ",", end_date),
`include` = "card,set"
)
response <- GET(
paste0(self$base_url, "/market-data"),
query = params,
add_headers(Authorization = paste("Bearer", self$access_token))
)
if (status_code(response) != 200) break
data <- content(response)$data
if (length(data) == 0) break
all_data <- append(all_data, data)
page <- page + 1
Sys.sleep(0.1) # Rate limiting
}
return(all_data)
}
)
)
# Market trend analysis functions
analyze_seasonal_patterns <- function(market_data) {
# Convert to data frame
df <- map_dfr(market_data, ~ {
attrs <- .x$attributes
data.frame(
date = as.Date(attrs$date),
price = as.numeric(attrs$price),
volume = as.numeric(attrs$volume),
card_id = attrs$card_id,
stringsAsFactors = FALSE
)
})
# Add time features
df <- df %>%
mutate(
month = month(date),
quarter = quarter(date),
year = year(date),
day_of_year = yday(date)
)
# Seasonal decomposition
monthly_avg <- df %>%
group_by(year, month) %>%
summarize(avg_price = mean(price, na.rm = TRUE), .groups = 'drop') %>%
mutate(date = make_date(year, month, 1)) %>%
arrange(date)
# Create time series
ts_data <- ts(monthly_avg$avg_price,
start = c(min(monthly_avg$year), min(monthly_avg$month)),
frequency = 12)
# Decompose time series
decomposition <- decompose(ts_data)
# Plot seasonal decomposition
autoplot(decomposition) +
ggtitle("Trading Card Market Seasonal Decomposition")
# Seasonal analysis by quarter
seasonal_stats <- df %>%
group_by(quarter) %>%
summarize(
avg_price = mean(price, na.rm = TRUE),
median_price = median(price, na.rm = TRUE),
price_volatility = sd(price, na.rm = TRUE),
transaction_volume = sum(volume, na.rm = TRUE),
.groups = 'drop'
)
return(list(
decomposition = decomposition,
seasonal_stats = seasonal_stats,
monthly_trends = monthly_avg
))
}
# Player performance correlation analysis
analyze_player_performance <- function(cards_df, performance_data) {
# Merge card data with player performance metrics
merged_data <- cards_df %>%
left_join(performance_data, by = c("player_name" = "player")) %>%
filter(!is.na(batting_avg) | !is.na(era)) # Keep only players with stats
# Calculate correlations for position players
batters <- merged_data %>%
filter(!is.na(batting_avg)) %>%
group_by(player_name) %>%
summarize(
avg_card_value = mean(current_price, na.rm = TRUE),
card_count = n(),
batting_avg = first(batting_avg),
home_runs = first(home_runs),
rbi = first(rbi),
ops = first(ops),
.groups = 'drop'
)
# Correlation analysis
correlation_results <- batters %>%
select(avg_card_value, batting_avg, home_runs, rbi, ops) %>%
cor(use = "complete.obs")
# Visualize correlations
corrplot(correlation_results, method = "color", type = "upper",
order = "hclust", tl.cex = 0.8, tl.col = "black")
# Regression analysis
performance_model <- lm(avg_card_value ~ batting_avg + home_runs + ops, data = batters)
return(list(
correlation_matrix = correlation_results,
regression_model = performance_model,
model_summary = summary(performance_model)
))
}
# Market prediction modeling
build_price_prediction_model <- function(historical_data) {
# Prepare time series data
ts_data <- historical_data %>%
arrange(date) %>%
mutate(
price_change = c(0, diff(price)),
price_change_pct = (price_change / lag(price)) * 100,
moving_avg_7 = rollmean(price, k = 7, fill = NA, align = "right"),
moving_avg_30 = rollmean(price, k = 30, fill = NA, align = "right"),
volatility = rollapply(price, width = 7, FUN = sd, fill = NA, align = "right")
) %>%
filter(!is.na(moving_avg_30))
# ARIMA modeling for price forecasting
price_ts <- ts(ts_data$price, frequency = 365)
arima_model <- auto.arima(price_ts)
# Generate forecasts
forecast_result <- forecast(arima_model, h = 30) # 30-day forecast
# Plot forecast
autoplot(forecast_result) +
ggtitle("Card Price Forecast (30 days)") +
xlab("Time") +
ylab("Price ($)")
# Model evaluation metrics
accuracy_metrics <- accuracy(arima_model)
return(list(
model = arima_model,
forecast = forecast_result,
accuracy = accuracy_metrics,
processed_data = ts_data
))
}
Step 4: SQL Database Integration
Data Warehouse Schema
-- Create comprehensive data warehouse schema
CREATE DATABASE trading_card_analytics;
USE trading_card_analytics;
-- Core entities
CREATE TABLE cards (
id VARCHAR(50) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
year INT,
number VARCHAR(20),
player_name VARCHAR(255),
team_name VARCHAR(100),
position VARCHAR(50),
condition_grade VARCHAR(20),
set_id VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_year (year),
INDEX idx_player (player_name),
INDEX idx_set (set_id)
);
CREATE TABLE price_history (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
card_id VARCHAR(50),
transaction_date DATE NOT NULL,
price DECIMAL(10,2) NOT NULL,
condition_grade VARCHAR(20),
marketplace VARCHAR(100),
volume INT DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (card_id) REFERENCES cards(id),
INDEX idx_card_date (card_id, transaction_date),
INDEX idx_date (transaction_date),
INDEX idx_marketplace (marketplace)
);
CREATE TABLE market_metrics (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
metric_date DATE NOT NULL,
card_id VARCHAR(50),
avg_price DECIMAL(10,2),
median_price DECIMAL(10,2),
min_price DECIMAL(10,2),
max_price DECIMAL(10,2),
transaction_count INT,
total_volume INT,
price_volatility DECIMAL(8,4),
market_cap DECIMAL(15,2),
FOREIGN KEY (card_id) REFERENCES cards(id),
UNIQUE KEY unique_card_date (card_id, metric_date)
);
-- Analytics views
CREATE VIEW daily_market_summary AS
SELECT
transaction_date,
COUNT(*) as total_transactions,
AVG(price) as avg_price,
STDDEV(price) as price_volatility,
SUM(volume) as total_volume,
COUNT(DISTINCT card_id) as unique_cards_traded
FROM price_history
GROUP BY transaction_date
ORDER BY transaction_date;
CREATE VIEW top_performers_monthly AS
SELECT
DATE_FORMAT(ph.transaction_date, '%Y-%m') as month,
c.player_name,
c.name as card_name,
AVG(ph.price) as avg_price,
COUNT(*) as transaction_count,
STDDEV(ph.price) as price_volatility
FROM price_history ph
JOIN cards c ON ph.card_id = c.id
WHERE ph.transaction_date >= DATE_SUB(CURDATE(), INTERVAL 12 MONTH)
GROUP BY month, c.player_name, c.name
HAVING transaction_count >= 5
ORDER BY month DESC, avg_price DESC;
ETL Pipeline Implementation
import pymysql
from sqlalchemy import create_engine, text
import pandas as pd
from datetime import datetime, timedelta
class TradingCardETL:
def __init__(self, db_config: Dict, api_client):
self.api_client = api_client
self.engine = create_engine(
f"mysql+pymysql://{db_config['user']}:{db_config['password']}"
f"@{db_config['host']}:{db_config['port']}/{db_config['database']}"
)
def extract_and_load_cards(self) -> None:
"""Extract cards from API and load into warehouse"""
print("Extracting cards from API...")
cards_df = self.api_client.fetch_all_cards()
# Data cleaning and transformation
cards_df = cards_df.dropna(subset=["card_id", "name"])
cards_df["updated_at"] = datetime.now()
# Load to database with upsert logic
with self.engine.begin() as conn:
# Create temporary table
cards_df.to_sql("cards_temp", conn, if_exists="replace", index=False)
# Upsert into main table
upsert_query = text("""
INSERT INTO cards (id, name, year, number, player_name, team_name,
position, set_id, updated_at)
SELECT card_id, name, year, number, player_name, team_name,
position, set_id, updated_at
FROM cards_temp
ON DUPLICATE KEY UPDATE
name = VALUES(name),
year = VALUES(year),
number = VALUES(number),
player_name = VALUES(player_name),
team_name = VALUES(team_name),
position = VALUES(position),
set_id = VALUES(set_id),
updated_at = VALUES(updated_at)
""")
conn.execute(upsert_query)
conn.execute(text("DROP TABLE cards_temp"))
print(f"Loaded {len(cards_df)} cards into warehouse")
def extract_and_load_prices(self, days_back: int = 7) -> None:
"""Extract recent price data and load into warehouse"""
print(f"Extracting price data for last {days_back} days...")
start_date = datetime.now() - timedelta(days=days_back)
# Get all cards that need price updates
with self.engine.connect() as conn:
cards_query = text("""
SELECT id FROM cards
WHERE updated_at >= :start_date
ORDER BY updated_at DESC
LIMIT 1000
""")
card_ids = pd.read_sql(cards_query, conn, params={"start_date": start_date})
all_price_data = []
for card_id in card_ids["id"]:
try:
prices = self.api_client.fetch_historical_prices(card_id, days_back)
if not prices.empty:
prices["card_id"] = card_id
all_price_data.append(prices)
except Exception as e:
print(f"Failed to fetch prices for {card_id}: {e}")
continue
if all_price_data:
all_prices_df = pd.concat(all_price_data, ignore_index=True)
# Load price data
all_prices_df.to_sql("price_history", self.engine,
if_exists="append", index=False)
print(f"Loaded {len(all_prices_df)} price records")
def calculate_daily_metrics(self, target_date: str = None) -> None:
"""Calculate and store daily market metrics"""
if target_date is None:
target_date = datetime.now().strftime("%Y-%m-%d")
metrics_query = text("""
INSERT INTO market_metrics
(metric_date, card_id, avg_price, median_price, min_price, max_price,
transaction_count, total_volume, price_volatility)
SELECT
:target_date as metric_date,
card_id,
AVG(price) as avg_price,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price) as median_price,
MIN(price) as min_price,
MAX(price) as max_price,
COUNT(*) as transaction_count,
SUM(volume) as total_volume,
STDDEV(price) as price_volatility
FROM price_history
WHERE transaction_date = :target_date
GROUP BY card_id
HAVING COUNT(*) >= 3
ON DUPLICATE KEY UPDATE
avg_price = VALUES(avg_price),
median_price = VALUES(median_price),
min_price = VALUES(min_price),
max_price = VALUES(max_price),
transaction_count = VALUES(transaction_count),
total_volume = VALUES(total_volume),
price_volatility = VALUES(price_volatility)
""")
with self.engine.begin() as conn:
result = conn.execute(metrics_query, {"target_date": target_date})
print(f"Calculated metrics for {result.rowcount} cards on {target_date}")
# R statistical analysis functions
perform_market_regression <- function(engine) {
# Load data for regression analysis
query <- "
SELECT
c.player_name,
c.year,
c.position,
AVG(mm.avg_price) as avg_price,
AVG(mm.price_volatility) as volatility,
SUM(mm.transaction_count) as total_transactions,
COUNT(DISTINCT c.id) as card_count
FROM cards c
JOIN market_metrics mm ON c.id = mm.card_id
WHERE mm.metric_date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY)
AND c.player_name IS NOT NULL
GROUP BY c.player_name, c.year, c.position
HAVING total_transactions >= 10
"
market_data <- dbGetQuery(engine, query)
# Multiple regression model
model <- lm(avg_price ~ year + position + card_count + volatility,
data = market_data)
# Model diagnostics
par(mfrow = c(2, 2))
plot(model)
# Print model summary
summary(model)
# Predictions vs actual
market_data$predicted_price <- predict(model)
ggplot(market_data, aes(x = predicted_price, y = avg_price)) +
geom_point(alpha = 0.6) +
geom_smooth(method = "lm", color = "red") +
geom_abline(intercept = 0, slope = 1, linetype = "dashed") +
labs(
title = "Predicted vs Actual Card Prices",
x = "Predicted Price ($)",
y = "Actual Price ($)"
) +
theme_minimal()
return(model)
}
time_series_forecast <- function(engine, card_id, forecast_days = 30) {
# Load historical price data for specific card
query <- paste0("
SELECT transaction_date as date, AVG(price) as price
FROM price_history
WHERE card_id = '", card_id, "'
AND transaction_date >= DATE_SUB(CURDATE(), INTERVAL 365 DAY)
GROUP BY transaction_date
ORDER BY transaction_date
")
price_data <- dbGetQuery(engine, query)
price_data$date <- as.Date(price_data$date)
# Create time series
ts_object <- ts(price_data$price, frequency = 365)
# Fit ARIMA model
fit <- auto.arima(ts_object)
# Generate forecast
forecast_result <- forecast(fit, h = forecast_days)
# Create visualization
autoplot(forecast_result) +
ggtitle(paste("Price Forecast for Card", card_id)) +
xlab("Time") +
ylab("Price ($)") +
theme_minimal()
return(list(
model = fit,
forecast = forecast_result,
data = price_data
))
}
Step 5: Business Intelligence Integration
Tableau Integration
import tableauserverclient as TSC
from tableauserverclient.models import DatasourceItem, ProjectItem
class TableauIntegration:
def __init__(self, server_url: str, username: str, password: str, site_id: str = ""):
self.server_url = server_url
self.username = username
self.password = password
self.site_id = site_id
self.server = None
def connect(self) -> None:
"""Connect to Tableau Server"""
self.server = TSC.Server(self.server_url, use_server_version=True)
tableau_auth = TSC.TableauAuth(self.username, self.password, site_id=self.site_id)
self.server.auth.sign_in(tableau_auth)
def create_trading_card_datasource(self, db_config: Dict) -> str:
"""Create Tableau datasource for trading card data"""
# Create connection configuration
connection_config = f"""
<connection class='mysql' dbname='{db_config["database"]}'
server='{db_config["host"]}' port='{db_config["port"]}'
username='{db_config["user"]}'
password='{db_config["password"]}'>
<metadata-records>
<metadata-record class='table'>
<record-name>cards</record-name>
<table-name>cards</table-name>
</metadata-record>
<metadata-record class='table'>
<record-name>price_history</record-name>
<table-name>price_history</table-name>
</metadata-record>
<metadata-record class='table'>
<record-name>market_metrics</record-name>
<table-name>market_metrics</table-name>
</metadata-record>
</metadata-records>
</connection>
"""
# Create calculated fields for Tableau
calculated_fields = """
<calculation formula='AVG([Price])' name='Average Price'>
<table-calc ordering-type='None' />
</calculation>
<calculation formula='WINDOW_STDEV([Price])' name='Price Volatility'>
<table-calc ordering-type='None' />
</calculation>
<calculation formula='([Price] - LOOKUP([Price], -1)) / LOOKUP([Price], -1)'
name='Price Change %'>
<table-calc ordering-type='None' />
</calculation>
"""
return "trading_card_datasource_id"
def create_market_dashboard(self) -> str:
"""Create comprehensive market analysis dashboard"""
dashboard_config = {
"name": "Trading Card Market Analysis",
"worksheets": [
{
"name": "Price Trends",
"chart_type": "line",
"dimensions": ["Transaction Date"],
"measures": ["Average Price", "Price Volatility"],
"filters": ["Player Name", "Year", "Condition"]
},
{
"name": "Market Heatmap",
"chart_type": "heatmap",
"dimensions": ["Player Name", "Year"],
"measures": ["Average Price"],
"color": "Price Change %"
},
{
"name": "Volume Analysis",
"chart_type": "bar",
"dimensions": ["Month"],
"measures": ["Transaction Count", "Total Volume"]
},
{
"name": "Player Performance",
"chart_type": "scatter",
"dimensions": ["Player Name"],
"measures": ["Average Price", "Transaction Count"],
"size": "Price Volatility"
}
]
}
return "market_dashboard_id"
# Usage example
tableau = TableauIntegration(
server_url="https://your-tableau-server.com",
username="your_username",
password="your_password"
)
tableau.connect()
datasource_id = tableau.create_trading_card_datasource(db_config)
dashboard_id = tableau.create_market_dashboard()
Power BI Integration
import requests
import json
from msal import PublicClientApplication
class PowerBIIntegration:
def __init__(self, tenant_id: str, client_id: str, username: str, password: str):
self.tenant_id = tenant_id
self.client_id = client_id
self.username = username
self.password = password
self.access_token = None
def authenticate(self) -> None:
"""Authenticate with Power BI using MSAL"""
authority = f"https://login.microsoftonline.com/{self.tenant_id}"
scopes = ["https://analysis.windows.net/powerbi/api/.default"]
app = PublicClientApplication(
client_id=self.client_id,
authority=authority
)
result = app.acquire_token_by_username_password(
username=self.username,
password=self.password,
scopes=scopes
)
if "access_token" in result:
self.access_token = result["access_token"]
else:
raise Exception("Power BI authentication failed")
def create_dataset_schema(self) -> Dict:
"""Define Power BI dataset schema for trading card data"""
return {
"name": "TradingCardAnalytics",
"tables": [
{
"name": "Cards",
"columns": [
{"name": "card_id", "dataType": "string"},
{"name": "name", "dataType": "string"},
{"name": "year", "dataType": "int64"},
{"name": "player_name", "dataType": "string"},
{"name": "team_name", "dataType": "string"},
{"name": "position", "dataType": "string"},
{"name": "set_id", "dataType": "string"}
]
},
{
"name": "PriceHistory",
"columns": [
{"name": "id", "dataType": "int64"},
{"name": "card_id", "dataType": "string"},
{"name": "transaction_date", "dataType": "dateTime"},
{"name": "price", "dataType": "double"},
{"name": "condition_grade", "dataType": "string"},
{"name": "marketplace", "dataType": "string"},
{"name": "volume", "dataType": "int64"}
]
}
],
"relationships": [
{
"name": "CardToPrices",
"fromTable": "Cards",
"fromColumn": "card_id",
"toTable": "PriceHistory",
"toColumn": "card_id"
}
]
}
def push_data_to_powerbi(self, workspace_id: str, dataset_id: str,
table_name: str, data: pd.DataFrame) -> None:
"""Push data to Power BI dataset"""
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
# Convert DataFrame to Power BI format
rows = []
for _, row in data.iterrows():
row_data = {}
for col in data.columns:
value = row[col]
if pd.isna(value):
row_data[col] = None
elif isinstance(value, pd.Timestamp):
row_data[col] = value.isoformat()
else:
row_data[col] = value
rows.append(row_data)
# Push data in batches
batch_size = 1000
for i in range(0, len(rows), batch_size):
batch = rows[i:i + batch_size]
response = requests.post(
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/tables/{table_name}/rows",
headers=headers,
json={"rows": batch}
)
if response.status_code != 200:
print(f"Failed to push batch {i//batch_size + 1}: {response.text}")
Step 6: Real-Time Analytics Dashboard
Streaming Data Processing
import asyncio
import websockets
import json
from kafka import KafkaProducer, KafkaConsumer
import redis
class RealTimeAnalytics:
def __init__(self, redis_config: Dict, kafka_config: Dict):
self.redis_client = redis.Redis(**redis_config)
self.kafka_producer = KafkaProducer(
bootstrap_servers=kafka_config["servers"],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
async def stream_price_updates(self, websocket_url: str) -> None:
"""Stream real-time price updates from API"""
async with websockets.connect(websocket_url) as websocket:
await websocket.send(json.dumps({
"action": "subscribe",
"channel": "price_updates",
"filter": {"min_price": 10} # Only significant price changes
}))
async for message in websocket:
try:
update = json.loads(message)
await self.process_price_update(update)
except Exception as e:
print(f"Failed to process update: {e}")
async def process_price_update(self, update: Dict) -> None:
"""Process real-time price update"""
card_id = update["card_id"]
new_price = update["price"]
timestamp = update["timestamp"]
# Store in Redis for real-time access
price_key = f"price:{card_id}:current"
self.redis_client.hset(price_key, mapping={
"price": new_price,
"timestamp": timestamp,
"change_pct": update.get("change_percent", 0)
})
# Set expiration for automatic cleanup
self.redis_client.expire(price_key, 3600) # 1 hour
# Send to Kafka for batch processing
self.kafka_producer.send('price_updates', {
"card_id": card_id,
"price": new_price,
"timestamp": timestamp,
"source": "api_stream"
})
# Trigger alerts if significant price movement
await self.check_price_alerts(card_id, new_price)
def get_real_time_market_summary(self) -> Dict:
"""Get current market snapshot from Redis"""
price_keys = self.redis_client.keys("price:*:current")
if not price_keys:
return {"error": "No real-time data available"}
current_prices = []
for key in price_keys:
price_data = self.redis_client.hgetall(key)
card_id = key.decode().split(":")[1]
current_prices.append({
"card_id": card_id,
"price": float(price_data[b"price"]),
"timestamp": price_data[b"timestamp"].decode(),
"change_pct": float(price_data[b"change_pct"])
})
# Calculate market summary
prices = [p["price"] for p in current_prices]
changes = [p["change_pct"] for p in current_prices]
return {
"total_cards_tracked": len(current_prices),
"average_price": np.mean(prices),
"median_price": np.median(prices),
"price_std": np.std(prices),
"average_change_pct": np.mean(changes),
"gainers_count": len([c for c in changes if c > 0]),
"losers_count": len([c for c in changes if c < 0]),
"last_updated": datetime.now().isoformat()
}
async def check_price_alerts(self, card_id: str, new_price: float) -> None:
"""Check if price update triggers any alerts"""
# Get user alerts for this card
alert_key = f"alerts:{card_id}"
alerts = self.redis_client.lrange(alert_key, 0, -1)
for alert in alerts:
alert_data = json.loads(alert)
target_price = alert_data["target_price"]
alert_type = alert_data["type"] # "above" or "below"
user_id = alert_data["user_id"]
triggered = (
(alert_type == "above" and new_price >= target_price) or
(alert_type == "below" and new_price <= target_price)
)
if triggered:
await self.send_price_alert(user_id, card_id, new_price, alert_data)
Interactive Dashboard with Streamlit
import streamlit as st
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
class TradingCardDashboard:
def __init__(self, db_engine):
self.db_engine = db_engine
def create_dashboard(self):
"""Create interactive Streamlit dashboard"""
st.set_page_config(
page_title="Trading Card Market Analytics",
page_icon="🃏",
layout="wide"
)
st.title("🃏 Trading Card Market Analytics")
# Sidebar controls
st.sidebar.header("Filters")
# Date range selector
date_range = st.sidebar.date_input(
"Select Date Range",
value=(datetime.now() - timedelta(days=30), datetime.now()),
max_value=datetime.now()
)
# Player filter
players = self.get_top_players(50)
selected_players = st.sidebar.multiselect(
"Select Players",
options=players,
default=players[:5]
)
# Year filter
year_range = st.sidebar.slider(
"Year Range",
min_value=1980,
max_value=2024,
value=(2020, 2024)
)
# Main dashboard content
col1, col2, col3, col4 = st.columns(4)
# Key metrics
with col1:
total_cards = self.get_metric("total_cards", date_range, selected_players)
st.metric("Total Cards Tracked", f"{total_cards:,}")
with col2:
avg_price = self.get_metric("avg_price", date_range, selected_players)
st.metric("Average Price", f"${avg_price:.2f}")
with col3:
total_volume = self.get_metric("total_volume", date_range, selected_players)
st.metric("Trading Volume", f"{total_volume:,}")
with col4:
market_change = self.get_metric("market_change", date_range, selected_players)
st.metric("Market Change", f"{market_change:+.2f}%")
# Charts row 1
col1, col2 = st.columns(2)
with col1:
st.subheader("Price Trend Analysis")
price_chart = self.create_price_trend_chart(date_range, selected_players)
st.plotly_chart(price_chart, use_container_width=True)
with col2:
st.subheader("Volume Distribution")
volume_chart = self.create_volume_chart(date_range, selected_players)
st.plotly_chart(volume_chart, use_container_width=True)
# Charts row 2
col1, col2 = st.columns(2)
with col1:
st.subheader("Top Performers")
performers_chart = self.create_top_performers_chart(date_range)
st.plotly_chart(performers_chart, use_container_width=True)
with col2:
st.subheader("Market Correlation Matrix")
correlation_chart = self.create_correlation_heatmap(selected_players)
st.plotly_chart(correlation_chart, use_container_width=True)
# Detailed data table
st.subheader("Detailed Market Data")
market_data = self.get_detailed_market_data(date_range, selected_players)
st.dataframe(market_data, use_container_width=True)
# Export functionality
if st.button("Export Data to CSV"):
csv = market_data.to_csv(index=False)
st.download_button(
label="Download CSV",
data=csv,
file_name=f"trading_card_data_{datetime.now().strftime('%Y%m%d')}.csv",
mime="text/csv"
)
def create_price_trend_chart(self, date_range: tuple, players: List[str]) -> go.Figure:
"""Create interactive price trend chart"""
query = """
SELECT
ph.transaction_date,
c.player_name,
AVG(ph.price) as avg_price,
COUNT(*) as transaction_count
FROM price_history ph
JOIN cards c ON ph.card_id = c.id
WHERE ph.transaction_date BETWEEN %s AND %s
AND c.player_name IN %s
GROUP BY ph.transaction_date, c.player_name
ORDER BY ph.transaction_date
"""
df = pd.read_sql(query, self.db_engine, params=[
date_range[0], date_range[1], tuple(players)
])
fig = px.line(
df,
x="transaction_date",
y="avg_price",
color="player_name",
title="Average Price Trends by Player"
)
fig.update_layout(
xaxis_title="Date",
yaxis_title="Average Price ($)",
hovermode='x unified'
)
return fig
def create_volume_chart(self, date_range: tuple, players: List[str]) -> go.Figure:
"""Create trading volume analysis chart"""
query = """
SELECT
DATE_FORMAT(ph.transaction_date, '%Y-%m') as month,
SUM(ph.volume) as total_volume,
COUNT(DISTINCT ph.card_id) as unique_cards
FROM price_history ph
JOIN cards c ON ph.card_id = c.id
WHERE ph.transaction_date BETWEEN %s AND %s
AND c.player_name IN %s
GROUP BY month
ORDER BY month
"""
df = pd.read_sql(query, self.db_engine, params=[
date_range[0], date_range[1], tuple(players)
])
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(
go.Bar(x=df["month"], y=df["total_volume"], name="Volume"),
secondary_y=False,
)
fig.add_trace(
go.Scatter(x=df["month"], y=df["unique_cards"],
mode="lines+markers", name="Unique Cards"),
secondary_y=True,
)
fig.update_yaxes(title_text="Trading Volume", secondary_y=False)
fig.update_yaxes(title_text="Unique Cards Traded", secondary_y=True)
fig.update_xaxes(title_text="Month")
return fig
# Streamlit app runner
if __name__ == "__main__":
dashboard = TradingCardDashboard(db_engine)
dashboard.create_dashboard()
Step 7: Machine Learning and Predictive Analytics
Price Prediction Models
import sklearn
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
class TradingCardPricePrediction:
def __init__(self, db_engine):
self.db_engine = db_engine
self.models = {}
self.encoders = {}
self.scaler = StandardScaler()
def prepare_training_data(self) -> pd.DataFrame:
"""Prepare comprehensive dataset for ML training"""
query = """
SELECT
c.id as card_id,
c.name,
c.year,
c.player_name,
c.team_name,
c.position,
mm.avg_price,
mm.price_volatility,
mm.transaction_count,
mm.total_volume,
DATEDIFF(CURDATE(), mm.metric_date) as days_since,
-- Player aggregated features
(SELECT AVG(mm2.avg_price)
FROM market_metrics mm2
JOIN cards c2 ON mm2.card_id = c2.id
WHERE c2.player_name = c.player_name) as player_avg_price,
(SELECT COUNT(DISTINCT c2.id)
FROM cards c2
WHERE c2.player_name = c.player_name) as player_card_count,
-- Set features
(SELECT AVG(mm2.avg_price)
FROM market_metrics mm2
JOIN cards c2 ON mm2.card_id = c2.id
WHERE c2.set_id = c.set_id) as set_avg_price
FROM cards c
JOIN market_metrics mm ON c.id = mm.card_id
WHERE mm.metric_date >= DATE_SUB(CURDATE(), INTERVAL 365 DAY)
AND mm.avg_price IS NOT NULL
AND c.player_name IS NOT NULL
"""
df = pd.read_sql(query, self.db_engine)
# Feature engineering
df['card_age'] = 2024 - df['year']
df['is_rookie_card'] = (df['year'] <= df.groupby('player_name')['year'].transform('min')).astype(int)
df['price_to_player_avg'] = df['avg_price'] / df['player_avg_price']
df['price_to_set_avg'] = df['avg_price'] / df['set_avg_price']
# Encode categorical variables
for col in ['player_name', 'team_name', 'position']:
if col not in self.encoders:
self.encoders[col] = LabelEncoder()
df[f'{col}_encoded'] = self.encoders[col].fit_transform(df[col].fillna('Unknown'))
else:
df[f'{col}_encoded'] = self.encoders[col].transform(df[col].fillna('Unknown'))
return df
def train_price_prediction_model(self) -> Dict:
"""Train machine learning model for price prediction"""
df = self.prepare_training_data()
# Select features for modeling
feature_columns = [
'year', 'card_age', 'is_rookie_card', 'player_card_count',
'price_volatility', 'transaction_count', 'total_volume',
'price_to_player_avg', 'price_to_set_avg', 'days_since',
'player_name_encoded', 'team_name_encoded', 'position_encoded'
]
X = df[feature_columns].fillna(0)
y = df['avg_price']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# Train multiple models
models = {
'random_forest': RandomForestRegressor(n_estimators=100, random_state=42),
'gradient_boost': GradientBoostingRegressor(n_estimators=100, random_state=42)
}
results = {}
for name, model in models.items():
# Train model
model.fit(X_train_scaled, y_train)
# Make predictions
train_pred = model.predict(X_train_scaled)
test_pred = model.predict(X_test_scaled)
# Calculate metrics
results[name] = {
'model': model,
'train_mae': mean_absolute_error(y_train, train_pred),
'test_mae': mean_absolute_error(y_test, test_pred),
'train_rmse': np.sqrt(mean_squared_error(y_train, train_pred)),
'test_rmse': np.sqrt(mean_squared_error(y_test, test_pred)),
'r2_score': model.score(X_test_scaled, y_test)
}
# Feature importance
if hasattr(model, 'feature_importances_'):
feature_importance = pd.DataFrame({
'feature': feature_columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
results[name]['feature_importance'] = feature_importance
# Save best model
best_model_name = min(results.keys(), key=lambda x: results[x]['test_mae'])
self.models['price_prediction'] = results[best_model_name]['model']
return results
def predict_card_price(self, card_features: Dict) -> Dict:
"""Predict price for a given card"""
if 'price_prediction' not in self.models:
raise ValueError("Model not trained yet")
# Prepare features
feature_vector = self._prepare_prediction_features(card_features)
feature_vector_scaled = self.scaler.transform([feature_vector])
# Make prediction
predicted_price = self.models['price_prediction'].predict(feature_vector_scaled)[0]
# Calculate prediction confidence (using model-specific methods)
model = self.models['price_prediction']
if hasattr(model, 'estimators_'):
# For ensemble methods, get prediction variance
predictions = [tree.predict([feature_vector])[0] for tree in model.estimators_]
confidence_interval = np.percentile(predictions, [5, 95])
else:
confidence_interval = [predicted_price * 0.9, predicted_price * 1.1]
return {
'predicted_price': predicted_price,
'confidence_low': confidence_interval[0],
'confidence_high': confidence_interval[1],
'prediction_date': datetime.now().isoformat()
}
# Usage example
ml_analyzer = TradingCardPricePrediction(db_engine)
model_results = ml_analyzer.train_price_prediction_model()
print("Model Performance Summary:")
for model_name, metrics in model_results.items():
print(f"{model_name.title()}:")
print(f" Test MAE: ${metrics['test_mae']:.2f}")
print(f" Test RMSE: ${metrics['test_rmse']:.2f}")
print(f" R² Score: {metrics['r2_score']:.3f}")
Step 8: Market Sentiment Analysis
News and Social Media Integration
import tweepy
import yfinance as yf
from textblob import TextBlob
from transformers import pipeline
import feedparser
class MarketSentimentAnalysis:
def __init__(self, twitter_config: Dict):
self.twitter_api = tweepy.Client(
bearer_token=twitter_config["bearer_token"],
consumer_key=twitter_config["consumer_key"],
consumer_secret=twitter_config["consumer_secret"],
access_token=twitter_config["access_token"],
access_token_secret=twitter_config["access_token_secret"]
)
# Load sentiment analysis model
self.sentiment_pipeline = pipeline(
"sentiment-analysis",
model="cardiffnlp/twitter-roberta-base-sentiment-latest"
)
def collect_social_sentiment(self, player_name: str, days_back: int = 7) -> pd.DataFrame:
"""Collect and analyze social media sentiment for a player"""
# Search Twitter for mentions
query = f'"{player_name}" (trading card OR baseball card OR rookie card) -is:retweet'
tweets = tweepy.Paginator(
self.twitter_api.search_recent_tweets,
query=query,
max_results=100,
tweet_fields=['created_at', 'author_id', 'public_metrics']
).flatten(limit=1000)
tweet_data = []
for tweet in tweets:
# Analyze sentiment
sentiment = self.sentiment_pipeline(tweet.text)[0]
tweet_data.append({
'date': tweet.created_at.date(),
'text': tweet.text,
'sentiment_label': sentiment['label'],
'sentiment_score': sentiment['score'],
'retweet_count': tweet.public_metrics['retweet_count'],
'like_count': tweet.public_metrics['like_count'],
'player_name': player_name
})
sentiment_df = pd.DataFrame(tweet_data)
# Calculate daily sentiment scores
daily_sentiment = sentiment_df.groupby('date').agg({
'sentiment_score': 'mean',
'retweet_count': 'sum',
'like_count': 'sum',
'text': 'count'
}).rename(columns={'text': 'mention_count'})
return daily_sentiment
def analyze_news_impact(self, player_name: str) -> Dict:
"""Analyze news sentiment and correlation with price movements"""
# Collect news articles
news_feeds = [
'https://feeds.espn.com/rss/mlb/news',
'https://www.baseball-reference.com/rss/news.xml',
'https://sports.yahoo.com/mlb/rss.xml'
]
all_articles = []
for feed_url in news_feeds:
try:
feed = feedparser.parse(feed_url)
for entry in feed.entries:
if player_name.lower() in entry.title.lower() or \
player_name.lower() in entry.summary.lower():
# Analyze article sentiment
content = f"{entry.title} {entry.summary}"
blob = TextBlob(content)
all_articles.append({
'date': datetime(*entry.published_parsed[:6]).date(),
'title': entry.title,
'summary': entry.summary,
'sentiment_polarity': blob.sentiment.polarity,
'sentiment_subjectivity': blob.sentiment.subjectivity,
'source': feed_url
})
except Exception as e:
print(f"Failed to process feed {feed_url}: {e}")
news_df = pd.DataFrame(all_articles)
if news_df.empty:
return {"error": "No news articles found"}
# Aggregate daily news sentiment
daily_news = news_df.groupby('date').agg({
'sentiment_polarity': 'mean',
'sentiment_subjectivity': 'mean',
'title': 'count'
}).rename(columns={'title': 'article_count'})
return {
'daily_sentiment': daily_news,
'total_articles': len(news_df),
'avg_sentiment': news_df['sentiment_polarity'].mean(),
'sentiment_volatility': news_df['sentiment_polarity'].std()
}
Step 9: Portfolio Optimization Analysis
Modern Portfolio Theory Application
import numpy as np
import scipy.optimize as sco
from scipy import stats
class TradingCardPortfolioOptimizer:
def __init__(self, db_engine):
self.db_engine = db_engine
def calculate_return_metrics(self, card_ids: List[str], period_days: int = 365) -> pd.DataFrame:
"""Calculate return and risk metrics for cards"""
returns_data = {}
for card_id in card_ids:
query = """
SELECT transaction_date, AVG(price) as daily_price
FROM price_history
WHERE card_id = %s
AND transaction_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY transaction_date
ORDER BY transaction_date
"""
prices_df = pd.read_sql(query, self.db_engine, params=[card_id, period_days])
if len(prices_df) < 30: # Minimum data requirement
continue
# Calculate daily returns
prices_df['returns'] = prices_df['daily_price'].pct_change()
returns_data[card_id] = prices_df.set_index('transaction_date')['returns']
# Create returns matrix
returns_matrix = pd.DataFrame(returns_data).dropna()
# Calculate metrics
metrics = {}
for card_id in returns_matrix.columns:
returns = returns_matrix[card_id].dropna()
metrics[card_id] = {
'annual_return': returns.mean() * 365,
'annual_volatility': returns.std() * np.sqrt(365),
'sharpe_ratio': (returns.mean() * 365) / (returns.std() * np.sqrt(365)),
'max_drawdown': self._calculate_max_drawdown(returns),
'var_95': np.percentile(returns, 5),
'skewness': stats.skew(returns),
'kurtosis': stats.kurtosis(returns)
}
return pd.DataFrame(metrics).T
def _calculate_max_drawdown(self, returns: pd.Series) -> float:
"""Calculate maximum drawdown"""
cumulative = (1 + returns).cumprod()
rolling_max = cumulative.expanding().max()
drawdown = (cumulative - rolling_max) / rolling_max
return drawdown.min()
def optimize_portfolio(self, card_ids: List[str], target_return: float = None) -> Dict:
"""Optimize portfolio using Modern Portfolio Theory"""
# Get returns data
query = """
SELECT
ph.card_id,
ph.transaction_date,
AVG(ph.price) as price
FROM price_history ph
WHERE ph.card_id IN ({})
AND ph.transaction_date >= DATE_SUB(CURDATE(), INTERVAL 365 DAY)
GROUP BY ph.card_id, ph.transaction_date
ORDER BY ph.transaction_date
""".format(','.join(['%s'] * len(card_ids)))
prices_df = pd.read_sql(query, self.db_engine, params=card_ids)
# Pivot to get price matrix
price_matrix = prices_df.pivot(index='transaction_date',
columns='card_id',
values='price').fillna(method='ffill')
# Calculate returns
returns_matrix = price_matrix.pct_change().dropna()
# Expected returns and covariance matrix
expected_returns = returns_matrix.mean() * 365
cov_matrix = returns_matrix.cov() * 365
num_assets = len(card_ids)
# Optimization constraints
constraints = [
{'type': 'eq', 'fun': lambda x: np.sum(x) - 1} # Weights sum to 1
]
if target_return:
constraints.append({
'type': 'eq',
'fun': lambda x: np.sum(x * expected_returns) - target_return
})
bounds = tuple((0, 1) for _ in range(num_assets))
# Objective function: minimize portfolio variance
def objective(weights):
return np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
# Optimize
initial_weights = np.array([1/num_assets] * num_assets)
result = sco.minimize(
objective,
initial_weights,
method='SLSQP',
bounds=bounds,
constraints=constraints
)
optimal_weights = result.x
# Calculate portfolio metrics
portfolio_return = np.sum(optimal_weights * expected_returns)
portfolio_volatility = objective(optimal_weights)
sharpe_ratio = portfolio_return / portfolio_volatility
# Create portfolio allocation DataFrame
allocation_df = pd.DataFrame({
'card_id': card_ids,
'weight': optimal_weights,
'expected_return': expected_returns[card_ids].values,
'individual_risk': np.sqrt(np.diag(cov_matrix))
}).sort_values('weight', ascending=False)
return {
'optimal_weights': optimal_weights,
'portfolio_return': portfolio_return,
'portfolio_volatility': portfolio_volatility,
'sharpe_ratio': sharpe_ratio,
'allocation': allocation_df,
'optimization_success': result.success
}
# Example usage
optimizer = TradingCardPortfolioOptimizer(db_engine)
card_metrics = optimizer.calculate_return_metrics(['card_1', 'card_2', 'card_3'])
portfolio_result = optimizer.optimize_portfolio(['card_1', 'card_2', 'card_3'])
print("Optimal Portfolio Allocation:")
print(portfolio_result['allocation'])
print(f"Expected Return: {portfolio_result['portfolio_return']:.2%}")
print(f"Portfolio Risk: {portfolio_result['portfolio_volatility']:.2%}")
print(f"Sharpe Ratio: {portfolio_result['sharpe_ratio']:.3f}")
Step 10: Advanced Visualization and Reporting
Interactive Market Dashboard
import dash
from dash import dcc, html, Input, Output, callback
import plotly.graph_objects as go
from datetime import datetime, timedelta
class AdvancedTradingCardDashboard:
def __init__(self, db_engine):
self.db_engine = db_engine
self.app = dash.Dash(__name__)
self.setup_layout()
self.setup_callbacks()
def setup_layout(self):
"""Set up the dashboard layout"""
self.app.layout = html.Div([
html.H1("Trading Card Market Analytics",
style={'textAlign': 'center', 'marginBottom': 30}),
# Control panel
html.Div([
html.Div([
html.Label("Select Players:"),
dcc.Dropdown(
id='player-dropdown',
options=self.get_player_options(),
value=['Mike Trout', 'Ronald Acuna Jr.'],
multi=True
)
], style={'width': '30%', 'display': 'inline-block'}),
html.Div([
html.Label("Date Range:"),
dcc.DatePickerRange(
id='date-range-picker',
start_date=datetime.now() - timedelta(days=90),
end_date=datetime.now(),
display_format='YYYY-MM-DD'
)
], style={'width': '30%', 'display': 'inline-block', 'marginLeft': '5%'}),
html.Div([
html.Label("Analysis Type:"),
dcc.RadioItems(
id='analysis-type',
options=[
{'label': 'Price Trends', 'value': 'price'},
{'label': 'Volume Analysis', 'value': 'volume'},
{'label': 'Volatility Study', 'value': 'volatility'}
],
value='price',
inline=True
)
], style={'width': '30%', 'display': 'inline-block', 'marginLeft': '5%'})
], style={'marginBottom': 30}),
# Main charts
html.Div([
html.Div([
dcc.Graph(id='main-chart')
], style={'width': '60%', 'display': 'inline-block'}),
html.Div([
dcc.Graph(id='correlation-heatmap')
], style={'width': '40%', 'display': 'inline-block'})
]),
# Secondary charts row
html.Div([
html.Div([
dcc.Graph(id='distribution-chart')
], style={'width': '33%', 'display': 'inline-block'}),
html.Div([
dcc.Graph(id='performance-scatter')
], style={'width': '33%', 'display': 'inline-block'}),
html.Div([
dcc.Graph(id='risk-return-chart')
], style={'width': '34%', 'display': 'inline-block'})
]),
# Data table
html.Div([
html.H3("Market Data Export"),
html.Button("Download CSV", id="download-button"),
dcc.Download(id="download-dataframe-csv"),
], style={'marginTop': 30})
])
def setup_callbacks(self):
"""Set up interactive callbacks"""
@self.app.callback(
[Output('main-chart', 'figure'),
Output('correlation-heatmap', 'figure'),
Output('distribution-chart', 'figure'),
Output('performance-scatter', 'figure'),
Output('risk-return-chart', 'figure')],
[Input('player-dropdown', 'value'),
Input('date-range-picker', 'start_date'),
Input('date-range-picker', 'end_date'),
Input('analysis-type', 'value')]
)
def update_charts(selected_players, start_date, end_date, analysis_type):
# Load data based on selections
market_data = self.load_market_data(selected_players, start_date, end_date)
if analysis_type == 'price':
main_chart = self.create_price_trend_chart(market_data)
elif analysis_type == 'volume':
main_chart = self.create_volume_analysis_chart(market_data)
else: # volatility
main_chart = self.create_volatility_chart(market_data)
correlation_chart = self.create_correlation_heatmap(market_data)
distribution_chart = self.create_price_distribution_chart(market_data)
performance_chart = self.create_performance_scatter(market_data)
risk_return_chart = self.create_risk_return_chart(market_data)
return (main_chart, correlation_chart, distribution_chart,
performance_chart, risk_return_chart)
def create_price_trend_chart(self, data: pd.DataFrame) -> go.Figure:
"""Create advanced price trend visualization"""
fig = make_subplots(
rows=2, cols=1,
shared_xaxes=True,
vertical_spacing=0.1,
subplot_titles=('Price Trends', 'Trading Volume'),
row_heights=[0.7, 0.3]
)
# Price trends
for player in data['player_name'].unique():
player_data = data[data['player_name'] == player]
fig.add_trace(
go.Scatter(
x=player_data['transaction_date'],
y=player_data['avg_price'],
mode='lines+markers',
name=f'{player} Price',
line=dict(width=2)
),
row=1, col=1
)
# Volume bars
volume_data = data.groupby('transaction_date')['total_volume'].sum().reset_index()
fig.add_trace(
go.Bar(
x=volume_data['transaction_date'],
y=volume_data['total_volume'],
name='Daily Volume',
marker_color='lightblue',
opacity=0.7
),
row=2, col=1
)
fig.update_layout(
title="Trading Card Market Analysis",
height=600,
showlegend=True
)
return fig
def run_server(self, debug: bool = False, port: int = 8050):
"""Run the dashboard server"""
self.app.run_server(debug=debug, port=port)
# Example usage
dashboard = AdvancedTradingCardDashboard(db_engine)
dashboard.run_server(debug=True)
Automated Reporting
from jinja2 import Template
import pdfkit
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
class AutomatedReporting:
def __init__(self, db_engine, smtp_config: Dict):
self.db_engine = db_engine
self.smtp_config = smtp_config
def generate_weekly_report(self, recipients: List[str]) -> None:
"""Generate and send weekly market report"""
# Collect report data
report_data = self.collect_weekly_data()
# Generate visualizations
charts = self.create_report_charts(report_data)
# Create HTML report
html_report = self.create_html_report(report_data, charts)
# Convert to PDF
pdf_report = pdfkit.from_string(html_report, False)
# Send email report
self.send_email_report(recipients, html_report, pdf_report)
def collect_weekly_data(self) -> Dict:
"""Collect data for weekly report"""
queries = {
'market_summary': """
SELECT
COUNT(DISTINCT card_id) as cards_traded,
AVG(avg_price) as avg_market_price,
SUM(transaction_count) as total_transactions,
AVG(price_volatility) as avg_volatility
FROM market_metrics
WHERE metric_date >= DATE_SUB(CURDATE(), INTERVAL 7 DAY)
""",
'top_gainers': """
SELECT
c.name,
c.player_name,
mm_current.avg_price as current_price,
mm_week_ago.avg_price as week_ago_price,
((mm_current.avg_price - mm_week_ago.avg_price) / mm_week_ago.avg_price * 100) as change_pct
FROM cards c
JOIN market_metrics mm_current ON c.id = mm_current.card_id
JOIN market_metrics mm_week_ago ON c.id = mm_week_ago.card_id
WHERE mm_current.metric_date = CURDATE()
AND mm_week_ago.metric_date = DATE_SUB(CURDATE(), INTERVAL 7 DAY)
ORDER BY change_pct DESC
LIMIT 10
""",
'volume_leaders': """
SELECT
c.name,
c.player_name,
SUM(mm.transaction_count) as weekly_volume,
AVG(mm.avg_price) as avg_price
FROM cards c
JOIN market_metrics mm ON c.id = mm.card_id
WHERE mm.metric_date >= DATE_SUB(CURDATE(), INTERVAL 7 DAY)
GROUP BY c.id, c.name, c.player_name
ORDER BY weekly_volume DESC
LIMIT 10
"""
}
report_data = {}
for key, query in queries.items():
report_data[key] = pd.read_sql(query, self.db_engine)
return report_data
def create_html_report(self, data: Dict, charts: Dict) -> str:
"""Create HTML report from template"""
template_str = """
<!DOCTYPE html>
<html>
<head>
<title>Weekly Trading Card Market Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background-color: #f0f0f0; padding: 20px; text-align: center; }
.section { margin: 20px 0; }
.chart { text-align: center; margin: 20px 0; }
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.metric { display: inline-block; margin: 10px; padding: 15px;
background-color: #e7f3ff; border-radius: 5px; }
</style>
</head>
<body>
<div class="header">
<h1>Weekly Trading Card Market Report</h1>
<p>Report Generated: {{ report_date }}</p>
</div>
<div class="section">
<h2>Market Summary</h2>
<div class="metric">
<strong>Cards Traded:</strong> {{ market_summary.cards_traded }}
</div>
<div class="metric">
<strong>Avg Price:</strong> ${{ "%.2f"|format(market_summary.avg_market_price) }}
</div>
<div class="metric">
<strong>Total Transactions:</strong> {{ market_summary.total_transactions }}
</div>
<div class="metric">
<strong>Market Volatility:</strong> {{ "%.2f"|format(market_summary.avg_volatility) }}%
</div>
</div>
<div class="section">
<h2>Top Gainers This Week</h2>
<table>
<thead>
<tr>
<th>Card Name</th>
<th>Player</th>
<th>Current Price</th>
<th>Change %</th>
</tr>
</thead>
<tbody>
{% for gainer in top_gainers %}
<tr>
<td>{{ gainer.name }}</td>
<td>{{ gainer.player_name }}</td>
<td>${{ "%.2f"|format(gainer.current_price) }}</td>
<td style="color: green;">+{{ "%.1f"|format(gainer.change_pct) }}%</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
<div class="section chart">
<h2>Price Trend Chart</h2>
{{ charts.price_trends|safe }}
</div>
</body>
</html>
"""
template = Template(template_str)
return template.render(
report_date=datetime.now().strftime("%Y-%m-%d %H:%M"),
market_summary=data['market_summary'].iloc[0],
top_gainers=data['top_gainers'].to_dict('records'),
volume_leaders=data['volume_leaders'].to_dict('records'),
charts=charts
)
# Automated scheduling
import schedule
import time
def schedule_reports():
"""Schedule automated report generation"""
reporter = AutomatedReporting(db_engine, smtp_config)
# Weekly report every Monday at 9 AM
schedule.every().monday.at("09:00").do(
reporter.generate_weekly_report,
recipients=["[email protected]", "[email protected]"]
)
# Daily summary every day at 6 PM
schedule.every().day.at("18:00").do(
reporter.generate_daily_summary,
recipients=["[email protected]"]
)
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
# Start automated reporting
if __name__ == "__main__":
schedule_reports()
Data Export and Integration
Multi-format Export Utility
class DataExportUtility:
def __init__(self, db_engine):
self.db_engine = db_engine
def export_market_data(self, format_type: str, date_range: tuple,
output_path: str = None) -> str:
"""Export market data in various formats"""
query = """
SELECT
c.id as card_id,
c.name as card_name,
c.player_name,
c.year,
mm.metric_date,
mm.avg_price,
mm.median_price,
mm.price_volatility,
mm.transaction_count,
mm.total_volume
FROM cards c
JOIN market_metrics mm ON c.id = mm.card_id
WHERE mm.metric_date BETWEEN %s AND %s
ORDER BY mm.metric_date, c.player_name
"""
df = pd.read_sql(query, self.db_engine, params=date_range)
if output_path is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"trading_card_export_{timestamp}"
if format_type.lower() == 'csv':
file_path = f"{output_path}.csv"
df.to_csv(file_path, index=False)
elif format_type.lower() == 'parquet':
file_path = f"{output_path}.parquet"
df.to_parquet(file_path, index=False, compression='snappy')
elif format_type.lower() == 'json':
file_path = f"{output_path}.json"
df.to_json(file_path, orient='records', date_format='iso')
elif format_type.lower() == 'excel':
file_path = f"{output_path}.xlsx"
with pd.ExcelWriter(file_path, engine='xlsxwriter') as writer:
df.to_excel(writer, sheet_name='Market_Data', index=False)
# Add summary statistics sheet
summary_stats = df.groupby('player_name').agg({
'avg_price': ['mean', 'std', 'min', 'max'],
'transaction_count': 'sum',
'total_volume': 'sum'
}).round(2)
summary_stats.to_excel(writer, sheet_name='Summary_Stats')
else:
raise ValueError(f"Unsupported format: {format_type}")
print(f"Data exported to {file_path}")
return file_path
# API integration for external tools
class AnalyticsAPI:
def __init__(self, db_engine):
self.db_engine = db_engine
def get_card_analytics_endpoint(self, card_id: str) -> Dict:
"""REST endpoint for card analytics data"""
query = """
SELECT
mm.metric_date,
mm.avg_price,
mm.price_volatility,
mm.transaction_count,
LAG(mm.avg_price) OVER (ORDER BY mm.metric_date) as prev_price
FROM market_metrics mm
WHERE mm.card_id = %s
AND mm.metric_date >= DATE_SUB(CURDATE(), INTERVAL 90 DAY)
ORDER BY mm.metric_date
"""
df = pd.read_sql(query, self.db_engine, params=[card_id])
if df.empty:
return {"error": "No data found for card"}
# Calculate analytics
df['price_change'] = ((df['avg_price'] - df['prev_price']) / df['prev_price'] * 100).fillna(0)
analytics = {
'card_id': card_id,
'current_price': float(df.iloc[-1]['avg_price']),
'price_trend': df['price_change'].tolist(),
'volatility_trend': df['price_volatility'].tolist(),
'volume_trend': df['transaction_count'].tolist(),
'period_return': float((df.iloc[-1]['avg_price'] - df.iloc[0]['avg_price']) / df.iloc[0]['avg_price'] * 100),
'avg_daily_volume': int(df['transaction_count'].mean()),
'risk_score': float(df['price_volatility'].mean()),
'data_points': len(df),
'last_updated': df.iloc[-1]['metric_date'].isoformat()
}
return analytics
# Usage examples
exporter = DataExportUtility(db_engine)
api = AnalyticsAPI(db_engine)
# Export data in different formats
csv_file = exporter.export_market_data('csv', ('2024-01-01', '2024-08-30'))
parquet_file = exporter.export_market_data('parquet', ('2024-01-01', '2024-08-30'))
# Get analytics for specific card
card_analytics = api.get_card_analytics_endpoint('card_123')
print(json.dumps(card_analytics, indent=2))
Best Practices Summary
Data Quality and Governance
- Implement data validation and cleaning pipelines
- Monitor data freshness and completeness
- Establish data lineage tracking
- Create data quality metrics and alerts
- Document data sources and transformations
Performance Optimization
- Use appropriate indexing for time-series queries
- Implement data partitioning for large datasets
- Cache frequently accessed analytics
- Optimize API calls with bulk requests
- Use columnar storage for analytical workloads
Security and Compliance
- Encrypt sensitive data in transit and at rest
- Implement proper access controls
- Audit data access and modifications
- Follow data retention policies
- Ensure compliance with applicable regulations
Scalability Considerations
- Design for horizontal scaling
- Use distributed computing for large datasets
- Implement proper error handling and retry logic
- Monitor system performance and resource usage
- Plan for data archival and cleanup strategies
This comprehensive guide provides the foundation for building sophisticated data analysis applications that unlock valuable insights from trading card market data using modern analytics tools and techniques.