Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在swoole服务中, kafka生产者每过一段时间就报错 #69

Open
insomniazz opened this issue Jul 25, 2022 · 5 comments
Open

在swoole服务中, kafka生产者每过一段时间就报错 #69

insomniazz opened this issue Jul 25, 2022 · 5 comments

Comments

@insomniazz
Copy link

  • 你遇到了什么问题?
    在swoole服务中, kafka生产者每过一段时间就报错

  • Kafka 环境是自建还是云服务?
    自建

  • 请执行下面的命令获取环境信息。

php -v & php --ri swoole & composer info | grep longlang/phpkafka

PHP 7.4.25 (cli) (built: Oct 19 2021 15:18:10) ( NTS )
Copyright (c) The PHP Group
Zend Engine v3.4.0, Copyright (c) Zend Technologies
    with Zend OPcache v7.4.25, Copyright (c), by Zend Technologies

swoole

Swoole => enabled
Author => Swoole Team <[email protected]>
Version => 4.6.1
Built => Jan 11 2021 12:30:02
coroutine => enabled with boost asm context
trace_log => enabled
epoll => enabled
eventfd => enabled
signalfd => enabled
cpu_affinity => enabled
spinlock => enabled
rwlock => enabled
sockets => enabled
openssl => OpenSSL 1.0.2k-fips  26 Jan 2017
http2 => enabled
json => enabled
curl-native => enabled
pcre => enabled
zlib => 1.2.7
mutex_timedlock => enabled
pthread_barrier => enabled
futex => enabled
mysqlnd => enabled
async_redis => enabled

Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => On => On
swoole.unixsock_buffer_size => 8388608 => 8388608

  • 提供最小可复现代码:
// 你的代码
declare(strict_types=1);//语法严格模式
require_once dirname(__DIR__).'/vendor/autoload.php';
ini_set('memory_limit', '2048M');
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;

$http = new Swoole\Http\Server("0.0.0.0", 9503);
$http->set([
    'tcp_fastopen' => true,
    'max_wait_time' => 1,
    'enable_reuse_port'=>false,//同一个端口复用
    'log_file'=>dirname(__DIR__)."/log/swoole.log"
]);

$config = new ProducerConfig();
$config->setBootstrapServer('172.18.50.74:9092,172.18.50.75:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$http->on('request', function ($request, $response) use ($producer) {
    try {
        $topic = 'play_history_pre3';
        var_dump($request->server['request_uri']);
        $sendData = json_encode($request->post);
        $producer->send($topic,$sendData);
        unset($sendData,$topic);
        $response->end('success');
    } catch (\Throwable $e) {
        var_dump($e->getMessage());
        $response->end(json_encode(array(
            'code' => '999',
            'message' => '记录失败'
        )));
    }
});

$http->start();
  • 错误信息
PHP Fatal error:  Uncaught longlang\phpkafka\Exception\SocketException: Could not recv data from stream,  [0] in /data/www/kafka/swoole/vendor/longlang/phpkafka/src/Socket/SwooleSocket.php:129
Stack trace:
#0 /data/www/kafka/swoole/vendor/longlang/phpkafka/src/Client/SwooleClient.php(135): longlang\phpkafka\Socket\SwooleSocket->recv(4, -1)
#1 {main}
  thrown in /data/www/kafka/swoole/vendor/longlang/phpkafka/src/Socket/SwooleSocket.php on line 129
@dygin
Copy link
Contributor

dygin commented Aug 3, 2022

是因为kafka收回了空闲的连接,而swoole这时候还没对连接销毁关闭,这phpkafka没有对connections.max.idle.ms进行支持,这时候你可以重写longlang\phpkafka\Client\SwooleClient
在方法startRecvCo的Expection加入一个判断当出现这个错误的时候把当前的无用连接给销毁
private function startRecvCo(): void { $this->coRecvRunning = true; $this->recvCoId = true; $this->recvCoId = Coroutine::create(function () { while ($this->coRecvRunning) { try { $data = $this->socket->recv(4, -1); if ('' === $data) { break; } $length = Int32::unpack($data); $data = $this->socket->recv($length); $correlationId = Int32::unpack($data); if (isset($this->recvChannels[$correlationId])) { $this->recvChannels[$correlationId]->push($data); } } catch (Exception $e) { if ($e instanceof SocketException && !$this->connected) { return; } if ($e instanceof SocketException && $this->connected) { $this->socket->close(); break; } $callback = $this->getConfig()->getExceptionCallback(); if ($callback) { $callback($e); } else { throw $e; } } } }); }

@ushell
Copy link

ushell commented Aug 17, 2022

这个异常问题什么时候可以修复下?

@peibinzhu
Copy link

peibinzhu commented Aug 24, 2022

@Yurunsoft 同样遇到这个问题

src/Client/SwooleClient.php 文件 145 行

} catch (Exception $e) {
if ($e instanceof SocketException && !$this->connected) {
return;
}
$callback = $this->getConfig()->getExceptionCallback();
if ($callback) {
$callback($e);
} else {
throw $e;
}

改成

// 忽略其他代码
} catch (Exception $e) {
    if ($e instanceof SocketException && !$this->connected) {
        return;
    }
    if ($e instanceof SocketException && $this->connected) {
        $this->socket->close();
        break;
    }
    $callback = $this->getConfig()->getExceptionCallback();
    if ($callback) {
        $callback($e);
    } else {
        throw $e;
    }
}

就没出现报错了

@shuifa
Copy link

shuifa commented Oct 21, 2022

connections.max.idle.ms

原来是这样。。我说第一次投递消息后,一直有一个协程阻塞在revc()这里。这个包对长连接不进行超时或者回收处理

@insomniazz
Copy link
Author

insomniazz commented Dec 16, 2022

@Yurunsoft 同样遇到这个问题

src/Client/SwooleClient.php 文件 145 行

} catch (Exception $e) {
if ($e instanceof SocketException && !$this->connected) {
return;
}
$callback = $this->getConfig()->getExceptionCallback();
if ($callback) {
$callback($e);
} else {
throw $e;
}

改成

// 忽略其他代码
} catch (Exception $e) {
    if ($e instanceof SocketException && !$this->connected) {
        return;
    }
    if ($e instanceof SocketException && $this->connected) {
        $this->socket->close();
        break;
    }
    $callback = $this->getConfig()->getExceptionCallback();
    if ($callback) {
        $callback($e);
    } else {
        throw $e;
    }
}

就没出现报错了

改成这样后,出现了新的错误,如下:
"错误信息":"[10753] Unknown",
"错误文件":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Protocol/ErrorCode.php",
"错误行号":385,
"php trace":[
{
"file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Client/SyncClient.php",
"line":196,
"function":"check",
"class":"longlang\phpkafka\Protocol\ErrorCode",
"type":"::"
},
{
"file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Client/SyncClient.php",
"line":103,
"function":"updateApiVersions",
"class":"longlang\phpkafka\Client\SyncClient",
"type":"->"
},
{
"file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Client/SwooleClient.php",
"line":52,
"function":"connect",
"class":"longlang\phpkafka\Client\SyncClient",
"type":"->"
},
{
"file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Broker.php",
"line":175,
"function":"connect",
"class":"longlang\phpkafka\Client\SwooleClient",
"type":"->"
},
{
"file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Broker.php",
"line":157,
"function":"getClientByBrokerId",
"class":"longlang\phpkafka\Broker",
"type":"->"
},
{
"file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Producer/Producer.php",
"line":160,
"function":"getClient",
"class":"longlang\phpkafka\Broker",
"type":"->"
},
{
"file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Producer/Producer.php",
"line":55,
"function":"sendBatch",
"class":"longlang\phpkafka\Producer\Producer",
"type":"->"
},
{
"file":"/data/www/kafka/swoole/producer/kafkaProducer.php",
"line":76,
"function":"send",
"class":"longlang\phpkafka\Producer\Producer",
"type":"->"
}
]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants