php的socket_recv在获取数据的时候如果实际要接收的数据是大于设置的buffer值的时候,我应该怎么样循环取出完整的数据然后把他发送给其他套接字呢?
1 回答

LIANHK
TA贡献78条经验 获得超17个赞
<?php // 受到PHP性能限制(包括pthread自身限制以及代码加密,锁等待等问题) // 经过测试,这个休眠时间几乎是必须的 // 要完美解决这个问题,需要使用golang define ( 'USLEEP_TIME', 10000 ); function recv_data($socket, $length, $max_time) { $time = microtime ( true ); $recv_length = 0; $data = ''; $close_flag = false; while ( microtime ( true ) - $time <= $max_time && $recv_length < $length ) { if (! is_resource ( $socket ) || $close_flag) { break; } $recv_length_this = socket_recv ( $socket, $temp_data, $length - $recv_length, MSG_DONTWAIT ); $recv_length += $recv_length_this; $data .= $temp_data; if ($recv_length_this === 0) { $close_flag = true; } usleep ( USLEEP_TIME ); } return array ( $recv_length, $data ); } function data_get($sem, $shm, $index) { $data_array = array (); if (sem_acquire ( $sem, false )) { $data_array = shm_get_var ( $shm, $index ); sem_release ( $sem ); } return $data_array; } function data_set($sem, $shm, $index, $data) { if (sem_acquire ( $sem, false )) { $data_array = array (); if (shm_has_var ( $shm, $index )) { $data_array = shm_get_var ( $shm, $index ); } shm_put_var ( $shm, $index, $data ); sem_release ( $sem ); } return $data_array; } function data_merge($sem, $shm, $index, $array) { if (sem_acquire ( $sem, false )) { $data_array = shm_get_var ( $shm, $index ); $keys = array_keys ( $array ); $values = array_values ( $array ); for($i = 0; $i < count ( $keys ); $i ++) { $data_array [$keys [$i]] = $values [$i]; } shm_put_var ( $shm, $index, $data_array ); sem_release ( $sem ); } return $data_array; } function data_append($sem, $shm, $index, $array) { if (sem_acquire ( $sem, false )) { $data_array = shm_get_var ( $shm, $index ); $data_array [] = $array; shm_put_var ( $shm, $index, $data_array ); sem_release ( $sem ); } return $data_array; } /** * 保存线程共享数据 * $index * 一般用数组(读线程写入,写线程读出,除非写线程有消息写入以通知给其他读/写线程对) * 保存客户端连接信息(READ, WRITE,IP,PORT,START_TIME) * $max_connection + $index * 读写线程对标识 * READ: 0 未开启 1 正在运行 2 已退出 * WRITE: 0 未开启 1 正在运行 2 已退出 * 主线程运行标识 * 2 * $max_connection * false 通知所有线程退出 */ function connect($ip, $port, $read_class, $write_class, $max_connection = 20, $max_wait_time = 5) { $directory = defined ( 'LOGGER' ) ? constant ( 'LOGGER' ) : '.'; if (! file_exists ( $directory )) { mkdir ( $directory, 0777, true ); } $log_prefix = defined ( 'LOG_PREFIX' ) ? constant ( 'LOG_PREFIX' ) : __FILE__; file_put_contents ( $directory . $log_prefix . '.pid', getmypid () ); set_time_limit ( 0 ); ob_implicit_flush (); $key = ftok ( tempnam ( time (), rand ( 0, 10000 ) ), 's' ); $sem = sem_get ( $key ); $shm = shm_attach ( $key, $max_connection * 1000000 ); for($i = 0; $i < $max_connection; $i ++) { data_set ( $sem, $shm, $i, array () ); data_set ( $sem, $shm, $max_connection + $i, array ( 'READ' => 0, 'WRITE' => 0 ) ); } data_set ( $sem, $shm, 2 * $max_connection, true ); $sockets = array (); $reads = array (); $writes = array (); if ($ip === null) { $socket = socket_create ( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_option ( $socket, SOL_SOCKET, SO_REUSEADDR, 1 ); socket_set_nonblock ( $socket ); $ip = '0.0.0.0'; if (! @socket_bind ( $socket, $ip, $port )) { die ( 'can not bind "' . $ip . ':' . $port . '"' . PHP_EOL ); } socket_listen ( $socket ); $array = array ( null, $sem, $shm, null, $max_connection, $max_wait_time ); while ( shm_get_var ( $array [2], 2 * $max_connection ) ) { $s = socket_accept ( $socket ); $index = - 1; for($i = 0; $i < $max_connection; $i ++) { $data_array = shm_get_var ( $array [2], $max_connection + $i ); if (in_array ( $data_array ['READ'], array ( 0, 3 ) ) && in_array ( $data_array ['WRITE'], array ( 0, 3 ) )) { if ($index == - 1) { @socket_shutdown ( $sockets [$i], 2 ); @socket_close ( $sockets [$i] ); $index = $i; break; } } } if (is_resource ( $s )) { if ($index == - 1) { @socket_shutdown ( $s, 2 ); socket_close ( $s ); } else { socket_set_nonblock ( $s ); socket_getpeername ( $s, $address, $port ); $sockets [$index] = $s; $array [0] = $index; $array [3] = $s; data_set ( $array [1], $array [2], $array [4] + $array [0], array ( 'READ' => 1, 'WRITE' => 1, 'IP' => $address, 'PORT' => $port, 'START_TIME' => time () ) ); $reads [$index] = new $read_class ( $array ); $writes [$index] = new $write_class ( $array ); } } usleep ( $max_connection * USLEEP_TIME ); } } else { $socket = socket_create ( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_nonblock ( $socket ); while ( ! @socket_connect ( $socket, $ip, $port ) ) { sleep ( 1 ); } data_set ( $sem, $shm, 1, array ( 'READ' => 1, 'WRITE' => 1, 'IP' => $ip, 'PORT' => $port, 'START_TIME' => time () ) ); $array = array ( 0, $sem, $shm, $socket, 1, $max_wait_time ); $read = new $read_class ( $array ); $write = new $write_class ( $array ); $read->join (); $write->join (); } @socket_shutdown ( $socket, 2 ); socket_close ( $socket ); sem_remove ( $sem ); shm_remove ( $shm ); }
- 1 回答
- 0 关注
- 1743 浏览
添加回答
举报
0/150
提交
取消