codysseydev/argus 问题修复 & 功能扩展

解决BUG、新增功能、兼容多环境部署,快速响应你的开发需求

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

codysseydev/argus

Composer 安装命令:

composer require codysseydev/argus

包简介

Queue observability: records the lifecycle of queued jobs into a searchable store.

README 文档

README

Latest Version on Packagist Total Downloads License Tests

Queue observability for Laravel. Argus records the lifecycle of every queued job (queued, processing, processed, failed, released) into a searchable store, behind a swappable storage interface, without slowing job processing, and exposes a query service to search jobs, replay a job's history, and group failures by root cause.

Argus ships no HTTP routes, controllers, auth, or UI. The consuming application owns its own RBAC/SSO/HTTP layer and calls the query service directly.

Requirements

  • PHP ^8.5, Laravel ^13.0
  • PostgreSQL 16+ (default store), Redis (default buffer)

Install

composer require codysseydev/argus
php artisan vendor:publish --tag=argus-config
php artisan vendor:publish --tag=argus-postgres-migrations
php artisan migrate

Add the correlation whitelist and (optionally) a tenant resolver to config/argus.php.

Running the pipeline

Argus captures transitions in your existing queue workers automatically (via queue event listeners). Two processes complete the pipeline:

# 1. The shipper: drains the buffer and writes to the store. Run under supervisor.
php artisan argus:ship

# 2. Partition maintenance: schedule daily.
php artisan argus:partitions

Argus schedules argus:partitions for you (daily at the configured time), so you do not need to add it to routes/console.php. Adjust or disable it via config:

// config/argus.php
'schedule' => [
    'enabled' => true,        // set false to register the schedule yourself
    'partitions_at' => '00:10',
],

Querying recorded data

Inject Argus\Query\JobQueryService. It depends only on the backend-agnostic read contract, so swapping the storage backend never touches your query code. Build filters with Argus\Query\FilterBuilder; every criterion is optional and all supplied criteria are ANDed together.

use Argus\Query\FilterBuilder;
use Argus\Query\JobQueryService;
use Argus\Support\TransitionType;
use Carbon\CarbonImmutable;

public function __construct(private JobQueryService $argus) {}

// 1. Search current job state. Returns list<JobSummary> (empty if nothing matches).
$jobs = $this->argus->search(
    FilterBuilder::make()
        ->tenant('tenant-1')
        ->status(TransitionType::FAILED)
        ->queue('emails')
        ->attemptBetween(2, 5)
        ->between(CarbonImmutable::parse('-7 days'), CarbonImmutable::now())
        ->correlation('request_id', 'r-abc123')
        ->limit(50)
        ->offset(0)
        ->build()
);

foreach ($jobs as $job) {
    $job->jobUuid;
    $job->status;          // 'queued' | 'processing' | 'processed' | 'failed' | 'released'
    $job->isInFlight();    // true when the job has no terminal transition yet
}

// 2. Replay one job's full lifecycle, ascending by sequence. list<TransitionRecord>.
$history = $this->argus->getHistory($jobUuid);

// 3. Group failures by exception fingerprint within a window. list<FailureGroup>.
$groups = $this->argus->groupFailures(
    FilterBuilder::make()
        ->between(CarbonImmutable::parse('-24 hours'), CarbonImmutable::now())
        ->build()
);

foreach ($groups as $group) {
    $group->fingerprint;            // stable root-cause key
    $group->representativeMessage;  // a scrubbed example message
    $group->count;                  // failures collapsed into this group
    $group->firstSeen;
    $group->lastSeen;
}

Return shapes are DTOs (JobSummary, TransitionRecord, FailureGroup), never raw rows, and never contain customer payload data.

In-flight vs completed

A job with no terminal transition (no processed/failed) is in-flight: JobSummary::isInFlight() returns true. Completed jobs return false and carry a finishedAt. This distinguishes a still-running job from a finished one without inspecting status.

Retention

retention_days (default 30) controls how much history is kept. argus:partitions drops whole daily partitions older than the window. Changing the config value changes what is dropped on the next run, with no code change.

// config/argus.php
'retention_days' => 30,

Saved searches

A saved search persists a named JobFilter so an engineer can re-run it later. It goes through the same swappable storage seam as everything else (Argus\Contracts\ SavedSearchStore, Postgres implementation selected by config('argus.store')). Inject Argus\SavedSearches\SavedSearchService.

use Argus\Query\FilterBuilder;
use Argus\SavedSearches\SavedSearchService;
use Argus\Support\TransitionType;

public function __construct(private SavedSearchService $searches) {}

// Define a saved search: a name + the same filter object you pass to search().
$saved = $this->searches->create(
    'failed-emails',
    FilterBuilder::make()
        ->queue('emails')
        ->status(TransitionType::FAILED)
        ->build(),
);

$all  = $this->searches->all();          // list<SavedSearch>
$one  = $this->searches->find($saved->id);
$this->searches->update($saved->id, 'failed-emails', $newFilter);
$this->searches->delete($saved->id);

// Re-run it. Returns the same list<JobSummary> as running the inline filter.
$jobs = $this->searches->results($saved->id);

The filter is stored via a backend-agnostic codec (Argus\Query\FilterCodec), so a reconstructed filter is equal to the original and re-runs to exactly the same rows.

Threshold alerts

Attach a rule to a saved search: when the search matches more than threshold jobs over a rolling windowSeconds lookback, Argus fires an alert. Inject Argus\Alerting\AlertService.

use Argus\Alerting\AlertService;

public function __construct(private AlertService $alerts) {}

$rule = $this->alerts->attach(
    savedSearchId: $saved->id,
    name: 'too-many-failed-emails',
    threshold: 50,          // fire when MORE THAN 50 jobs match
    windowSeconds: 900,     // over a rolling 15-minute window (overrides the saved filter's since/until)
    cooldownSeconds: 1800,  // damp flapping: suppress re-alerts within 30 minutes
    sinks: ['slack'],       // which sink(s) to notify (keys from config('argus.alerting.sinks'))
    enabled: true,
);

$this->alerts->all();
$this->alerts->forSavedSearch($saved->id);
$this->alerts->update($rule->id, 'too-many-failed-emails', 100, 900, 1800, ['slack', 'webhook'], true);
$this->alerts->delete($rule->id);

The evaluator (Argus\Alerting\AlertEvaluator) re-runs each enabled rule on a schedule. It alerts on the transition into breach only: while a search keeps breaching it does not re-alert every interval; it re-alerts after the search recovers (drops at or below the threshold) and breaches again. cooldownSeconds additionally suppresses rapid re-alerts caused by flapping.

Argus wires argus:evaluate-alerts onto Laravel's scheduler for you (every 5 minutes by default), so you do not need to add it to routes/console.php:

// config/argus.php
'alerting' => [
    'enabled' => true,           // set false to register the schedule yourself
    'cadence' => '*/5 * * * *',  // cron expression for how often alerts evaluate
    'sinks' => [
        'slack'   => ['webhook_url' => env('ARGUS_SLACK_WEBHOOK_URL')],
        'webhook' => ['url' => env('ARGUS_ALERT_WEBHOOK_URL'), 'headers' => []],
    ],
],

Evaluation never blocks. Counting each rule is a cheap COUNT(*); delivery is dispatched to a queued DeliverAlertJob (with retries and backoff). If a sink is down the job is retried, so the alert is retained, not lost, and a slow or failing sink never stalls the evaluator or other rules. (DeliverAlertJob is in capture.except, so Argus does not observe its own alert deliveries.)

Adding a custom sink

Sinks live behind Argus\Contracts\AlertSink. Slack and a generic webhook ship in the box. Adding PagerDuty, email, or anything else is implementing the interface and registering it, no core changes:

use Argus\Alerting\AlertNotification;
use Argus\Contracts\AlertSink;

final readonly class PagerDutySink implements AlertSink
{
    public function name(): string
    {
        return 'pagerduty'; // the key a rule references in its sinks list
    }

    public function send(AlertNotification $notification): void
    {
        // POST $notification to PagerDuty. Throw on failure so the queued job retries.
    }
}

Register it on the AlertSinkRegistry singleton from your own service provider, then reference 'pagerduty' in any rule's sinks:

use Argus\Alerting\AlertSinkRegistry;

public function boot(AlertSinkRegistry $sinks): void
{
    $sinks->register(new PagerDutySink(/* ... */));
}

The buffer choice

The capture path must never block or slow a worker, so listeners do not write to storage directly. Instead each listener does an O(1) LPUSH of one transition onto a Redis list and returns immediately. A dedicated argus:ship daemon drains that list in batches and writes to the store.

We deliberately did NOT ship transitions via a Laravel queued job: that would enqueue work onto the very queue system Argus observes (a feedback loop and added load). The Redis-list buffer keeps capture off the queue entirely.

Backpressure is built in: the shipper moves drained items to an inflight list and only removes them after a successful store write (ack). If the store is slow or down, items are never acked, so they stay buffered and are retried. Nothing is dropped and workers are never stalled (they only ever push).

The buffer is an interface (Argus\Contracts\TransitionBuffer). Swapping the Redis list for a Redis-stream outbox later is a single binding change; listeners and the shipper are unaffected.

The storage-swap seam

Ingestion depends only on Argus\Contracts\TransitionStore (the write side). The concrete store is resolved from config('argus.store'). Today postgres is implemented; a future OpenSearch backend implements the same interface and is selected by changing one config value, with zero changes to ingestion code. Backend-specific migrations live with their backend (database/migrations/postgres) and are only loaded/published for that backend.

The read side (Argus\Contracts\TransitionQuery) is the same seam: the Postgres store implements both the write and read contracts. The query layer (Argus\Query) compiles to a backend-agnostic JobFilter and contains no SQL itself, so a future OpenSearch backend implements the read methods and is selected by one config value.

Saved searches (Argus\Contracts\SavedSearchStore) and alert rules (Argus\Contracts\AlertRuleStore) follow the identical pattern: an interface here, a Postgres implementation under Storage/Postgres, selected by the same config('argus.store') value. Their backend-specific migrations live in database/migrations/postgres with the rest. Nothing in the service or alerting layer knows the backend.

What Argus never stores

Argus never writes raw job payloads. It captures only the correlation identifiers you explicitly whitelist in config('argus.correlation.fields'). Anything not on that list is never captured, so non-whitelisted data cannot be stored, searched, or returned.

Exception messages are the one free-text field Argus persists, so they are scrubbed at ingestion before storage: emails, UUIDs, long hex tokens, and long digit runs are replaced with [email] / [uuid] / [hex] / [id]. The stored value is already redacted, so nothing sensitive is written or returned, including in the history and failure-grouping views. Messages are also truncated and fingerprinted by root cause. This is best-effort defense-in-depth on top of the hard guarantees above.

统计信息

  • 总下载量: 0
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 2
  • 依赖项目数: 1
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2026-06-28

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固