19 $this->_socket = $socket;
26 public function ids($controller_name = null){
28 foreach($this->clients as
$id => $client)
29 if(!$controller_name || ($client->controller->name == $controller_name)) $ids[] =
$id;
38 $this->clients[
$id]->connection->send(json_encode($data));
47 public function broadcast($data,$controller_name = null,$except_ids = null){
48 $ids = array_diff($this->
ids($controller_name),$except_ids ?: []);
49 foreach($ids as
$id) $this->
send($id,$data);
58 if($token = \
Rsi\Record::get($data,
'token')){
59 $this->_tokens[$token] = [
'time' => $time = time() + $this->_socket->tokenTtl,
'params' => $data];
60 return [
'time' => $time];
70 if($session_id = \
Rsi\Record::get($data,
'sessionId')){
71 foreach($this->_tokens as &$token_data)
if($token_data[
'sessionId'] == $session_id){
72 $token_data = array_merge($token_data,$data);
76 foreach($this->clients as $client)
if($client->params[
'sessionId'] == $session_id){
77 $client->params = array_merge($client->params,$data);
80 return [
'count' => 0];
90 $controller_name = \Rsi\Record::get($data,
'controller');
92 foreach($this->clients as
$id => $client)
93 if(!$controller_name || ($client->controller->name == $controller_name))
94 $session_ids[
$id] = $client->params[
'sessionId'];
95 return [
'sessionIds' => $session_ids];
104 $controller_name = \Rsi\Record::get($data,
'controller');
105 $session_ids = \Rsi\Record::get($data,
'sessions');
107 foreach($this->clients as $client)
if(
108 (!$session_ids || in_array($client->params[
'sessionId'],$session_ids)) &&
109 (!$controller_name || ($client->controller->name == $controller_name))
111 $client->connection->close();
114 return [
'count' => $count];
121 $this->log->notice(
'Shutdown requested',__FILE__,__LINE__);
132 return [
'count' => count($this->
broadcast(
133 \
Rsi\Record::get($data,
'data'),
134 \
Rsi\Record::get($data,
'controller'),
135 \
Rsi\Record::get($data,
'except')
145 if($controller_name = \
Rsi\Record::get($data,
'controller')){
147 $this->_socket->component(
'front')->controller($controller_name)->execute();
148 return $this->_socket->component(
'request')->data;
150 elseif(method_exists($this,$method =
'local' . ucfirst(\
Rsi\Record::get($data,
'action'))))
151 return call_user_func([$this,$method],$data);
160 if(in_array($connection->getRemoteAddress(),$this->_socket->localClients)){
161 $log->debug(
'New local connection',__FILE__,__LINE__);
162 $connection->on(
'data',
function($data) use ($connection,$log){
163 $log->debug(
'Received local data (' . strlen($data) .
' bytes)',[
'data' => $data]);
164 $connection->write(serialize($this->
onLocalData(unserialize($data))));
168 $log->notice(
'Local connection not allowed',__FILE__,__LINE__,[
'connection' => $connection]);
169 $connection->close();
176 public function onOpen(\Ratchet\ConnectionInterface $connection){
178 $request = $connection->WebSocket->request;
179 if($origins = $this->_socket->clientOrigins){
180 $origin = parse_url($origin = (
string)$request->getHeader(
'Origin'),PHP_URL_HOST) ?: $origin;
181 if(!in_array($origin,$origins)){
182 $log->notice(
'Origin not allowed',__FILE__,__LINE__,[
'connection' => $connection,
'origin' => $origin]);
183 return $connection->close();
186 parse_str($request->getQuery(),$query);
188 foreach($this->_tokens as $token => $params)
if($params[
'time'] < $time) unset($this->_tokens[$token]);
189 if(!array_key_exists($token = $query[
'token'] ?? null,$this->_tokens)){
190 $log->notice(
'Token too old ',__FILE__,__LINE__,[
'connection' => $connection]);
191 return $connection->close();
193 $_SESSION = $params = $this->_tokens[$token][
'params'];
194 $_SERVER[
'REMOTE_ADDR'] = $connection->remoteAddress;
195 $router = $this->_socket->component(
'router');
196 $router->execute($request->getPath());
197 $controller = $this->_socket->component(
'front')->controller($router->controllerName);
199 unset($_SERVER[
'REMOTE_ADDR']);
200 $log->debug(
"New connection ({$connection->resourceId}) for {$controller->name}",__FILE__,__LINE__);
201 $this->clients[$connection->resourceId] =
new Client($connection,$params,$controller);
202 unset($this->_tokens[$token]);
205 public function onClose(\Ratchet\ConnectionInterface $connection){
206 $this->log->debug(
"Connection closed ({$connection->resourceId})",__FILE__,__LINE__);
207 unset($this->clients[$connection->resourceId]);
211 $this->log->error($exception->getMessage(),__FILE__,__LINE__);
212 $connection->close();
215 public function onMessage(\Ratchet\ConnectionInterface $connection,$data){
216 $data = json_decode($data,
true);
217 $this->log->debug(
"Message from connection {$connection->resourceId}",__FILE__,__LINE__,[
'data' => $data]);
218 $client = $this->clients[$this->
id = $connection->resourceId];
219 $client->controller->reset();
221 $_SESSION = $client->params;
222 $_SERVER[
'REMOTE_ADDR'] = $connection->remoteAddress;
223 $client->controller->execute();
225 $_POST = $_SESSION = [];
226 unset($_SERVER[
'REMOTE_ADDR']);
232 $this->server->run();
241 return $this->_socket->component(
'log');
246 if(!$this->_socket->clientPort)
throw new \Exception(
'No client port');
247 $this->_loop = \React\EventLoop\Factory::create();
248 $this->_client = new \React\Socket\Server($this->_loop);
249 $this->_client->listen($port = $this->_socket->clientLocalPort ?: $this->_socket->clientPort,
'0.0.0.0');
250 $this->log->debug(
"Listening on port $port for client connections");
251 $this->_local = new \React\Socket\Server($this->_loop);
252 $this->_local->listen($port = $this->_socket->localPort,
'0.0.0.0');
253 $this->log->debug(
"Listening on port $port for local connections");
254 $this->_local->on(
'connection',[$this,
'onLocalConnection']);
265 if(!$this->_server) $this->_server = new \Ratchet\Server\IoServer(
266 new \Ratchet\Http\HttpServer(
new \Ratchet\WebSocket\WsServer($this)),
$_client
Client/public WebSocket.
localSessionIds($data)
Active session ID's through a local connection.
localBroadcast($data)
Broadcast data to all clients through a local connection.
$clients
Array with all connected clients (key = resource ID, value = Client object).
localToken($data)
Create a WebSocket access token through a local connection.
localParams($data)
Add or update client data (session data).
onOpen(\Ratchet\ConnectionInterface $connection)
Called on a new client connection.
onMessage(\Ratchet\ConnectionInterface $connection, $data)
onClose(\Ratchet\ConnectionInterface $connection)
onLocalConnection($connection)
Called on a new local connection.
broadcast($data, $controller_name=null, $except_ids=null)
Broadcast data to all clients.
localShutdown($data)
Request a shutdown of the WebSocket server through a local connection.
onLocalData($data)
Local data handler.
run()
Run the WebSocket server.
onError(\Ratchet\ConnectionInterface $connection,\Exception $exception)
$_local
Local socket (raw).
$id
Resource ID for current client.
$_tokens
Array with allowed client tokens (key = token, value = array with keys 'time' and 'params')...
ids($controller_name=null)
Client connection resource ID's.
send($id, $data)
Send data to a client.
localClose($data)
Close one or more connections through a local connection.