# Data Fetcher System

<figure><img src="/files/ApHyJGewJjBdtwQRpDx5" alt=""><figcaption></figcaption></figure>

### 1. System Overview

The AIQ Data Fetcher is a robust, asynchronous system designed to collect, analyze, and evaluate cryptocurrency token data from various sources. The system uses a job queue architecture to process tasks in parallel, with dependencies between tasks, enabling efficient data processing and analysis.

The system serves multiple purposes:

* Sourcing new tokens from various platforms (Birdeye, Dexscreener, etc.)
* Fetching token data from multiple APIs
* Analyzing token metrics and social signals
* Evaluating tokens for potential trading opportunities
* Continuously monitoring active tokens

### 2. Core Architecture

#### 2.1 Job Queue System

The data fetcher uses BullMQ, a Redis-based queue system, to manage and process jobs. This allows:

* Parallel processing of independent tasks
* Dependency management between related tasks
* Rate limiting to respect API constraints
* Automatic retries for failed jobs
* Progress tracking for long-running operations

#### 2.2 Key Components

1. **Token Sourcing Jobs**: Discover new tokens from various platforms
2. **Data Fetching Jobs**: Collect relevant data about tokens
3. **Evaluation Jobs**: Analyze the collected data and make decisions
4. **Monitoring Jobs**: Continuously track active tokens for updates

#### 2.3 Data Flow

1. Token sourcing discovers potential tokens
2. Evaluation initialization creates a tree of data fetching jobs
3. Data fetchers collect necessary information
4. Once all data is collected, evaluation jobs analyze the token
5. For promising tokens, monitoring begins to track updates

### 3. Rate Limiting Implementation

The system implements precise rate limiting to respect API constraints while maximizing throughput. In `index.ts`, specific limiters are defined:

```typescript
// Production rate limiters
const birdeyeLimiter = {
  // Birdeye Max: 1500 RPM = 25/s, using 10/s to maintain buffer
  max: 10,
  duration: 1000,
  groupKey: "birdeye",
};

const dexscreenerLimiter = {
  // Dexscreener Max: 300 RPM = 5/s, using 2/s for safety
  max: 2,
  duration: 1000,
  groupKey: "dexscreener",
};

const lunarCrushLimiter = {
  // LunarCrush Max: 100 RPM = 1.66/s, using 1/s to stay within limit
  max: 1,
  duration: 1000,
  groupKey: "lunarCrush",
};

const snifferLimiter = {
  max: 4,
  duration: 1000,
  groupKey: "sniffer",
};

const evalInitLimiter = {
  ...config.evalInitRateLimiter, // Loaded from configuration
  groupKey: "eval-init",
};
```

For development environments, these are replaced with a more conservative limiter:

```typescript
const devLimiter = {
  max: 1,
  duration: 1000,
  groupKey: "dev",
};
```

These limiters are applied to specific worker types when initializing BullMQ workers:

```typescript
workers[queue] = new Worker(queue, tokenDataFetcherJob, {
  ...queueOptions,
  name: `worker-${queue}`,
  concurrency, // Varies by queue type
  limiter, // Appropriate limiter based on API
});
```

### 4. Token Sourcing

The system sources tokens from multiple platforms:

#### 4.1 Birdeye Sources

* **New Listings** (`birdeyeNewListingsScannerJob`): Recently listed tokens
* **Trending Tokens** (`birdeyeTrendingScannerJob`): Popular tokens based on trading activity
* **Token Lists** (`birdeyeTokenListScannerJob`): Comprehensive token lists
* **All Tokens List** (`birdeyeAllTokensListScannerJob`): Extensive token catalog with filtering

#### 4.2 Dexscreener Sources

* **Token Boosts** (`dexscreenerBoostScannerJob`): Tokens featured through paid promotions

#### 4.3 Data Processing Flow

1. Source jobs identify potential tokens
2. The `processTokenSources` function filters and processes these tokens
3. Valid tokens are passed to evaluation jobs

#### 4.4 Token Filtering Logic

The system uses specific filtering criteria in `birdeyeAllTokensListScannerJob.ts`:

```typescript
const isValidToken = (token: TokenInfo, minLastTradeTime: number) => {
  const reasons: any = {};

  if (!token.mc || token.mc < config.allTokensListMinMcap) {
    reasons.low_mcap = true;
  }
  if (!token.v24hUSD || token.v24hUSD < config.allTokensListMin24hVolume) {
    reasons.low_vol = true;
  }
  if (token.liquidity < config.allTokensListMinLiquidity) {
    reasons.low_liq = true;
  }
  if (!token.lastTradeUnixTime || token.lastTradeUnixTime < minLastTradeTime) {
    reasons.old_last_trade_time = true;
  }
  if (
    token.address.toLowerCase().endsWith("pump") &&
    (!token.mc || token.mc < config.minMcapForPumpfunTokens)
  ) {
    reasons.low_mcap_pumpfun = true;
  }

  if (Object.keys(reasons).length > 0) {
    // Log dismissal reason
    return false;
  }

  return true;
};
```

### 5. Token Evaluation Process

#### 5.1 Initialization

The `evaluateTokenInitDataFetchingJob` is the entry point for token evaluation:

1. Retrieves token information from database
2. Checks if market cap is sufficient
3. Creates a dependency tree of data fetching operations
4. Schedules data fetching jobs
5. Sets up a flow to evaluate results once data is collected

**5.1.1 Market Cap Validation Logic**

```typescript
try {
  const marketCap = await checkMarketCap(baseTokenData);
  if (
    marketCap <= config.allTokensListMinMcap && // Configured threshold
    checkConfigType !== ConfigType.SELL // Skip for sell evaluations
  ) {
    await tokenStatusUpdater(token, TokenStatus.BLOCKED);
    return {
      status: "BLOCKED",
      reason: "Token market cap is too low",
    };
  }
} catch (error) {
  // Market cap unavailable logic
  await tokenStatusUpdater(token, TokenStatus.DORMANT);
  const err = error as Error;
  return {
    error: err.message,
    stack: err.stack,
  };
}
```

#### 5.2 Data Fetching

The system fetches various types of data about tokens:

**5.2.1 Technical Data**

* **Overview** (`fetchOverviewJobFn`): Basic token information (price, liquidity, etc.)
* **Security** (`fetchSecurityJobFn`): Security metrics and risk assessment
* **Markets** (`fetchMarketsJobFn`): Trading venues and volume
* **Top Traders** (`fetchTopTradersJobFn`): Information about major traders
* **OHLCV** (`ohlcvJob`): Price candle data for technical analysis

**5.2.2 On-chain Data**

* **Pairs** (`fetchPairsJobFn`): Trading pairs on decentralized exchanges
* **Paid Orders** (`fetchPaidOrdersJobFn`): Orders with paid promotions
* **Token Sniffer** (`tokenSnifferJob`): Security audit information

**5.2.3 Social and AI Analysis**

* **Social** (`tokenDataFetcherSocialJob`): Social media sentiment and activity
* **Narrative** (`fetchNarrativeJob`): AI-driven analysis of token narrative
* **Price Trend Prediction** (`fetchPriceTrendPredictionJob`): AI prediction of price movements

#### 5.3 Core Data Fetching Logic

The core data fetching logic is in `tokenDataFetcherJob`:

```typescript
// Check if existing data is fresh
if (lastDataFetch && !isDataStale(lastDataFetch, maxLastFetchedAtSeconds)) {
  logger.debug(`Skipping data fetch for token - still fresh`);
  return dataFetcherResponseToJobData(lastDataFetch);
}

// Fetch dependencies if needed
const childrenFetchedDataMap: Map<DataFetcherKey, DataFetcherResponse> = new Map();
if (dataFetcher.dependencies) {
  // ... dependency fetching logic ...
}

try {
  // Execute data fetcher
  const fetcherJob = dataFetcherJobs[dataFetcherKey];
  const data = await fetcherJob(job, dataFetcher, childrenFetchedDataMap);
  
  // Handle in-progress state if exists
  // ... in-progress handling ...
  
  // Store result
  const dataFetcherResponse = await DataFetcherResponseRepository.create({
    tokenId,
    key: dataFetcherKey,
    data,
    expiresAt,
    status: DataFetcherResponseStatus.SUCCESS,
  });
  return dataFetcherResponseToJobData(dataFetcherResponse);
} catch (error) {
  // Fallback to previous data if max attempts reached
  if (lastDataFetch && job.attemptsStarted >= dataFetcher.maxAttempts) {
    // ... log error ...
    return dataFetcherResponseToJobData(lastDataFetch);
  }

  // Handle unrecoverable errors
  const isUnrecoverable = error instanceof UnrecoverableError;
  const hasTimedOut = differenceInSeconds(/*...*/) > dataFetcher.maxAttemptsTimeoutSeconds;
  const hasReachedMaxAttempts = job.attemptsStarted >= dataFetcher.maxAttempts;

  if (isUnrecoverable || hasTimedOut || hasReachedMaxAttempts) {
    // ... clean up and log ...
    return; // Return undefined instead of throwing
  }

  throw error; // Only throw for recoverable errors that haven't reached max attempts
}
```

#### 5.4 Evaluation Finalization

Once data is collected, one of these jobs processes the results:

* **Status Evaluation** (`evaluateTokenFetchedDataJob`): General token assessment
* **Buy Evaluation** (`evaluateBuyJob`): Analysis for buying opportunities
* **Sell Evaluation** (`evaluateSellJob`): Analysis for selling opportunities

### 6. Job Conditional Logic Details

#### 6.1 Token Data Fetcher Job

In `tokenDataFetcherJob.ts`, the job implements sophisticated conditional logic:

1. **Fresh Data Check Logic**:

```typescript
if (lastDataFetch && !isDataStale(lastDataFetch, maxLastFetchedAtSeconds)) {
  logger.debug(`Skipping data fetch for token - still fresh`);
  return dataFetcherResponseToJobData(lastDataFetch);
}
```

2. **Error/Retry Fallback Logic**:

```typescript
// Fallback to previous data if max attempts reached
if (lastDataFetch && job.attemptsStarted >= dataFetcher.maxAttempts) {
  Logger.newContext({/*...*/}).error(
    `Max attempts reached, falling back to last data`,
    error as Error
  );
  return dataFetcherResponseToJobData(lastDataFetch);
}

const isUnrecoverable = error instanceof UnrecoverableError;
const hasTimedOut = differenceInSeconds(/*...*/) > dataFetcher.maxAttemptsTimeoutSeconds;
const hasReachedMaxAttempts = job.attemptsStarted >= dataFetcher.maxAttempts;

// Close dangling jobs and return null rather than fail on unrecoverable errors
if (isUnrecoverable || hasTimedOut || hasReachedMaxAttempts) {
  await DataFetcherResponseRepository.failDanglingInProgress(/*...*/);
  dataFetcherMissingDataCounter.add(1, {/*...*/});
  return; // Return undefined instead of throwing
}

throw error; // Only throw for recoverable errors that haven't reached max attempts
```

#### 6.2 Social Data Fetcher Logic

In `dataFetcherSocialJob.ts`, complex conditional logic handles partial data availability:

```typescript
// Return earlier successfully for new tokens, timeouts, or max attempts
// as long as some data was fetched
if (
  (isNewToken || hasTimedOut || hasReachedMaxAttempts) &&
  hasFetchedAnySocialData
) {
  return inProgressResponse.data;
}

// For new tokens, stop retrying after 3 attempts
if (isNewToken && job.attemptsStarted >= 3) {
  throw new UnrecoverableError(
    "Social checks can't run on new tokens - stopping retries after 3 attempts"
  );
}
```

#### 6.3 Narrative Fetcher Error Handling

In `dataFetcherNarrativeJob.ts`:

```typescript
// Only retry parsing errors for the first 3 attempts
if (narrative.error && job.attemptsMade < 3) {
  throw new Error("Fetch narrative job attempt failed");
} else if (narrative.error) {
  logger.error("Fetch narrative job completed with an error");
}

return narrative; // Return possibly error-containing data after max attempts
```

### 7. Dependency Management

The system manages dependencies between data fetching tasks using a directed acyclic graph (DAG). The `copycatProcessor` handles complex dependency patterns:

#### 7.1 Dependency Tree Construction

```typescript
function buildSubtree(taskName: string, visited: Set<string>): TreeNode | null {
  // Prevent circular dependencies
  if (visited.has(taskName)) {
    throw new Error(`Circular dependency detected for task: ${taskName}`);
  }

  const task = jobs.find((t) => t.taskKey === taskName);
  if (!task) return null;

  const newVisited = new Set(visited);
  newVisited.add(taskName);

  // Create children nodes
  const children: TreeNode[] = [];
  for (const dep of task.dependencies) {
    const isCopyCat = usedTasks.has(dep); // Track reused dependencies
    const child = buildSubtree(dep, newVisited);
    if (child) {
      children.push({
        ...child,
        isCopyCat: isCopyCat,
      });
    }
  }

  usedTasks.add(taskName);

  return {
    name: taskName,
    children,
  };
}
```

#### 7.2 CopyCat Job Creation

The algorithm detects reused dependencies and creates "copy cat" jobs to optimize execution:

```typescript
export const createCopyCatJob = (job: FlowChildJob): FlowChildJob => {
  const baseQueueName = job.queueName as keyof typeof Queues;
  const baseJobId = job.opts?.jobId;

  if (!baseJobId) throw new Error("Base Copy Cat job must have a jobId");

  const jobData: CopyCatJob = {
    copyJobId: baseJobId,
    copyJobQueueName: baseQueueName,
  };

  const copyCatJob: FlowChildJob = {
    name: `copy_cat_job_for_${baseJobId}`,
    queueName: Queues.copyCat,
    data: jobData,
    opts: {
      backoff: {
        type: "exponential",
        delay: 1000,
      },
      attempts: 10,
      jobId: `${baseJobId}-copy_cat-${uuid()}`,
    },
  };

  return copyCatJob;
};
```

#### 7.3 CopyCat Processor

The copyCat processor itself monitors the original job and returns its result:

```typescript
export const copycatProcessor = async (job: Job<CopycatJob>) => {
  try {
    const { copyJobId, copyJobQueueName } = job.data;
    // ... validation ...
    
    const copyQueue = queueService.getQueue(copyJobQueueName);
    
    // Polling loop to check original job status
    while (true) {
      const copyJob = await copyQueue.getJob(copyJobId);
      
      if (!copyJob) {
        // ... retry logic ...
      }

      const status = await copyJob.getState();

      if (status === "completed") {
        return copyJob.returnvalue;
      }

      if (status === "failed") {
        throw new Error(`Copy job failed: ${copyJobId}`);
      }

      await new Promise((resolve) => setTimeout(resolve, 3000));
    }
  } catch (error) {
    throw new Error(`Error CopyCat processor: ${error}`);
  }
};
```

### 8. Data Caching and Expiration

The system uses timestamp-based data expiration to optimize API usage:

#### 8.1 Expiration Check

```typescript
const isDataStale = (
  lastDataFetch: DataFetcherResponse,
  maxLastFetchedAtSeconds: number | undefined,
) => {
  const lastFetchedAtSecondsAgo = lastDataFetch?.time
    ? differenceInSeconds(new Date(), lastDataFetch?.time)
    : 0;

  // Check max age constraint if provided
  if (maxLastFetchedAtSeconds) {
    return lastFetchedAtSecondsAgo > maxLastFetchedAtSeconds;
  }

  // Otherwise check expiration time
  return !lastDataFetch.expiresAt || lastDataFetch.expiresAt < new Date();
};
```

#### 8.2 Expiration Configuration

Each data fetcher has its own `expiresInSeconds` configuration in the database, allowing customization per data type. When storing data:

```typescript
const expiresAt = addSeconds(new Date(), dataFetcher.expiresInSeconds);

// Store with expiration timestamp
const dataFetcherResponse = await DataFetcherResponseRepository.create({
  tokenId,
  key: dataFetcherKey,
  data,
  expiresAt,
  status: DataFetcherResponseStatus.SUCCESS,
});
```

### 9. OHLCV Data Collection

The `ohlcvJob.ts` implements a pagination system to handle the API limitations:

```typescript
const PAGE_SIZE = 1000; // Maximum candles per request

while (currentTimeFromSecs < timeToSecs) {
  const { success, data, ...response } = await Birdeye.getOHLCV(
    chainName,
    address,
    "1m",
    currentTimeFromSecs,
    timeToSecs,
  );
  
  // ... data processing ...
  
  // Pagination logic
  if (data.items.length < PAGE_SIZE) {
    // Reached the end of available data
    ohlcvData.lastRetrievedCandleTime = 
      data.items[data.items.length - 1].unixTime;
    await job.updateProgress(100);
    break;
  }
  
  // Report progress
  const progressPercentage = Math.min(
    100,
    Math.floor((totalProcessed / totalExpectedMinutes) * 100),
  );
  await job.updateProgress(progressPercentage);
  
  // Next page - start from the timestamp after the last record
  currentTimeFromSecs = data.items[data.items.length - 1].unixTime + 1;
}
```

### 10. Token Sniffer Integration

The `tokenSnifferJob.ts` uses different implementations based on blockchain:

```typescript
export const tokenSnifferJob = async (
  job: Job<BaseTokenData>,
): Promise<TokenSnifferResponse> => {
  switch (job.data.chainName) {
    case ChainName.SOLANA:
      return processSolSniffer(job);
    case ChainName.BASE:
      return processTokenSniffer(job);
    default:
      throw new UnrecoverableError(
        `TokenSniffer Chain not supported ${job.data.chainName}`,
      );
  }
};
```

Each chain has specific audit logic:

```typescript
// For Base chain
const top10HoldersTotalAmount = result.balances.top_holders.reduce(
  (acc, holder) => acc + holder.balance,
  0,
);
const top10HoldersSupplyPct = result.total_supply
  ? (top10HoldersTotalAmount / result.total_supply) * 100
  : undefined;

const top10HoldersPassed =
  top10HoldersSupplyPct !== undefined && top10HoldersSupplyPct <= 30;
```

### 11. In-Progress Fetching and Recovery

For complex data fetchers like social, the system handles partial data and in-progress states:

```typescript
let inProgressResponse =
  (await DataFetcherResponseRepository.findLastInProgressByKeyAndTokenId(
    DataFetcherKey.SOCIAL,
    job.data.tokenId,
  )) ||
  (await DataFetcherResponseRepository.initInProgressFetching(
    DataFetcherKey.SOCIAL,
    job.data.tokenId,
  ));

const data = inProgressResponse.data as SocialJobData;

// Attempt to fetch missing parts
if (!data.topicResponse) {
  try {
    const topicResponse = await lunarCrush.getLunarCrushTopic(address);
    data.topicResponse = topicResponse;
    // ...
  } catch (error) {
    errors.push(error as Error);
  }
}

// Update in-progress state
inProgressResponse = await DataFetcherResponseRepository.updateData(
  inProgressResponse as any,
);
```

This allows long-running jobs to be resumed if interrupted, and for partial data to be used when complete data cannot be obtained.

### 12. Monitoring and Maintenance

#### 12.1 Re-evaluation

The `reEvaluateTokenJob` periodically checks tokens:

```typescript
// Different token types have different re-evaluation periods
const activeTime = subSeconds(
  new Date(),
  systemConfig.activeTokenMonitorIntervalSeconds,
);
const activeTokens = await TokenRepository.findAllByStatusWithLastEvalRequestedBefore(
  TokenStatus.ACTIVE,
  activeTime,
);

const dormantTime = subSeconds(
  new Date(),
  systemConfig.dormantTokenMonitorIntervalSeconds,
);
const dormantTokens = await TokenRepository.findAllByStatusWithLastEvalRequestedBefore(
  TokenStatus.DORMANT,
  dormantTime,
);

const rejectedTime = subSeconds(
  new Date(),
  systemConfig.rejectedTokenMonitorIntervalSeconds,
);
const rejectedTokens = await TokenRepository.findAllByStatusWithLastEvalRequestedBefore(
  TokenStatus.REJECTED,
  rejectedTime,
);
```

#### 12.2 Token Balance Checking

The re-evaluation process checks if tokens have active holders before sending sell signals:

```typescript
const filterOutTokensWithoutHolders = async (tokens: Token[], logger: Logger) => {
  const holderCountPromises = tokens.map((token) =>
    aiquantApi
      .getActiveHoldersCount(token.id)
      .then(({ holdersCount }) => ({ token, holdersCount, success: true })),
  );

  const results = await Promise.all(holderCountPromises);
  const tokensWithHolders: Token[] = [];

  for (const result of results) {
    const { token, holdersCount, success } = result;
    if (success && holdersCount > 0) {
      tokensWithHolders.push(token);
    }
  }

  return tokensWithHolders;
};
```

#### 12.3 Price Snapshots

The `activePriceSnapshotsJob` maintains current price data:

```typescript
const mapTokensToBatchesGroupedByChain = (
  tokens: Token[],
): Map<ChainName, Token[][]> => {
  const batches = new Map<ChainName, Token[][]>();
  for (const token of tokens) {
    const chainName = token.chainName;
    if (!batches.has(chainName)) {
      batches.set(chainName, [[]]);
    }

    const chainBatches = batches.get(chainName)!;

    const lastBatch = chainBatches[chainBatches.length - 1];
    if (lastBatch.length < BATCH_SIZE) {
      lastBatch.push(token);
    } else {
      chainBatches.push([token]);
    }
  }
  return batches;
};
```

This creates an optimized structure where tokens are first grouped by blockchain, then into batches of appropriate size for the API.

### 13. Worker Configuration

#### 13.1 Concurrency Settings

The system uses different concurrency settings per job type:

```typescript
// Setup Data Fetching Workers
const dataFetchingWorkers = [
  {
    queue: getDataFetchQueueName(DataFetcherKey.NARRATIVE),
    concurrency: 2,
    limiter: defaultLimiter,
  },
  {
    queue: getDataFetchQueueName(DataFetcherKey.PRICE_TREND_PREDICTION),
    concurrency: 2,
    limiter: defaultLimiter,
  },
  // ...
  {
    queue: getDataFetchQueueName(DataFetcherKey.TOKEN_SNIFFER),
    concurrency: 10, // Higher concurrency for token sniffer
    limiter: snifferLimiter,
  },
  // ...
];
```

For evaluation jobs, higher concurrency is used since they're CPU-bound rather than I/O bound:

```typescript
evaluateTokenFetchedData: new Worker(
  Queues.evaluateTokenFetchedData,
  evaluateTokenFetchedDataJob,
  {
    ...queueOptions,
    name: `worker-${Queues.evaluateTokenFetchedData}`,
    concurrency: 100, // High concurrency for CPU-bound evaluation
  },
),
```

#### 13.2 Worker Health Management

Workers monitor their own health and job performance:

```typescript
Object.values(workers).forEach((worker) => {
  const logger = Logger.getInstance().withContext({
    context: worker.name,
  });

  worker.on("completed", (job) => {
    // Log completion and metrics
    bullmqJobsCompletedCounter.add(1, {
      worker: worker.name,
      ...logCtx,
    });
  });
  
  worker.on("failed", (job, error) => {
    // Different logging severity based on attempt count
    if (
      job?.attemptsStarted !== undefined &&
      job?.opts.attempts !== undefined &&
      job?.attemptsStarted < job?.opts.attempts
    ) {
      logger.info(msg, { ...logCtx, err: error });
    } else {
      logger.error(msg, error, logCtx);
    }
    
    // Log metrics
    bullmqJobsFailedCounter.add(1, {
      worker: worker.name,
      failedReason: job?.failedReason,
      ...logCtx.tags,
    });
  });
});
```

#### 13.3 Graceful Shutdown

The system implements graceful shutdown to prevent data loss:

```typescript
process.on("SIGTERM", () => {
  Logger.newContext({ context: "workers" }).info(
    "SIGTERM received, closing workers...",
  );
  Object.values(workers).forEach((worker) => worker.close());
});
```

### 14. Error Metrics Collection

The system tracks error metrics to monitor performance:

```typescript
// Dependency errors
dataFetcherMissingDependenciesCounter.add(1, {
  dataFetcherKey,
  dependency,
  tokenId,
  chainName: job.data.chainName,
  symbol: job.data.symbol,
  address: job.data.address,
});

// General data missing errors
dataFetcherMissingDataCounter.add(1, {
  dataFetcherKey,
  errorType: error instanceof Error ? error.name : "Unknown",
  tokenId,
  chainName: job.data.chainName,
  symbol: job.data.symbol,
  address: job.data.address,
});

// Expired data metrics for priority re-fetching
dataFetcherExpiredEvaluationsCounter.add(expired.length, { ...job.data });
```

### 15. Database Batching

Database operations are batched to optimize performance:

```typescript
const updateTokenPrices = async (tokenPrices: Record<number, number>) => {
  const records = Object.entries(tokenPrices).map(([tokenId, price]) => ({
    tokenId: Number(tokenId),
    price,
  }));

  const chunks = chunkArray(records, DB_BATCH_SIZE);
  for (const batch of chunks) {
    await TokenRepository.updateManyPriceSnapshots(batch);
  }
};
```

### 16. Common Workflows

#### 16.1 New Token Discovery and Evaluation

1. Source jobs discover potential tokens
2. `processTokenSources` filters and processes tokens
3. `evaluateTokenInitDataFetchingJob` initializes evaluation
4. Data fetchers collect necessary information
5. `evaluateTokenFetchedDataJob` makes a decision about the token

#### 16.2 Active Token Monitoring

1. `reEvaluateTokenJob` identifies tokens needing updates
2. Data fetchers refresh stale data
3. Evaluation jobs reassess tokens based on new data
4. `activePriceSnapshotsJob` maintains current price information

#### 16.3 Buy/Sell Signal Generation

1. `evaluateBuyJob` or `evaluateSellJob` analyzes token data
2. Technical and social indicators are considered
3. AI predictions influence the decision
4. A signal is generated if criteria are met

### 17. Integration with External Services

The system integrates with several external APIs:

1. **Birdeye**: For token discovery and market data
2. **Dexscreener**: For DEX data and promotions
3. **LunarCrush**: For social metrics and sentiment
4. **SolSniffer/TokenSniffer**: For security audits
5. **AI Services**: For narrative and price trend prediction

### 18. Conclusion

The AIQ Data Fetcher system is a sophisticated platform for token discovery, analysis, and evaluation. Its job-based architecture allows for efficient, scalable data processing, while its comprehensive data collection and analysis capabilities enable informed trading decisions.

Key strengths of the system include:

* Efficient handling of multiple API integrations with proper rate limiting
* Sophisticated dependency management for complex data workflows
* Robust error handling with appropriate fallbacks and retries
* Comprehensive monitoring and maintenance of active tokens
* Scalable architecture that can be extended to new data sources and blockchains


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.aiquant.fun/technical-docs/data-fetcher-system.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
