-
-
Notifications
You must be signed in to change notification settings - Fork 62
Expand file tree
/
Copy pathCoopExecutor.php
More file actions
88 lines (80 loc) · 3.23 KB
/
CoopExecutor.php
File metadata and controls
88 lines (80 loc) · 3.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
<?php
namespace React\Dns\Query;
use React\Promise\Promise;
use React\Promise\PromiseInterface;
/**
* Cooperatively resolves hosts via the given base executor to ensure same query is not run concurrently
*
* Wraps an existing `ExecutorInterface` to keep tracking of pending queries
* and only starts a new query when the same query is not already pending. Once
* the underlying query is fulfilled/rejected, it will forward its value to all
* promises awaiting the same query.
*
* This means it will not limit concurrency for queries that differ, for example
* when sending many queries for different host names or types.
*
* This is useful because all executors are entirely async and as such allow you
* to execute any number of queries concurrently. You should probably limit the
* number of concurrent queries in your application or you're very likely going
* to face rate limitations and bans on the resolver end. For many common
* applications, you may want to avoid sending the same query multiple times
* when the first one is still pending, so you will likely want to use this in
* combination with some other executor like this:
*
* ```php
* $executor = new CoopExecutor(
* new RetryExecutor(
* new TimeoutExecutor(
* new UdpTransportExecutor($nameserver),
* 3.0
* )
* )
* );
* ```
*/
final class CoopExecutor implements ExecutorInterface
{
private $executor;
private $pending = [];
private $counts = [];
public function __construct(ExecutorInterface $base)
{
$this->executor = $base;
}
public function query(Query $query): PromiseInterface
{
$key = $this->serializeQueryToIdentity($query);
if (isset($this->pending[$key])) {
// same query is already pending, so use shared reference to pending query
$promise = $this->pending[$key];
++$this->counts[$key];
} else {
// no such query pending, so start new query and keep reference until it's fulfilled or rejected
$promise = $this->executor->query($query);
$this->pending[$key] = $promise;
$this->counts[$key] = 1;
$promise->then(function () use ($key) {
unset($this->pending[$key], $this->counts[$key]);
}, function () use ($key) {
unset($this->pending[$key], $this->counts[$key]);
});
}
// Return a child promise awaiting the pending query.
// Cancelling this child promise should only cancel the pending query
// when no other child promise is awaiting the same query.
return new Promise(function ($resolve, $reject) use ($promise) {
$promise->then($resolve, $reject);
}, function () use (&$promise, $key, $query) {
if (--$this->counts[$key] < 1) {
unset($this->pending[$key], $this->counts[$key]);
$promise->cancel();
$promise = null;
}
throw new \RuntimeException('DNS query for ' . $query->describe() . ' has been cancelled');
});
}
private function serializeQueryToIdentity(Query $query)
{
return sprintf('%s:%s:%s', $query->name, $query->type, $query->class);
}
}