
19.06.2008, 04:08
|
|
[Лишённый самовыражени
Регистрация: 16.01.2005
Сообщений: 1,787
Провел на форуме: 9751379
Репутация:
3812
|
|
Класс для одновременного открытия нескольких сокетов
PHP код:
<?
class HttpQueue
{
/**
* An array of URLs
*
* @access private
* @var array
*/
private $_urls = array();
/**
* An array of server sockets
*
* @access private
* @var array
*/
private $_sockets = array();
/**
* An array of server responses
*
* @access private
* @var array
*/
private $_response = array();
/**
* Socket timeout
*
* @access private
* @var integer
*/
private $_timeout = 30;
/**
* An array of sockets which can be received
*
* @access private
* @var array
*/
private $_read = array();
/**
* An array of sockets which can be sended
*
* @access private
* @var array
*/
private $_write = array();
/**
* Adds an URL into a tasklist
*
* @access public
* @param string $method
* @param string $url
* @return void
*/
public function add($method, $url)
{
$this->_urls[] = array(strtoupper($method), $this->_parseUrl($url));
}
/**
* Parses requested URL and checks for all URL parts
*
* @access private
* @param string $url
* @return array
*/
private function _parseUrl($url)
{
$parts = parse_url($url);
$parts['port'] = array_key_exists(’port’, $parts) ? $parts['port'] : 80;
$parts['sock'] = sprintf(’%s:%s’, $parts['host'], $parts['port']);
$parts['request'] = sprintf(’%s?%s’, $parts['path'], $parts['query']);
return $parts;
}
/**
* Starts fetch process
*
* @access public
* @param void
* @return array
*/
public function fetch()
{
$this->_create();
$this->_process();
return $this->toArray();
}
/**
* Sets socket timeout (in seconds)
*
* @access public
* @param integer $timeout
* @return void
*/
public function setTimeout($timeout)
{
$this->_timeout = $timeout;
}
/**
* Returns array of server responses
*
* @access public
* @param void
* @return array
*/
public function toArray()
{
return $this->_response;
}
/**
* Creates socket conenctions with hosts given in URL
*
* @access private
* @param void
* @return void
*/
private function _create()
{
foreach ($this->_urls as $id => $connect) {
if ($socket = @stream_socket_client($connect[1]['sock'], $errno,
$errstr, $this->_timeout,
STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT)) {
$this->_sockets[$id] = $socket;
$this->_response[$id] = ‘processing’;
continue;
}
$this->_response[$id] = “failed, $errno $errstr”;
}
}
/**
* Process opened sockets
*
* @access private
* @param void
* @return void
*/
private function _process()
{
while (count($this->_sockets)) {
$this->_read = $this->_sockets;
$this->_write = $this->_sockets;
$select = stream_select($this->_read, $this->_write, $e = null,
$this->_timeout);
if ($select > 0) {
$this->_read();
$this->_write();
}
else {
foreach ($this->_sockets as $id => $s) {
$this->_response[$id] = ‘Timed out’;
}
break;
}
}
}
/**
* Receives data from readable socket
*
* @access private
* @param void
* @return void
*/
private function _read()
{
foreach ($this->_read as $fp) {
$id = array_search($fp, $this->_sockets);
$data = fread($fp, 8192);
if (strlen($data) == 0) {
if ($this->_response[$id] == ‘processing’) {
$this->_response[$id] = ‘failed to connect’;
}
fclose($fp);
unset($this->_sockets[$id]);
}
else {
$this->_response[$id].= $data;
}
}
}
/**
* Sends HTTP request into writeable socket
*
* @access private
* @param void
* @return void
*/
private function _write()
{
foreach ($this->_write as $fp) {
$id = array_search($fp, $this->_sockets);
$method = $this->_urls[$id][0];
$request = $this->_urls[$id][1]['request'];
$host = $this->_urls[$id][1]['host'];
fputs($fp, “$method $request HTTP/1.0rn”);
fputs($fp, “Host: $hostrnrn”);
$this->_response[$id] = ”;
}
}
}
?>
__________________
|
|
|