为了账号安全,请及时绑定邮箱和手机立即绑定

php socket_recv获取数据的问题

php socket_recv获取数据的问题

PHP
qq_哈哈哈_41 2018-01-12 09:08:28
php的socket_recv在获取数据的时候如果实际要接收的数据是大于设置的buffer值的时候,我应该怎么样循环取出完整的数据然后把他发送给其他套接字呢?
查看完整描述

1 回答

?
LIANHK

TA贡献78条经验 获得超17个赞

<?php

// 受到PHP性能限制(包括pthread自身限制以及代码加密,锁等待等问题)
// 经过测试,这个休眠时间几乎是必须的
// 要完美解决这个问题,需要使用golang
define ( 'USLEEP_TIME', 10000 );
function recv_data($socket, $length, $max_time) {
    $time = microtime ( true );
    $recv_length = 0;
    $data = '';
    $close_flag = false;
    while ( microtime ( true ) - $time <= $max_time && $recv_length < $length ) {
        if (! is_resource ( $socket ) || $close_flag) {
            break;
        }
        $recv_length_this = socket_recv ( $socket, $temp_data, $length - $recv_length, MSG_DONTWAIT );
        $recv_length += $recv_length_this;
        $data .= $temp_data;
        if ($recv_length_this === 0) {
            $close_flag = true;
        }
        usleep ( USLEEP_TIME );
    }
    return array (
            $recv_length,
            $data 
    );
}
function data_get($sem, $shm, $index) {
    $data_array = array ();
    if (sem_acquire ( $sem, false )) {
        $data_array = shm_get_var ( $shm, $index );
        sem_release ( $sem );
    }
    return $data_array;
}
function data_set($sem, $shm, $index, $data) {
    if (sem_acquire ( $sem, false )) {
        $data_array = array ();
        if (shm_has_var ( $shm, $index )) {
            $data_array = shm_get_var ( $shm, $index );
        }
        shm_put_var ( $shm, $index, $data );
        sem_release ( $sem );
    }
    return $data_array;
}
function data_merge($sem, $shm, $index, $array) {
    if (sem_acquire ( $sem, false )) {
        $data_array = shm_get_var ( $shm, $index );
        $keys = array_keys ( $array );
        $values = array_values ( $array );
        for($i = 0; $i < count ( $keys ); $i ++) {
            $data_array [$keys [$i]] = $values [$i];
        }
        shm_put_var ( $shm, $index, $data_array );
        sem_release ( $sem );
    }
    return $data_array;
}
function data_append($sem, $shm, $index, $array) {
    if (sem_acquire ( $sem, false )) {
        $data_array = shm_get_var ( $shm, $index );
        $data_array [] = $array;
        shm_put_var ( $shm, $index, $data_array );
        sem_release ( $sem );
    }
    return $data_array;
}

/**
 * 保存线程共享数据
 * $index
 * 一般用数组(读线程写入,写线程读出,除非写线程有消息写入以通知给其他读/写线程对)
 * 保存客户端连接信息(READ, WRITE,IP,PORT,START_TIME)
 * $max_connection + $index
 * 读写线程对标识
 * READ: 0 未开启 1 正在运行 2 已退出
 * WRITE: 0 未开启 1 正在运行 2 已退出
 * 主线程运行标识
 * 2 * $max_connection
 * false 通知所有线程退出
 */
function connect($ip, $port, $read_class, $write_class, $max_connection = 20, $max_wait_time = 5) {
    $directory = defined ( 'LOGGER' ) ? constant ( 'LOGGER' ) : '.';
    if (! file_exists ( $directory )) {
        mkdir ( $directory, 0777, true );
    }
    $log_prefix = defined ( 'LOG_PREFIX' ) ? constant ( 'LOG_PREFIX' ) : __FILE__;
    file_put_contents ( $directory . $log_prefix . '.pid', getmypid () );
    
    set_time_limit ( 0 );
    ob_implicit_flush ();
    $key = ftok ( tempnam ( time (), rand ( 0, 10000 ) ), 's' );
    $sem = sem_get ( $key );
    $shm = shm_attach ( $key, $max_connection * 1000000 );
    for($i = 0; $i < $max_connection; $i ++) {
        data_set ( $sem, $shm, $i, array () );
        data_set ( $sem, $shm, $max_connection + $i, array (
                'READ' => 0,
                'WRITE' => 0 
        ) );
    }
    data_set ( $sem, $shm, 2 * $max_connection, true );
    $sockets = array ();
    $reads = array ();
    $writes = array ();
    if ($ip === null) {
        $socket = socket_create ( AF_INET, SOCK_STREAM, SOL_TCP );
        socket_set_option ( $socket, SOL_SOCKET, SO_REUSEADDR, 1 );
        socket_set_nonblock ( $socket );
        $ip = '0.0.0.0';
        if (! @socket_bind ( $socket, $ip, $port )) {
            die ( 'can not bind "' . $ip . ':' . $port . '"' . PHP_EOL );
        }
        socket_listen ( $socket );
        $array = array (
                null,
                $sem,
                $shm,
                null,
                $max_connection,
                $max_wait_time 
        );
        while ( shm_get_var ( $array [2], 2 * $max_connection ) ) {
            $s = socket_accept ( $socket );
            $index = - 1;
            for($i = 0; $i < $max_connection; $i ++) {
                $data_array = shm_get_var ( $array [2], $max_connection + $i );
                if (in_array ( $data_array ['READ'], array (
                        0,
                        3 
                ) ) && in_array ( $data_array ['WRITE'], array (
                        0,
                        3 
                ) )) {
                    if ($index == - 1) {
                        @socket_shutdown ( $sockets [$i], 2 );
                        @socket_close ( $sockets [$i] );
                        $index = $i;
                        break;
                    }
                }
            }
            if (is_resource ( $s )) {
                if ($index == - 1) {
                    @socket_shutdown ( $s, 2 );
                    socket_close ( $s );
                } else {
                    socket_set_nonblock ( $s );
                    socket_getpeername ( $s, $address, $port );
                    $sockets [$index] = $s;             
                    $array [0] = $index;
                    $array [3] = $s;
                    data_set ( $array [1], $array [2], $array [4] + $array [0], array (
                            'READ' => 1,
                            'WRITE' => 1,
                            'IP' => $address,
                            'PORT' => $port,
                            'START_TIME' => time () 
                    ) );
                    $reads [$index] = new $read_class ( $array );
                    $writes [$index] = new $write_class ( $array );
                }
            }
            usleep ( $max_connection * USLEEP_TIME );
        }
    } else {
        $socket = socket_create ( AF_INET, SOCK_STREAM, SOL_TCP );
        socket_set_nonblock ( $socket );
        while ( ! @socket_connect ( $socket, $ip, $port ) ) {
            sleep ( 1 );
        }
        data_set ( $sem, $shm, 1, array (
                'READ' => 1,
                'WRITE' => 1,
                'IP' => $ip,
                'PORT' => $port,
                'START_TIME' => time () 
        ) );
        $array = array (
                0,
                $sem,
                $shm,
                $socket,
                1,
                $max_wait_time 
        );
        $read = new $read_class ( $array );
        $write = new $write_class ( $array );
        $read->join ();
        $write->join ();
    }
    @socket_shutdown ( $socket, 2 );
    socket_close ( $socket );
    sem_remove ( $sem );
    shm_remove ( $shm );
}


查看完整回答
反对 回复 2018-01-12
  • 1 回答
  • 0 关注
  • 1743 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信