codysseydev/argus
Composer 安装命令:
composer require codysseydev/argus
包简介
Queue observability: records the lifecycle of queued jobs into a searchable store.
README 文档
README
Queue observability for Laravel. Argus records the lifecycle of every queued job (queued, processing, processed, failed, released) into a searchable store, behind a swappable storage interface, without slowing job processing, and exposes a query service to search jobs, replay a job's history, and group failures by root cause.
Argus ships no HTTP routes, controllers, auth, or UI. The consuming application owns its own RBAC/SSO/HTTP layer and calls the query service directly.
Requirements
- PHP
^8.5, Laravel^13.0 - PostgreSQL 16+ (default store), Redis (default buffer)
Install
composer require codysseydev/argus php artisan vendor:publish --tag=argus-config php artisan vendor:publish --tag=argus-postgres-migrations php artisan migrate
Add the correlation whitelist and (optionally) a tenant resolver to config/argus.php.
Running the pipeline
Argus captures transitions in your existing queue workers automatically (via queue event listeners). Two processes complete the pipeline:
# 1. The shipper: drains the buffer and writes to the store. Run under supervisor. php artisan argus:ship # 2. Partition maintenance: schedule daily. php artisan argus:partitions
Argus schedules argus:partitions for you (daily at the configured time), so you
do not need to add it to routes/console.php. Adjust or disable it via config:
// config/argus.php 'schedule' => [ 'enabled' => true, // set false to register the schedule yourself 'partitions_at' => '00:10', ],
Querying recorded data
Inject Argus\Query\JobQueryService. It depends only on the backend-agnostic
read contract, so swapping the storage backend never touches your query code.
Build filters with Argus\Query\FilterBuilder; every criterion is optional and
all supplied criteria are ANDed together.
use Argus\Query\FilterBuilder; use Argus\Query\JobQueryService; use Argus\Support\TransitionType; use Carbon\CarbonImmutable; public function __construct(private JobQueryService $argus) {} // 1. Search current job state. Returns list<JobSummary> (empty if nothing matches). $jobs = $this->argus->search( FilterBuilder::make() ->tenant('tenant-1') ->status(TransitionType::FAILED) ->queue('emails') ->attemptBetween(2, 5) ->between(CarbonImmutable::parse('-7 days'), CarbonImmutable::now()) ->correlation('request_id', 'r-abc123') ->limit(50) ->offset(0) ->build() ); foreach ($jobs as $job) { $job->jobUuid; $job->status; // 'queued' | 'processing' | 'processed' | 'failed' | 'released' $job->isInFlight(); // true when the job has no terminal transition yet } // 2. Replay one job's full lifecycle, ascending by sequence. list<TransitionRecord>. $history = $this->argus->getHistory($jobUuid); // 3. Group failures by exception fingerprint within a window. list<FailureGroup>. $groups = $this->argus->groupFailures( FilterBuilder::make() ->between(CarbonImmutable::parse('-24 hours'), CarbonImmutable::now()) ->build() ); foreach ($groups as $group) { $group->fingerprint; // stable root-cause key $group->representativeMessage; // a scrubbed example message $group->count; // failures collapsed into this group $group->firstSeen; $group->lastSeen; }
Return shapes are DTOs (JobSummary, TransitionRecord, FailureGroup), never
raw rows, and never contain customer payload data.
In-flight vs completed
A job with no terminal transition (no processed/failed) is in-flight:
JobSummary::isInFlight() returns true. Completed jobs return false and carry
a finishedAt. This distinguishes a still-running job from a finished one without
inspecting status.
Retention
retention_days (default 30) controls how much history is kept. argus:partitions
drops whole daily partitions older than the window. Changing the config value
changes what is dropped on the next run, with no code change.
// config/argus.php 'retention_days' => 30,
Saved searches
A saved search persists a named JobFilter so an engineer can re-run it later. It
goes through the same swappable storage seam as everything else (Argus\Contracts\ SavedSearchStore, Postgres implementation selected by config('argus.store')).
Inject Argus\SavedSearches\SavedSearchService.
use Argus\Query\FilterBuilder; use Argus\SavedSearches\SavedSearchService; use Argus\Support\TransitionType; public function __construct(private SavedSearchService $searches) {} // Define a saved search: a name + the same filter object you pass to search(). $saved = $this->searches->create( 'failed-emails', FilterBuilder::make() ->queue('emails') ->status(TransitionType::FAILED) ->build(), ); $all = $this->searches->all(); // list<SavedSearch> $one = $this->searches->find($saved->id); $this->searches->update($saved->id, 'failed-emails', $newFilter); $this->searches->delete($saved->id); // Re-run it. Returns the same list<JobSummary> as running the inline filter. $jobs = $this->searches->results($saved->id);
The filter is stored via a backend-agnostic codec (Argus\Query\FilterCodec), so a
reconstructed filter is equal to the original and re-runs to exactly the same rows.
Threshold alerts
Attach a rule to a saved search: when the search matches more than threshold jobs
over a rolling windowSeconds lookback, Argus fires an alert. Inject
Argus\Alerting\AlertService.
use Argus\Alerting\AlertService; public function __construct(private AlertService $alerts) {} $rule = $this->alerts->attach( savedSearchId: $saved->id, name: 'too-many-failed-emails', threshold: 50, // fire when MORE THAN 50 jobs match windowSeconds: 900, // over a rolling 15-minute window (overrides the saved filter's since/until) cooldownSeconds: 1800, // damp flapping: suppress re-alerts within 30 minutes sinks: ['slack'], // which sink(s) to notify (keys from config('argus.alerting.sinks')) enabled: true, ); $this->alerts->all(); $this->alerts->forSavedSearch($saved->id); $this->alerts->update($rule->id, 'too-many-failed-emails', 100, 900, 1800, ['slack', 'webhook'], true); $this->alerts->delete($rule->id);
The evaluator (Argus\Alerting\AlertEvaluator) re-runs each enabled rule on a schedule.
It alerts on the transition into breach only: while a search keeps breaching it does
not re-alert every interval; it re-alerts after the search recovers (drops at or below
the threshold) and breaches again. cooldownSeconds additionally suppresses rapid
re-alerts caused by flapping.
Argus wires argus:evaluate-alerts onto Laravel's scheduler for you (every 5 minutes
by default), so you do not need to add it to routes/console.php:
// config/argus.php 'alerting' => [ 'enabled' => true, // set false to register the schedule yourself 'cadence' => '*/5 * * * *', // cron expression for how often alerts evaluate 'sinks' => [ 'slack' => ['webhook_url' => env('ARGUS_SLACK_WEBHOOK_URL')], 'webhook' => ['url' => env('ARGUS_ALERT_WEBHOOK_URL'), 'headers' => []], ], ],
Evaluation never blocks. Counting each rule is a cheap COUNT(*); delivery is dispatched
to a queued DeliverAlertJob (with retries and backoff). If a sink is down the job is
retried, so the alert is retained, not lost, and a slow or failing sink never stalls the
evaluator or other rules. (DeliverAlertJob is in capture.except, so Argus does not
observe its own alert deliveries.)
Adding a custom sink
Sinks live behind Argus\Contracts\AlertSink. Slack and a generic webhook ship in the box.
Adding PagerDuty, email, or anything else is implementing the interface and registering it,
no core changes:
use Argus\Alerting\AlertNotification; use Argus\Contracts\AlertSink; final readonly class PagerDutySink implements AlertSink { public function name(): string { return 'pagerduty'; // the key a rule references in its sinks list } public function send(AlertNotification $notification): void { // POST $notification to PagerDuty. Throw on failure so the queued job retries. } }
Register it on the AlertSinkRegistry singleton from your own service provider, then
reference 'pagerduty' in any rule's sinks:
use Argus\Alerting\AlertSinkRegistry; public function boot(AlertSinkRegistry $sinks): void { $sinks->register(new PagerDutySink(/* ... */)); }
The buffer choice
The capture path must never block or slow a worker, so listeners do not write to
storage directly. Instead each listener does an O(1) LPUSH of one transition onto
a Redis list and returns immediately. A dedicated argus:ship daemon drains that
list in batches and writes to the store.
We deliberately did NOT ship transitions via a Laravel queued job: that would enqueue work onto the very queue system Argus observes (a feedback loop and added load). The Redis-list buffer keeps capture off the queue entirely.
Backpressure is built in: the shipper moves drained items to an inflight list and only
removes them after a successful store write (ack). If the store is slow or down, items
are never acked, so they stay buffered and are retried. Nothing is dropped and workers
are never stalled (they only ever push).
The buffer is an interface (Argus\Contracts\TransitionBuffer). Swapping the Redis list
for a Redis-stream outbox later is a single binding change; listeners and the shipper are
unaffected.
The storage-swap seam
Ingestion depends only on Argus\Contracts\TransitionStore (the write side). The concrete
store is resolved from config('argus.store'). Today postgres is implemented; a future
OpenSearch backend implements the same interface and is selected by changing one config
value, with zero changes to ingestion code. Backend-specific migrations live with their
backend (database/migrations/postgres) and are only loaded/published for that backend.
The read side (Argus\Contracts\TransitionQuery) is the same seam: the Postgres store
implements both the write and read contracts. The query layer (Argus\Query) compiles
to a backend-agnostic JobFilter and contains no SQL itself, so a future OpenSearch
backend implements the read methods and is selected by one config value.
Saved searches (Argus\Contracts\SavedSearchStore) and alert rules
(Argus\Contracts\AlertRuleStore) follow the identical pattern: an interface here, a
Postgres implementation under Storage/Postgres, selected by the same config('argus.store')
value. Their backend-specific migrations live in database/migrations/postgres with the
rest. Nothing in the service or alerting layer knows the backend.
What Argus never stores
Argus never writes raw job payloads. It captures only the correlation identifiers you
explicitly whitelist in config('argus.correlation.fields'). Anything not on that list is
never captured, so non-whitelisted data cannot be stored, searched, or returned.
Exception messages are the one free-text field Argus persists, so they are scrubbed at
ingestion before storage: emails, UUIDs, long hex tokens, and long digit runs are
replaced with [email] / [uuid] / [hex] / [id]. The stored value is already
redacted, so nothing sensitive is written or returned, including in the history and
failure-grouping views. Messages are also truncated and fingerprinted by root cause.
This is best-effort defense-in-depth on top of the hard guarantees above.
统计信息
- 总下载量: 0
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 2
- 依赖项目数: 1
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2026-06-28