<?php /** * Created by PhpStorm. * User: ajing * Site: www.weicot.com * Date: 2018/8/13 * Time: 17:17 */ namespace Weicot\IM; class Server { public $serv; public $storage; public $swoole_table; public $message; public $user; function __construct() { $redis = new \Weicot\Drive\Redis([ // redis 储存器 "host" => "127.0.0.1", "prot" => "6379", "prefix" => "user_" ]); $this->storage = new Storage($redis->redis); $this->message = new Message(); // 消息存储 $this->user = new User(); $this->user->setStorage($this->storage); $this->swoole_table = new \swoole_table(1024); //行数 //内存表储存器 $this->swoole_table->column('uid', \swoole_table::TYPE_INT, 8); $this->swoole_table->create(); $this->serv = new \swoole_websocket_server("0.0.0.0", 2018); $this->serv->set([ // 设置配置 'daemonize' => 1, // 是否是守护进程 1 是 'max_request' => 10000, // 最大连接数量 'dispatch_mode' => 2, 'debug_mode' => 1, 'log_file' => './swoole.log', //日志存储路径 // 心跳检测的设置,自动踢掉掉线的fd 'heartbeat_check_interval' => 5, //表示每60秒,遍历所有连接,如果该连接在60秒内,没有向服务器发送任何数据,此连接将被强制关闭 'heartbeat_idle_time' => 600, ]); $this->serv->on('open', array($this, 'onOpen')); //启动server时候会触发 $this->serv->on('message', array($this, 'onMessage')); // $this->serv->on('close', array($this, 'onClose')); //。 $this->serv->start(); } /** * 打开连接时执行 * @param swoole_websocket_server $server * @param $req */ function onOpen(\swoole_websocket_server $server, $req) { // 连接进来时执行 echo "server: success with fd{$req->fd}\n"; } /*** * 消息进来时执行 * @param swoole_websocket_server $server * @param $frame */ function onMessage(\swoole_websocket_server $server, $frame) { $fd = $frame->fd; // $connectionInfo = $server->connection_info($fd); //获取连接参数 $data = $frame->data; if (!$this->isClientHeartbeat($data, $server, $fd)) { //判断是否是心跳包 $re_msg = json_decode($data, true); // 将json 解码并放入路由中 $this->routing($re_msg, $fd, $server); //路由 } } /*** * 关闭时执行 * @param swoole_websocket_server $server * @param $fd */ function onClose(\swoole_websocket_server $server, $fd) { if ($this->swoole_table->exist($fd)) { //如果客户端ID 存在 $uid = $this->swoole_table->get($fd); // $this->storage->logout($uid["uid"]); //登出这个用户 $this->swoole_table->del($fd); $this->offlineMessage($server, $uid["uid"]); echo "close {$uid["uid"] } \n"; } echo "client {$fd} closed\n"; } /*** * 路由 * @param $re_msg * @param $fd * @param $server */ function routing($re_msg, $fd, $server) { $type = $this->getType($re_msg); switch ($type) { case "login": //如果是登录 $msgData = $this->login($re_msg, $fd, $server); $server->push($fd, json_encode($msgData)); if ($msgData["status"]) { //如果登录成功则执行 $this->getOfflineMessage($server, $fd, $msgData["uid"]); } break; case "reconnection": //断线从连 $msgData = $this->reconnection($server, $re_msg, $fd); if ($msgData["status"]) { //成功连接 $server->push($fd, json_encode($msgData)); //断线重连后获取系统消息 $this->getOfflineMessage($server, $fd, $re_msg["uid"]); } else { $server->push($fd, json_encode($msgData)); } break; case "online_list": //在线列表 $msgData = json_encode($this->getOnlineUsers($re_msg, $fd)); $server->push($fd, $msgData); break; case "friends_list": $msgData = json_encode($this->getFriendsLists($re_msg, $fd)); $server->push($fd, $msgData); break; case "message": //消息处理 $this->message($re_msg, $fd, $server); break; case "tmp_message": //临时消息处理 $this->message($re_msg, $fd, $server, true); break; case "del_session": // 删除会话 $msgData = $this->delSession($re_msg, $fd); $server->push($fd, json_encode($msgData)); break; case "sign_out": // 登出 $this->signOut($re_msg, $fd, $server); break; case "error": // 删除会话 $server->push($fd, json_encode($re_msg)); break; default: $server->push($fd, json_encode($re_msg)); } } /** * 获取消息类型 * @param $msg */ function getType($msg) { if (isset($msg["type"])) { return $msg["type"]; } return "error"; } /*** * 上线后发送离线消息 * @param $server * @param $fd * @param $uid */ function getOfflineMessage($server, $fd, $uid) { $data = $this->message->getOfflineMessage($uid); if (!empty($data)) { $msgId = []; foreach ($data as $value) { $msgId[] = $value["id"]; $msg = [ "omesg_id" => $value["id"], "uid" => $value["uid"], "to_uid" => $value["to_uid"], "type" => $value["type"], "send_time" => $value["send_time"], "content" => $value["content"], ]; $server->push($fd, json_encode($msg)); } $this->message->upOfflineMessageStatus($msgId); }; } /*** * 登出 * @param $re_msg * @param $fd * @param $server */ function signOut($re_msg, $fd, $server) { $existsIsLogin = $this->storage->exists($re_msg["uid"]); if ($existsIsLogin) { if ($this->swoole_table->exist($existsIsLogin["fd"])) { //如果客户端ID 存在 $this->storage->logout($existsIsLogin["uid"]); //登出这个用户 $this->swoole_table->del($existsIsLogin["fd"]); if ($server->exist($existsIsLogin["fd"])) { $server->send($existsIsLogin["fd"], json_encode([ "type" => "sign_out", "uid" => $existsIsLogin["uid"], "fd" => $existsIsLogin["fd"], "msg" => "the user has sign out ", "status" => true ])); $server->close($existsIsLogin["fd"]); } echo "logout {$existsIsLogin["uid"] } \n"; } echo "client {$fd} closed\n"; } } /*** * 判断是否是客户端的心跳包 * @param $data * @param $server * @param $fd * @return bool */ function isClientHeartbeat($data, $server, $fd) { if ($data == '{"uid":"hi"}') { $server->push($fd, '{"uid":"hi"}'); // 返回数据 return true; }; return false; } /*** * 登录逻辑 * { * "uid":"4155", 用户id * "type":"login", 类型 * "auth_key":"jkafjkdsagg", 授权key * "time":1530599947267 * } */ function login($re_msg, $fd, $server) { if (!(isset($re_msg["user_key"]) && isset($re_msg["uid"]))) { //判断key 是否纯在 return $this->loginMsg("login", 00, $fd, false, [ "msg" => "no user key" ]); } $this->kickOut($re_msg, $fd, $server); // 如果这个用户在其他地方登陆 则踢出去 //抢登 $data = $this->user->getUserData($re_msg["user_key"]); if ($data) { //判断用户是否存在 $data["fd"] = $fd; //链接句柄 $data["session_key"] = md5(time() . "_" . rand(1, 9)); $addLogin = $this->storage->login($data["uid"], $data); if ($addLogin) {//将登录状态添加进 swoole table $this->swoole_table->set($fd, ["uid" => $data["uid"]]); $this->onlineMessage($server, $data["uid"]); //将上线消息群发给所有好友 return $this->loginMsg("login", $data["uid"], $fd, true, [ "session_key" => $data["session_key"], ]); }; return $this->loginMsg("login", $data["uid"], $fd, false, [ "msg" => "is login", //已经等录 ]); } return $this->loginMsg("login", $data["uid"], $fd, false, [ "msg" => "user not found" //用户不存在 ]); } /*** * 如果用户在其他地方登陆 则登出这个用户 * @param $re_msg * @param $fd * @param $server */ function kickOut($re_msg, $fd, $server) { $existsIsLogin = $this->storage->exists($re_msg["uid"]); if ($existsIsLogin) { if ($this->swoole_table->exist($existsIsLogin["fd"])) { //如果客户端ID 存在 $this->storage->logout($existsIsLogin["uid"]); //登出这个用户 $this->swoole_table->del($existsIsLogin["fd"]); if ($server->exist($existsIsLogin["fd"])) { $server->send($existsIsLogin["fd"], json_encode([ "type" => "kick_out", "uid" => $existsIsLogin["uid"], "fd" => $existsIsLogin["fd"], "msg" => "the user has logged in elsewhere", "status" => true ])); $server->close($existsIsLogin["fd"]); } echo "logout {$existsIsLogin["uid"] } \n"; } echo "client {$fd} closed\n"; } } /*** * 断线从连 * @param $re_msg * @param $fd */ function reconnection($server, $re_msg, $fd) { if (isset($re_msg["session_key"]) && isset($re_msg["uid"])) { $data = $this->storage->getUser($re_msg["uid"]); if (isset($data)) { if ($data["session_key"] == $re_msg["session_key"]) { $this->swoole_table->set($fd, ["uid" => $data["uid"]]); $data["fd"] = $fd; //刷新链接句柄 $this->storage->login($re_msg["uid"], $data); return $this->reconnectionMsg($re_msg, $fd, 1001, "reconnection is ok", true); } return $this->reconnectionMsg($re_msg, $fd, 1002, "the user has logged in elsewhere", false); } return $this->reconnectionMsg($re_msg, $fd, 1003, " user has logged out", false); } return $this->reconnectionMsg($re_msg, $fd, 1004, " the parameter is invalid", false); } /*** * @param $re_msg * @param $fd * @param $code * @param $msg * @param $status * @return array */ function reconnectionMsg($re_msg, $fd, $code, $msg, $status) { return [ "type" => "reconnection", "uid" => $re_msg["uid"], "fd" => $fd, "code" => $code, "msg" => $msg, "status" => $status ]; } /** * 获得在线用户 * @param $re_msg * @param $fd * @return array */ function getOnlineUsers($re_msg, $fd) { $OnlineUsers = $this->storage->getOnlineUsers(); if (empty($OnlineUsers)) { return [ "type" => "online_list", "uid" => $re_msg["uid"], "fd" => $fd, "list" => false, "status" => false ]; } else { return [ "type" => "online_list", "uid" => $re_msg["uid"], "fd" => $fd, "list" => $this->user->getAllOnlineUsersData($OnlineUsers), "status" => true ]; } } /** * 获得我的用户列表 * @param $re_msg * @param $fd * @return array */ function getFriendsLists($re_msg, $fd) { $friendsList = $this->getFriendsList($re_msg["uid"]); if (!empty($friendsList)) { return [ "type" => "friends_list", "uid" => $re_msg["uid"], "fd" => $fd, "list" => $friendsList, "status" => true ]; } return [ "type" => "friends_list", "uid" => $re_msg["uid"], "fd" => $fd, "list" => false, "status" => false ]; } /** * 转发消息 * @param $re_msg * @param $fd * @param $server */ function message($re_msg, $fd, $server, $isTmpMessage = false) { if ($re_msg['to_uid'] == "") { $server->push($fd, json_encode($re_msg)); } else { $uidData = $this->storage->getUser($re_msg['to_uid']); //获取所要发送的 用户 fd if ($server->exist($uidData["fd"])) { // 判断是否已断开通客户端的链接或者push失败 https://group.swoole.com/question/106856 $this->message->save($re_msg, true); //保存消息到记录 $message = [ "time" => time(), "type" => "message", "uid" => $re_msg["uid"], "to_uid" => $re_msg['to_uid'], "content" => $re_msg['content'], "status" => true ]; if ($isTmpMessage) { // 互相加好友 if (!($re_msg["uid"] == $re_msg['to_uid'])) { // 防止自己加自己好友 $this->user->addSession($re_msg["uid"], $re_msg['to_uid']);//如果是临时会话 $this->user->addSession($re_msg["to_uid"], $re_msg['uid']);//如果是临时会话 } $myData = $this->storage->getUser($re_msg['uid']); //获取所要发送的 用户 fd $message["uid_data"] = $myData; $message["type"] = "tmp_message"; // $message["user_data"] = $this->getUserWithUid($re_msg["uid"]); } $server->push($uidData["fd"], json_encode($message)); } else { //用户离线 $this->message->save($re_msg, false); //将离线消息发送给自己 $server->push($fd, json_encode( [ "type" => "offline_message", "offline_uid" => $re_msg['to_uid'], "content" => "用户已经下线", "status" => true ] )); }; } } /*** * @param $type * @param $uid * @param $fb * @param $status * @param array $data * @return array */ function loginMsg($type, $uid, $fb, $status, $data = []) { return [ "type" => $type, "uid" => $uid, "fd" => $fb, "status" => $status, "data" => $data, ]; } /*** * 发送消息 * @param $type * @param $uid * @param $to_uid * @param $content * @param $status * @return array */ function sendMsg($type, $uid, $to_uid, $content, $status) { return [ "type" => $type, "uid" => $uid, "to_uid" => $to_uid, "content" => $content, "status" => $status ]; } /** * 通过UID 获取用户数据 * @param $uid * @return array */ function getUserWithUid($uid) { return [ "uid" => $uid, "name" => $uid, "avatar" => "https://b-ssl.duitang.com/uploads/item/201605/05/20160505145557_dtYHf.thumb.700_0.jpeg", "online" => $this->storage->exists($uid), ]; } /** * 获得我的朋友列表 * @param $userId */ function getFriendsList($uid) { return $this->user->getSessionData($uid); } /** * 给所有好友发消息 * @param $server * @param $uid 用户ID * @param $msgData * @param bool $online */ function sendFullFriendsMessage($server, $uid, $msgData, $online = true) { $friendsList = $this->getFriendsList($uid); if (!empty($friendsList)) { if ($online) { //只给在线的群发 foreach ($friendsList as $friend) { if ($friend["online"]) { // 只给在线的群发 $uidData = $this->storage->getUser($friend['uid']); //获取所要发送的 用户 fd if ($server->exist($uidData["fd"])) { // 判断是否已断开通客户端的链接或者push失败 https://group.swoole.com/question/106856 $server->push($uidData["fd"], json_encode($msgData)); } else { $this->storage->logout($friend['uid']); // 如果朋友不在线则登出它 } } } return; } else { // 给所有在线的群发 foreach ($friendsList as $friend) { $uidData = $this->storage->getUser($friend['uid']); //获取所要发送的 用户 fd $server->push($uidData["fd"], json_encode($msgData)); } } } } /*** * 将离线消息群发给好友 * @param $server * @param $uid */ function offlineMessage($server, $uid) { $this->sendFullFriendsMessage($server, $uid, [ "type" => "offline_message", "offline_uid" => $uid, "content" => "用户已经下线", "status" => true ]); } /*** * 群发上线消息 * @param $server * @param $uid */ function onlineMessage($server, $uid) { $this->sendFullFriendsMessage($server, $uid, [ "type" => "online_message", "offline_uid" => $uid, "content" => "用户已经上线", "status" => true ]); } //删除会话 function delSession($re_msg, $fd) { if (isset($re_msg['del_uid'])) { $uid = $re_msg['uid']; $del_uid = $re_msg['del_uid']; if ($this->user->delSessionWithUid($del_uid, $uid)) { return [ "type" => "del_session", "uid" => $re_msg["uid"], "fd" => $fd, "del_uid" => $re_msg["del_uid"], "status" => true ]; }; } return [ "type" => "del_session", "uid" => $re_msg["uid"], "fd" => $fd, "del_uid" => $re_msg["del_uid"], "status" => false ]; } }
转载请注明:(●--●) Hello.My Weicot » 一个简单的Swoole IM 示例 支持离线 断线从连 已读未读