Wrong Storage Level Choices¶
Performance Impact
Cache eviction, memory pressure - Using inappropriate storage levels causes cache thrashing.
The Problem¶
Using the default MEMORY_ONLY storage level when data doesn't fit in memory leads to constant cache eviction and poor performance. Different use cases need different storage strategies.
❌ Problematic Code
# BAD: Default MEMORY_ONLY when data doesn't fit
large_df.cache() # Uses MEMORY_ONLY, causes eviction cascades
Storage Level Comparison¶
Storage Level | Memory | Disk | Serialized | Replicated | Best For |
---|---|---|---|---|---|
MEMORY_ONLY | ✅ | ❌ | ❌ | ❌ | Small datasets, fast access |
MEMORY_AND_DISK | ✅ | ✅ | ❌ | ❌ | Medium datasets |
MEMORY_ONLY_SER | ✅ | ❌ | ✅ | ❌ | Memory-constrained |
MEMORY_AND_DISK_SER | ✅ | ✅ | ✅ | ❌ | Large datasets |
DISK_ONLY | ❌ | ✅ | ✅ | ❌ | Rarely accessed |
Solutions¶
✅ Match Storage to Use Case
from pyspark import StorageLevel
# For large datasets that might not fit in memory
large_df.persist(StorageLevel.MEMORY_AND_DISK_SER)
# For critical data that needs high availability
critical_df.persist(StorageLevel.MEMORY_ONLY_2) # Replicated
# For infrequently accessed but expensive to compute
archive_df.persist(StorageLevel.DISK_ONLY)
# For iterative algorithms with memory constraints
ml_features.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
✅ Auto-Select Storage Level
def select_optimal_storage_level(df, access_pattern="frequent", memory_available_gb=8):
"""Select optimal storage level based on usage pattern"""
# Estimate DataFrame size
sample_count = df.sample(0.01).count()
if sample_count == 0:
return StorageLevel.MEMORY_ONLY
total_count = df.count()
estimated_size_gb = (total_count / sample_count) * 0.1 # Rough estimate
# Storage level decision matrix
if access_pattern == "frequent":
if estimated_size_gb < memory_available_gb * 0.3:
return StorageLevel.MEMORY_ONLY
elif estimated_size_gb < memory_available_gb * 0.6:
return StorageLevel.MEMORY_ONLY_SER
else:
return StorageLevel.MEMORY_AND_DISK_SER
elif access_pattern == "occasional":
if estimated_size_gb < memory_available_gb * 0.2:
return StorageLevel.MEMORY_ONLY
else:
return StorageLevel.MEMORY_AND_DISK_SER
elif access_pattern == "rare":
return StorageLevel.DISK_ONLY
elif access_pattern == "critical":
if estimated_size_gb < memory_available_gb * 0.4:
return StorageLevel.MEMORY_ONLY_2 # Replicated
else:
return StorageLevel.MEMORY_AND_DISK_SER_2
return StorageLevel.MEMORY_AND_DISK_SER # Safe default
# Usage
storage_level = select_optimal_storage_level(
df=expensive_computation,
access_pattern="frequent",
memory_available_gb=16
)
expensive_computation.persist(storage_level)
print(f"Using storage level: {storage_level}")
✅ Test Storage Performance
def compare_storage_performance(df, test_operations=3):
"""Compare performance across different storage levels"""
import time
storage_levels = {
"MEMORY_ONLY": StorageLevel.MEMORY_ONLY,
"MEMORY_ONLY_SER": StorageLevel.MEMORY_ONLY_SER,
"MEMORY_AND_DISK": StorageLevel.MEMORY_AND_DISK,
"MEMORY_AND_DISK_SER": StorageLevel.MEMORY_AND_DISK_SER
}
results = {}
test_df = df.limit(10000) # Use smaller dataset for testing
for name, level in storage_levels.items():
print(f"Testing {name}...")
# Cache with this storage level
test_df.persist(level)
# Trigger cache population
start_time = time.time()
cache_count = test_df.count()
cache_time = time.time() - start_time
# Test access performance
access_times = []
for i in range(test_operations):
start_time = time.time()
test_df.filter(col("amount") > 100).count()
access_times.append(time.time() - start_time)
avg_access_time = sum(access_times) / len(access_times)
results[name] = {
"cache_time": cache_time,
"avg_access_time": avg_access_time,
"total_time": cache_time + avg_access_time * test_operations
}
# Clean up
test_df.unpersist()
print(f" Cache time: {cache_time:.2f}s")
print(f" Avg access: {avg_access_time:.2f}s")
# Report best option
best_option = min(results.keys(), key=lambda x: results[x]["total_time"])
print(f"\n✅ Best performing: {best_option}")
return results
# Usage
performance_results = compare_storage_performance(df)
✅ Adaptive Storage Management
class AdaptiveStorageManager:
"""Dynamically adjust storage levels based on memory pressure"""
def __init__(self, spark_session):
self.spark = spark_session
self.cached_dataframes = {}
def cache_with_monitoring(self, df, name, initial_level=StorageLevel.MEMORY_AND_DISK_SER):
"""Cache DataFrame with monitoring and potential adjustment"""
df.persist(initial_level)
df.count() # Trigger cache
self.cached_dataframes[name] = {
'dataframe': df,
'storage_level': initial_level,
'access_count': 0
}
# Check memory pressure after caching
if self._is_memory_pressure():
print(f"Memory pressure detected, adjusting {name}")
self._adjust_storage_levels()
def access_dataframe(self, name):
"""Access cached DataFrame and track usage"""
if name in self.cached_dataframes:
self.cached_dataframes[name]['access_count'] += 1
return self.cached_dataframes[name]['dataframe']
return None
def _is_memory_pressure(self):
"""Check if cluster is under memory pressure"""
executors = self.spark.sparkContext.statusTracker().getExecutorInfos()
for executor in executors:
memory_usage = executor.memoryUsed / executor.maxMemory
if memory_usage > 0.8: # 80% threshold
return True
return False
def _adjust_storage_levels(self):
"""Adjust storage levels based on access patterns"""
for name, info in self.cached_dataframes.items():
df = info['dataframe']
access_count = info['access_count']
current_level = info['storage_level']
# Frequently accessed: keep in memory
if access_count > 5:
continue
# Rarely accessed: move to disk
elif access_count < 2:
df.unpersist()
df.persist(StorageLevel.DISK_ONLY)
info['storage_level'] = StorageLevel.DISK_ONLY
print(f"Moved {name} to DISK_ONLY")
def cleanup_all(self):
"""Clean up all cached DataFrames"""
for name, info in self.cached_dataframes.items():
info['dataframe'].unpersist()
self.cached_dataframes.clear()
# Usage
storage_manager = AdaptiveStorageManager(spark)
storage_manager.cache_with_monitoring(expensive_df, "aggregated_data")
# Access DataFrame
result = storage_manager.access_dataframe("aggregated_data")
Key Takeaways¶
Storage Level Guidelines
- Data size vs available memory determines primary choice
- Access frequency - frequent access favors memory
- Fault tolerance needs - critical data should be replicated
- Cost sensitivity - disk is cheaper than memory
Measuring Impact¶
Storage Optimization Results
# MEMORY_ONLY (large dataset): Constant eviction, 5x slower
# MEMORY_AND_DISK_SER: Stable performance, 2x compression
# Improvement: Consistent performance, better resource utilization
The right storage level prevents cache thrashing and ensures predictable performance. Choose based on data characteristics and access patterns, not defaults.