letts/php-client
Composer 安装命令:
composer require letts/php-client
包简介
PHP client and mission runtime for the letts/dugdale distributed task queue.
README 文档
README
PHP client and mission runtime for the letts distributed task queue (successor to jobd).
The package has two halves:
- Client (
Letts\Client) — application-side. Dispatches missions todugdaledaemons, follows their event streams, fans out across hosts, and manages running missions (kill/restart/delete/query). - Mission runtime (
Letts\Mission) — worker-side. A thin façade your mission scripts use to read input, stream progress, write output files, react to cancellation, and report success/failure over the daemon's fd-3 control channel.
A dugdale is the daemon that actually runs missions; a mission is a PHP script it executes; a lane is a named concurrency queue on a dugdale; a label tags dugdales so you can address them by capability instead of by id.
Requirements
- PHP 8.3+
- ext-curl, ext-pcntl, ext-posix (plus ext-sockets to run the test suite)
- Composer
Install
composer require letts/php-client
Concepts
Addressing a target
Every dispatch() / run() call selects its target dugdale in one of three
ways (combining them throws BadRequestException):
| Mode | Argument | Lane | Meaning |
|---|---|---|---|
| route | route: 'normal' | from route | A named (host, lane) pair defined in letts.yaml. |
| host | host: 's1' | lane: required | A dugdale id, or an alias that resolves to one. |
| match | match: ['prod'] | lane: required | Auto-select a dugdale carrying all the given labels and declaring the lane. |
When neither route nor host is given, the auto-select label filter resolves
in order: the call's match: → the client's withMatch() scope →
selector.match from letts.yaml. If no source provides a filter, the call
throws BadRequestException. The two default sources are auto-select scoping
only — they are ignored by (and never conflict with) explicit route/host
calls; only literally passing match: together with route/host is rejected.
With match, if several dugdales qualify, one is chosen at random (load
distribution); if none qualify, NoMatchingDugdaleException is thrown.
Scopes & tokens
Calls authenticate with one of three token scopes, picked automatically:
- dispatch —
dispatch(),run(),runParallel(),runOnAll(),getMission() - admin —
listMissions(),kill(),restart(),delete() - exec — reserved; not used by the client today
Each dugdale's own token wins over the global auth.* fallback. See
Configuration.
Client
Construction
use Letts\Client;
$letts = Client::default(); // discover letts.yaml (see cascade below)
$letts = Client::fromConfig('/path/letts.yaml');
$letts = Client::fromConfig('/path/letts.yaml', [
'request_timeout' => 30, // per-request inactivity seconds (default 30)
'connect_timeout' => 5, // connection-phase seconds (best-effort per platform)
'max_connections_per_host' => 4, // curl pool size (default 4)
'retry_attempts' => 3, // total tries on network/5xx (default 3)
'retry_backoff' => [100, 500, 2000],// backoff in ms between tries
]);
Both factories accept an optional injected HttpClientInterface for tests.
Retries apply to network errors and 5xx responses only — 4xx (including
429) is treated as definitive and not retried.
dispatch() — fire-and-forget
Returns the mission id immediately; does not wait for completion.
$id = $letts->dispatch(
mission: 'NotifyUser',
host: 's7', lane: 'high', // or route:, or match: with lane:
input: ['user_id' => 1],
files: ['photo' => '/tmp/pic.jpg'], // optional, see Input files
timeout: '30s', // optional mission execution timeout
missionId: null, // optional caller-supplied id (idempotency)
);
Idempotency: the mission id doubles as the Idempotency-Key. Re-dispatching
the same missionId returns the same mission; reusing it with a different
payload throws ConflictException.
run() — dispatch and wait
Dispatches, then follows the NDJSON event stream until the mission reaches a
terminal done event, and returns a RunResult. If the
connection drops mid-flight the stream reconnects (resuming from the last seen
event) with a short backoff, up to 3 consecutive unproductive attempts — the
budget resets while events keep arriving, so long missions over restartable
connections aren't capped.
$r = $letts->run(
mission: 'GenerateThumbnails',
route: 'normal', // addressing: route | host & lane | match & lane
input: ['video_id' => 123],
);
echo $r->return['processed'];
// All options:
$r = $letts->run(
mission: 'RenderReport',
host: 's1', lane: 'normal',
input: ['id' => 9],
files: ['template' => '/tmp/t.html'],
timeout: '5m', // mission-side execution limit (daemon)
waitTimeout: '30s', // client-side wait deadline (ms/s/m/h)
onProgress: fn(?float $v, ?string $m) => printf("%.0f%% %s\n", ($v ?? 0) * 100, $m),
downloadOutputsTo: '/tmp/out', // save mission output files into this dir
throwOnFailure: true, // default: non-success → MissionFailedException
fetchLogs: false, // default: do NOT pull stdout/stderr (extra request)
);
Notes:
throwOnFailure(defaulttrue): a non-success outcome raisesMissionFailedException. Setfalseto inspect theRunResultinstead.fetchLogs(defaultfalse): stdout/stderr are an extra round-trip, so$r->logsis empty unless you opt in. Log fetch failures degrade to empty logs, never fail the run.downloadOutputsTo: on success, each output file the mission registered is streamed to<dir>/<role>(never buffered whole in memory) and verified against the size/sha256 from the terminal event; a failed or short download raisesStagingExceptionand leaves no partial file behind.timeoutis enforced by the daemon on the mission;waitTimeoutis how long the client waits before giving up on the stream — past itrun()throwsWaitTimeoutExceptionwhile the mission keeps running on the daemon.
runParallel() and runOnAll() — fan-out
runParallel() runs many jobs concurrently over one multiplexed connection pool,
so wall-clock is the slowest job, not the sum. Results preserve input order.
$results = $letts->runParallel([
['host' => 's1', 'lane' => 'manual', 'mission' => 'DiskUsage', 'input' => ['mount' => '/']],
['host' => 's2', 'lane' => 'manual', 'mission' => 'DiskUsage', 'input' => ['mount' => '/']],
]);
foreach ($results as $hr) { // each is a HostResult
if ($hr->isSuccess()) {
echo "$hr->host: " . $hr->result->return['summary'] . "\n";
} else {
echo "$hr->host: " . ($hr->error->kind ?? 'fail') . "\n";
}
}
Each job is an array with the same addressing keys as run()
(route | host | match, plus lane, mission, input, files, timeout).
Unlike run(), a stream that drops mid-flight is not reconnected — the job
surfaces a network HostError. Use it for short control-style missions.
Both fan-out calls take a waitTimeout:; jobs still unfinished at the deadline
surface a HostError of kind timeout (their missions keep running on the
daemons):
$results = $letts->runParallel($jobs, waitTimeout: '2m');
runOnAll() fans out one mission to every dugdale that matches the labels and
declares the lane:
$results = $letts->runOnAll(mission: 'FlushCache', lane: 'normal', match: ['prod']);
The label filter is required and resolves like auto-select (match: →
withMatch() scope → selector.match). With no filter from any source,
runOnAll() throws NoMatchingDugdaleException instead of silently hitting
every dugdale that happens to declare the lane.
Mission control & queries
$info = $letts->getMission($id); // ?MissionInfo; host omitted → search all dugdales
$info = $letts->getMission($id, host: 's1');
$list = $letts->listMissions(host: 's1', filters: ['status' => 'running']); // admin; host required
$letts->kill($id, signal: 'TERM', host: 's1'); // admin
$newId = $letts->restart($id, host: 's1'); // admin; returns the new mission id
$letts->delete($id, host: 's1', force: true); // admin
$dugdales = $letts->dugdales(match: ['prod']); // list<Config\Dugdale> matching labels
$scoped = $letts->withMatch(['prod']); // copy with a default auto-select label filter
getMission() returns null when the mission isn't found. listMissions(),
kill(), restart(), and delete() require an explicit host and an admin
token.
Unlike dispatch (whose Idempotency-Key makes re-sending safe), kill() and
restart() are never auto-retried: every successful restart enqueues a
brand-new mission, so re-sending after an ambiguous network failure could
double the work. On a NetworkException check state via getMission() before
trying again. delete() is idempotent and retries normally.
Result objects
RunResult (Letts\Result\RunResult)
| Property | Type | Notes |
|---|---|---|
host | string | dugdale id that ran the mission |
missionId | string | |
outcome | string | success | failed | oom | killed | timeout | crashed | lost |
return | ?array | the mission's success() payload |
failReason / failMessage / failDetails | ?string / ?string / ?array | populated on failure |
exitCode / signal | ?int / ?string | |
durationMs | int | |
logs | Logs | empty unless fetchLogs: true |
outputFiles | array | role => {staging_id, sha256, size} |
->isSuccess(): bool is true iff outcome === 'success'.
Logs — stdout, stderr (strings), stdoutTruncated, stderrTruncated.
HostResult (returned by runParallel()/runOnAll()) — host,
result: ?RunResult, error: ?HostError; helpers ->isReachable() and
->isSuccess().
HostError — kind (auth | bad_request | conflict | backpressure | network | timeout), message, httpStatus, errorCode.
MissionInfo (getMission()/listMissions()) — the full daemon record:
missionId, status, outcome, lane, missionName, groupId, exitCode,
signal, failReason/failMessage/failDetails, return, input,
durationMs, timeoutMs, pid, time{Created,Started,Finished}Ms,
restartedFrom, inputs, outputs, and more.
Exceptions
All extend Letts\Exceptions\LettsException (which extends \RuntimeException).
| Exception | Raised when |
|---|---|
MissionFailedException | run(throwOnFailure: true) and the mission did not succeed. Carries getOutcome(), getReason(), getFailMessage(), getFailDetails(), getResult(). |
NoMatchingDugdaleException | match/runOnAll finds no dugdale with the requested labels and lane. |
BadRequestException | invalid addressing (route and host combined, no addressing at all, missing lane), 400. |
AuthException | 401 — bad/missing token for the scope. |
ConflictException | 409 — idempotency-key reused with a different payload. |
BackpressureException | 503 — daemon shedding load. |
DispatchException | other non-2xx; getCode() is the HTTP status. |
NetworkException | transport failure (incl. an event stream that can't be kept open to a terminal event); getHost() identifies the dugdale. |
WaitTimeoutException | run(waitTimeout:) elapsed before the mission finished; the mission keeps running on the daemon. |
StagingException | input-file upload or output-file download failed. |
ConfigException / MissingEnvException | bad letts.yaml / unresolved ${ENV} (the latter exposes ->name). |
In runParallel()/runOnAll() these dispatch errors are caught per-job and
surfaced as HostError instead of thrown.
Mission runtime
A mission is an executable PHP script the dugdale runs. Bootstrap with
Mission::start(), which auto-detects its environment: under a dugdale
(LETTS_MISSION_ID set) it wires the fd-3 control channel and signal/shutdown
handlers; run directly from a shell it falls back to standalone mode.
<?php
require __DIR__ . '/vendor/autoload.php';
use Letts\Mission;
$m = Mission::start();
$videoId = $m->input('video_id');
$m->progress(0.5, "processing video $videoId");
// ... do work ...
$m->success(['processed' => true]);
Input
Input is a JSON object, read with dot-notation paths:
$m->input('user.name'); // throws if the path is absent
$m->input('user.name', 'guest'); // returns the default if absent
$m->has('user.name'); // bool
$m->all(); // the whole input array
Input files
Files passed via dispatch(files: ...) / run(files: ...) are uploaded to the
daemon's staging area (resumable: an interrupted upload retries from the byte
the daemon confirmed) and materialized on disk for the mission, keyed by the
role you chose:
$path = $m->file('photo'); // absolute path to the materialized file
$size = $m->fileSize('photo');
$fh = $m->fileStream('photo'); // open read handle
$all = $m->files(); // role => {path, size, sha256}
Progress & cooperative cancellation
$m->progress(0.25, 'a quarter done'); // value (0..1) and/or message; both optional
$m->progress(message: 'still working');
Mission::start() installs SIGTERM/SIGINT handlers with
pcntl_async_signals(true), so signals arrive between opcodes with no extra
work. Call $m->checkSignal() at safe points in long loops — it throws
InterruptedException when the daemon asks the mission to stop:
foreach ($items as $item) {
$m->checkSignal(); // throws InterruptedException on SIGTERM/SIGINT
process($item);
}
If you let it propagate, the runtime reports the mission as failed. In practice
the stop was initiated by the daemon (a kill or timeout), so the daemon's
terminal outcome (killed/timeout) takes precedence regardless of what the
mission emits last. Don't add manual pcntl_signal_dispatch() calls — async
delivery is already on.
Output files
Write into the path the runtime gives you, then register the key. Keys must match
^[A-Za-z_][A-Za-z0-9_]{0,63}$ and not start with __:
file_put_contents($m->outputPath('result'), $bytes);
$m->outputFile('result'); // register; dugdale collects and hashes it on success
$m->success(['written' => true]);
success() verifies every registered output file exists on disk first, so a
missing file fails locally with a clear error instead of an opaque daemon-side
missing_output.
Success & failure
$m->success(['key' => 'value']); // return must be a JSON object (assoc array) or null — not a list
$m->success(); // no return value
$m->fail('could not reach upstream', exitCode: 2, details: ['url' => $url]);
Both terminate the process. Passing a list (sequential array) to success()
throws InvalidArgumentException — wrap it, e.g. ['items' => $list]. A
failure always exits non-zero: fail(..., exitCode: 0) is coerced to 1
(exit 0 would contradict the failure and the daemon would record it as the
diagnostic fail_then_zero_exit instead of your message).
Failure & crash semantics
The runtime classifies abnormal exits for you:
- Uncaught
Throwable→ fail event,reason = uncaught_exception, with class/file/line/trace infailDetails. - Out of memory →
outcome = oom,reason = php_memory_limit(a 64 KB reserve buffer lets the handler run after the limit is hit). - Other fatal error →
outcome = crashed/reason = php_fatal_error. - Explicit
fail()→reason = explicit.
Standalone debug mode
Run a mission outside a dugdale for local debugging. Input comes from argv,
progress goes to stderr, and the success() payload is printed as pretty JSON to
stdout:
php missions/X.php --input='{"video_id":42}'
php missions/X.php --input-file=payload.json
php missions/X.php --input=- # read JSON from stdin
File keys work too: with no dugdale to materialize files, $m->file('photo')
returns the input value itself as the path — point it at a local file, e.g.
--input='{"photo":"/tmp/photo.jpg"}'. Output files go to out/<key> under
the current directory.
Configuration (letts.yaml)
The client discovers letts.yaml via this cascade (first existing file wins),
matching the letts CLI so the library and CLI resolve the same file:
$LETTS_CONFIG(if set, the file must exist)./letts.yaml$XDG_CONFIG_HOME/letts/letts.yaml(only whenXDG_CONFIG_HOMEis set)~/.letts/letts.yaml/etc/letts/letts.yaml
Full example:
# Global token fallbacks (a dugdale's own token overrides these).
# ${ENV} placeholders are substituted when a token/alias is resolved.
auth:
token: "${LETTS_DISPATCH_TOKEN}" # dispatch scope
admin_token: "${LETTS_ADMIN_TOKEN}" # admin scope
defaults:
port: 7180 # used by any dugdale that omits `port`
# Default label filter for auto-select / runOnAll when a call passes no match:.
selector:
match: [prod]
# Named (host, lane) pairs addressable via run(route: ...).
routes:
normal: {host: s1, lane: normal}
bulk: {host: s1, lane: high}
# Host aliases resolved to a dugdale id (cycle-checked, max 8 hops; ${ENV} ok).
aliases:
primary: s1
# Reusable blocks dugdales can `extends`.
templates:
prod:
labels: [prod]
lanes:
normal: {concurrency: 4}
high: {concurrency: 8}
dugdales:
- id: s1
host: server1.internal
port: 7180
extends: prod # inherit labels and lanes
token: "${S1_TOKEN}" # overrides auth.token for s1
admin_token: "${S1_ADMIN}"
- id: s2
host: server2.internal
extends: prod
lanes:
high: null # delete the `high` lane inherited from the template
Key reference:
| Key | Purpose |
|---|---|
auth.token / admin_token / exec_token | global token fallbacks per scope |
defaults.port | default daemon port (0–65535) when a dugdale omits one |
selector.match | default label filter for auto-select / runOnAll when no match: is passed |
routes.<name> | {host, lane} for run(route: ...) |
aliases.<name> | alternate name → dugdale id |
templates.<name> | reusable {labels, lanes, tokens, …} block |
dugdales[].id | required; unique; ^[a-z][a-z0-9_-]{0,63}$ |
dugdales[].host / port / url | endpoint (url overrides host:port) |
dugdales[].extends | name of a template to inherit from |
dugdales[].labels | tags used by match / runOnAll / dugdales() |
dugdales[].lanes.<name> | {concurrency, paused}; null deletes an inherited lane |
dugdales[].token / admin_token / exec_token | per-dugdale tokens |
extends merge: scalars are dugdale-wins; labels are replaced (not
unioned) when the dugdale sets its own; lanes are unioned with the template
(dugdale wins on collision, null deletes). Unknown keys are rejected, and all
ids/lane/label/route/template names are regex-validated on load.
The
mission_dirandruntimekeys are accepted for parity with thelettsCLI config but are not consumed by the PHP client (they configure the daemon, not the client).
Testing
composer test # unit tests only — no daemon needed
composer test:integration # rebuilds tools/dugdale from source, then runs
composer test:all
composer test:integration first runs composer build-daemon, which rebuilds
tools/dugdale from the letts Go source every time — so integration tests
always run against the current daemon wire-contract, never a stale binary. By
default it expects the letts checkout at ../letts; override with:
LETTS_SRC=/path/to/letts composer test:integration
If the Go toolchain or the letts source is unavailable, the rebuild is skipped
and integration tests that need the binary are skipped too. You can rebuild the
daemon on its own with composer build-daemon.
License
MIT — see LICENSE.
统计信息
- 总下载量: 0
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 3
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2026-06-12