diff --git a/CMakeLists.txt b/CMakeLists.txt index 78069ec0..cdc475e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -145,15 +145,14 @@ add_subdirectory(tools) include_directories(modules) -add_subdirectory(modules/HydrogenClient) -if(NOT WIN32) -add_subdirectory(modules/HydrogenServer) -endif() -add_subdirectory(modules/libridge) +add_subdirectory(modules/hydrogen_client) add_subdirectory(drivers) add_subdirectory(${lithium_src_dir}/core) +if(NOT WIN32) +add_subdirectory(${lithium_src_dir}/modules/deviceloader) +endif() set(api_SRC ${lithium_src_dir}/api/astap.cpp diff --git a/cmake_modules/FindUv.cmake b/cmake_modules/FindUV.cmake similarity index 100% rename from cmake_modules/FindUv.cmake rename to cmake_modules/FindUV.cmake diff --git a/libs/libzippp/libzippp.cpp b/libs/libzippp/libzippp.cpp index c51b304e..abb28368 100644 --- a/libs/libzippp/libzippp.cpp +++ b/libs/libzippp/libzippp.cpp @@ -385,7 +385,7 @@ int ZipArchive::close(void) { if(!listeners.empty()) { zip_register_progress_callback_with_state(zipHandle, progressPrecision, progress_callback, nullptr, this); - zip_register_cancel_callback_with_state(zipHandle, progress_cancel_callback, nullptr, this); + // zip_register_cancel_callback_with_state(zipHandle, progress_cancel_callback, nullptr, this); } //avoid to reset the progress when unzipping diff --git a/libs/loguru/loguru.hpp b/libs/loguru/loguru.hpp index 94d5964b..cb32b6a7 100644 --- a/libs/loguru/loguru.hpp +++ b/libs/loguru/loguru.hpp @@ -172,7 +172,7 @@ Website: www.ilikebigbits.com #endif #ifndef LOGURU_USE_FMTLIB - #define LOGURU_USE_FMTLIB 0 + #define LOGURU_USE_FMTLIB 1 #endif #ifndef LOGURU_USE_LOCALE diff --git a/modules/HydrogenServer/driver_info.cpp b/modules/HydrogenServer/driver_info.cpp deleted file mode 100644 index d095cd59..00000000 --- a/modules/HydrogenServer/driver_info.cpp +++ /dev/null @@ -1,837 +0,0 @@ -#include "driver_info.hpp" - -#include -#include - -#ifdef _WIN32 -#include -#include -#else -#include -#include -#include -#include -#endif - -#include "io.hpp" -#include "lilxml.hpp" -#include "client_info.hpp" -#include "xml_util.hpp" -#include "hydrogen_server.hpp" - -#include "loguru/loguru.hpp" - -/* start the given local HYDROGEN driver process. - * exit if trouble. - */ -#ifdef _WIN32 -void LocalDvrInfo::start() -{ - Msg *mp; - HANDLE hReadPipe, hWritePipe, hErrorPipe; - SECURITY_ATTRIBUTES sa; - PROCESS_INFORMATION pi; - STARTUPINFO si; - HANDLE hEfd; - DWORD pid; - -#ifdef OSX_EMBEDED_MODE - fprintf(stderr, "STARTING \"%s\"\n", name.c_str()); - fflush(stderr); -#endif - - HANDLE hRp, hWp, hEp; // 修改变量声明为HANDLE类型 - - /* build three pipes: r, w and error*/ - if (useSharedBuffer) - { - // FIXME: lots of FD are opened by hydrogenserver. FD_CLOEXEC is a must + check other fds - sa.nLength = sizeof(SECURITY_ATTRIBUTES); - sa.bInheritHandle = TRUE; - sa.lpSecurityDescriptor = NULL; - if (!CreatePipe(&hReadPipe, &hWritePipe, &sa, 0)) - { - // log(fmt("CreatePipe: %d\n", GetLastError())); - // Bye(); - } - hErrorPipe = hWritePipe; - } - else - { - - if (!CreatePipe(&hRp, &hWp, NULL, 4096)) - { - // log(fmt("CreatePipe read: %d\n", GetLastError())); - // Bye(); - } - if (!CreatePipe(&hRp, &hEp, NULL, 4096)) - { - // log(fmt("CreatePipe error: %d\n", GetLastError())); - // Bye(); - } - hReadPipe = hRp; - hWritePipe = hWp; - hErrorPipe = hEp; - } - - /* fork&exec new process */ - ZeroMemory(&si, sizeof(STARTUPINFO)); - si.cb = sizeof(STARTUPINFO); - si.hStdInput = hReadPipe; - si.hStdOutput = hWritePipe; - si.hStdError = hErrorPipe; - si.dwFlags |= STARTF_USESTDHANDLES; - - if (!CreateProcess(NULL, (LPSTR)name.c_str(), NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi)) - { - // log(fmt("CreateProcess: %d\n", GetLastError())); - // Bye(); - } - - if (useSharedBuffer) - { - /* don't need child's other socket end */ - CloseHandle(hReadPipe); - - /* record pid, io channels, init lp and snoop list */ - setFds(reinterpret_cast(hWritePipe), reinterpret_cast(hWritePipe)); - } - else - { - /* don't need child's side of pipes */ - CloseHandle(hWritePipe); - CloseHandle(hReadPipe); - - /* record pid, io channels, init lp and snoop list */ - setFds(reinterpret_cast(int) hRp, reinterpret_cast(int) hWp); // 修改为使用新的变量名 - } - - CloseHandle(hErrorPipe); - - // Watch pid - this->pid = pi.dwProcessId; - do - { - (this->pidwatcher).pid = (this->pid); - (this->pidwatcher).flags = !!(1); - } while (0); - ev_child_set() - this->pidwatcher.set(this->pid); - //this->pidwatcher.start(); - - // Watch input on efd - hEfd = hEp; // 修改为使用新的变量名 - SetHandleInformation(hEfd, HANDLE_FLAG_INHERIT, 0); - this->efd = _open_osfhandle(reinterpret_cast(hEfd), _O_RDONLY | _O_BINARY); - unsigned long mode = 1; - ioctlsocket(this->efd, FIONBIO, &mode); - this->eio.start(this->efd, ev::READ); - - /* first message primes driver to report its properties -- dev known - * if restarting - */ - if (verbose > 0) - LOG_F(INFO, "pid=%d rfd=%d wfd=%d efd=%d\n", pid, (int)hRp, (int)hWp, (int)hEp); - - XMLEle *root = addXMLEle(NULL, "getProperties"); - addXMLAtt(root, "version", TO_STRING(HYDROGENV)); - mp = new Msg(nullptr, root); - - // pushmsg can kill mp. do at end - pushMsg(mp); -} - -#else -void LocalDvrInfo::start() -{ - Msg *mp; - int rp[2], wp[2], ep[2]; - int ux[2]; - int pid; - -#ifdef OSX_EMBEDED_MODE - fprintf(stderr, "STARTING \"%s\"\n", name.c_str()); - fflush(stderr); -#endif - - /* build three pipes: r, w and error*/ - if (useSharedBuffer) - { - // FIXME: lots of FD are opened by hydrogenserver. FD_CLOEXEC is a must + check other fds - if (socketpair(AF_UNIX, SOCK_STREAM, 0, ux) == -1) - { - // log(fmt("socketpair: %s\n", strerror(errno))); - // Bye(); - } - } - else - { - if (pipe(rp) < 0) - { - // log(fmt("read pipe: %s\n", strerror(errno))); - // Bye(); - } - if (pipe(wp) < 0) - { - // log(fmt("write pipe: %s\n", strerror(errno))); - // Bye(); - } - } - if (pipe(ep) < 0) - { - // log(fmt("stderr pipe: %s\n", strerror(errno))); - // Bye(); - } - - /* fork&exec new process */ - pid = fork(); - if (pid < 0) - { - // log(fmt("fork: %s\n", strerror(errno))); - // Bye(); - } - if (pid == 0) - { - /* child: exec name */ - int fd; - - /* rig up pipes */ - if (useSharedBuffer) - { - // For unix socket, the same socket end can be used for both read & write - dup2(ux[0], 0); /* driver stdin reads from ux[0] */ - dup2(ux[0], 1); /* driver stdout writes to ux[0] */ - ::close(ux[0]); - ::close(ux[1]); - } - else - { - dup2(wp[0], 0); /* driver stdin reads from wp[0] */ - dup2(rp[1], 1); /* driver stdout writes to rp[1] */ - } - dup2(ep[1], 2); /* driver stderr writes to e[]1] */ - for (fd = 3; fd < 100; fd++) - (void)::close(fd); - - if (!envDev.empty()) - setenv("HYDROGENDEV", envDev.c_str(), 1); - /* Only reset environment variable in case of FIFO */ - else if (fifo) - unsetenv("HYDROGENDEV"); - if (!envConfig.empty()) - setenv("HYDROGENCONFIG", envConfig.c_str(), 1); - else if (fifo) - unsetenv("HYDROGENCONFIG"); - if (!envSkel.empty()) - setenv("HYDROGENSKEL", envSkel.c_str(), 1); - else if (fifo) - unsetenv("HYDROGENSKEL"); - std::string executable; - if (!envPrefix.empty()) - { - setenv("HYDROGENPREFIX", envPrefix.c_str(), 1); -#if defined(OSX_EMBEDED_MODE) - executable = envPrefix + "/Contents/MacOS/" + name; -#elif defined(__APPLE__) - executable = envPrefix + "/" + name; -#else - executable = envPrefix + "/bin/" + name; -#endif - - fprintf(stderr, "%s\n", executable.c_str()); - - execlp(executable.c_str(), name.c_str(), NULL); - } - else - { - if (name[0] == '.') - { - executable = std::string(dirname((char *)me)) + "/" + name; - execlp(executable.c_str(), name.c_str(), NULL); - } - else - { - execlp(name.c_str(), name.c_str(), NULL); - } - } - -#ifdef OSX_EMBEDED_MODE - fprintf(stderr, "FAILED \"%s\"\n", name.c_str()); - fflush(stderr); -#endif - // log(fmt("execlp %s: %s\n", executable.c_str(), strerror(errno))); - _exit(1); /* parent will notice EOF shortly */ - } - - if (useSharedBuffer) - { - /* don't need child's other socket end */ - ::close(ux[0]); - - /* record pid, io channels, init lp and snoop list */ - setFds(ux[1], ux[1]); - rp[0] = ux[1]; - wp[1] = ux[1]; - } - else - { - /* don't need child's side of pipes */ - ::close(wp[0]); - ::close(rp[1]); - - /* record pid, io channels, init lp and snoop list */ - setFds(rp[0], wp[1]); - } - - ::close(ep[1]); - - // Watch pid - this->pid = pid; - this->pidwatcher.set(pid); - this->pidwatcher.start(); - - // Watch input on efd - this->efd = ep[0]; - fcntl(this->efd, F_SETFL, fcntl(this->efd, F_GETFL, 0) | O_NONBLOCK); - this->eio.start(this->efd, ev::READ); - - /* first message primes driver to report its properties -- dev known - * if restarting - */ - if (verbose > 0) - LOG_F(INFO, "pid=%d rfd=%d wfd=%d efd=%d\n", pid, rp[0], wp[1], ep[0]); - - XMLEle *root = addXMLEle(NULL, "getProperties"); - addXMLAtt(root, "version", TO_STRING(HYDROGENV)); - mp = new Msg(nullptr, root); - - // pushmsg can kill mp. do at end - pushMsg(mp); -} -#endif - -void RemoteDvrInfo::extractRemoteId(const std::string &name, std::string &o_host, int &o_port, std::string &o_dev) const -{ - char dev[MAXHYDROGENDEVICE] = {0}; - char host[MAXSBUF] = {0}; - - /* extract host and port from name*/ - int hydrogen_port = HYDROGENPORT; - if (sscanf(name.c_str(), "%[^@]@%[^:]:%d", dev, host, &hydrogen_port) < 2) - { - // Device missing? Try a different syntax for all devices - if (sscanf(name.c_str(), "@%[^:]:%d", host, &hydrogen_port) < 1) - { - // log(fmt("Bad remote device syntax: %s\n", name.c_str())); - // Bye(); - } - } - - o_host = host; - o_port = hydrogen_port; - o_dev = dev; -} - -/* start the given remote HYDROGEN driver connection. - * exit if trouble. - */ -void RemoteDvrInfo::start() -{ - int sockfd; - std::string dev; - extractRemoteId(name, host, port, dev); - - /* connect */ - sockfd = openHYDROGENServer(); - - /* record flag pid, io channels, init lp and snoop list */ - - this->setFds(sockfd, sockfd); - - if (verbose > 0) - LOG_F(INFO, "socket=%d\n", sockfd); - - /* N.B. storing name now is key to limiting outbound traffic to this - * dev. - */ - if (!dev.empty()) - this->dev.insert(dev); - - /* Sending getProperties with device lets remote server limit its - * outbound (and our inbound) traffic on this socket to this device. - */ - XMLEle *root = addXMLEle(NULL, "getProperties"); - - if (!dev.empty()) - { - addXMLAtt(root, "device", dev.c_str()); - addXMLAtt(root, "version", TO_STRING(HYDROGENV)); - } - else - { - // This informs downstream server that it is connecting to an upstream server - // and not a regular client. The difference is in how it treats snooping properties - // among properties. - addXMLAtt(root, "device", "*"); - addXMLAtt(root, "version", TO_STRING(HYDROGENV)); - } - - Msg *mp = new Msg(nullptr, root); - - // pushmsg can kill this. do at end - pushMsg(mp); -} - -int RemoteDvrInfo::openHYDROGENServer() -{ - struct sockaddr_in serv_addr; - struct hostent *hp; - int sockfd; - - /* lookup host address */ - hp = gethostbyname(host.c_str()); - if (!hp) - { - LOG_F(ERROR, "gethostbyname(%s): %s\n", host.c_str(), strerror(errno)); - // Bye(); - } - - /* create a socket to the HYDROGEN server */ - (void)memset((char *)&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = ((struct in_addr *)(hp->h_addr_list[0]))->s_addr; - serv_addr.sin_port = htons(port); - if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) - { - LOG_F(ERROR, "socket(%s,%d): %s\n", host.c_str(), port, strerror(errno)); - // Bye(); - } - - /* connect */ - if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) - { - LOG_F(ERROR, "connect(%s,%d): %s\n", host.c_str(), port, strerror(errno)); - // Bye(); - } - - /* ok */ - return (sockfd); -} - -void DvrInfo::onMessage(XMLEle *root, std::list &sharedBuffers) -{ - char *roottag = tagXMLEle(root); - const char *dev = findXMLAttValu(root, "device"); - const char *name = findXMLAttValu(root, "name"); - int isblob = !strcmp(tagXMLEle(root), "setBLOBVector"); - - if (verbose > 2) - traceMsg("read ", root); - else if (verbose > 1) - { - LOG_F(ERROR, "read <%s device='%s' name='%s'>\n", tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name")); - } - - /* that's all if driver is just registering a snoop */ - /* JM 2016-05-18: Send getProperties to upstream chained servers as well.*/ - if (!strcmp(roottag, "getProperties")) - { - this->addSDevice(dev, name); - Msg *mp = new Msg(this, root); - /* send to interested chained servers upstream */ - // FIXME: no use of root here - ClInfo::q2Servers(this, mp, root); - /* Send to snooped drivers if they exist so that they can echo back the snooped propertly immediately */ - // FIXME: no use of root here - q2RDrivers(dev, mp, root); - - mp->queuingDone(); - - return; - } - - /* that's all if driver desires to snoop BLOBs from other drivers */ - if (!strcmp(roottag, "enableBLOB")) - { - Property *sp = findSDevice(dev, name); - if (sp) - crackBLOB(pcdataXMLEle(root), &sp->blob); - delXMLEle(root); - return; - } - - /* Found a new device? Let's add it to driver info */ - if (dev[0] && !this->isHandlingDevice(dev)) - { -#ifdef OSX_EMBEDED_MODE - if (this->dev.empty()) - fprintf(stderr, "STARTED \"%s\"\n", dp->name.c_str()); - fflush(stderr); -#endif - this->dev.insert(dev); - } - - /* log messages if any and wanted */ - if (ldir) - logDMsg(root, dev); - - if (!strcmp(roottag, "pingRequest")) - { - setXMLEleTag(root, "pingReply"); - - Msg *mp = new Msg(this, root); - pushMsg(mp); - mp->queuingDone(); - return; - } - - /* build a new message -- set content iff anyone cares */ - Msg *mp = Msg::fromXml(this, root, sharedBuffers); - if (!mp) - { - close(); - return; - } - - /* send to interested clients */ - ClInfo::q2Clients(NULL, isblob, dev, name, mp, root); - - /* send to snooping drivers */ - DvrInfo::q2SDrivers(this, isblob, dev, name, mp, root); - - /* set message content if anyone cares else forget it */ - mp->queuingDone(); -} - -void DvrInfo::closeWritePart() -{ - // Don't want any half-dead drivers - close(); -} - -void DvrInfo::close() -{ - // Tell client driver is dead. - for (auto dev : dev) - { - /* Inform clients that this driver is dead */ - XMLEle *root = addXMLEle(NULL, "delProperty"); - addXMLAtt(root, "device", dev.c_str()); - - prXMLEle(stderr, root, 0); - Msg *mp = new Msg(this, root); - - ClInfo::q2Clients(NULL, 0, dev.c_str(), "", mp, root); - mp->queuingDone(); - } - - bool terminate; - if (!restart) - { - terminate = true; - } - else - { - if (restarts >= maxrestarts) - { - // log(fmt("Terminated after #%d restarts.\n", restarts)); - terminate = true; - } - else - { - // log(fmt("restart #%d\n", restarts)); - ++restarts; - terminate = false; - } - } - -#ifdef OSX_EMBEDED_MODE - fprintf(stderr, "STOPPED \"%s\"\n", name.c_str()); - fflush(stderr); -#endif - - // FIXME: we loose stderr from dying driver - if (terminate) - { - delete (this); - if ((!fifo) && (drivers.ids().empty())) - // Bye(); - return; - } - else - { - DvrInfo *restarted = this->clone(); - delete (this); - restarted->start(); - } -} - -void DvrInfo::q2RDrivers(const std::string &dev, Msg *mp, XMLEle *root) -{ - char *roottag = tagXMLEle(root); - - /* queue message to each interested driver. - * N.B. don't send generic getProps to more than one remote driver, - * otherwise they all fan out and we get multiple responses back. - */ - std::set remoteAdvertised; - for (auto dpId : drivers.ids()) - { - auto dp = drivers[dpId]; - if (dp == nullptr) - continue; - - std::string remoteUid = dp->remoteServerUid(); - bool isRemote = !remoteUid.empty(); - - /* driver known to not support this dev */ - if ((!dev.empty()) && dev[0] != '*' && !dp->isHandlingDevice(dev)) - continue; - - /* Only send message to each *unique* remote driver at a particular host:port - * Since it will be propogated to all other devices there */ - if (dev.empty() && isRemote) - { - if (remoteAdvertised.find(remoteUid) != remoteAdvertised.end()) - continue; - - /* Retain last remote driver data so that we do not send the same info again to a driver - * residing on the same host:port */ - remoteAdvertised.insert(remoteUid); - } - - /* JM 2016-10-30: Only send enableBLOB to remote drivers */ - if (isRemote == 0 && !strcmp(roottag, "enableBLOB")) - continue; - - // pushmsg can kill dp. do at end - dp->pushMsg(mp); - } -} - -void DvrInfo::q2SDrivers(DvrInfo *me, int isblob, const std::string &dev, const std::string &name, Msg *mp, XMLEle *root) -{ - std::string meRemoteServerUid = me ? me->remoteServerUid() : ""; - for (auto dpId : drivers.ids()) - { - auto dp = drivers[dpId]; - if (dp == nullptr) - continue; - - Property *sp = dp->findSDevice(dev, name); - - /* nothing for dp if not snooping for dev/name or wrong BLOB mode */ - if (!sp) - continue; - if ((isblob && sp->blob == B_NEVER) || (!isblob && sp->blob == B_ONLY)) - continue; - - // Do not send snoop data to remote drivers at the same host - // since they will manage their own snoops remotely - if ((!meRemoteServerUid.empty()) && dp->remoteServerUid() == meRemoteServerUid) - continue; - - // pushmsg can kill dp. do at end - dp->pushMsg(mp); - } -} - -void DvrInfo::addSDevice(const std::string &dev, const std::string &name) -{ - Property *sp; - - /* no dups */ - sp = findSDevice(dev, name); - if (sp) - return; - - /* add dev to sdevs list */ - sp = new Property(dev, name); - sp->blob = B_NEVER; - sprops.push_back(sp); - - if (verbose) - LOG_F(INFO, "snooping on %s.%s\n", dev.c_str(), name.c_str()); -} - -Property *DvrInfo::findSDevice(const std::string &dev, const std::string &name) const -{ - for (auto sp : sprops) - { - if ((sp->dev == dev) && (sp->name.empty() || sp->name == name)) - return (sp); - } - - return nullptr; -} - -DvrInfo::DvrInfo(bool useSharedBuffer) : MsgQueue(useSharedBuffer), - restarts(0) -{ - drivers.insert(this); -} - -DvrInfo::DvrInfo(const DvrInfo &model) : MsgQueue(model.useSharedBuffer), - name(model.name), - restarts(model.restarts) -{ - drivers.insert(this); -} - -DvrInfo::~DvrInfo() -{ - drivers.erase(this); - for (auto prop : sprops) - { - delete prop; - } -} - -bool DvrInfo::isHandlingDevice(const std::string &dev) const -{ - return this->dev.find(dev) != this->dev.end(); -} - -void DvrInfo::log(const std::string &str) const -{ - std::string logLine = "Driver "; - logLine += name; - logLine += ": "; - logLine += str; - LOG_F(INFO, "%s", logLine.c_str()); -} - -LocalDvrInfo::LocalDvrInfo() : DvrInfo(true) -{ - eio.set(this); - pidwatcher.set(this); -} - -LocalDvrInfo::LocalDvrInfo(const LocalDvrInfo &model) : DvrInfo(model), - envDev(model.envDev), - envConfig(model.envConfig), - envSkel(model.envSkel), - envPrefix(model.envPrefix) -{ - eio.set(this); - pidwatcher.set(this); -} - -LocalDvrInfo::~LocalDvrInfo() -{ - closeEfd(); - if (pid != 0) - { - kill(pid, SIGKILL); /* libev insures there will be no zombies */ - pid = 0; - } - closePid(); -} - -LocalDvrInfo *LocalDvrInfo::clone() const -{ - return new LocalDvrInfo(*this); -} - -void LocalDvrInfo::closeEfd() -{ - ::close(efd); - efd = -1; - eio.stop(); -} - -void LocalDvrInfo::closePid() -{ - pid = 0; - pidwatcher.stop(); -} - -void LocalDvrInfo::onEfdEvent(ev::io &, int revents) -{ - if (EV_ERROR & revents) - { - int sockErrno = readFdError(this->efd); - if (sockErrno) - { - LOG_F(ERROR, "Error on stderr: %s\n", strerror(sockErrno)); - closeEfd(); - } - return; - } - - if (revents & EV_READ) - { - ssize_t nr; - - /* read more */ - nr = read(efd, errbuff + errbuffpos, sizeof(errbuff) - errbuffpos); - if (nr <= 0) - { - if (nr < 0) - { - if (errno == EAGAIN || errno == EWOULDBLOCK) - return; - - LOG_F(ERROR, "stderr %s\n", strerror(errno)); - } - else - LOG_F(ERROR, "stderr EOF\n"); - closeEfd(); - return; - } - errbuffpos += nr; - - for (int i = 0; i < errbuffpos; ++i) - { - if (errbuff[i] == '\n') - { - LOG_F(ERROR, "%.*s\n", (int)i, errbuff); - i++; /* count including nl */ - errbuffpos -= i; /* remove from nexbuf */ - memmove(errbuff, errbuff + i, errbuffpos); /* slide remaining to front */ - i = -1; /* restart for loop scan */ - } - } - } -} - -#ifdef _WIN32 -void LocalDvrInfo::onPidEvent(ev_child &, int revents) -#else -void LocalDvrInfo::onPidEvent(ev::child &, int revents) -#endif -{ - if (revents & EV_CHILD) - { - if (WIFEXITED(pidwatcher.rstatus)) - { - LOG_F(ERROR, "process %d exited with status %d\n", pid, WEXITSTATUS(pidwatcher.rstatus)); - } - else if (WIFSIGNALED(pidwatcher.rstatus)) - { - int signum = WTERMSIG(pidwatcher.rstatus); - LOG_F(ERROR, "process %d killed with signal %d - %s\n", pid, signum, strsignal(signum)); - } - pid = 0; - this->pidwatcher.stop(); - } -} - -RemoteDvrInfo::RemoteDvrInfo() : DvrInfo(false) -{ -} - -RemoteDvrInfo::RemoteDvrInfo(const RemoteDvrInfo &model) : DvrInfo(model), - host(model.host), - port(model.port) -{ -} - -RemoteDvrInfo::~RemoteDvrInfo() -{ -} - -RemoteDvrInfo *RemoteDvrInfo::clone() const -{ - return new RemoteDvrInfo(*this); -} diff --git a/modules/HydrogenServer/fifo_server.cpp b/modules/HydrogenServer/fifo_server.cpp deleted file mode 100644 index 0c0bd463..00000000 --- a/modules/HydrogenServer/fifo_server.cpp +++ /dev/null @@ -1,228 +0,0 @@ -#include "fifo_server.hpp" - -#include -#include -#include - -#include "io.hpp" -#include "driver_info.hpp" -#include "hydrogen_server.hpp" - -Fifo::Fifo(const std::string &name) : name(name) -{ - fdev.set(this); -} - -/* Attempt to open up FIFO */ -void Fifo::close(void) -{ - if (fd != -1) - { - ::close(fd); - fd = -1; - fdev.stop(); - } - bufferPos = 0; -} - -void Fifo::open() -{ - /* Open up FIFO, if available */ -#ifdef _WIN32 - fd = ::open(name.c_str(), O_RDONLY); -#else - fd = ::open(name.c_str(), O_RDONLY | O_NONBLOCK | O_CLOEXEC); -#endif - - if (fd < 0) - { - // log(fmt("open(%s): %s.\n", name.c_str(), strerror(errno))); - // Bye(); - } - - fdev.start(fd, EV_READ); -} - -/* Handle one fifo command. Start/stop drivers accordingly */ -void Fifo::processLine(const char *line) -{ - - // log(fmt("FIFO: %s\n", line)); - - char cmd[MAXSBUF], arg[4][1], var[4][MAXSBUF], tDriver[MAXSBUF], tName[MAXSBUF], envConfig[MAXSBUF], - envSkel[MAXSBUF], envPrefix[MAXSBUF]; - - memset(&tDriver[0], 0, sizeof(char) * MAXSBUF); - memset(&tName[0], 0, sizeof(char) * MAXSBUF); - memset(&envConfig[0], 0, sizeof(char) * MAXSBUF); - memset(&envSkel[0], 0, sizeof(char) * MAXSBUF); - memset(&envPrefix[0], 0, sizeof(char) * MAXSBUF); - - int n = 0; - - bool remoteDriver = !!strstr(line, "@"); - - // If remote driver - if (remoteDriver) - { - n = sscanf(line, "%s %511[^\n]", cmd, tDriver); - - // Remove quotes if any - char *ptr = tDriver; - int len = strlen(tDriver); - while ((ptr = strstr(tDriver, "\""))) - { - memmove(ptr, ptr + 1, --len); - ptr[len] = '\0'; - } - } - // If local driver - else - { - n = sscanf(line, "%s %s -%1c \"%511[^\"]\" -%1c \"%511[^\"]\" -%1c \"%511[^\"]\" -%1c \"%511[^\"]\"", cmd, - tDriver, arg[0], var[0], arg[1], var[1], arg[2], var[2], arg[3], var[3]); - } - - int n_args = (n - 2) / 2; - - int j = 0; - for (j = 0; j < n_args; j++) - { - if (arg[j][0] == 'n') - { - strncpy(tName, var[j], MAXSBUF - 1); - tName[MAXSBUF - 1] = '\0'; - } - else if (arg[j][0] == 'c') - { - strncpy(envConfig, var[j], MAXSBUF - 1); - envConfig[MAXSBUF - 1] = '\0'; - } - else if (arg[j][0] == 's') - { - strncpy(envSkel, var[j], MAXSBUF - 1); - envSkel[MAXSBUF - 1] = '\0'; - } - else if (arg[j][0] == 'p') - { - strncpy(envPrefix, var[j], MAXSBUF - 1); - envPrefix[MAXSBUF - 1] = '\0'; - } - } - - bool startCmd; - if (!strcmp(cmd, "start")) - startCmd = 1; - else - startCmd = 0; - - if (startCmd) - { - - DvrInfo *dp; - if (remoteDriver == 0) - { - auto *localDp = new LocalDvrInfo(); - dp = localDp; - // strncpy(dp->dev, tName, MAXHYDROGENDEVICE); - localDp->envDev = tName; - localDp->envConfig = envConfig; - localDp->envSkel = envSkel; - localDp->envPrefix = envPrefix; - } - else - { - dp = new RemoteDvrInfo(); - } - dp->name = tDriver; - dp->start(); - } - else - { - for (auto dp : DvrInfo::drivers) - { - if (dp == nullptr) - continue; - - if (dp->name == tDriver) - { - /* If device name is given, check against it before shutting down */ - if (tName[0] && !dp->isHandlingDevice(tName)) - continue; - if (verbose) - // log(fmt("FIFO: Shutting down driver: %s\n", tDriver)); - - dp->restart = false; - dp->close(); - break; - } - } - } -} - -void Fifo::read(void) -{ - int rd = ::read(fd, buffer + bufferPos, sizeof(buffer) - 1 - bufferPos); - if (rd == 0) - { - if (bufferPos > 0) - { - buffer[bufferPos] = '\0'; - processLine(buffer); - } - close(); - open(); - return; - } - if (rd == -1) - { - if (errno == EAGAIN || errno == EWOULDBLOCK) - return; - - // log(fmt("Fifo error: %s\n", strerror(errno))); - close(); - open(); - return; - } - - bufferPos += rd; - - for (int i = 0; i < bufferPos; ++i) - { - if (buffer[i] == '\n') - { - buffer[i] = 0; - processLine(buffer); - // shift the buffer - i++; /* count including nl */ - bufferPos -= i; /* remove from nexbuf */ - memmove(buffer, buffer + i, bufferPos); /* slide remaining to front */ - i = -1; /* restart for loop scan */ - } - } - - if ((unsigned)bufferPos >= sizeof(buffer) - 1) - { - // log(fmt("Fifo overflow")); - close(); - open(); - } -} - -void Fifo::ioCb(ev::io &, int revents) -{ - if (EV_ERROR & revents) - { - int sockErrno = readFdError(this->fd); - if (sockErrno) - { - // log(fmt("Error on fifo: %s\n", strerror(sockErrno))); - close(); - open(); - } - } - else if (revents & EV_READ) - { - read(); - } -} diff --git a/modules/HydrogenServer/fifo_server.hpp b/modules/HydrogenServer/fifo_server.hpp deleted file mode 100644 index 38bb8a34..00000000 --- a/modules/HydrogenServer/fifo_server.hpp +++ /dev/null @@ -1,30 +0,0 @@ -#pragma once - -#include - -#include - -class Fifo -{ - std::string name; /* Path to FIFO for dynamic startups & shutdowns of drivers */ - - char buffer[1024]; - int bufferPos = 0; - int fd = -1; - ev::io fdev; - - void close(); - void open(); - void processLine(const char *line); - - /* Read commands from FIFO and process them. Start/stop drivers accordingly */ - void read(); - void ioCb(ev::io &watcher, int revents); - -public: - Fifo(const std::string &name); - void listen() - { - open(); - } -}; \ No newline at end of file diff --git a/modules/HydrogenClient/CMakeLists.txt b/modules/hydrogen_client/CMakeLists.txt similarity index 100% rename from modules/HydrogenClient/CMakeLists.txt rename to modules/hydrogen_client/CMakeLists.txt diff --git a/modules/HydrogenClient/abstractbaseclient.cpp b/modules/hydrogen_client/abstractbaseclient.cpp similarity index 100% rename from modules/HydrogenClient/abstractbaseclient.cpp rename to modules/hydrogen_client/abstractbaseclient.cpp diff --git a/modules/HydrogenClient/abstractbaseclient.h b/modules/hydrogen_client/abstractbaseclient.h similarity index 100% rename from modules/HydrogenClient/abstractbaseclient.h rename to modules/hydrogen_client/abstractbaseclient.h diff --git a/modules/HydrogenClient/abstractbaseclient_p.h b/modules/hydrogen_client/abstractbaseclient_p.h similarity index 100% rename from modules/HydrogenClient/abstractbaseclient_p.h rename to modules/hydrogen_client/abstractbaseclient_p.h diff --git a/modules/HydrogenClient/baseclient.cpp b/modules/hydrogen_client/baseclient.cpp similarity index 100% rename from modules/HydrogenClient/baseclient.cpp rename to modules/hydrogen_client/baseclient.cpp diff --git a/modules/HydrogenClient/baseclient.h b/modules/hydrogen_client/baseclient.h similarity index 100% rename from modules/HydrogenClient/baseclient.h rename to modules/hydrogen_client/baseclient.h diff --git a/modules/HydrogenClient/baseclient_p.h b/modules/hydrogen_client/baseclient_p.h similarity index 100% rename from modules/HydrogenClient/baseclient_p.h rename to modules/hydrogen_client/baseclient_p.h diff --git a/modules/HydrogenClient/socket/CMakeLists.txt b/modules/hydrogen_client/socket/CMakeLists.txt similarity index 100% rename from modules/HydrogenClient/socket/CMakeLists.txt rename to modules/hydrogen_client/socket/CMakeLists.txt diff --git a/modules/HydrogenClient/socket/select.h b/modules/hydrogen_client/socket/select.h similarity index 100% rename from modules/HydrogenClient/socket/select.h rename to modules/hydrogen_client/socket/select.h diff --git a/modules/HydrogenClient/socket/tcpsocket.cpp b/modules/hydrogen_client/socket/tcpsocket.cpp similarity index 100% rename from modules/HydrogenClient/socket/tcpsocket.cpp rename to modules/hydrogen_client/socket/tcpsocket.cpp diff --git a/modules/HydrogenClient/socket/tcpsocket.h b/modules/hydrogen_client/socket/tcpsocket.h similarity index 100% rename from modules/HydrogenClient/socket/tcpsocket.h rename to modules/hydrogen_client/socket/tcpsocket.h diff --git a/modules/HydrogenClient/socket/tcpsocket_p.h b/modules/hydrogen_client/socket/tcpsocket_p.h similarity index 100% rename from modules/HydrogenClient/socket/tcpsocket_p.h rename to modules/hydrogen_client/socket/tcpsocket_p.h diff --git a/modules/HydrogenClient/socket/tcpsocket_unix.cpp b/modules/hydrogen_client/socket/tcpsocket_unix.cpp similarity index 100% rename from modules/HydrogenClient/socket/tcpsocket_unix.cpp rename to modules/hydrogen_client/socket/tcpsocket_unix.cpp diff --git a/modules/HydrogenClient/socket/tcpsocket_win.cpp b/modules/hydrogen_client/socket/tcpsocket_win.cpp similarity index 100% rename from modules/HydrogenClient/socket/tcpsocket_win.cpp rename to modules/hydrogen_client/socket/tcpsocket_win.cpp diff --git a/modules/liimage/cvimage.cpp b/modules/lithium_image/cvimage.cpp similarity index 100% rename from modules/liimage/cvimage.cpp rename to modules/lithium_image/cvimage.cpp diff --git a/modules/liimage/cvimage.hpp b/modules/lithium_image/cvimage.hpp similarity index 100% rename from modules/liimage/cvimage.hpp rename to modules/lithium_image/cvimage.hpp diff --git a/modules/liimage/draw.cpp b/modules/lithium_image/draw.cpp similarity index 100% rename from modules/liimage/draw.cpp rename to modules/lithium_image/draw.cpp diff --git a/modules/liimage/image.cpp b/modules/lithium_image/image.cpp similarity index 100% rename from modules/liimage/image.cpp rename to modules/lithium_image/image.cpp diff --git a/modules/liimage/image.hpp b/modules/lithium_image/image.hpp similarity index 100% rename from modules/liimage/image.hpp rename to modules/lithium_image/image.hpp diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index e718e5c0..6a41ac07 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -4,7 +4,6 @@ project(hydrogendriver C CXX) find_package(USB1 REQUIRED) find_package(JPEG REQUIRED) find_package(Threads REQUIRED) -find_package(NOVA REQUIRED) include_directories(${USB1_INCLUDE_DIRS}) add_subdirectory(io) diff --git a/src/modules/device/device_manager.cpp b/src/modules/device/device_manager.cpp index d1480701..75df4d4d 100644 --- a/src/modules/device/device_manager.cpp +++ b/src/modules/device/device_manager.cpp @@ -44,7 +44,6 @@ Description: Device Manager #include "core/camera_utils.hpp" -#define LOGURU_USE_FMTLIB #include "loguru/loguru.hpp" #ifdef __cpp_lib_format diff --git a/src/modules/device/device_utils.cpp b/src/modules/device/device_utils.cpp index 36934062..0521c80f 100644 --- a/src/modules/device/device_utils.cpp +++ b/src/modules/device/device_utils.cpp @@ -31,18 +31,21 @@ Description: Device Utilities #include "device_utils.hpp" -#ifdef _WIN32 -#include -#else - -#endif -#include #include #include +#include #include - #include +#ifdef _WIN32 +#include +#else +#include +#include +#include +#endif + +#ifdef _WIN32 std::string execute_command(const std::string &cmd) { std::array buffer; @@ -86,4 +89,64 @@ std::string execute_command(const std::string &cmd) CloseHandle(pi.hProcess); CloseHandle(pi.hThread); return result; -} \ No newline at end of file +} +#else +std::string execute_command(const std::string &cmd) +{ + std::array buffer; + std::string result = ""; + + int pipeOut[2]; + if (pipe(pipeOut) == -1) + { + throw std::runtime_error("Failed to create pipe!"); + } + + pid_t childPid = fork(); + if (childPid == -1) + { + throw std::runtime_error("Failed to fork process!"); + } + else if (childPid == 0) + { + // Child process + close(pipeOut[0]); // Close unused read end of the pipe + + // Redirect stdout and stderr to the write end of the pipe + if (dup2(pipeOut[1], STDOUT_FILENO) == -1 || dup2(pipeOut[1], STDERR_FILENO) == -1) + { + throw std::runtime_error("Failed to redirect output!"); + } + close(pipeOut[1]); // Close the write end of the pipe + + // Execute the command + execl("/bin/sh", "sh", "-c", cmd.c_str(), NULL); + + // This point is reached only if execl fails + throw std::runtime_error("Failed to execute command!"); + } + else + { + // Parent process + close(pipeOut[1]); // Close unused write end of the pipe + + ssize_t bytesRead; + while ((bytesRead = read(pipeOut[0], buffer.data(), buffer.size())) > 0) + { + result.append(buffer.data(), bytesRead); + } + close(pipeOut[0]); // Close the read end of the pipe + + int status; + waitpid(childPid, &status, 0); // Wait for the child process to exit + + // Handle any error status + if (WIFEXITED(status) && WEXITSTATUS(status) != 0) + { + throw std::runtime_error("Command execution failed with non-zero exit status!"); + } + } + + return result; +} +#endif \ No newline at end of file diff --git a/modules/HydrogenServer/CMakeLists.txt b/src/modules/deviceloader/CMakeLists.txt similarity index 75% rename from modules/HydrogenServer/CMakeLists.txt rename to src/modules/deviceloader/CMakeLists.txt index 2e708b95..a3b62fdc 100644 --- a/modules/HydrogenServer/CMakeLists.txt +++ b/src/modules/deviceloader/CMakeLists.txt @@ -14,6 +14,9 @@ set(server_SRC tcp_server.cpp time.cpp xml_util.cpp + signal.cpp + remote_driver.cpp + local_driver.cpp ) include_directories(${CMAKE_SOURCE_DIR}/src/core) @@ -28,13 +31,23 @@ if(ANDROID) else() find_package(Threads REQUIRED) find_package(Libev REQUIRED) + find_package(UV REQUIRED) + find_package(fmt REQUIRED) add_executable(${PROJECT_NAME} ${server_SRC} hydrogen_server.cpp) + target_compile_definitions(${PROJECT_NAME} PRIVATE USE_LIBUV=1) + target_link_libraries(hydrogenserver hydrogendriverstatic ${CMAKE_THREAD_LIBS_INIT} ${LIBEV_LIBRARIES}) target_include_directories(hydrogenserver SYSTEM PRIVATE ${LIBEV_INCLUDE_DIRS}) - target_link_libraries(hydrogenserver loguru) + target_link_libraries(hydrogenserver loguru fmt ${UV_LIBRARY} backward) + + set_target_properties( + hydrogenserver + PROPERTIES + OUTPUT_NAME ../../../hydrogenserver + ) install(TARGETS hydrogenserver RUNTIME DESTINATION bin) endif(ANDROID) diff --git a/src/modules/deviceloader/README.md b/src/modules/deviceloader/README.md new file mode 100644 index 00000000..a64c10a6 --- /dev/null +++ b/src/modules/deviceloader/README.md @@ -0,0 +1,52 @@ +Hydrogen Server - Next Generation Device Server +=============================================== + +本项目为INDI Server扩展项目,完全支持原版INDI设备,并增加了对Hydrogen设备的支持。以下是对该项目进行的主要修改和扩展: + ++ 拆分单文件:将原先的单一文件拆分成多个文件,这有助于后续的代码维护和管理。通过拆分文件,可以更好地组织代码,使得逻辑结构更清晰。 + ++ 使用最新的C++特性:在重写老旧代码的过程中,使用了部分最新的C++特性。这可能包括使用更现代化的语法和标准库特性,使代码更加简洁、高效和易读。 + ++ 增加对Windows的支持:除了原版INDI设备的支持外,Hydrogen Server还增加了对Windows操作系统的支持。这意味着可以在Windows平台上运行Hydrogen Server,并与Hydrogen设备进行交互。 + ++ 逻辑优化:对部分逻辑进行了优化,以提升程序的性能和效率。通过对代码进行改进和精简,可以改善系统的响应速度,并提供更好的用户体验。 + +## 文件目录 + +``` +. +├── CMakeLists.txt +├── README.md +├── client_info.cpp +├── client_info.hpp +├── concurrent.cpp +├── concurrent.hpp +├── driver_info.cpp +├── driver_info.hpp +├── fifo_server.cpp +├── fifo_server.hpp +├── hydrogen_server.cpp +├── hydrogen_server.hpp +├── io.cpp +├── io.hpp +├── local_driver.cpp +├── local_driver.hpp +├── message.cpp +├── message.hpp +├── message_queue.cpp +├── message_queue.hpp +├── property.cpp +├── property.hpp +├── remote_driver.cpp +├── remote_driver.hpp +├── serialize.cpp +├── serialize.hpp +├── signal.cpp +├── signal.hpp +├── tcp_server.cpp +├── tcp_server.hpp +├── time.cpp +├── time.hpp +├── xml_util.cpp +└── xml_util.hpp +``` diff --git a/modules/HydrogenServer/client_info.cpp b/src/modules/deviceloader/client_info.cpp similarity index 100% rename from modules/HydrogenServer/client_info.cpp rename to src/modules/deviceloader/client_info.cpp diff --git a/modules/HydrogenServer/client_info.hpp b/src/modules/deviceloader/client_info.hpp similarity index 100% rename from modules/HydrogenServer/client_info.hpp rename to src/modules/deviceloader/client_info.hpp diff --git a/modules/HydrogenServer/concurrent.cpp b/src/modules/deviceloader/concurrent.cpp similarity index 100% rename from modules/HydrogenServer/concurrent.cpp rename to src/modules/deviceloader/concurrent.cpp diff --git a/modules/HydrogenServer/concurrent.hpp b/src/modules/deviceloader/concurrent.hpp similarity index 100% rename from modules/HydrogenServer/concurrent.hpp rename to src/modules/deviceloader/concurrent.hpp diff --git a/src/modules/deviceloader/driver_info.cpp b/src/modules/deviceloader/driver_info.cpp new file mode 100644 index 00000000..89b3df47 --- /dev/null +++ b/src/modules/deviceloader/driver_info.cpp @@ -0,0 +1,305 @@ +#include "driver_info.hpp" + +#include +#include + +#ifdef _WIN32 +#include +#include +#else +#include +#include +#include +#include +#endif + +#include "io.hpp" +#include "lilxml.hpp" +#include "client_info.hpp" +#include "xml_util.hpp" +#include "hydrogen_server.hpp" + +#include "loguru/loguru.hpp" + +void DvrInfo::onMessage(XMLEle *root, std::list &sharedBuffers) +{ + char *roottag = tagXMLEle(root); + const char *dev = findXMLAttValu(root, "device"); + const char *name = findXMLAttValu(root, "name"); + int isblob = !strcmp(tagXMLEle(root), "setBLOBVector"); + + if (verbose > 2) + traceMsg("read ", root); + else if (verbose > 1) + { + LOG_F(ERROR, "read <{} device='{}' name='{}'>\n", tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name")); + } + + /* that's all if driver is just registering a snoop */ + /* JM 2016-05-18: Send getProperties to upstream chained servers as well.*/ + if (!strcmp(roottag, "getProperties")) + { + this->addSDevice(dev, name); + Msg *mp = new Msg(this, root); + /* send to interested chained servers upstream */ + // FIXME: no use of root here + ClInfo::q2Servers(this, mp, root); + /* Send to snooped drivers if they exist so that they can echo back the snooped propertly immediately */ + // FIXME: no use of root here + q2RDrivers(dev, mp, root); + + mp->queuingDone(); + + return; + } + + /* that's all if driver desires to snoop BLOBs from other drivers */ + if (!strcmp(roottag, "enableBLOB")) + { + Property *sp = findSDevice(dev, name); + if (sp) + crackBLOB(pcdataXMLEle(root), &sp->blob); + delXMLEle(root); + return; + } + + /* Found a new device? Let's add it to driver info */ + if (dev[0] && !this->isHandlingDevice(dev)) + { +#ifdef OSX_EMBEDED_MODE + if (this->dev.empty()) + fprintf(stderr, "STARTED \"%s\"\n", dp->name.c_str()); + fflush(stderr); +#endif + this->dev.insert(dev); + } + + /* log messages if any and wanted */ + if (ldir) + logDMsg(root, dev); + + if (!strcmp(roottag, "pingRequest")) + { + setXMLEleTag(root, "pingReply"); + + Msg *mp = new Msg(this, root); + pushMsg(mp); + mp->queuingDone(); + return; + } + + /* build a new message -- set content iff anyone cares */ + Msg *mp = Msg::fromXml(this, root, sharedBuffers); + if (!mp) + { + close(); + return; + } + + /* send to interested clients */ + ClInfo::q2Clients(NULL, isblob, dev, name, mp, root); + + /* send to snooping drivers */ + DvrInfo::q2SDrivers(this, isblob, dev, name, mp, root); + + /* set message content if anyone cares else forget it */ + mp->queuingDone(); +} + +void DvrInfo::closeWritePart() +{ + // Don't want any half-dead drivers + close(); +} + +void DvrInfo::close() +{ + // Tell client driver is dead. + for (auto dev : dev) + { + /* Inform clients that this driver is dead */ + XMLEle *root = addXMLEle(NULL, "delProperty"); + addXMLAtt(root, "device", dev.c_str()); + + prXMLEle(stderr, root, 0); + Msg *mp = new Msg(this, root); + + ClInfo::q2Clients(NULL, 0, dev.c_str(), "", mp, root); + mp->queuingDone(); + } + + bool terminate; + if (!restart) + { + terminate = true; + } + else + { + if (restarts >= maxrestarts) + { + log(fmt::format("Terminated after #{} restarts.", restarts)); + terminate = true; + } + else + { + log(fmt::format("restart #{}", restarts)); + ++restarts; + terminate = false; + } + } + +#ifdef OSX_EMBEDED_MODE + fprintf(stderr, "STOPPED \"%s\"\n", name.c_str()); + fflush(stderr); +#endif + + // FIXME: we loose stderr from dying driver + if (terminate) + { + delete (this); + if ((!fifo) && (drivers.ids().empty())) + // Bye(); + return; + } + else + { + DvrInfo *restarted = this->clone(); + delete (this); + restarted->start(); + } +} + +void DvrInfo::q2RDrivers(const std::string &dev, Msg *mp, XMLEle *root) +{ + char *roottag = tagXMLEle(root); + + /* queue message to each interested driver. + * N.B. don't send generic getProps to more than one remote driver, + * otherwise they all fan out and we get multiple responses back. + */ + std::set remoteAdvertised; + for (auto dpId : drivers.ids()) + { + auto dp = drivers[dpId]; + if (dp == nullptr) + continue; + + std::string remoteUid = dp->remoteServerUid(); + bool isRemote = !remoteUid.empty(); + + /* driver known to not support this dev */ + if ((!dev.empty()) && dev[0] != '*' && !dp->isHandlingDevice(dev)) + continue; + + /* Only send message to each *unique* remote driver at a particular host:port + * Since it will be propogated to all other devices there */ + if (dev.empty() && isRemote) + { + if (remoteAdvertised.find(remoteUid) != remoteAdvertised.end()) + continue; + + /* Retain last remote driver data so that we do not send the same info again to a driver + * residing on the same host:port */ + remoteAdvertised.insert(remoteUid); + } + + /* JM 2016-10-30: Only send enableBLOB to remote drivers */ + if (isRemote == 0 && !strcmp(roottag, "enableBLOB")) + continue; + + // pushmsg can kill dp. do at end + dp->pushMsg(mp); + } +} + +void DvrInfo::q2SDrivers(DvrInfo *me, int isblob, const std::string &dev, const std::string &name, Msg *mp, XMLEle *root) +{ + std::string meRemoteServerUid = me ? me->remoteServerUid() : ""; + for (auto dpId : drivers.ids()) + { + auto dp = drivers[dpId]; + if (dp == nullptr) + continue; + + Property *sp = dp->findSDevice(dev, name); + + /* nothing for dp if not snooping for dev/name or wrong BLOB mode */ + if (!sp) + continue; + if ((isblob && sp->blob == B_NEVER) || (!isblob && sp->blob == B_ONLY)) + continue; + + // Do not send snoop data to remote drivers at the same host + // since they will manage their own snoops remotely + if ((!meRemoteServerUid.empty()) && dp->remoteServerUid() == meRemoteServerUid) + continue; + + // pushmsg can kill dp. do at end + dp->pushMsg(mp); + } +} + +void DvrInfo::addSDevice(const std::string &dev, const std::string &name) +{ + Property *sp; + + /* no dups */ + sp = findSDevice(dev, name); + if (sp) + return; + + /* add dev to sdevs list */ + sp = new Property(dev, name); + sp->blob = B_NEVER; + sprops.push_back(sp); + + if (verbose) + LOG_F(INFO, "snooping on %s.%s\n", dev.c_str(), name.c_str()); +} + +Property *DvrInfo::findSDevice(const std::string &dev, const std::string &name) const +{ + for (auto sp : sprops) + { + if ((sp->dev == dev) && (sp->name.empty() || sp->name == name)) + return (sp); + } + + return nullptr; +} + +DvrInfo::DvrInfo(bool useSharedBuffer) : MsgQueue(useSharedBuffer), + restarts(0) +{ + drivers.insert(this); +} + +DvrInfo::DvrInfo(const DvrInfo &model) : MsgQueue(model.useSharedBuffer), + name(model.name), + restarts(model.restarts) +{ + drivers.insert(this); +} + +DvrInfo::~DvrInfo() +{ + drivers.erase(this); + for (auto prop : sprops) + { + delete prop; + } +} + +bool DvrInfo::isHandlingDevice(const std::string &dev) const +{ + return this->dev.find(dev) != this->dev.end(); +} + +void DvrInfo::log(const std::string &str) const +{ + std::string logLine = "Driver "; + logLine += name; + logLine += ": "; + logLine += str; + LOG_F(INFO, "{}", logLine.c_str()); +} diff --git a/modules/HydrogenServer/driver_info.hpp b/src/modules/deviceloader/driver_info.hpp similarity index 58% rename from modules/HydrogenServer/driver_info.hpp rename to src/modules/deviceloader/driver_info.hpp index f1a1ee54..4cc3ebc3 100644 --- a/modules/HydrogenServer/driver_info.hpp +++ b/src/modules/deviceloader/driver_info.hpp @@ -79,77 +79,3 @@ class DvrInfo : public MsgQueue return false; } }; - -class LocalDvrInfo : public DvrInfo -{ - char errbuff[1024]; /* buffer for stderr pipe. line too long will be clipped */ - int errbuffpos = 0; /* first free pos in buffer */ - ev::io eio; /* Event loop io events */ -#ifdef _WIN32 - ev_child pidwatcher; -#else - ev::child pidwatcher; -#endif - void onEfdEvent(ev::io &watcher, int revents); /* callback for data on efd */ -#ifdef _WIN32 - void onPidEvent(ev_child &watcher, int revents); -#else - void onPidEvent(ev::child &watcher, int revents); -#endif - - int pid = 0; /* process id or 0 for N/A (not started/terminated) */ - int efd = -1; /* stderr from driver, or -1 when N/A */ - - void closeEfd(); - void closePid(); - -protected: - LocalDvrInfo(const LocalDvrInfo &model); - -public: - std::string envDev; - std::string envConfig; - std::string envSkel; - std::string envPrefix; - - LocalDvrInfo(); - virtual ~LocalDvrInfo(); - - virtual void start(); - - virtual LocalDvrInfo *clone() const; - - virtual const std::string remoteServerUid() const - { - return ""; - } -}; - -class RemoteDvrInfo : public DvrInfo -{ - /* open a connection to the given host and port or die. - * return socket fd. - */ - int openHYDROGENServer(); - - void extractRemoteId(const std::string &name, std::string &o_host, int &o_port, std::string &o_dev) const; - -protected: - RemoteDvrInfo(const RemoteDvrInfo &model); - -public: - std::string host; - int port; - - RemoteDvrInfo(); - virtual ~RemoteDvrInfo(); - - virtual void start(); - - virtual RemoteDvrInfo *clone() const; - - virtual const std::string remoteServerUid() const - { - return std::string(host) + ":" + std::to_string(port); - } -}; \ No newline at end of file diff --git a/src/modules/deviceloader/fifo_server.cpp b/src/modules/deviceloader/fifo_server.cpp new file mode 100644 index 00000000..834853e2 --- /dev/null +++ b/src/modules/deviceloader/fifo_server.cpp @@ -0,0 +1,457 @@ +#include "fifo_server.hpp" + +#include +#include +#include + +#include "io.hpp" +#include "driver_info.hpp" +#include "local_driver.hpp" +#include "remote_driver.hpp" +#include "hydrogen_server.hpp" + +#ifdef USE_LIBUV + +Fifo::Fifo(const std::string &name) : name(name) +{ + pollHandle.data = this; +} + +/* Attempt to open up FIFO */ +void Fifo::close() +{ + if (fd != -1) + { + ::close(fd); + fd = -1; + uv_poll_stop(&pollHandle); + } + bufferPos = 0; +} + +void Fifo::open() +{ + /* Open up FIFO, if available */ +#ifdef _WIN32 + fd = ::open(name.c_str(), O_RDONLY); +#else + fd = ::open(name.c_str(), O_RDONLY | O_NONBLOCK | O_CLOEXEC); +#endif + + if (fd < 0) + { + // log(fmt("open(%s): %s.\n", name.c_str(), strerror(errno))); + // Bye(); + } + + uv_poll_init(uv_default_loop(), &pollHandle, fd); + uv_poll_start(&pollHandle, UV_READABLE, ioCb); +} + +/* Handle one fifo command. Start/stop drivers accordingly */ +void Fifo::processLine(const char *line) +{ + + // log(fmt("FIFO: %s\n", line)); + + char cmd[MAXSBUF], arg[4][1], var[4][MAXSBUF], tDriver[MAXSBUF], tName[MAXSBUF], envConfig[MAXSBUF], + envSkel[MAXSBUF], envPrefix[MAXSBUF]; + + memset(&tDriver[0], 0, sizeof(char) * MAXSBUF); + memset(&tName[0], 0, sizeof(char) * MAXSBUF); + memset(&envConfig[0], 0, sizeof(char) * MAXSBUF); + memset(&envSkel[0], 0, sizeof(char) * MAXSBUF); + memset(&envPrefix[0], 0, sizeof(char) * MAXSBUF); + + int n = 0; + + bool remoteDriver = !!strstr(line, "@"); + + // If remote driver + if (remoteDriver) + { + n = sscanf(line, "%s %511[^\n]", cmd, tDriver); + + // Remove quotes if any + char *ptr = tDriver; + int len = strlen(tDriver); + while ((ptr = strstr(tDriver, "\""))) + { + memmove(ptr, ptr + 1, --len); + ptr[len] = '\0'; + } + } + // If local driver + else + { + n = sscanf(line, "%s %s -%1c \"%511[^\"]\" -%1c \"%511[^\"]\" -%1c \"%511[^\"]\" -%1c \"%511[^\"]\"", cmd, + tDriver, arg[0], var[0], arg[1], var[1], arg[2], var[2], arg[3], var[3]); + } + + int n_args = (n - 2) / 2; + + int j = 0; + for (j = 0; j < n_args; j++) + { + if (arg[j][0] == 'n') + { + strncpy(tName, var[j], MAXSBUF - 1); + tName[MAXSBUF - 1] = '\0'; + } + else if (arg[j][0] == 'c') + { + strncpy(envConfig, var[j], MAXSBUF - 1); + envConfig[MAXSBUF - 1] = '\0'; + } + else if (arg[j][0] == 's') + { + strncpy(envSkel, var[j], MAXSBUF - 1); + envSkel[MAXSBUF - 1] = '\0'; + } + else if (arg[j][0] == 'p') + { + strncpy(envPrefix, var[j], MAXSBUF - 1); + envPrefix[MAXSBUF - 1] = '\0'; + } + } + + bool startCmd; + if (!strcmp(cmd, "start")) + startCmd = 1; + else + startCmd = 0; + + if (startCmd) + { + + DvrInfo *dp; + if (remoteDriver == 0) + { + auto *localDp = new LocalDvrInfo(); + dp = localDp; + // strncpy(dp->dev, tName, MAXHYDROGENDEVICE); + localDp->envDev = tName; + localDp->envConfig = envConfig; + localDp->envSkel = envSkel; + localDp->envPrefix = envPrefix; + } + else + { + dp = new RemoteDvrInfo(); + } + dp->name = tDriver; + dp->start(); + } + else + { + for (auto dp : DvrInfo::drivers) + { + if (dp == nullptr) + continue; + + if (dp->name == tDriver) + { + /* If device name is given, check against it before shutting down */ + if (tName[0] && !dp->isHandlingDevice(tName)) + continue; + if (verbose) + // log(fmt("FIFO: Shutting down driver: %s\n", tDriver)); + + dp->restart = false; + dp->close(); + break; + } + } + } +} + +void Fifo::read() +{ + int rd = ::read(fd, buffer + bufferPos, sizeof(buffer) - 1 - bufferPos); + if (rd == 0) + { + if (bufferPos > 0) + { + buffer[bufferPos] = '\0'; + processLine(buffer); + } + close(); + open(); + return; + } + if (rd == -1) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + + // log(fmt("Fifo error: %s\n", strerror(errno))); + close(); + open(); + return; + } + + bufferPos += rd; + + for (int i = 0; i < bufferPos; ++i) + { + if (buffer[i] == '\n') + { + buffer[i] = 0; + processLine(buffer); + // shift the buffer + i++; /* count including nl */ + bufferPos -= i; /* remove from nexbuf */ + memmove(buffer, buffer + i, bufferPos); /* slide remaining to front */ + i = -1; /* restart for loop scan */ + } + } + + if ((unsigned)bufferPos >= sizeof(buffer) - 1) + { + // log(fmt("Fifo overflow")); + close(); + open(); + } +} + +void Fifo::ioCb(uv_poll_t *handle, int status, int revents) +{ + Fifo *fifo = static_cast(handle->data); + if (status < 0) + { + int sockErrno = readFdError(fifo->fd); + if (sockErrno) + { + // log(fmt("Error on fifo: %s\n", strerror(sockErrno))); + fifo->close(); + fifo->open(); + } + } + else if (revents & UV_READABLE) + { + fifo->read(); + } +} + +#else + +Fifo::Fifo(const std::string &name) : name(name) +{ + fdev.set(this); +} + +/* Attempt to open up FIFO */ +void Fifo::close(void) +{ + if (fd != -1) + { + ::close(fd); + fd = -1; + fdev.stop(); + } + bufferPos = 0; +} + +void Fifo::open() +{ + /* Open up FIFO, if available */ +#ifdef _WIN32 + fd = ::open(name.c_str(), O_RDONLY); +#else + fd = ::open(name.c_str(), O_RDONLY | O_NONBLOCK | O_CLOEXEC); +#endif + + if (fd < 0) + { + // log(fmt("open(%s): %s.\n", name.c_str(), strerror(errno))); + // Bye(); + } + + fdev.start(fd, EV_READ); +} + +/* Handle one fifo command. Start/stop drivers accordingly */ +void Fifo::processLine(const char *line) +{ + + // log(fmt("FIFO: %s\n", line)); + + char cmd[MAXSBUF], arg[4][1], var[4][MAXSBUF], tDriver[MAXSBUF], tName[MAXSBUF], envConfig[MAXSBUF], + envSkel[MAXSBUF], envPrefix[MAXSBUF]; + + memset(&tDriver[0], 0, sizeof(char) * MAXSBUF); + memset(&tName[0], 0, sizeof(char) * MAXSBUF); + memset(&envConfig[0], 0, sizeof(char) * MAXSBUF); + memset(&envSkel[0], 0, sizeof(char) * MAXSBUF); + memset(&envPrefix[0], 0, sizeof(char) * MAXSBUF); + + int n = 0; + + bool remoteDriver = !!strstr(line, "@"); + + // If remote driver + if (remoteDriver) + { + n = sscanf(line, "%s %511[^\n]", cmd, tDriver); + + // Remove quotes if any + char *ptr = tDriver; + int len = strlen(tDriver); + while ((ptr = strstr(tDriver, "\""))) + { + memmove(ptr, ptr + 1, --len); + ptr[len] = '\0'; + } + } + // If local driver + else + { + n = sscanf(line, "%s %s -%1c \"%511[^\"]\" -%1c \"%511[^\"]\" -%1c \"%511[^\"]\" -%1c \"%511[^\"]\"", cmd, + tDriver, arg[0], var[0], arg[1], var[1], arg[2], var[2], arg[3], var[3]); + } + + int n_args = (n - 2) / 2; + + int j = 0; + for (j = 0; j < n_args; j++) + { + if (arg[j][0] == 'n') + { + strncpy(tName, var[j], MAXSBUF - 1); + tName[MAXSBUF - 1] = '\0'; + } + else if (arg[j][0] == 'c') + { + strncpy(envConfig, var[j], MAXSBUF - 1); + envConfig[MAXSBUF - 1] = '\0'; + } + else if (arg[j][0] == 's') + { + strncpy(envSkel, var[j], MAXSBUF - 1); + envSkel[MAXSBUF - 1] = '\0'; + } + else if (arg[j][0] == 'p') + { + strncpy(envPrefix, var[j], MAXSBUF - 1); + envPrefix[MAXSBUF - 1] = '\0'; + } + } + + bool startCmd; + if (!strcmp(cmd, "start")) + startCmd = 1; + else + startCmd = 0; + + if (startCmd) + { + + DvrInfo *dp; + if (remoteDriver == 0) + { + auto *localDp = new LocalDvrInfo(); + dp = localDp; + // strncpy(dp->dev, tName, MAXHYDROGENDEVICE); + localDp->envDev = tName; + localDp->envConfig = envConfig; + localDp->envSkel = envSkel; + localDp->envPrefix = envPrefix; + } + else + { + dp = new RemoteDvrInfo(); + } + dp->name = tDriver; + dp->start(); + } + else + { + for (auto dp : DvrInfo::drivers) + { + if (dp == nullptr) + continue; + + if (dp->name == tDriver) + { + /* If device name is given, check against it before shutting down */ + if (tName[0] && !dp->isHandlingDevice(tName)) + continue; + if (verbose) + // log(fmt("FIFO: Shutting down driver: %s\n", tDriver)); + + dp->restart = false; + dp->close(); + break; + } + } + } +} + +void Fifo::read(void) +{ + int rd = ::read(fd, buffer + bufferPos, sizeof(buffer) - 1 - bufferPos); + if (rd == 0) + { + if (bufferPos > 0) + { + buffer[bufferPos] = '\0'; + processLine(buffer); + } + close(); + open(); + return; + } + if (rd == -1) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + + // log(fmt("Fifo error: %s\n", strerror(errno))); + close(); + open(); + return; + } + + bufferPos += rd; + + for (int i = 0; i < bufferPos; ++i) + { + if (buffer[i] == '\n') + { + buffer[i] = 0; + processLine(buffer); + // shift the buffer + i++; /* count including nl */ + bufferPos -= i; /* remove from nexbuf */ + memmove(buffer, buffer + i, bufferPos); /* slide remaining to front */ + i = -1; /* restart for loop scan */ + } + } + + if ((unsigned)bufferPos >= sizeof(buffer) - 1) + { + // log(fmt("Fifo overflow")); + close(); + open(); + } +} + +void Fifo::ioCb(ev::io &, int revents) +{ + if (EV_ERROR & revents) + { + int sockErrno = readFdError(this->fd); + if (sockErrno) + { + // log(fmt("Error on fifo: %s\n", strerror(sockErrno))); + close(); + open(); + } + } + else if (revents & EV_READ) + { + read(); + } +} + +#endif diff --git a/src/modules/deviceloader/fifo_server.hpp b/src/modules/deviceloader/fifo_server.hpp new file mode 100644 index 00000000..c1f65563 --- /dev/null +++ b/src/modules/deviceloader/fifo_server.hpp @@ -0,0 +1,63 @@ +#pragma once + +#include + +#ifdef USE_LIBUV + +#include +#include + +class Fifo +{ + std::string name; /* Path to FIFO for dynamic startups & shutdowns of drivers */ + + char buffer[1024]; + int bufferPos = 0; + int fd = -1; + uv_poll_t pollHandle; + + void close(); + void open(); + void processLine(const char *line); + + /* Read commands from FIFO and process them. Start/stop drivers accordingly */ + void read(); + static void ioCb(uv_poll_t* handle, int status, int revents); + +public: + Fifo(const std::string& name); + void listen() + { + open(); + } +}; + +#else + +#include + +class Fifo +{ + std::string name; /* Path to FIFO for dynamic startups & shutdowns of drivers */ + + char buffer[1024]; + int bufferPos = 0; + int fd = -1; + ev::io fdev; + + void close(); + void open(); + void processLine(const char *line); + + /* Read commands from FIFO and process them. Start/stop drivers accordingly */ + void read(); + void ioCb(ev::io &watcher, int revents); + +public: + Fifo(const std::string &name); + void listen() + { + open(); + } +}; +#endif \ No newline at end of file diff --git a/modules/HydrogenServer/hydrogen_server.cpp b/src/modules/deviceloader/hydrogen_server.cpp similarity index 72% rename from modules/HydrogenServer/hydrogen_server.cpp rename to src/modules/deviceloader/hydrogen_server.cpp index 04bbc196..0023beca 100644 --- a/modules/HydrogenServer/hydrogen_server.cpp +++ b/src/modules/deviceloader/hydrogen_server.cpp @@ -19,6 +19,8 @@ #include "client_info.hpp" #include "concurrent.hpp" #include "driver_info.hpp" +#include "local_driver.hpp" +#include "remote_driver.hpp" #include "io.hpp" #include "message_queue.hpp" @@ -28,6 +30,7 @@ #include "tcp_server.hpp" #include "time.hpp" #include "xml_util.hpp" +#include "signal.hpp" #include "hydrogen_server.hpp" @@ -64,29 +67,18 @@ #endif #endif +#ifdef USE_LIBUV +#include +#else #include +#endif #include "loguru/loguru.hpp" -#include "argparse/argparse.hpp" +#include "backward/backward.hpp" extern ConcurrentSet ClInfo::clients; extern ConcurrentSet DvrInfo::drivers; -/* record we have started and our args */ -static void logStartup(int ac, char *av[]) -{ - int i; - - std::string startupMsg = "startup:"; - for (i = 0; i < ac; i++) - { - startupMsg += " "; - startupMsg += av[i]; - } - startupMsg += '\n'; - LOG_F(INFO, "%s", startupMsg.c_str()); -} - /* print usage message and exit (2) */ static void usage(void) { @@ -113,6 +105,7 @@ static void usage(void) exit(2); } +/* #ifdef _WIN32 static void noSIGPIPE() { @@ -128,12 +121,19 @@ static void noSIGPIPE() (void)sigaction(SIGPIPE, &sa, NULL); } #endif +*/ +void noSIGPIPE() +{ + SignalHandler::registerHandler(SIGPIPE, []() {}); +} -int main(int ac, char *av[]) +void cleanup() { - /* log startup */ - logStartup(ac, av); + SignalHandler::unregisterHandler(SIGPIPE); +} +int main(int ac, char *av[]) +{ /* save our name */ me = av[0]; @@ -152,7 +152,9 @@ int main(int ac, char *av[]) #else /* crack args */ - while ((--ac > 0) && ((*++av)[0] == '-')) + /* + Old: + while ((--ac > 0) && ((*++av)[0] == '-')) { char *s; for (s = av[0] + 1; *s != '\0'; s++) @@ -232,6 +234,45 @@ int main(int ac, char *av[]) usage(); } } + */ + int opt; + while ((opt = getopt(ac, av, "l:m:p:d:u:f:r:v")) != -1) + { + switch (opt) + { + case 'l': + ldir = optarg; + break; + case 'm': + maxqsiz = std::stoi(optarg) * 1024 * 1024; + break; + case 'p': + port = std::stoi(optarg); + break; + case 'd': + maxstreamsiz = std::stoi(optarg) * 1024 * 1024; + break; +#ifdef ENABLE_HYDROGEN_SHARED_MEMORY + case 'u': + UnixServer::unixSocketPath = optarg; + break; +#endif // ENABLE_HYDROGEN_SHARED_MEMORY + case 'f': + fifo = new Fifo(optarg); + break; + case 'r': + maxrestarts = std::stoi(optarg); + if (maxrestarts < 0) + maxrestarts = 0; + break; + case 'v': + verbose++; + break; + default: // '?' + usage(); + } + } + #endif /* at this point there are ac args in av[] to name our drivers */ @@ -242,24 +283,49 @@ int main(int ac, char *av[]) noSIGPIPE(); /* start each driver */ - while (ac-- > 0) + /* Old: + while (ac-- > 0) + { + std::string dvrName = *av++; + DvrInfo *dr; + if (dvrName.find('@') != std::string::npos) + { + dr = new RemoteDvrInfo(); + } + else + { + dr = new LocalDvrInfo(); + } + dr->name = dvrName; + dr->start(); + } + */ + LOG_F(INFO, "Start loading driver..."); + int count = ac - 1; + std::vector> drivers; + for (int i = 0; i < count; i++) { - std::string dvrName = *av++; - DvrInfo *dr; + std::string dvrName = av[i + 1]; + std::shared_ptr driver; if (dvrName.find('@') != std::string::npos) { - dr = new RemoteDvrInfo(); + driver = std::make_unique(); } else { - dr = new LocalDvrInfo(); + driver = std::make_unique(); } - dr->name = dvrName; - dr->start(); + driver->name = dvrName; + drivers.push_back(driver); + drivers[i]->start(); + LOG_F(INFO, "Started {}", driver->name); } /* announce we are online */ - (new TcpServer(port))->listen(); + // Old: (new TcpServer(port))->listen(); + std::shared_ptr tcp_server; + tcp_server = std::make_shared(port); + tcp_server->listen(); #ifdef ENABLE_HYDROGEN_SHARED_MEMORY /* create a new unix server */ @@ -272,13 +338,19 @@ int main(int ac, char *av[]) // JM 2022.07.23: This causes an issue on MacOS. Disabled for now until investigated further. // unsetenv("HYDROGENPREFIX"); + LOG_F(INFO, "Starting FIFO server"); fifo->listen(); } /* handle new clients and all io */ + LOG_F(INFO, "Main loop started"); +#ifdef USE_LIBUV + uv_run(loop, UV_RUN_DEFAULT); +#else loop.loop(); +#endif /* will not happen unless no more listener left ! */ - LOG_F(ERROR, "unexpected return from event loop\n"); + LOG_F(ERROR, "unexpected return from event loop"); return (1); } diff --git a/modules/HydrogenServer/hydrogen_server.hpp b/src/modules/deviceloader/hydrogen_server.hpp similarity index 95% rename from modules/HydrogenServer/hydrogen_server.hpp rename to src/modules/deviceloader/hydrogen_server.hpp index 6e23c415..9b7ac15e 100644 --- a/modules/HydrogenServer/hydrogen_server.hpp +++ b/src/modules/deviceloader/hydrogen_server.hpp @@ -23,7 +23,11 @@ #define STRINGIFY_TOK(x) #x #define TO_STRING(x) STRINGIFY_TOK(x) +#ifdef USE_LIBUV +static uv_loop_t* loop = uv_default_loop(); +#else static ev::default_loop loop; +#endif static Fifo *fifo = nullptr; static const char *me; /* our name */ static int port = HYDROGENPORT; /* public HYDROGEN port */ diff --git a/modules/HydrogenServer/io.cpp b/src/modules/deviceloader/io.cpp similarity index 100% rename from modules/HydrogenServer/io.cpp rename to src/modules/deviceloader/io.cpp diff --git a/modules/HydrogenServer/io.hpp b/src/modules/deviceloader/io.hpp similarity index 100% rename from modules/HydrogenServer/io.hpp rename to src/modules/deviceloader/io.hpp diff --git a/src/modules/deviceloader/local_driver.cpp b/src/modules/deviceloader/local_driver.cpp new file mode 100644 index 00000000..e256d38e --- /dev/null +++ b/src/modules/deviceloader/local_driver.cpp @@ -0,0 +1,535 @@ +#include "local_driver.hpp" + +#include +#include + +#ifdef _WIN32 +#include +#include +#else +#include +#include +#include +#include +#endif + +#include "io.hpp" +#include "lilxml.hpp" +#include "client_info.hpp" +#include "xml_util.hpp" +#include "hydrogen_server.hpp" + +#include "loguru/loguru.hpp" + +LocalDvrInfo::LocalDvrInfo() : DvrInfo(true) +{ +#ifdef USE_LIBUV + eio.data = this; + pidwatcher.data = this; +#else + eio.set(this); + pidwatcher.set(this); +#endif +} + +LocalDvrInfo::LocalDvrInfo(const LocalDvrInfo &model) : DvrInfo(model), + envDev(model.envDev), + envConfig(model.envConfig), + envSkel(model.envSkel), + envPrefix(model.envPrefix) +{ +#ifdef USE_LIBUV + eio.data = this; + pidwatcher.data = this; +#else + eio.set(this); + pidwatcher.set(this); +#endif +} + +LocalDvrInfo::~LocalDvrInfo() +{ + closeEfd(); + if (pid != 0) + { + kill(pid, SIGKILL); /* libev insures there will be no zombies */ + pid = 0; + } + closePid(); +} + +/* start the given local HYDROGEN driver process. + * exit if trouble. + */ +#ifdef _WIN32 +void LocalDvrInfo::start() +{ + Msg *mp; + HANDLE hReadPipe, hWritePipe, hErrorPipe; + SECURITY_ATTRIBUTES sa; + PROCESS_INFORMATION pi; + STARTUPINFO si; + HANDLE hEfd; + DWORD pid; + +#ifdef OSX_EMBEDED_MODE + fprintf(stderr, "STARTING \"%s\"\n", name.c_str()); + fflush(stderr); +#endif + + HANDLE hRp, hWp, hEp; // 修改变量声明为HANDLE类型 + + /* build three pipes: r, w and error*/ + if (useSharedBuffer) + { + // FIXME: lots of FD are opened by hydrogenserver. FD_CLOEXEC is a must + check other fds + sa.nLength = sizeof(SECURITY_ATTRIBUTES); + sa.bInheritHandle = TRUE; + sa.lpSecurityDescriptor = NULL; + if (!CreatePipe(&hReadPipe, &hWritePipe, &sa, 0)) + { + log(fmt::format("CreatePipe: %d\n", GetLastError())); + // Bye(); + } + hErrorPipe = hWritePipe; + } + else + { + + if (!CreatePipe(&hRp, &hWp, NULL, 4096)) + { + log(fmt::format("CreatePipe read: %d\n", GetLastError())); + // Bye(); + } + if (!CreatePipe(&hRp, &hEp, NULL, 4096)) + { + log(fmt::format("CreatePipe error: %d\n", GetLastError())); + // Bye(); + } + hReadPipe = hRp; + hWritePipe = hWp; + hErrorPipe = hEp; + } + + /* fork&exec new process */ + ZeroMemory(&si, sizeof(STARTUPINFO)); + si.cb = sizeof(STARTUPINFO); + si.hStdInput = hReadPipe; + si.hStdOutput = hWritePipe; + si.hStdError = hErrorPipe; + si.dwFlags |= STARTF_USESTDHANDLES; + + if (!CreateProcess(NULL, (LPSTR)name.c_str(), NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi)) + { + log(fmt::format("CreateProcess: %d\n", GetLastError())); + // Bye(); + } + + if (useSharedBuffer) + { + /* don't need child's other socket end */ + CloseHandle(hReadPipe); + + /* record pid, io channels, init lp and snoop list */ + setFds(reinterpret_cast(hWritePipe), reinterpret_cast(hWritePipe)); + } + else + { + /* don't need child's side of pipes */ + CloseHandle(hWritePipe); + CloseHandle(hReadPipe); + + /* record pid, io channels, init lp and snoop list */ + setFds(reinterpret_cast(int) hRp, reinterpret_cast(int) hWp); // 修改为使用新的变量名 + } + + CloseHandle(hErrorPipe); + + // Watch pid + this->pid = pi.dwProcessId; + do + { + (this->pidwatcher).pid = (this->pid); + (this->pidwatcher).flags = !!(1); + } while (0); + ev_child_set() this->pidwatcher.set(this->pid); + // this->pidwatcher.start(); + + // Watch input on efd + hEfd = hEp; // 修改为使用新的变量名 + SetHandleInformation(hEfd, HANDLE_FLAG_INHERIT, 0); + this->efd = _open_osfhandle(reinterpret_cast(hEfd), _O_RDONLY | _O_BINARY); + unsigned long mode = 1; + ioctlsocket(this->efd, FIONBIO, &mode); + this->eio.start(this->efd, ev::READ); + + /* first message primes driver to report its properties -- dev known + * if restarting + */ + if (verbose > 0) + LOG_F(INFO, "pid=%d rfd=%d wfd=%d efd=%d\n", pid, (int)hRp, (int)hWp, (int)hEp); + + XMLEle *root = addXMLEle(NULL, "getProperties"); + addXMLAtt(root, "version", TO_STRING(HYDROGENV)); + mp = new Msg(nullptr, root); + + // pushmsg can kill mp. do at end + pushMsg(mp); +} + +#else +void LocalDvrInfo::start() +{ + Msg *mp; + int rp[2], wp[2], ep[2]; + int ux[2]; + int pid; + +#ifdef OSX_EMBEDED_MODE + fprintf(stderr, "STARTING \"%s\"\n", name.c_str()); + fflush(stderr); +#endif + + /* build three pipes: r, w and error*/ + if (useSharedBuffer) + { + // FIXME: lots of FD are opened by hydrogenserver. FD_CLOEXEC is a must + check other fds + if (socketpair(AF_UNIX, SOCK_STREAM, 0, ux) == -1) + { + log(fmt::format("socketpair: %s\n", strerror(errno))); + // Bye(); + } + } + else + { + if (pipe(rp) < 0) + { + log(fmt::format("read pipe: %s\n", strerror(errno))); + // Bye(); + } + if (pipe(wp) < 0) + { + log(fmt::format("write pipe: %s\n", strerror(errno))); + // Bye(); + } + } + if (pipe(ep) < 0) + { + log(fmt::format("stderr pipe: %s\n", strerror(errno))); + // Bye(); + } + + /* fork&exec new process */ + pid = fork(); + if (pid < 0) + { + log(fmt::format("fork: %s\n", strerror(errno))); + // Bye(); + } + if (pid == 0) + { + /* child: exec name */ + int fd; + + /* rig up pipes */ + if (useSharedBuffer) + { + // For unix socket, the same socket end can be used for both read & write + dup2(ux[0], 0); /* driver stdin reads from ux[0] */ + dup2(ux[0], 1); /* driver stdout writes to ux[0] */ + ::close(ux[0]); + ::close(ux[1]); + } + else + { + dup2(wp[0], 0); /* driver stdin reads from wp[0] */ + dup2(rp[1], 1); /* driver stdout writes to rp[1] */ + } + dup2(ep[1], 2); /* driver stderr writes to e[]1] */ + for (fd = 3; fd < 100; fd++) + (void)::close(fd); + + if (!envDev.empty()) + setenv("HYDROGENDEV", envDev.c_str(), 1); + /* Only reset environment variable in case of FIFO */ + else if (fifo) + unsetenv("HYDROGENDEV"); + if (!envConfig.empty()) + setenv("HYDROGENCONFIG", envConfig.c_str(), 1); + else if (fifo) + unsetenv("HYDROGENCONFIG"); + if (!envSkel.empty()) + setenv("HYDROGENSKEL", envSkel.c_str(), 1); + else if (fifo) + unsetenv("HYDROGENSKEL"); + std::string executable; + if (!envPrefix.empty()) + { + setenv("HYDROGENPREFIX", envPrefix.c_str(), 1); +#if defined(OSX_EMBEDED_MODE) + executable = envPrefix + "/Contents/MacOS/" + name; +#elif defined(__APPLE__) + executable = envPrefix + "/" + name; +#else + executable = envPrefix + "/bin/" + name; +#endif + + fprintf(stderr, "%s\n", executable.c_str()); + + execlp(executable.c_str(), name.c_str(), NULL); + } + else + { + if (name[0] == '.') + { + executable = std::string(dirname((char *)me)) + "/" + name; + execlp(executable.c_str(), name.c_str(), NULL); + } + else + { + execlp(name.c_str(), name.c_str(), NULL); + } + } + +#ifdef OSX_EMBEDED_MODE + fprintf(stderr, "FAILED \"%s\"\n", name.c_str()); + fflush(stderr); +#endif + log(fmt::format("execlp %s: %s\n", executable.c_str(), strerror(errno))); + _exit(1); /* parent will notice EOF shortly */ + } + + if (useSharedBuffer) + { + /* don't need child's other socket end */ + ::close(ux[0]); + + /* record pid, io channels, init lp and snoop list */ + setFds(ux[1], ux[1]); + rp[0] = ux[1]; + wp[1] = ux[1]; + } + else + { + /* don't need child's side of pipes */ + ::close(wp[0]); + ::close(rp[1]); + + /* record pid, io channels, init lp and snoop list */ + setFds(rp[0], wp[1]); + } + + ::close(ep[1]); + + // Watch pid + this->pid = pid; +#ifdef USE_LIBUV + if (this->pidwatcher.loop != nullptr && this->pidwatcher.signal_cb != nullptr) + { + uv_signal_start(&this->pidwatcher, this->pidwatcher.signal_cb, SIGCHLD); + } + +#else + this->pidwatcher.set(pid); + this->pidwatcher.start(); +#endif + + // Watch input on efd + this->efd = ep[0]; + fcntl(this->efd, F_SETFL, fcntl(this->efd, F_GETFL, 0) | O_NONBLOCK); +#ifdef USE_LIBUV + // Start eio file descriptor watcher + if (this->eio.loop != nullptr) + { + uv_poll_start(&this->eio, UV_READABLE, (uv_poll_cb)&LocalDvrInfo::onEfdEvent); + } + +#else + this->eio.start(this->efd, ev::READ); +#endif + + /* first message primes driver to report its properties -- dev known + * if restarting + */ + if (verbose > 0) + LOG_F(INFO, "pid=%d rfd=%d wfd=%d efd=%d\n", pid, rp[0], wp[1], ep[0]); + + XMLEle *root = addXMLEle(NULL, "getProperties"); + addXMLAtt(root, "version", TO_STRING(HYDROGENV)); + mp = new Msg(nullptr, root); + + // pushmsg can kill mp. do at end + pushMsg(mp); +} +#endif + +LocalDvrInfo *LocalDvrInfo::clone() const +{ + return new LocalDvrInfo(*this); +} + +void LocalDvrInfo::closeEfd() +{ + ::close(efd); + efd = -1; +#ifdef USE_LIBUV + uv_poll_stop(&eio); +#else + eio.stop(); +#endif +} + +void LocalDvrInfo::closePid() +{ + pid = 0; +#ifdef USE_LIBUV + uv_signal_stop(&pidwatcher); +#else + pidwatcher.stop(); +#endif +} + +#ifdef USE_LIBUV + +void LocalDvrInfo::onEfdEvent(uv_poll_t *handle, int status, int events) +{ + if (status < 0) + { + // 处理错误情况 + const char *error = uv_strerror(status); + LOG_F(ERROR, "Error on stderr: {}", error); + closeEfd(); + return; + } + + if (events & UV_READABLE) + { + ssize_t nr; + + /* read more */ + nr = read(efd, errbuff + errbuffpos, sizeof(errbuff) - errbuffpos); + if (nr <= 0) + { + if (nr < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + + LOG_F(ERROR, "stderr {}", strerror(errno)); + } + else + LOG_F(ERROR, "stderr EOF"); + closeEfd(); + return; + } + errbuffpos += nr; + + for (int i = 0; i < errbuffpos; ++i) + { + if (errbuff[i] == '\n') + { + LOG_F(ERROR, "{}{}", (int)i, errbuff); + i++; /* count including nl */ + errbuffpos -= i; /* remove from nexbuf */ + memmove(errbuff, errbuff + i, errbuffpos); /* slide remaining to front */ + i = -1; /* restart for loop scan */ + } + } + } +} + +#ifdef _WIN32 +void LocalDvrInfo::onPidEvent(uv_signal_t *handle, int exit_status) +#else +void LocalDvrInfo::onPidEvent(uv_process_t *handle, int exit_status, int term_signal) +#endif +{ + // 处理pid事件 + if (exit_status == UV_SIGNAL && handle->status) + { + if (WIFEXITED(pidwatcher.signum)) + { + LOG_F(ERROR, "process {} exited with status {}", pid, WEXITSTATUS(pidwatcher.signum)); + } + else if (WIFSIGNALED(pidwatcher.signum)) + { + int exit_status = WTERMSIG(pidwatcher.signum); + LOG_F(ERROR, "process {} killed with signal {} - {}", pid, exit_status, strsignal(exit_status)); + } + pid = 0; + uv_signal_stop(&pidwatcher); + } +} + +#else +void LocalDvrInfo::onEfdEvent(ev::io &, int revents) +{ + if (EV_ERROR & revents) + { + int sockErrno = readFdError(this->efd); + if (sockErrno) + { + LOG_F(ERROR, "Error on stderr: {}", strerror(sockErrno)); + closeEfd(); + } + return; + } + + if (revents & EV_READ) + { + ssize_t nr; + + /* read more */ + nr = read(efd, errbuff + errbuffpos, sizeof(errbuff) - errbuffpos); + if (nr <= 0) + { + if (nr < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + + LOG_F(ERROR, "stderr {}", strerror(errno)); + } + else + LOG_F(ERROR, "stderr EOF"); + closeEfd(); + return; + } + errbuffpos += nr; + + for (int i = 0; i < errbuffpos; ++i) + { + if (errbuff[i] == '\n') + { + LOG_F(ERROR, "{}{}", (int)i, errbuff); + i++; /* count including nl */ + errbuffpos -= i; /* remove from nexbuf */ + memmove(errbuff, errbuff + i, errbuffpos); /* slide remaining to front */ + i = -1; /* restart for loop scan */ + } + } + } +} + +#ifdef _WIN32 +void LocalDvrInfo::onPidEvent(ev_child &, int revents) +#else +void LocalDvrInfo::onPidEvent(ev::child &, int revents) +#endif +{ + if (revents & EV_CHILD) + { + if (WIFEXITED(pidwatcher.rstatus)) + { + LOG_F(ERROR, "process {} exited with status {}", pid, WEXITSTATUS(pidwatcher.rstatus)); + } + else if (WIFSIGNALED(pidwatcher.rstatus)) + { + int exit_status = WTERMSIG(pidwatcher.rstatus); + LOG_F(ERROR, "process {} killed with signal {} - {}", pid, exit_status, strsignal(exit_status)); + } + pid = 0; + this->pidwatcher.stop(); + } +} +#endif \ No newline at end of file diff --git a/src/modules/deviceloader/local_driver.hpp b/src/modules/deviceloader/local_driver.hpp new file mode 100644 index 00000000..41039ebf --- /dev/null +++ b/src/modules/deviceloader/local_driver.hpp @@ -0,0 +1,55 @@ +#include "driver_info.hpp" + +class LocalDvrInfo : public DvrInfo +{ + char errbuff[1024]; /* buffer for stderr pipe. line too long will be clipped */ + int errbuffpos = 0; /* first free pos in buffer */ +#ifdef USE_LIBUV + uv_poll_t eio; // 替换 ev::io eio; + void onEfdEvent(uv_poll_t *handle, int status, int events); // 替换 void onEfdEvent(ev::io &watcher, int revents); + uv_signal_t pidwatcher; // 替换 ev::child pidwatcher; + void onPidEvent(uv_process_t *handle, int exit_status, int term_signal); // 替换 void onPidEvent(ev_child &watcher, int revents); +#else + ev::io eio; /* Event loop io events */ + void onEfdEvent(ev::io &watcher, int revents); /* callback for data on efd */ + +#ifdef _WIN32 + ev_child pidwatcher; +#else + ev::child pidwatcher; +#endif + +#ifdef _WIN32 + void onPidEvent(ev_child &watcher, int revents); +#else + void onPidEvent(ev::child &watcher, int revents); +#endif +#endif + + int pid = 0; /* process id or 0 for N/A (not started/terminated) */ + int efd = -1; /* stderr from driver, or -1 when N/A */ + + void closeEfd(); + void closePid(); + +protected: + LocalDvrInfo(const LocalDvrInfo &model); + +public: + std::string envDev; + std::string envConfig; + std::string envSkel; + std::string envPrefix; + + LocalDvrInfo(); + virtual ~LocalDvrInfo(); + + virtual void start(); + + virtual LocalDvrInfo *clone() const; + + virtual const std::string remoteServerUid() const + { + return ""; + } +}; \ No newline at end of file diff --git a/modules/HydrogenServer/message.cpp b/src/modules/deviceloader/message.cpp similarity index 100% rename from modules/HydrogenServer/message.cpp rename to src/modules/deviceloader/message.cpp diff --git a/modules/HydrogenServer/message.hpp b/src/modules/deviceloader/message.hpp similarity index 100% rename from modules/HydrogenServer/message.hpp rename to src/modules/deviceloader/message.hpp diff --git a/modules/HydrogenServer/message_queue.cpp b/src/modules/deviceloader/message_queue.cpp similarity index 81% rename from modules/HydrogenServer/message_queue.cpp rename to src/modules/deviceloader/message_queue.cpp index da4e8048..3989e6a0 100644 --- a/modules/HydrogenServer/message_queue.cpp +++ b/src/modules/deviceloader/message_queue.cpp @@ -20,16 +20,28 @@ MsgQueue::MsgQueue(bool useSharedBuffer) : useSharedBuffer(useSharedBuffer) { lp = newLilXML(); +#ifdef USE_LIBUV + uv_poll_init(uv_default_loop(), &rio, rFd); + uv_poll_init(uv_default_loop(), &wio, wFd); + rio.data = this; + wio.data = this; +#else rio.set(this); wio.set(this); +#endif rFd = -1; wFd = -1; } MsgQueue::~MsgQueue() { +#ifdef USE_LIBUV + uv_poll_stop(&rio); + uv_poll_stop(&wio); +#else rio.stop(); wio.stop(); +#endif clearMsgQueue(); delLilXML(lp); @@ -85,6 +97,56 @@ void MsgQueue::closeWritePart() } } +#ifdef USE_LIBUV +void MsgQueue::setFds(int rFd, int wFd) +{ + if (this->rFd != -1) + { + uv_poll_stop(&rio); + uv_poll_stop(&wio); + ::close(this->rFd); + if (this->rFd != this->wFd) + { + ::close(this->wFd); + } + } + else if (this->wFd != -1) + { + uv_poll_stop(&wio); + ::close(this->wFd); + } + + this->rFd = rFd; + this->wFd = wFd; + this->nsent.reset(); + + if (rFd != -1) + { +#ifdef _WIN32 + int fd = _fileno(reinterpret_cast(_get_osfhandle(rFd))); + _setmode(fd, _O_BINARY); +#else + fcntl(rFd, F_SETFL, fcntl(rFd, F_GETFL, 0) | O_NONBLOCK); +#endif + + if (wFd != rFd) + { +#ifdef _WIN32 + int fd = _fileno(reinterpret_cast(_get_osfhandle(wFd))); + _setmode(fd, _O_BINARY); +#else + fcntl(wFd, F_SETFL, fcntl(wFd, F_GETFL, 0) | O_NONBLOCK); +#endif + } + + uv_poll_init(uv_default_loop(), &rio, rFd); + uv_poll_init(uv_default_loop(), &wio, wFd); + rio.data = this; + wio.data = this; + updateIos(); + } +} +#else void MsgQueue::setFds(int rFd, int wFd) { if (this->rFd != -1) @@ -131,6 +193,7 @@ void MsgQueue::setFds(int rFd, int wFd) updateIos(); } } +#endif SerializedMsg *MsgQueue::headMsg() const { @@ -166,6 +229,48 @@ void MsgQueue::pushMsg(Msg *mp) updateIos(); } +#ifdef USE_LIBUV +void MsgQueue::updateIos() +{ + if (wFd != -1) + { + if (msgq.empty() || !msgq.front()->requestContent(nsent)) + { + uv_poll_stop(&wio); + } + else + { + uv_poll_start(&wio, UV_WRITABLE, IOCallback); + } + } + if (rFd != -1) + { + uv_poll_start(&rio, UV_READABLE, IOCallback); + } +} + +// IO事件回调函数 +void MsgQueue::IOCallback(uv_poll_t *handle, int status, int events) +{ + MsgQueue *msgQueue = static_cast(handle->data); + if (status < 0) + { + // 处理错误情况 + return; + } + + if (events & UV_READABLE) + { + // 处理可读事件 + } + + if (events & UV_WRITABLE) + { + // 处理可写事件 + } +} + +#else void MsgQueue::updateIos() { if (wFd != -1) @@ -184,6 +289,7 @@ void MsgQueue::updateIos() rio.start(); } } +#endif void MsgQueue::messageMayHaveProgressed(const SerializedMsg *msg) { @@ -206,7 +312,11 @@ void MsgQueue::clearMsgQueue() // Cancel io write events updateIos(); +#ifdef USE_LIBUV + uv_poll_stop(&wio); +#else wio.stop(); +#endif } unsigned long MsgQueue::msgQSize() const @@ -222,6 +332,42 @@ unsigned long MsgQueue::msgQSize() const return (l); } +#ifdef USE_LIBUV +void MsgQueue::ioCb(uv_poll_t *handle, int status, int revents) +{ + if (status < 0) + { + int sockErrno = readFdError(this->rFd); + if ((!sockErrno) && this->wFd != this->rFd) + { + sockErrno = readFdError(this->wFd); + } + + if (sockErrno) + { + // log(fmt("Communication error: %s\n", strerror(sockErrno))); + close(); + return; + } + } + + if (UV_READABLE & revents) + readFromFd(); + + if (UV_WRITABLE & revents) + writeToFd(); +} + +void MsgQueue::setReadWriteCallback() +{ + uv_poll_init(uv_default_loop(), &rio, rFd); + uv_poll_init(uv_default_loop(), &wio, wFd); + rio.data = this; + wio.data = this; + uv_poll_start(&rio, UV_READABLE, IOCallback); + uv_poll_start(&wio, UV_WRITABLE, IOCallback); +} +#else void MsgQueue::ioCb(ev::io &, int revents) { if (EV_ERROR & revents) @@ -246,6 +392,7 @@ void MsgQueue::ioCb(ev::io &, int revents) if (revents & EV_WRITE) writeToFd(); } +#endif #ifdef _WIN32 size_t MsgQueue::doRead(char *buf, size_t nr) @@ -593,7 +740,11 @@ void MsgQueue::writeToFd() { if (!mp->getContent(nsent, data, nsend, sharedBuffers)) { +#ifdef USE_LIBUV + uv_poll_stop(&wio); +#else wio.stop(); +#endif return; } @@ -705,7 +856,7 @@ void MsgQueue::crackBLOB(const char *enableBLOB, BLOBHandling *bp) void MsgQueue::traceMsg(const std::string &logMsg, XMLEle *root) { - LOG_F(INFO, "%s", logMsg.c_str()); + LOG_F(INFO, "{}", logMsg); static const char *prtags[] = { @@ -723,23 +874,27 @@ void MsgQueue::traceMsg(const std::string &logMsg, XMLEle *root) unsigned int i; /* print tag header */ - fprintf(stderr, "%s %s %s %s", tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name"), - findXMLAttValu(root, "state")); + // fprintf(stderr, "%s %s %s %s", tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name"), + // findXMLAttValu(root, "state")); + LOG_F(ERROR, "{} {} {} {}", tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name"), + findXMLAttValu(root, "state")); pcd = pcdataXMLEle(root); if (pcd[0]) - fprintf(stderr, " %s", pcd); + // fprintf(stderr, " %s", pcd); + LOG_F(ERROR, "{}", pcd); perm = findXMLAttValu(root, "perm"); if (perm[0]) - fprintf(stderr, " %s", perm); + // fprintf(stderr, " %s", perm); + LOG_F(ERROR, "{}", perm); msg = findXMLAttValu(root, "message"); if (msg[0]) - fprintf(stderr, " '%s'", msg); + // fprintf(stderr, " '%s'", msg); + LOG_F(ERROR, "{}", msg); /* print each array value */ for (e = nextXMLEle(root, 1); e; e = nextXMLEle(root, 0)) for (i = 0; i < sizeof(prtags) / sizeof(prtags[0]); i++) if (strcmp(prtags[i], tagXMLEle(e)) == 0) - fprintf(stderr, "\n %10s='%s'", findXMLAttValu(e, "name"), pcdataXMLEle(e)); - - fprintf(stderr, "\n"); + // fprintf(stderr, "\n %10s='%s'", findXMLAttValu(e, "name"), pcdataXMLEle(e)); + LOG_F(ERROR, "{:<10}='{}'", findXMLAttValu(e, "name"), pcdataXMLEle(e)); } \ No newline at end of file diff --git a/modules/HydrogenServer/message_queue.hpp b/src/modules/deviceloader/message_queue.hpp similarity index 90% rename from modules/HydrogenServer/message_queue.hpp rename to src/modules/deviceloader/message_queue.hpp index d599f263..c4dd6732 100644 --- a/modules/HydrogenServer/message_queue.hpp +++ b/src/modules/deviceloader/message_queue.hpp @@ -13,14 +13,25 @@ #include #endif +#ifdef USE_LIBUV +#include +#else #include +#endif class MsgQueue : public Collectable { int rFd, wFd; LilXML *lp; /* XML parsing context */ +#ifdef USE_LIBUV + uv_poll_t rio, wio; // Event loop io events + void ioCb(uv_poll_t *handle, int status, int revents); + static void IOCallback(uv_poll_t* handle, int status, int events); + void setReadWriteCallback(); +#else ev::io rio, wio; /* Event loop io events */ void ioCb(ev::io &watcher, int revents); +#endif // Update the status of FD read/write ability void updateIos(); diff --git a/modules/HydrogenServer/property.cpp b/src/modules/deviceloader/property.cpp similarity index 100% rename from modules/HydrogenServer/property.cpp rename to src/modules/deviceloader/property.cpp diff --git a/modules/HydrogenServer/property.hpp b/src/modules/deviceloader/property.hpp similarity index 100% rename from modules/HydrogenServer/property.hpp rename to src/modules/deviceloader/property.hpp diff --git a/src/modules/deviceloader/remote_driver.cpp b/src/modules/deviceloader/remote_driver.cpp new file mode 100644 index 00000000..914d5851 --- /dev/null +++ b/src/modules/deviceloader/remote_driver.cpp @@ -0,0 +1,172 @@ +#include "remote_driver.hpp" + +#ifdef _WIN32 +#include +#include +#else +#include +#include +#include +#include +#endif + +#include "loguru/loguru.hpp" + +#include "hydrogen_server.hpp" + +#include "loguru/loguru.hpp" + +RemoteDvrInfo::RemoteDvrInfo() : DvrInfo(false) +{ +} + +RemoteDvrInfo::RemoteDvrInfo(const RemoteDvrInfo &model) : DvrInfo(model), + host(model.host), + port(model.port) +{ +} + +RemoteDvrInfo::~RemoteDvrInfo() +{ +} + +RemoteDvrInfo *RemoteDvrInfo::clone() const +{ + return new RemoteDvrInfo(*this); +} + +/* + Old: + void RemoteDvrInfo::extractRemoteId(const std::string &name, std::string &o_host, int &o_port, std::string &o_dev) const + { + char dev[MAXHYDROGENDEVICE] = {0}; + char host[MAXSBUF] = {0}; + + int hydrogen_port = HYDROGENPORT; + if (sscanf(name.c_str(), "%[^@]@%[^:]:%d", dev, host, &hydrogen_port) < 2) + { + // Device missing? Try a different syntax for all devices + if (sscanf(name.c_str(), "@%[^:]:%d", host, &hydrogen_port) < 1) + { + log(fmt::format("Bad remote device syntax: %s\n", name.c_str())); + // Bye(); + } + } + + o_host = host; + o_port = hydrogen_port; + o_dev = dev; + } +*/ + +void RemoteDvrInfo::extractRemoteId(const std::string &name, std::string &o_host, int &o_port, std::string &o_dev) const +{ + std::string dev; + std::string host; + + // Extract host and port from name + size_t atPos = name.find('@'); + size_t colonPos = name.find(':'); + size_t portEndPos = name.size(); + + if (atPos != std::string::npos && colonPos != std::string::npos && colonPos > atPos + 1) + { + dev = name.substr(0, atPos); + host = name.substr(atPos + 1, colonPos - atPos - 1); + o_port = std::stoi(name.substr(colonPos + 1, portEndPos - colonPos - 1)); + } + else if (atPos == std::string::npos && colonPos != std::string::npos && colonPos > 0) + { + host = name.substr(0, colonPos); + o_port = std::stoi(name.substr(colonPos + 1, portEndPos - colonPos - 1)); + } + + o_host = host; + o_dev = dev; +} + +/* start the given remote HYDROGEN driver connection. + * exit if trouble. + */ +void RemoteDvrInfo::start() +{ + int sockfd; + std::string dev; + extractRemoteId(name, host, port, dev); + + /* connect */ + sockfd = openHYDROGENServer(); + + /* record flag pid, io channels, init lp and snoop list */ + + this->setFds(sockfd, sockfd); + + if (verbose > 0) + LOG_F(INFO, "socket={}", sockfd); + + /* N.B. storing name now is key to limiting outbound traffic to this + * dev. + */ + if (!dev.empty()) + this->dev.insert(dev); + + /* Sending getProperties with device lets remote server limit its + * outbound (and our inbound) traffic on this socket to this device. + */ + XMLEle *root = addXMLEle(NULL, "getProperties"); + + if (!dev.empty()) + { + addXMLAtt(root, "device", dev.c_str()); + addXMLAtt(root, "version", TO_STRING(HYDROGENV)); + } + else + { + // This informs downstream server that it is connecting to an upstream server + // and not a regular client. The difference is in how it treats snooping properties + // among properties. + addXMLAtt(root, "device", "*"); + addXMLAtt(root, "version", TO_STRING(HYDROGENV)); + } + + Msg *mp = new Msg(nullptr, root); + + // pushmsg can kill this. do at end + pushMsg(mp); +} + +int RemoteDvrInfo::openHYDROGENServer() +{ + struct sockaddr_in serv_addr; + struct hostent *hp; + int sockfd; + + /* lookup host address */ + hp = gethostbyname(host.c_str()); + if (!hp) + { + LOG_F(ERROR, "gethostbyname({}): {}", host.c_str(), strerror(errno)); + // Bye(); + } + + /* create a socket to the HYDROGEN server */ + (void)memset((char *)&serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = ((struct in_addr *)(hp->h_addr_list[0]))->s_addr; + serv_addr.sin_port = htons(port); + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) + { + LOG_F(ERROR, "socket({},{}): {}", host.c_str(), port, strerror(errno)); + // Bye(); + } + + /* connect */ + if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) + { + LOG_F(ERROR, "connect({},{}): {}", host.c_str(), port, strerror(errno)); + // Bye(); + } + + /* ok */ + return (sockfd); +} \ No newline at end of file diff --git a/src/modules/deviceloader/remote_driver.hpp b/src/modules/deviceloader/remote_driver.hpp new file mode 100644 index 00000000..544d5535 --- /dev/null +++ b/src/modules/deviceloader/remote_driver.hpp @@ -0,0 +1,30 @@ +#include "driver_info.hpp" + +class RemoteDvrInfo : public DvrInfo +{ + /* open a connection to the given host and port or die. + * return socket fd. + */ + int openHYDROGENServer(); + + void extractRemoteId(const std::string &name, std::string &o_host, int &o_port, std::string &o_dev) const; + +protected: + RemoteDvrInfo(const RemoteDvrInfo &model); + +public: + std::string host; + int port; + + RemoteDvrInfo(); + virtual ~RemoteDvrInfo(); + + virtual void start(); + + virtual RemoteDvrInfo *clone() const; + + virtual const std::string remoteServerUid() const + { + return std::string(host) + ":" + std::to_string(port); + } +}; \ No newline at end of file diff --git a/modules/HydrogenServer/serialize.cpp b/src/modules/deviceloader/serialize.cpp similarity index 96% rename from modules/HydrogenServer/serialize.cpp rename to src/modules/deviceloader/serialize.cpp index 8099e842..8691d80b 100644 --- a/modules/HydrogenServer/serialize.cpp +++ b/src/modules/deviceloader/serialize.cpp @@ -15,6 +15,7 @@ #include "message.hpp" #include "message_queue.hpp" #include "xml_util.hpp" +#include "hydrogen_server.hpp" #include "loguru/loguru.hpp" @@ -31,7 +32,12 @@ SerializedMsg::SerializedMsg(Msg *parent) : asyncProgress(), owner(parent), awai } requirements.xml = true; asyncStatus = PENDING; +#ifdef USE_LIBUV + uv_async_init(loop, &asyncProgress, (uv_async_cb)&SerializedMsg::async_progressed); + asyncProgress.data = this; +#else asyncProgress.set(this); +#endif } // Delete occurs when no async task is running and no awaiters are left @@ -57,7 +63,11 @@ void SerializedMsg::async_updateRequirement(const SerializationRequirement &req) return; } this->requirements = req; +#ifdef USE_LIBUV + uv_async_send(&asyncProgress); +#else asyncProgress.send(); +#endif } void SerializedMsg::async_pushChunck(const MsgChunck &m) @@ -65,14 +75,22 @@ void SerializedMsg::async_pushChunck(const MsgChunck &m) std::lock_guard guard(lock); this->chuncks.push_back(m); +#ifdef USE_LIBUV + uv_async_send(&asyncProgress); +#else asyncProgress.send(); +#endif } void SerializedMsg::async_done() { std::lock_guard guard(lock); asyncStatus = TERMINATED; +#ifdef USE_LIBUV + uv_async_send(&asyncProgress); +#else asyncProgress.send(); +#endif } void SerializedMsg::async_start() @@ -86,7 +104,11 @@ void SerializedMsg::async_start() asyncStatus = RUNNING; if (generateContentAsync()) { +#ifdef USE_LIBUV + uv_async_send(&asyncProgress); +#else asyncProgress.start(); +#endif std::thread t([this]() { generateContent(); }); @@ -104,8 +126,12 @@ void SerializedMsg::async_progressed() if (asyncStatus == TERMINATED) { - // FIXME: unblock ? +// FIXME: unblock ? +#ifdef USE_LIBUV + uv_close((uv_handle_t *)&this->asyncProgress, NULL); +#else asyncProgress.stop(); +#endif } // Update ios of awaiters diff --git a/modules/HydrogenServer/serialize.hpp b/src/modules/deviceloader/serialize.hpp similarity index 97% rename from modules/HydrogenServer/serialize.hpp rename to src/modules/deviceloader/serialize.hpp index e6a60ec2..67231fe2 100644 --- a/modules/HydrogenServer/serialize.hpp +++ b/src/modules/deviceloader/serialize.hpp @@ -5,7 +5,11 @@ #include #include +#ifdef USE_LIBUV +#include +#else #include +#endif class Msg; class MsgQueue; @@ -56,7 +60,11 @@ class SerializedMsg friend class MsgChunckIterator; std::recursive_mutex lock; +#ifdef USE_LIBUV + uv_async_t asyncProgress; +#else ev::async asyncProgress; +#endif // Start a thread for execution of asyncRun void async_start(); diff --git a/src/modules/deviceloader/signal.cpp b/src/modules/deviceloader/signal.cpp new file mode 100644 index 00000000..d249c59e --- /dev/null +++ b/src/modules/deviceloader/signal.cpp @@ -0,0 +1,64 @@ +#include "signal.hpp" +#include + +std::map SignalHandler::handlers; + +void SignalHandler::handleSignal(int signal) +{ + auto it = handlers.find(signal); + if (it != handlers.end()) + { + it->second(); + } +} + +void SignalHandler::registerHandler(int signal, const SignalHandlerFunc &handlerFunc) +{ +#ifdef _WIN32 + if (signal == SIGINT || signal == SIGTERM) + { + SetConsoleCtrlHandler(handleConsoleEvent, TRUE); + } + else + { + handlers[signal] = handlerFunc; + } +#else + handlers[signal] = handlerFunc; + std::signal(signal, handleSignal); +#endif +} + +void SignalHandler::unregisterHandler(int signal) +{ +#ifdef _WIN32 + if (signal == SIGINT || signal == SIGTERM) + { + SetConsoleCtrlHandler(handleConsoleEvent, FALSE); + } + else + { + handlers.erase(signal); + } +#else + handlers.erase(signal); + std::signal(signal, SIG_DFL); +#endif +} + +#ifdef _WIN32 +BOOL WINAPI SignalHandler::handleConsoleEvent(DWORD eventType) +{ + switch (eventType) + { + case CTRL_C_EVENT: + case CTRL_BREAK_EVENT: + case CTRL_CLOSE_EVENT: + case CTRL_SHUTDOWN_EVENT: + handleSignal(SIGINT); + return TRUE; + default: + return FALSE; + } +} +#endif \ No newline at end of file diff --git a/src/modules/deviceloader/signal.hpp b/src/modules/deviceloader/signal.hpp new file mode 100644 index 00000000..f2cbd4dc --- /dev/null +++ b/src/modules/deviceloader/signal.hpp @@ -0,0 +1,28 @@ +#ifndef SIGNALHANDLER_H +#define SIGNALHANDLER_H + +#include +#include + +#ifdef _WIN32 +#include +#endif + +class SignalHandler +{ +public: + using SignalHandlerFunc = std::function; + + static void handleSignal(int signal); + static void registerHandler(int signal, const SignalHandlerFunc &handlerFunc); + static void unregisterHandler(int signal); + +private: + static std::map handlers; + +#ifdef _WIN32 + static BOOL WINAPI handleConsoleEvent(DWORD eventType); +#endif +}; + +#endif // SIGNALHANDLER_H diff --git a/modules/HydrogenServer/tcp_server.cpp b/src/modules/deviceloader/tcp_server.cpp similarity index 58% rename from modules/HydrogenServer/tcp_server.cpp rename to src/modules/deviceloader/tcp_server.cpp index 5cc2930a..91f89fca 100644 --- a/modules/HydrogenServer/tcp_server.cpp +++ b/src/modules/deviceloader/tcp_server.cpp @@ -16,159 +16,128 @@ #include "loguru/loguru.hpp" -#ifdef ENABLE_HYDROGEN_SHARED_MEMORY - -UnixServer::UnixServer(const std::string &path) : path(path) -{ - sfdev.set(this); -} - -void UnixServer::log(const std::string &str) const +TcpServer::TcpServer(int port) : port(port) { - std::string logLine = "Local server: "; - logLine += str; - ::log(logLine); +#ifdef USE_LIBUV + poll_watcher.data = this; +#else + sfdev.set(this); +#endif } -void UnixServer::ioCb(ev::io &, int revents) +#ifdef USE_LIBUV +void TcpServer::ioCb(uv_poll_t *watcher, int status, int revents) { - if (revents & EV_ERROR) + if (status < 0) { - int sockErrno = readFdError(this->sfd); - if (sockErrno) - { - log(fmt("Error on unix socket: %s\n", strerror(sockErrno))); - // Bye(); - } + LOG_F(ERROR, "Error on tcp server socket: {}", uv_strerror(status)); + // Bye(); } - if (revents & EV_READ) + else if (revents & UV_READABLE) { accept(); } } -static void initUnixSocketAddr(const std::string &unixAddr, struct sockaddr_un &serv_addr_un, socklen_t &addrlen, bool bind) -{ - memset(&serv_addr_un, 0, sizeof(serv_addr_un)); - serv_addr_un.sun_family = AF_UNIX; - -#ifdef __linux__ - (void)bind; - - // Using abstract socket path to avoid filesystem boilerplate - strncpy(serv_addr_un.sun_path + 1, unixAddr.c_str(), sizeof(serv_addr_un.sun_path) - 1); - - int len = offsetof(struct sockaddr_un, sun_path) + unixAddr.size() + 1; - - addrlen = len; -#else - // Using filesystem socket path - strncpy(serv_addr_un.sun_path, unixAddr.c_str(), sizeof(serv_addr_un.sun_path) - 1); - - int len = offsetof(struct sockaddr_un, sun_path) + unixAddr.size(); - - if (bind) - { - unlink(unixAddr.c_str()); - } -#endif - addrlen = len; -} - -void UnixServer::listen() +void TcpServer::listen() { - struct sockaddr_un serv_socket; + struct sockaddr_in serv_socket; + int reuse = 1; /* make socket endpoint */ - if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) + if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - log(fmt("socket: %s\n", strerror(errno))); + LOG_F(ERROR, "socket: %s\n", strerror(errno)); // Bye(); } - int reuse = 1; + /* bind to given port for any IP address */ + memset(&serv_socket, 0, sizeof(serv_socket)); + serv_socket.sin_family = AF_INET; +#ifdef SSH_TUNNEL + serv_socket.sin_addr.s_addr = htonl(INADDR_LOOPBACK); +#else + serv_socket.sin_addr.s_addr = htonl(INADDR_ANY); +#endif + serv_socket.sin_port = htons((unsigned short)port); +#ifdef _WIN32 + if (setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&reuse, sizeof(reuse)) < 0) + { + LOG_F(ERROR, "Failed to set receive timeout."); + close(sfd); + sfd = -1; + return; + } +#else if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) { - log(fmt("setsockopt: %s\n", strerror(errno))); + LOG_F(ERROR, "setsockopt: %s\n", strerror(errno)); // Bye(); } - - /* bind to given path as unix address */ - socklen_t len; - initUnixSocketAddr(path, serv_socket, len, true); - - if (bind(sfd, (struct sockaddr *)&serv_socket, len) < 0) +#endif + if (bind(sfd, (struct sockaddr *)&serv_socket, sizeof(serv_socket)) < 0) { - log(fmt("bind: %s\n", strerror(errno))); + LOG_F(ERROR, "bind: %s\n", strerror(errno)); // Bye(); } /* willing to accept connections with a backlog of 5 pending */ if (::listen(sfd, 5) < 0) { - log(fmt("listen: %s\n", strerror(errno))); + LOG_F(ERROR, "listen: %s\n", strerror(errno)); // Bye(); } +#ifdef _WIN32 + int fd = _fileno(reinterpret_cast(_get_osfhandle(sfd))); + _setmode(fd, _O_BINARY); +#else fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL, 0) | O_NONBLOCK); - sfdev.start(sfd, EV_READ); +#endif + + // 创建uv_poll_t对象,并注册READABLE事件回调函数 + uv_poll_init_socket(loop, &poll_watcher, sfd); + uv_poll_start(&poll_watcher, UV_READABLE, (uv_poll_cb)&TcpServer::ioCb); + + /* 开始事件循环 */ + uv_run(loop, UV_RUN_DEFAULT); /* ok */ if (verbose > 0) - log(fmt("listening on local domain at: @%s\n", path.c_str())); + LOG_F(INFO, "listening to port %d on fd %d\n", port, sfd); } -void UnixServer::accept() + +void TcpServer::accept() { + struct sockaddr_in cli_socket; + socklen_t cli_len; int cli_fd; /* get a private connection to new client */ - cli_fd = ::accept(sfd, 0, 0); + cli_len = sizeof(cli_socket); + cli_fd = ::accept(sfd, (struct sockaddr *)&cli_socket, &cli_len); if (cli_fd < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) return; - log(fmt("accept: %s\n", strerror(errno))); + LOG_F(ERROR, "accept: %s\n", strerror(errno)); // Bye(); } - ClInfo *cp = new ClInfo(true); + ClInfo *cp = new ClInfo(false); /* rig up new clinfo entry */ cp->setFds(cli_fd, cli_fd); - if (verbose > 0) - { -#ifdef SO_PEERCRED - struct ucred ucred; - - socklen_t len = sizeof(struct ucred); - if (getsockopt(cli_fd, SOL_SOCKET, SO_PEERCRED, &ucred, &len) == -1) - { - log(fmt("getsockopt failed: %s\n", strerror(errno))); - // Bye(); - } - - cp->log(fmt("new arrival from local pid %ld (user: %ld:%ld) - welcome!\n", (long)ucred.pid, (long)ucred.uid, - (long)ucred.gid)); -#else - cp->log(fmt("new arrival from local domain - welcome!\n")); -#endif - } - #ifdef OSX_EMBEDED_MODE fprintf(stderr, "CLIENTS %d\n", clients.size()); fflush(stderr); #endif } -#endif // ENABLE_HYDROGEN_SHARED_MEMORY - -TcpServer::TcpServer(int port) : port(port) -{ - sfdev.set(this); -} +#else void TcpServer::ioCb(ev::io &, int revents) { @@ -237,7 +206,7 @@ void TcpServer::listen() } #ifdef _WIN32 - int fd = _fileno(reinterpret_cast(_get_osfhandle(sfd))); + int fd = _fileno(reinterpret_cast(_get_osfhandle(sfd))); _setmode(fd, _O_BINARY); #else fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL, 0) | O_NONBLOCK); @@ -277,3 +246,4 @@ void TcpServer::accept() fflush(stderr); #endif } +#endif diff --git a/src/modules/deviceloader/tcp_server.hpp b/src/modules/deviceloader/tcp_server.hpp new file mode 100644 index 00000000..6f672ccf --- /dev/null +++ b/src/modules/deviceloader/tcp_server.hpp @@ -0,0 +1,44 @@ +#pragma once + +#ifdef _WIN32 + +#else +#include +#endif + +#ifdef USE_LIBUV +#include +#else +#include +#endif + +class TcpServer +{ + int port; + int sfd = -1; +#ifdef USE_LIBUV + uv_poll_t poll_watcher; +#else + ev::io sfdev; +#endif + + +#ifdef USE_LIBUV + void ioCb(uv_poll_t *watcher, int status, int revents); + void accept(); +#else + void ioCb(ev::io &watcher, int revents); + /* prepare for new client arriving on socket. + * exit if trouble. + */ + void accept(); +#endif + +public: + TcpServer(int port); + + /* create the public HYDROGEN Driver endpoint lsocket on port. + * return server socket else exit. + */ + void listen(); +}; diff --git a/modules/HydrogenServer/time.cpp b/src/modules/deviceloader/time.cpp similarity index 100% rename from modules/HydrogenServer/time.cpp rename to src/modules/deviceloader/time.cpp diff --git a/modules/HydrogenServer/time.hpp b/src/modules/deviceloader/time.hpp similarity index 100% rename from modules/HydrogenServer/time.hpp rename to src/modules/deviceloader/time.hpp diff --git a/src/modules/deviceloader/unix_server.cpp b/src/modules/deviceloader/unix_server.cpp new file mode 100644 index 00000000..5ee6ac7f --- /dev/null +++ b/src/modules/deviceloader/unix_server.cpp @@ -0,0 +1,149 @@ + +#ifdef ENABLE_HYDROGEN_SHARED_MEMORY + +UnixServer::UnixServer(const std::string &path) : path(path) +{ + sfdev.set(this); +} + +void UnixServer::log(const std::string &str) const +{ + std::string logLine = "Local server: "; + logLine += str; + ::log(logLine); +} + +void UnixServer::ioCb(ev::io &, int revents) +{ + if (revents & EV_ERROR) + { + int sockErrno = readFdError(this->sfd); + if (sockErrno) + { + log(fmt("Error on unix socket: %s\n", strerror(sockErrno))); + // Bye(); + } + } + if (revents & EV_READ) + { + accept(); + } +} + +static void initUnixSocketAddr(const std::string &unixAddr, struct sockaddr_un &serv_addr_un, socklen_t &addrlen, bool bind) +{ + memset(&serv_addr_un, 0, sizeof(serv_addr_un)); + serv_addr_un.sun_family = AF_UNIX; + +#ifdef __linux__ + (void)bind; + + // Using abstract socket path to avoid filesystem boilerplate + strncpy(serv_addr_un.sun_path + 1, unixAddr.c_str(), sizeof(serv_addr_un.sun_path) - 1); + + int len = offsetof(struct sockaddr_un, sun_path) + unixAddr.size() + 1; + + addrlen = len; +#else + // Using filesystem socket path + strncpy(serv_addr_un.sun_path, unixAddr.c_str(), sizeof(serv_addr_un.sun_path) - 1); + + int len = offsetof(struct sockaddr_un, sun_path) + unixAddr.size(); + + if (bind) + { + unlink(unixAddr.c_str()); + } +#endif + addrlen = len; +} + +void UnixServer::listen() +{ + struct sockaddr_un serv_socket; + + /* make socket endpoint */ + if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) + { + log(fmt("socket: %s\n", strerror(errno))); + // Bye(); + } + + int reuse = 1; + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) + { + log(fmt("setsockopt: %s\n", strerror(errno))); + // Bye(); + } + + /* bind to given path as unix address */ + socklen_t len; + initUnixSocketAddr(path, serv_socket, len, true); + + if (bind(sfd, (struct sockaddr *)&serv_socket, len) < 0) + { + log(fmt("bind: %s\n", strerror(errno))); + // Bye(); + } + + /* willing to accept connections with a backlog of 5 pending */ + if (::listen(sfd, 5) < 0) + { + log(fmt("listen: %s\n", strerror(errno))); + // Bye(); + } + + fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL, 0) | O_NONBLOCK); + sfdev.start(sfd, EV_READ); + + /* ok */ + if (verbose > 0) + log(fmt("listening on local domain at: @%s\n", path.c_str())); +} + +void UnixServer::accept() +{ + int cli_fd; + + /* get a private connection to new client */ + cli_fd = ::accept(sfd, 0, 0); + if (cli_fd < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + + log(fmt("accept: %s\n", strerror(errno))); + // Bye(); + } + + ClInfo *cp = new ClInfo(true); + + /* rig up new clinfo entry */ + cp->setFds(cli_fd, cli_fd); + + if (verbose > 0) + { +#ifdef SO_PEERCRED + struct ucred ucred; + + socklen_t len = sizeof(struct ucred); + if (getsockopt(cli_fd, SOL_SOCKET, SO_PEERCRED, &ucred, &len) == -1) + { + log(fmt("getsockopt failed: %s\n", strerror(errno))); + // Bye(); + } + + cp->log(fmt("new arrival from local pid %ld (user: %ld:%ld) - welcome!\n", (long)ucred.pid, (long)ucred.uid, + (long)ucred.gid)); +#else + cp->log(fmt("new arrival from local domain - welcome!\n")); +#endif + } + +#ifdef OSX_EMBEDED_MODE + fprintf(stderr, "CLIENTS %d\n", clients.size()); + fflush(stderr); +#endif +} + +#endif // ENABLE_HYDROGEN_SHARED_MEMORY \ No newline at end of file diff --git a/modules/HydrogenServer/tcp_server.hpp b/src/modules/deviceloader/unix_server.hpp similarity index 52% rename from modules/HydrogenServer/tcp_server.hpp rename to src/modules/deviceloader/unix_server.hpp index b5f25a32..0b89e1ef 100644 --- a/modules/HydrogenServer/tcp_server.hpp +++ b/src/modules/deviceloader/unix_server.hpp @@ -1,33 +1,3 @@ -#pragma once - -#ifdef _WIN32 - -#else -#include -#endif - -#include - -class TcpServer -{ - int port; - int sfd = -1; - ev::io sfdev; - - /* prepare for new client arriving on socket. - * exit if trouble. - */ - void accept(); - void ioCb(ev::io &watcher, int revents); - -public: - TcpServer(int port); - - /* create the public HYDROGEN Driver endpoint lsocket on port. - * return server socket else exit. - */ - void listen(); -}; #ifdef ENABLE_HYDROGEN_SHARED_MEMORY diff --git a/modules/HydrogenServer/xml_util.cpp b/src/modules/deviceloader/xml_util.cpp similarity index 100% rename from modules/HydrogenServer/xml_util.cpp rename to src/modules/deviceloader/xml_util.cpp diff --git a/modules/HydrogenServer/xml_util.hpp b/src/modules/deviceloader/xml_util.hpp similarity index 100% rename from modules/HydrogenServer/xml_util.hpp rename to src/modules/deviceloader/xml_util.hpp