|
沙发
楼主 |
发表于 2015-09-20 11:35:33
|
只看该作者
2、使用 select/poll 的同步模型:属于同步非阻塞 IO 模型,代码如下:
<?php
/**
* SelectSocketServer Class
* By James.Huang <shagoo#gmail.com>
**/
set_time_limit(0);
class SelectSocketServer
{
private static $socket;
private static $timeout = 60;
private static $maxconns = 1024;
private static $connections = array();
function SelectSocketServer($port)
{
global $errno, $errstr;
if ($port < 1024) {
die("Port must be a number which bigger than 1024/n");
}
$socket = socket_create_listen($port);
if (!$socket) die("Listen $port failed");
socket_set_nonblock($socket); // 非阻塞
while (true)
{
$readfds = array_merge(self::$connections, array($socket));
$writefds = array();
// 选择一个连接,获取读、写连接通道
if (socket_select($readfds, $writefds, $e = null, $t = self::$timeout))
{
// 如果是当前服务端的监听连接
if (in_array($socket, $readfds)) {
// 接受客户端连接
$newconn = socket_accept($socket);
$i = (int) $newconn;
$reject = '';
if (count(self::$connections) >= self::$maxconns) {
$reject = "Server full, Try again later./n";
}
// 将当前客户端连接放入 socket_select 选择
self::$connections[$i] = $newconn;
// 输入的连接资源缓存容器
$writefds[$i] = $newconn;
// 连接不正常
if ($reject) {
socket_write($writefds[$i], $reject);
unset($writefds[$i]);
self::close($i);
} else {
echo "Client $i come./n";
}
// remove the listening socket from the clients-with-data array
$key = array_search($socket, $readfds);
unset($readfds[$key]);
}
// 轮循读通道
foreach ($readfds as $rfd) {
// 客户端连接
$i = (int) $rfd;
// 从通道读取
$line = @socket_read($rfd, 2048, PHP_NORMAL_READ);
if ($line === false) {
// 读取不到内容,结束连接
echo "Connection closed on socket $i./n";
self::close($i);
continue;
}
$tmp = substr($line, -1);
if ($tmp != "/r" && $tmp != "/n") {
// 等待更多数据
continue;
}
// 处理逻辑
$line = trim($line);
if ($line == "quit") {
echo "Client $i quit./n";
self::close($i);
break;
}
if ($line) {
echo "Client $i >>" . $line . "/n";
}
}
// 轮循写通道
foreach ($writefds as $wfd) {
$i = (int) $wfd;
$w = socket_write($wfd, "Welcome Client $i!/n");
}
}
}
}
function close ($i)
{
socket_shutdown(self::$connections[$i]);
socket_close(self::$connections[$i]);
unset(self::$connections[$i]);
}
}
new SelectSocketServer(2000);
3、使用 epoll/kqueue 的异步模型:属于异步阻塞/非阻塞 IO 模型,代码如下:
<?php
/**
* EpollSocketServer Class (use libevent)
* By James.Huang <shagoo#gmail.com>
*
* Defined constants:
*
* EV_TIMEOUT (integer)
* EV_READ (integer)
* EV_WRITE (integer)
* EV_SIGNAL (integer)
* EV_PERSIST (integer)
* EVLOOP_NONBLOCK (integer)
* EVLOOP_ONCE (integer)
**/
set_time_limit(0);
class EpollSocketServer
{
private static $socket;
private static $connections;
private static $buffers;
function EpollSocketServer ($port)
{
global $errno, $errstr;
if (!extension_loaded('libevent')) {
die("Please install libevent extension firstly/n");
}
if ($port < 1024) {
die("Port must be a number which bigger than 1024/n");
}
$socket_server = stream_socket_server("tcp://0.0.0.0:{$port}", $errno, $errstr);
if (!$socket_server) die("$errstr ($errno)");
stream_set_blocking($socket_server, 0); // 非阻塞
$base = event_base_new();
$event = event_new();
event_set($event, $socket_server, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base);
event_base_set($event, $base);
event_add($event);
event_base_loop($base);
self::$connections = array();
self::$buffers = array();
}
function ev_accept($socket, $flag, $base)
{
static $id = 0;
$connection = stream_socket_accept($socket);
stream_set_blocking($connection, 0);
$id++; // increase on each accept
$buffer = event_buffer_new($connection, array(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id);
event_buffer_base_set($buffer, $base);
event_buffer_timeout_set($buffer, 30, 30);
event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
event_buffer_priority_set($buffer, 10);
event_buffer_enable($buffer, EV_READ | EV_PERSIST);
// we need to save both buffer and connection outside
self::$connections[$id] = $connection;
self::$buffers[$id] = $buffer;
}
function ev_error($buffer, $error, $id)
{
event_buffer_disable(self::$buffers[$id], EV_READ | EV_WRITE);
event_buffer_free(self::$buffers[$id]);
fclose(self::$connections[$id]);
unset(self::$buffers[$id], self::$connections[$id]);
}
function ev_read($buffer, $id)
{
static $ct = 0;
$ct_last = $ct;
$ct_data = '';
while ($read = event_buffer_read($buffer, 1024)) {
$ct += strlen($read);
$ct_data .= $read;
}
$ct_size = ($ct - $ct_last) * 8;
echo "[$id] " . __METHOD__ . " > " . $ct_data . "/n";
event_buffer_write($buffer, "Received $ct_size byte data./r/n");
}
function ev_write($buffer, $id)
{
echo "[$id] " . __METHOD__ . "/n";
}
}
new EpollSocketServer(2000);
先说一下,以上的例子是基于 PHP 的 libevent 扩展实现的,需要运行的话要先安装此扩展,参考:http://pecl.php.net/package/libevent。
epoll/kqueue 的例子最好是能在linux环境下运行,因为windows下的php不支持多epoll模型,也不支持多进程。
以上就是我对php的初探,3中io模型的代码确定有效,可以再次基础上做server或者gateway服务器的开发,第一次发贴,写的匆忙,还有很多东西没有说明,不足之处望请见谅,以后持续编辑此贴!希望能和大家沟通交流!
参考资料:
http://blog.csdn.net/shagoo/article/details/6396089
http://www.cnblogs.com/skynet/archive/2010/12/12/1903949.html
|
|