Performance Optimization
Performance Optimization
Overview
INTUE's performance optimization framework enables maximum computational efficiency, reduced latency, and optimal resource utilization. These optimization techniques ensure high-throughput signal processing and fast trade execution across distributed systems.
Latency Reduction Techniques
Signal Propagation Optimization
// Optimize signal propagation across microservices
function optimizeSignalPath(signal, destination) {
// Map signal chain services
const signalChain = buildSignalServiceChain(signal, destination);
// Identify critical path
const criticalPath = analyzeCriticalPath(signalChain);
// Optimize signal transmission
const optimizedChain = criticalPath.map(serviceNode => {
if (serviceNode.isHighLatency) {
// Apply specialized optimizations for high-latency nodes
return optimizeHighLatencyNode(serviceNode);
}
return serviceNode;
});
// Create optimized signal envelope
return {
payload: signal,
routingMetadata: {
critical: true,
priorityLevel: determinePriorityLevel(signal),
preferredRoute: optimizedChain.map(node => node.id),
latencyBudget: calculateLatencyBudget(signal)
},
compression: shouldCompressSignal(signal) ? 'enabled' : 'disabled',
batching: shouldBatchSignal(signal) ? 'enabled' : 'disabled'
};
}
// Determine if signal should use compression
function shouldCompressSignal(signal) {
// Use compression for large payloads or non-critical paths
return signal.data.size > 10000 || !signal.metadata.timeCritical;
}
// Determine if signal should be batched
function shouldBatchSignal(signal) {
// Only batch non-urgent signals
return !signal.metadata.urgent && signal.batchCompatible;
}
// Calculate latency budget for signal
function calculateLatencyBudget(signal) {
if (signal.metadata.urgent) {
return 50; // 50ms for urgent signals
} else if (signal.metadata.timeCritical) {
return 200; // 200ms for time-critical signals
} else {
return 1000; // 1000ms for regular signals
}
}
Execution Path Optimization
// Optimize trade execution path
async function optimizeExecutionPath(trade, exchangeAdapter) {
// Capture execution start time
const startTime = performance.now();
// Generate execution plan
const executionPlan = await planExecution(trade, exchangeAdapter);
// Parallelize pre-execution validations
const [
balanceCheck,
marketConditionCheck,
riskCheck
] = await Promise.all([
checkSufficientBalance(trade),
checkMarketConditions(trade),
performRiskValidation(trade)
]);
// Validate all pre-execution checks
const validationResult = validatePreExecutionChecks([
balanceCheck,
marketConditionCheck,
riskCheck
]);
if (!validationResult.valid) {
return {
success: false,
reason: validationResult.reason,
latency: performance.now() - startTime
};
}
// Select optimal execution strategy based on order type and market conditions
const executionStrategy = selectExecutionStrategy(
trade,
executionPlan.marketConditions
);
// Execute trade with optimized settings
const result = await executeWithStrategy(
trade,
exchangeAdapter,
executionStrategy
);
// Calculate total execution latency
result.latency = performance.now() - startTime;
// Log latency metrics
logLatencyMetrics('trade_execution', result.latency, {
asset: trade.asset,
orderType: trade.type,
strategy: executionStrategy.name
});
return result;
}
// Select optimal execution strategy
function selectExecutionStrategy(trade, marketConditions) {
const strategies = {
// Standard market order
standard: {
name: 'standard',
maxSlippage: 0.001, // 0.1%
urgency: 'normal'
},
// Aggressive market order for urgent execution
aggressive: {
name: 'aggressive',
maxSlippage: 0.003, // 0.3%
urgency: 'high'
},
// Smart order routing for optimal execution
smart: {
name: 'smart',
maxSlippage: 0.001, // 0.1%
urgency: 'normal',
dynamicRouting: true
},
// TWAP for large orders
twap: {
name: 'twap',
maxSlippage: 0.001, // 0.1%
urgency: 'low',
timeWindow: 10 * 60 * 1000, // 10 minutes
intervals: 5
}
};
// Select strategy based on order size and market conditions
if (isLargeOrder(trade)) {
return strategies.twap;
} else if (marketConditions.volatility > 0.05) {
return strategies.aggressive;
} else if (marketConditions.liquidity > 0.8) {
return strategies.standard;
} else {
return strategies.smart;
}
}
Computing Resource Management
Adaptive Resource Allocation
// Adaptive resource allocation for compute-intensive tasks
class AdaptiveResourceManager {
constructor(config) {
this.maxWorkers = config.maxWorkers || 8;
this.minWorkers = config.minWorkers || 1;
this.currentWorkers = this.minWorkers;
this.taskQueue = [];
this.workers = [];
this.metrics = {
taskLatencies: [],
resourceUtilization: [],
queueLength: []
};
// Initialize worker pool
this.initializeWorkers(this.currentWorkers);
// Start monitoring
this.startMonitoring();
}
// Initialize worker pool
initializeWorkers(count) {
// Ensure we don't exceed max workers
const targetCount = Math.min(count, this.maxWorkers);
// Add new workers if needed
while (this.workers.length < targetCount) {
const worker = new ComputeWorker();
this.workers.push(worker);
}
// Remove workers if we have too many
while (this.workers.length > targetCount) {
const worker = this.workers.pop();
worker.terminate();
}
this.currentWorkers = this.workers.length;
console.log(`Worker pool adjusted to ${this.currentWorkers} workers`);
}
// Submit task to the pool
async submitTask(task) {
return new Promise((resolve, reject) => {
const wrappedTask = {
...task,
startTime: Date.now(),
resolve,
reject
};
this.taskQueue.push(wrappedTask);
this.processQueue();
});
}
// Process tasks in the queue
processQueue() {
// Find idle workers
const idleWorkers = this.workers.filter(worker => !worker.busy);
// Assign tasks to idle workers
while (idleWorkers.length > 0 && this.taskQueue.length > 0) {
const worker = idleWorkers.shift();
const task = this.taskQueue.shift();
worker.busy = true;
worker.execute(task).then(result => {
// Record metrics
const latency = Date.now() - task.startTime;
this.metrics.taskLatencies.push(latency);
// Resolve the task promise
task.resolve(result);
// Mark worker as idle
worker.busy = false;
// Continue processing queue
this.processQueue();
}).catch(error => {
task.reject(error);
worker.busy = false;
this.processQueue();
});
}
// Check if we need to adjust the worker pool
this.adjustWorkerPool();
}
// Adjust worker pool based on load
adjustWorkerPool() {
const queueLength = this.taskQueue.length;
const activeWorkers = this.workers.filter(worker => worker.busy).length;
const utilization = activeWorkers / this.currentWorkers;
// Record metrics
this.metrics.resourceUtilization.push(utilization);
this.metrics.queueLength.push(queueLength);
// Keep metrics arrays bounded
if (this.metrics.taskLatencies.length > 100) {
this.metrics.taskLatencies.shift();
}
if (this.metrics.resourceUtilization.length > 100) {
this.metrics.resourceUtilization.shift();
}
if (this.metrics.queueLength.length > 100) {
this.metrics.queueLength.shift();
}
// Calculate average utilization
const avgUtilization = this.metrics.resourceUtilization
.slice(-10)
.reduce((sum, val) => sum + val, 0) / 10;
// Adjust worker count based on utilization and queue length
if (avgUtilization > 0.8 && queueLength > 0) {
// High utilization and tasks waiting, add workers
const newWorkerCount = Math.min(
this.currentWorkers + 1,
this.maxWorkers
);
if (newWorkerCount > this.currentWorkers) {
this.initializeWorkers(newWorkerCount);
}
} else if (avgUtilization < 0.3 && queueLength === 0) {
// Low utilization and no waiting tasks, remove workers
const newWorkerCount = Math.max(
this.currentWorkers - 1,
this.minWorkers
);
if (newWorkerCount < this.currentWorkers) {
this.initializeWorkers(newWorkerCount);
}
}
}
// Start monitoring resource usage
startMonitoring() {
setInterval(() => {
const metrics = this.getMetrics();
// Log resource usage
console.log(`Resource utilization: ${metrics.utilization.toFixed(2)}`);
console.log(`Average task latency: ${metrics.avgLatency.toFixed(2)}ms`);
console.log(`Queue length: ${metrics.queueLength}`);
console.log(`Active workers: ${metrics.activeWorkers}/${this.currentWorkers}`);
}, 30000); // Every 30 seconds
}
// Get current metrics
getMetrics() {
const activeWorkers = this.workers.filter(worker => worker.busy).length;
return {
utilization: activeWorkers / this.currentWorkers,
avgLatency: this.metrics.taskLatencies.length > 0 ?
this.metrics.taskLatencies.reduce((sum, val) => sum + val, 0) /
this.metrics.taskLatencies.length : 0,
queueLength: this.taskQueue.length,
activeWorkers
};
}
}
// Example usage
const resourceManager = new AdaptiveResourceManager({
maxWorkers: 16,
minWorkers: 2
});
// Submit compute-intensive tasks
async function processSignalAnalysis(marketData) {
return resourceManager.submitTask({
type: 'signal_analysis',
data: marketData,
priority: 'high'
});
}
async function performBacktest(strategy, parameters) {
return resourceManager.submitTask({
type: 'backtest',
data: { strategy, parameters },
priority: 'medium'
});
}
Memory Optimization
// Memory optimization for large datasets
class OptimizedDataStore {
constructor(config) {
this.maxCacheSize = config.maxCacheSize || 500 * 1024 * 1024; // 500MB
this.currentCacheSize = 0;
this.cache = new Map();
this.accessCounts = new Map();
this.lastAccessed = new Map();
this.persistence = new PersistenceLayer(config.persistencePath);
}
// Store data with intelligent memory management
async store(key, data) {
// Calculate data size
const dataSize = this._calculateSize(data);
// Check if we need to evict items from cache
if (this.currentCacheSize + dataSize > this.maxCacheSize) {
await this._evictItems(dataSize);
}
// Store data in cache
this.cache.set(key, data);
this.accessCounts.set(key, 0);
this.lastAccessed.set(key, Date.now());
this.currentCacheSize += dataSize;
// Also persist to storage
await this.persistence.write(key, data);
return {
key,
size: dataSize,
cached: true
};
}
// Retrieve data with access tracking
async retrieve(key) {
// Check if in memory cache
if (this.cache.has(key)) {
// Update access metrics
this.accessCounts.set(key, (this.accessCounts.get(key) || 0) + 1);
this.lastAccessed.set(key, Date.now());
return this.cache.get(key);
}
// Not in cache, try to load from persistence
try {
const data = await this.persistence.read(key);
// If data exists and we have space, add to cache
if (data) {
const dataSize = this._calculateSize(data);
if (this.currentCacheSize + dataSize <= this.maxCacheSize) {
this.cache.set(key, data);
this.accessCounts.set(key, 1);
this.lastAccessed.set(key, Date.now());
this.currentCacheSize += dataSize;
}
return data;
}
} catch (error) {
console.error(`Error retrieving data for ${key}:`, error);
}
return null;
}
// Calculate approximate size of data in bytes
_calculateSize(data) {
if (typeof data === 'string') {
return data.length * 2; // UTF-16 characters
} else if (data instanceof ArrayBuffer) {
return data.byteLength;
} else if (Array.isArray(data)) {
return data.reduce((size, item) => size + this._calculateSize(item), 0);
} else if (typeof data === 'object' && data !== null) {
return Object.entries(data).reduce(
(size, [key, value]) =>
size + (key.length * 2) + this._calculateSize(value),
0
);
} else {
return 8; // Default size for numbers, booleans, etc.
}
}
// Evict items to make room for new data
async _evictItems(requiredSpace) {
// If nothing to evict or more space needed than total cache, clear all
if (this.cache.size === 0 || requiredSpace > this.maxCacheSize) {
this.cache.clear();
this.accessCounts.clear();
this.lastAccessed.clear();
this.currentCacheSize = 0;
return;
}
// Calculate score for each item (combination of access frequency and recency)
const scores = new Map();
for (const key of this.cache.keys()) {
const accessCount = this.accessCounts.get(key) || 0;
const lastAccess = this.lastAccessed.get(key) || 0;
const age = (Date.now() - lastAccess) / (1000 * 60); // Age in minutes
// Score is higher for frequently and recently accessed items
const score = (accessCount + 1) / (age + 1);
scores.set(key, score);
}
// Sort items by score (ascending, so lowest scores first)
const sortedItems = [...scores.entries()]
.sort((a, b) => a[1] - b[1])
.map(([key]) => key);
// Evict items until we have enough space
let freedSpace = 0;
for (const key of sortedItems) {
const data = this.cache.get(key);
const dataSize = this._calculateSize(data);
// Remove from cache
this.cache.delete(key);
this.accessCounts.delete(key);
this.lastAccessed.delete(key);
this.currentCacheSize -= dataSize;
freedSpace += dataSize;
// Check if we've freed enough space
if (freedSpace >= requiredSpace) {
break;
}
}
}
// Get cache statistics
getStats() {
return {
itemCount: this.cache.size,
currentCacheSize: this.currentCacheSize,
maxCacheSize: this.maxCacheSize,
utilization: this.currentCacheSize / this.maxCacheSize,
persistedItems: this.persistence.getItemCount()
};
}
}
// Example usage
const dataStore = new OptimizedDataStore({
maxCacheSize: 1024 * 1024 * 1024, // 1GB
persistencePath: './data/cache'
});
// Store market data efficiently
async function storeMarketData(asset, timeframe, data) {
const key = `market_data_${asset}_${timeframe}_${data.timestamp}`;
return dataStore.store(key, data);
}
// Retrieve market data efficiently
async function getMarketData(asset, timeframe, timestamp) {
const key = `market_data_${asset}_${timeframe}_${timestamp}`;
return dataStore.retrieve(key);
}
Parallel Processing Strategies
Multi-threaded Analysis
// Multi-threaded market analysis
class ParallelAnalysisEngine {
constructor(config) {
this.workerCount = config.workerCount || Math.max(2, navigator.hardwareConcurrency - 1);
this.workers = [];
this.taskQueue = [];
// Initialize worker threads
this._initializeWorkers();
}
// Initialize worker threads
_initializeWorkers() {
for (let i = 0; i < this.workerCount; i++) {
const worker = new Worker('./analysis-worker.js');
worker.onmessage = (event) => {
const { taskId, result, error } = event.data;
// Find task in queue
const taskIndex = this.taskQueue.findIndex(task => task.id === taskId);
if (taskIndex !== -1) {
const task = this.taskQueue[taskIndex];
// Remove task from queue
this.taskQueue.splice(taskIndex, 1);
// Resolve or reject task promise
if (error) {
task.reject(new Error(error));
} else {
task.resolve(result);
}
}
// Process next task if available
this._processNextTask(worker);
};
worker.onerror = (error) => {
console.error('Worker error:', error);
};
// Add to worker pool
this.workers.push({
worker,
busy: false
});
}
}
// Process next task from queue
_processNextTask(worker) {
// Find an idle worker and available task
const workerInfo = this.workers.find(w => w.worker === worker);
if (!workerInfo || this.taskQueue.length === 0) {
return;
}
// Find tasks that aren't in progress
const pendingTasks = this.taskQueue.filter(task => !task.inProgress);
if (pendingTasks.length === 0) {
return;
}
// Get next task
const task = pendingTasks[0];
task.inProgress = true;
workerInfo.busy = true;
// Send task to worker
worker.postMessage({
taskId: task.id,
type: task.type,
payload: task.payload
});
}
// Submit analysis task
async analyzeData(type, payload) {
return new Promise((resolve, reject) => {
const taskId = `task${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
// Add task to queue
this.taskQueue.push({
id: taskId,
type,
payload,
inProgress: false,
resolve,
reject
});
// Find an idle worker
const idleWorker = this.workers.find(w => !w.busy);
if (idleWorker) {
this._processNextTask(idleWorker.worker);
}
});
}
// Perform technical analysis in parallel
async analyzeTechnicalIndicators(asset, data, indicators) {
return this.analyzeData('technical', {
asset,
data,
indicators
});
}
// Analyze multiple assets in parallel
async analyzeMultipleAssets(assets, timeframe) {
// Create analysis tasks for each asset
const analysisPromises = assets.map(asset =>
this.analyzeData('asset_analysis', {
asset,
timeframe
})
);
// Wait for all analyses to complete
return Promise.all(analysisPromises);
}
// Shutdown worker threads
shutdown() {
this.workers.forEach(workerInfo => {
workerInfo.worker.terminate();
});
this.workers = [];
console.log('Analysis engine workers terminated');
}
}
// Example usage
const analysisEngine = new ParallelAnalysisEngine({
workerCount: 8
});
// Analyze multiple assets in parallel
async function analyzeMarket() {
const assets = ['BTC', 'ETH', 'SOL', 'AVAX', 'MATIC', 'LINK', 'UNI', 'AAVE'];
console.time('marketAnalysis');
const results = await analysisEngine.analyzeMultipleAssets(assets, '1h');
console.timeEnd('marketAnalysis');
return results;
}
Distributed Processing
// Distributed processing manager
class DistributedProcessingManager {
constructor(config) {
this.nodeRegistry = new Map();
this.serviceDiscovery = new ServiceDiscovery(config.discoveryEndpoint);
this.taskDistributor = new TaskDistributor();
this.resultCollector = new ResultCollector();
// Initialize system
this._initialize();
}
// Initialize distributed processing
async _initialize() {
// Discover available processing nodes
const nodes = await this.serviceDiscovery.discoverNodes();
for (const node of nodes) {
this.registerNode(node);
}
// Set up service discovery events
this.serviceDiscovery.on('nodeAdded', node => {
this.registerNode(node);
});
this.serviceDiscovery.on('nodeRemoved', nodeId => {
this.unregisterNode(nodeId);
});
console.log(`Distributed processing initialized with ${this.nodeRegistry.size} nodes`);
}
// Register processing node
registerNode(node) {
this.nodeRegistry.set(node.id, {
...node,
status: 'available',
capabilities: node.capabilities || [],
performance: node.performance || {
cpu: 1,
memory: 1,
network: 1
},
lastHeartbeat: Date.now()
});
console.log(`Node registered: ${node.id} (${node.hostname})`);
}
// Unregister processing node
unregisterNode(nodeId) {
this.nodeRegistry.delete(nodeId);
console.log(`Node unregistered: ${nodeId}`);
}
// Find suitable nodes for task
_findSuitableNodes(task, count = 1) {
const candidates = [];
for (const [nodeId, node] of this.nodeRegistry.entries()) {
// Skip unavailable nodes
if (node.status !== 'available') {
continue;
}
// Check if node has required capabilities
if (task.requiredCapabilities) {
const hasCapabilities = task.requiredCapabilities.every(
cap => node.capabilities.includes(cap)
);
if (!hasCapabilities) {
continue;
}
}
// Calculate node score based on performance metrics and load
const score = this._calculateNodeScore(node, task);
candidates.push({
nodeId,
node,
score
});
}
// Sort by score (descending)
candidates.sort((a, b) => b.score - a.score);
// Return the top N candidates
return candidates.slice(0, count).map(c => c.nodeId);
}
// Calculate node score for task assignment
_calculateNodeScore(node, task) {
// Base score from performance metrics
let score = (
node.performance.cpu * 0.4 +
node.performance.memory * 0.3 +
node.performance.network * 0.3
);
// Adjust for task type if node has specialty
if (task.type && node.specialties && node.specialties.includes(task.type)) {
score *= 1.5;
}
// Adjust for current load
if (node.currentLoad) {
score *= (1 - (node.currentLoad / 100));
}
return score;
}
// Distribute task to processing nodes
async distributeTask(task) {
// For tasks that can be parallelized
if (task.parallelizable) {
return this._distributeParallelTask(task);
}
// For single-node tasks
return this._distributeSingleNodeTask(task);
}
// Distribute task to a single node
async _distributeSingleNodeTask(task) {
const nodeIds = this._findSuitableNodes(task, 1);
if (nodeIds.length === 0) {
throw new Error('No suitable nodes available for task');
}
const nodeId = nodeIds[0];
const node = this.nodeRegistry.get(nodeId);
// Update node status
node.status = 'busy';
try {
// Send task to node
const result = await this.taskDistributor.sendTask(nodeId, task);
// Update node status
node.status = 'available';
node.lastTask = {
id: task.id,
type: task.type,
completedAt: Date.now()
};
return result;
} catch (error) {
// Handle node failure
console.error(`Node ${nodeId} failed executing task ${task.id}:`, error);
// Mark node as potentially problematic
node.status = 'error';
node.lastError = {
task: task.id,
error: error.message,
time: Date.now()
};
// Retry on another node
return this._distributeSingleNodeTask(task);
}
}
// Distribute task to multiple nodes in parallel
async _distributeParallelTask(task) {
// Determine number of nodes to use
const nodeCount = Math.min(
task.optimalParallelism || 3,
this.nodeRegistry.size
);
const nodeIds = this._findSuitableNodes(task, nodeCount);
if (nodeIds.length === 0) {
throw new Error('No suitable nodes available for parallel task');
}
// Split task into sub-tasks
const subTasks = this._splitTask(task, nodeIds.length);
// Distribute sub-tasks to nodes
const subTaskPromises = subTasks.map((subTask, index) => {
const nodeId = nodeIds[index];
const node = this.nodeRegistry.get(nodeId);
// Update node status
node.status = 'busy';
return this.taskDistributor.sendTask(nodeId, subTask)
.then(result => {
// Update node status
node.status = 'available';
node.lastTask = {
id: subTask.id,
type: subTask.type,
completedAt: Date.now()
};
return result;
})
.catch(error => {
// Handle node failure
console.error(`Node ${nodeId} failed executing sub-task ${subTask.id}:`, error);
// Mark node as potentially problematic
node.status = 'error';
node.lastError = {
task: subTask.id,
error: error.message,
time: Date.now()
};
throw error;
});
});
// Wait for all sub-tasks to complete or implement partial failure handling
try {
const subResults = await Promise.all(subTaskPromises);
// Combine sub-results
return this._combineResults(task, subResults);
} catch (error) {
// Handle partial failures by redistributing failed sub-tasks
// This is a simplified implementation
console.error('Partial failure in distributed task:', error);
throw new Error('Distributed task execution failed');
}
}
// Split task into sub-tasks for parallel processing
_splitTask(task, count) {
// This would be implemented based on the specific task type
// Here's a generic implementation for demonstration
const subTasks = [];
for (let i = 0; i < count; i++) {
subTasks.push({
id: `${task.id}_part${i}`,
type: task.type,
parentTask: task.id,
partIndex: i,
partCount: count,
payload: {
...task.payload,
partitionKey: i,
partitionCount: count
}
});
}
return subTasks;
}
// Combine results from sub-tasks
_combineResults(originalTask, subResults) {
// This would be implemented based on the specific task type
// Here's a generic implementation for demonstration
if (originalTask.type === 'marketAnalysis') {
// For market analysis, combine asset results
const combinedResults = {
assets: {},
timestamp: Date.now(),
analysisTime: 0
};
// Combine all asset results
for (const result of subResults) {
// Combine asset data
Object.assign(combinedResults.assets, result.assets);
// Track longest analysis time
combinedResults.analysisTime = Math.max(
combinedResults.analysisTime,
result.analysisTime || 0
);
}
return combinedResults;
} else if (originalTask.type === 'backtest') {
// For backtests, combine performance metrics
return {
results: subResults.map(r => r.results).flat(),
performance: {
totalTrades: subResults.reduce((sum, r) => sum + r.performance.trades, 0),
winRate: subResults.reduce((sum, r) => sum + r.performance.winRate, 0) / subResults.length,
profitLoss: subResults.reduce((sum, r) => sum + r.performance.profitLoss, 0),
maxDrawdown: Math.max(...subResults.map(r => r.performance.maxDrawdown))
}
};
} else {
// Generic combination
return {
combinedResult: subResults,
count: subResults.length
};
}
}
}
// Example usage
const distributedManager = new DistributedProcessingManager({
discoveryEndpoint: 'https://discovery.intue.io'
});
// Perform distributed market analysis
async function performDistributedAnalysis() {
const task = {
id: `analysis_${Date.now()}`,
type: 'marketAnalysis',
parallelizable: true,
optimalParallelism: 5,
requiredCapabilities: ['market-data-processing'],
payload: {
assets: ['BTC', 'ETH', 'SOL', 'AVAX', 'DOT', 'LINK', 'UNI', 'AAVE', 'MKR', 'SNX'],
timeframes: ['1h', '4h', '1d'],
indicators: ['rsi', 'macd', 'bollinger', 'volume_profile']
}
};
console.time('distributedAnalysis');
const result = await distributedManager.distributeTask(task);
console.timeEnd('distributedAnalysis');
return result;
}
Database Optimization
// Optimize database queries for time series data
class TimeSeriesOptimizer {
constructor(dbClient) {
this.db = dbClient;
this.cacheManager = new CacheManager({
maxSize: 1000,
ttl: 5 * 60 * 1000 // 5 minutes
});
this.queryStats = {
total: 0,
cached: 0,
dbQueries: 0
};
}
// Optimized query for time series data
async queryTimeSeries({
asset,
metric,
timeframe,
startTime,
endTime,
aggregation = 'none'
}) {
this.queryStats.total++;
// Generate cache key
const cacheKey = this._generateCacheKey({
asset,
metric,
timeframe,
startTime,
endTime,
aggregation
});
// Check cache first
const cachedResult = this.cacheManager.get(cacheKey);
if (cachedResult) {
this.queryStats.cached++;
return cachedResult;
}
this.queryStats.dbQueries++;
// Determine optimal query strategy
const queryStrategy = this._determineQueryStrategy({
timeframe,
startTime,
endTime
});
// Execute query with appropriate strategy
let result;
switch (queryStrategy) {
case 'directQuery':
result = await this._executeDirectQuery({
asset,
metric,
timeframe,
startTime,
endTime,
aggregation
});
break;
case 'aggregationPipeline':
result = await this._executeAggregationPipeline({
asset,
metric,
timeframe,
startTime,
endTime,
aggregation
});
break;
case 'downsampledQuery':
result = await this._executeDownsampledQuery({
asset,
metric,
timeframe,
startTime,
endTime,
aggregation
});
break;
default:
throw new Error(`Unknown query strategy: ${queryStrategy}`);
}
// Cache the result
this.cacheManager.set(cacheKey, result);
return result;
}
// Generate cache key
_generateCacheKey(params) {
return `${params.asset}_${params.metric}_${params.timeframe}_${params.startTime}_${params.endTime}_${params.aggregation}`;
}
// Determine optimal query strategy
_determineQueryStrategy({
timeframe,
startTime,
endTime
}) {
// Calculate time range in milliseconds
const rangeMs = endTime - startTime;
// Convert timeframe to milliseconds
const timeframeMs = this._timeframeToMs(timeframe);
// Calculate number of data points
const dataPoints = rangeMs / timeframeMs;
if (dataPoints <= 1000) {
// For small queries, direct query is fastest
return 'directQuery';
} else if (dataPoints <= 10000) {
// For medium queries, use aggregation pipeline
return 'aggregationPipeline';
} else {
// For large queries, use downsampled data
return 'downsampledQuery';
}
}
// Convert timeframe string to milliseconds
_timeframeToMs(timeframe) {
const unit = timeframe.slice(-1);
const value = parseInt(timeframe.slice(0, -1));
switch (unit) {
case 'm':
return value * 60 * 1000;
case 'h':
return value * 60 * 60 * 1000;
case 'd':
return value * 24 * 60 * 60 * 1000;
case 'w':
return value * 7 * 24 * 60 * 60 * 1000;
default:
throw new Error(`Unknown timeframe unit: ${unit}`);
}
}
// Execute direct database query
async _executeDirectQuery({
asset,
metric,
timeframe,
startTime,
endTime,
aggregation
}) {
// Direct query implementation for small datasets
const collection = this.db.collection(`${asset}_${timeframe}`);
const query = {
timestamp: {
$gte: startTime,
$lte: endTime
}
};
const projection = {
_id: 0,
timestamp: 1
};
// Add requested metric to projection
projection[metric] = 1;
// Execute query
const results = await collection.find(query).project(projection).sort({ timestamp: 1 }).toArray();
// Apply any additional aggregation if needed
if (aggregation !== 'none') {
return this._applyAggregation(results, metric, aggregation);
}
return results;
}
// Execute aggregation pipeline for medium-sized queries
async _executeAggregationPipeline({
asset,
metric,
timeframe,
startTime,
endTime,
aggregation
}) {
const collection = this.db.collection(`${asset}_${timeframe}`);
// Build aggregation pipeline
const pipeline = [
{
$match: {
timestamp: {
$gte: startTime,
$lte: endTime
}
}
},
{
$sort: {
timestamp: 1
}
},
{
$project: {
_id: 0,
timestamp: 1,
[metric]: 1
}
}
];
// Add aggregation stage if needed
if (aggregation !== 'none') {
pipeline.push(this._getAggregationStage(metric, aggregation));
}
// Execute aggregation pipeline
return collection.aggregate(pipeline).toArray();
}
// Execute query against downsampled data for large queries
async _executeDownsampledQuery({
asset,
metric,
timeframe,
startTime,
endTime,
aggregation
}) {
// Determine appropriate downsampled collection
const downsampledTimeframe = this._getDownsampledTimeframe(timeframe);
const collection = this.db.collection(`${asset}_${downsampledTimeframe}_downsampled`);
// Build query
const query = {
timestamp: {
$gte: startTime,
$lte: endTime
}
};
// Execute query
const results = await collection.find(query).sort({ timestamp: 1 }).toArray();
// Apply any additional processing
return this._processDownsampledResults(results, metric, aggregation);
}
// Get appropriate downsampled timeframe
_getDownsampledTimeframe(timeframe) {
// Logic to determine appropriate downsampled timeframe
const unit = timeframe.slice(-1);
const value = parseInt(timeframe.slice(0, -1));
if (unit === 'm' && value < 60) {
return '1h';
} else if (unit === 'h' && value < 24) {
return '1d';
} else {
return '1w';
}
}
// Apply aggregation to results
_applyAggregation(results, metric, aggregation) {
// Implementation of client-side aggregation
switch (aggregation) {
case 'avg':
return this._calculateAverage(results, metric);
case 'max':
return this._calculateMax(results, metric);
case 'min':
return this._calculateMin(results, metric);
case 'sum':
return this._calculateSum(results, metric);
default:
return results;
}
}
// Get aggregation stage for pipeline
_getAggregationStage(metric, aggregation) {
// Return appropriate aggregation stage
switch (aggregation) {
case 'avg':
return {
$group: {
_id: null,
result: { $avg: `$${metric}` },
timestamp: { $first: '$timestamp' }
}
};
case 'max':
return {
$group: {
_id: null,
result: { $max: `$${metric}` },
timestamp: { $first: '$timestamp' }
}
};
case 'min':
return {
$group: {
_id: null,
result: { $min: `$${metric}` },
timestamp: { $first: '$timestamp' }
}
};
case 'sum':
return {
$group: {
_id: null,
result: { $sum: `$${metric}` },
timestamp: { $first: '$timestamp' }
}
};
default:
return {};
}
}
// Process downsampled results
_processDownsampledResults(results, metric, aggregation) {
// Process downsampled data and apply aggregation if needed
if (aggregation !== 'none') {
return this._applyAggregation(results, metric, aggregation);
}
return results;
}
// Get query statistics
getQueryStats() {
return {
...this.queryStats,
cacheHitRate: this.queryStats.total > 0
? (this.queryStats.cached / this.queryStats.total) * 100
: 0
};
}
}
// Example usage
const timeSeriesOptimizer = new TimeSeriesOptimizer(dbClient);
// Query time series data efficiently
async function queryMarketData(asset, metric, timeframe, startDate, endDate) {
return timeSeriesOptimizer.queryTimeSeries({
asset,
metric,
timeframe,
startTime: startDate.getTime(),
endTime: endDate.getTime(),
aggregation: 'none'
});
}
Last updated