Skip to content

Commit 31d1af4

Browse files
committed
开始消费
1 parent 18858cd commit 31d1af4

File tree

4 files changed

+113
-57
lines changed

4 files changed

+113
-57
lines changed

README.md

Lines changed: 96 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
# ReactPHP Queue
22

3-
基于 Redis 的异步队列实现,使用 ReactPHP 构建
3+
基于 ReactPHP 和 Redis 实现的异步队列系统,提供轻量级、高性能的消息队列解决方案
44

55
## 特性
66

77
- 基于 Redis 实现的轻量级队列
8-
- 支持异步操作
9-
- 支持多队列
10-
- 支持阻塞式出队
8+
- 支持异步操作和非阻塞式处理
9+
- 支持多队列和优先级队列
10+
- 提供阻塞式出队操作
11+
- 内置任务状态管理
12+
- 支持多工作进程处理
13+
- 支持内存队列(ArrayQueue)
1114
- 支持队列前缀配置
1215

1316
## 安装
@@ -16,81 +19,125 @@
1619
composer require reactphp-x/queue
1720
```
1821

19-
## 基本用法
22+
## 基础使用
2023

21-
### 初始化队列
24+
### 创建队列实例
2225

2326
```php
27+
use React\EventLoop\Loop;
2428
use Clue\React\Redis\RedisClient;
2529
use ReactphpX\Queue\Queue;
2630

27-
$redis = new RedisClient('redis://localhost:6379');
28-
$queue = new Queue($redis, 'myapp'); // 第二个参数为队列前缀,可选
31+
$loop = Loop::get();
32+
$redis = new RedisClient('redis://127.0.0.1:6379');
33+
$queue = new Queue($redis, 'example');
2934
```
3035

31-
### 入队操作
36+
### 基本队列操作
3237

3338
```php
34-
// 向默认队列添加数据
35-
$queue->enqueue(['job' => 'task1', 'data' => 'some data']);
36-
37-
// 向指定队列添加数据
38-
$queue->enqueue(['job' => 'task2', 'data' => 'other data'], 'high-priority');
39-
```
39+
// 入队操作
40+
$queue->enqueue(['task' => 'task1', 'data' => 'some data'])->then(function () {
41+
echo "Task1 入队成功\n";
42+
});
4043

41-
### 出队操作
44+
// 获取队列大小
45+
$queue->size()->then(function ($size) {
46+
echo "当前队列大小: {$size}\n";
47+
});
4248

43-
```php
44-
// 从默认队列中取出数据(非阻塞)
49+
// 出队操作
4550
$queue->dequeue()->then(function ($data) {
46-
if ($data === null) {
47-
echo "队列为空\n";
48-
return;
49-
}
50-
echo "获取到数据:" . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
51+
echo "出队数据: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
5152
});
5253

53-
// 从指定队列中取出数据(非阻塞)
54-
$queue->dequeue('high-priority')->then(function ($data) {
55-
// 处理数据
54+
// 阻塞式出队(等待5秒)
55+
$queue->blockingDequeue(5)->then(function ($data) {
56+
if ($data) {
57+
echo "获取到数据: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
58+
} else {
59+
echo "等待超时,没有数据\n";
60+
}
5661
});
5762
```
5863

59-
### 阻塞式出队
64+
## 高级特性
65+
66+
### 多队列支持
67+
68+
支持创建多个优先级队列,实现任务优先级处理:
6069

6170
```php
62-
// 阻塞等待数据(默认永久等待)
63-
$queue->blockingDequeue()->then(function ($data) {
64-
// 处理数据
65-
});
71+
// 向不同优先级的队列添加任务
72+
$queue->enqueue($task, 'high');
73+
$queue->enqueue($task, 'medium');
74+
$queue->enqueue($task, 'low');
75+
76+
// 按优先级处理任务
77+
$queue->dequeue('high');
78+
$queue->dequeue('medium');
79+
$queue->dequeue('low');
80+
```
6681

67-
// 设置超时时间(单位:秒)
68-
$queue->blockingDequeue(5)->then(function ($data) {
69-
if ($data === null) {
70-
echo "等待超时\n";
71-
return;
82+
### 任务状态管理
83+
84+
使用 JobManager 管理任务状态和执行:
85+
86+
```php
87+
use ReactphpX\Queue\JobManager;
88+
use ReactphpX\Queue\Storage\RedisStorageDriver;
89+
90+
$storage = new RedisStorageDriver($redis);
91+
$jobManager = new JobManager($storage, $queue);
92+
93+
// 推送带状态跟踪的任务
94+
$jobManager->pushJob('job-1', function () {
95+
return "Job completed";
96+
}, 'default', true);
97+
98+
// 获取任务状态
99+
$jobManager->getAllJobs()->then(function ($jobs) {
100+
foreach ($jobs as $jobId => $job) {
101+
echo "Job ID: $jobId, Status: {$job['status']}\n";
72102
}
73-
// 处理数据
74103
});
75104
```
76105

77-
### 队列管理
106+
### 工作进程
107+
108+
支持多工作进程并行处理任务:
78109

79110
```php
80-
// 获取队列长度
81-
$queue->size()->then(function ($length) {
82-
echo "队列长度:$length\n";
111+
use ReactphpX\Queue\Consumer;
112+
113+
$consumer = new Consumer($queue);
114+
115+
// 注册任务处理器
116+
$consumer->consume(function ($data) {
117+
echo "Processing task: " . json_encode($data) . "\n";
118+
// 处理任务逻辑
119+
return true;
83120
});
121+
```
122+
123+
### 内存队列
84124

85-
// 清空队列
86-
$queue->clear()->then(function () {
87-
echo "队列已清空\n";
125+
提供基于内存的队列实现,适用于单进程场景:
126+
127+
```php
128+
use ReactphpX\Queue\ArrayQueue;
129+
130+
$queue = new ArrayQueue();
131+
132+
$queue->enqueue('task1')->then(function () {
133+
echo "Task added\n";
134+
});
135+
136+
$queue->dequeue()->then(function ($data) {
137+
echo "Processing: $data\n";
88138
});
89139
```
90140

91-
## 注意事项
141+
## 许可证
92142

93-
1. 所有操作都返回 Promise 对象,需要使用 then() 方法处理结果
94-
2. 队列中的数据会自动进行 JSON 编码和解码
95-
3. 如果设置了队列前缀,实际的 Redis 键名将为 `前缀:队列名`
96-
4. 阻塞式出队在队列为空时会等待新数据到达,可以通过设置超时时间来避免永久等待
143+
MIT License

examples/job_status.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717

1818
// 创建队列和任务管理器
1919
$queue = new Queue($redis);
20-
$consumer = new Consumer($queue);
2120
$storage = new RedisStorageDriver($redis);
2221
$jobManager = new JobManager($storage, $queue);
2322
$jobManager->initProcess(1,1);
2423
// $jobManager->clearJobs();
24+
$jobManager->startConsume();
25+
2526
// 注册消费者
26-
$consumer->consume(function ($data) use ($jobManager) {
27-
echo "Received job: $data\n";
28-
return $jobManager->processJob($data);
29-
});
27+
// $consumer = new Consumer($queue);
28+
// $consumer->consume(function ($data) use ($jobManager) {
29+
// echo "Received job: $data\n";
30+
// return $jobManager->processJob($data);
31+
// });
3032

3133
// // 示例:推送一些任务到队列
3234
for ($i = 1; $i <= 3; $i++) {

src/Consumer.php

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,16 @@ class Consumer
1515
private $callback;
1616
private $poller;
1717
private $maxAttempts = 3;
18-
private $interval = 1000;
1918
private $consumerCount = 2;
2019
private $currentQueueIndex = 0;
2120
private $consumedCount = [];
2221
private $checkHighPriorityAfter = 10;
2322

24-
public function __construct(QueueInterface $queue, array $priorityQueues = ['high', 'middle', 'low', 'default'], int $maxAttempts = 1, int $interval = 1000, int $consumerCount = 2)
23+
public function __construct(QueueInterface $queue, array $priorityQueues = ['high', 'middle', 'low', 'default'], int $maxAttempts = 1, int $consumerCount = 2)
2524
{
2625
$this->queue = $queue;
2726
$this->priorityQueues = $priorityQueues;
2827
$this->maxAttempts = $maxAttempts;
29-
$this->interval = $interval;
3028
$this->consumerCount = $consumerCount;
3129
}
3230

src/JobManager.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ class JobManager {
2020
public function __construct(StorageDriverInterface $storage, Queue $queue) {
2121
$this->storage = $storage;
2222
$this->queue = $queue;
23+
24+
}
25+
26+
public function startConsume(Consumer $consumer = null)
27+
{
28+
if (!$consumer) {
29+
$consumer = new Consumer($this->queue);
30+
$consumer->consume(fn ($data) => $this->processJob($data));
31+
}
2332
}
2433

2534
public function initProcess($min, $max)

0 commit comments

Comments
 (0)