#include #include #include #include #include #include #include #include #include #include #include #include #include #include "taskmanager.h" TaskManager::TaskManager(QObject* parent) : QObject(parent), m_loop(nullptr) { } TaskManager::~TaskManager() { } void TaskManager::open(const QUrl &bindingAddress) { qRegisterMetaType>("std::shared_ptr"); m_loop.reset(); m_loop = MessageEngineFactory::getInstance().getMessageEngine(bindingAddress); QObject::connect(m_loop.get(), &MessageEngine::connected, this, &TaskManager::onConnected); QObject::connect(m_loop.get(), &MessageEngine::messageReceived, this, &TaskManager::onMessageReceived, Qt::QueuedConnection); } void TaskManager::onConnected() { query(); // let's not wait 3 seconds and issue the first call directly timer = new QTimer(); connect(timer, SIGNAL(timeout()), this, SLOT(query())); timer->start(3000); loadAvg(); loadAvgTimer = new QTimer(); connect(loadAvgTimer, SIGNAL(timeout()), this, SLOT(loadAvg())); loadAvgTimer->start(1000); } void TaskManager::kill(int tid) { callService("kill_process", QJsonValue(tid)); } void TaskManager::getExtraInfo(int tid) { callService("get_extra_info", QJsonValue(tid)); } void TaskManager::loadAvg() { callService("get_load_avg", QJsonValue()); } void TaskManager::query() { callService("get_process_list", QJsonValue(QJsonObject({{"processes", QJsonValue()}}))); callService("get_netstat", QJsonValue()); } void TaskManager::callService(const QString& command, QJsonValue value) { std::unique_ptr message = MessageFactory::getInstance().createOutboundMessage(MessageId::Call); if (!message) return; CallMessage* msg = static_cast(message.get()); msg->createRequest("taskmanager", command, value); qDebug() << "sending message " << msg->serialize(); m_loop->sendMessage(std::move(message)); } void TaskManager::onMessageReceived(std::shared_ptr message) { if (!message) return; if (message->isReply()) ProcessResponse(message); } void TaskManager::ProcessResponse(std::shared_ptr message) { std::shared_ptr rmsg = std::static_pointer_cast(message); qDebug() << "got message "; QString verb = rmsg->requestVerb(); if (rmsg->replyStatus() == "failed") { qWarning() << "Reply Failed received for verb:" << verb; return; } QJsonObject data = rmsg->replyData(); QString msgType = data.value("msgType").toString(); if (msgType.isNull()) return; // no type supplied, ignoring if (QString::compare(msgType, "processList") == 0) { QJsonArray processes = data.value("processes").toArray(); ProcessResponseTasklist(processes); } else if (QString::compare(msgType, "extraInfo") == 0) { QJsonObject info = data.value("info").toObject(); ProcessResponseExtraInfo(info); } else if (QString::compare(msgType, "loadAvgInfo") == 0) { QJsonObject loadInfo = data.value("loadInfo").toObject(); ProcessResponseLoadAvg(loadInfo); } else if (QString::compare(msgType, "netStatInfo") == 0) { QJsonObject netstat = data.value("netstat").toObject(); ProcessResponseNetStat(netstat); } // more response types to follow } void TaskManager::ProcessResponseTasklist(QJsonArray& processes) { std::vector procs; if (processes.size() == 0) return; for(auto it = processes.constBegin(); it != processes.constEnd(); ++it) { if (!it->toObject().isEmpty()) procs.emplace_back(it->toObject()); } if (procs.empty()) return; for(auto &item : procs) { if(m_procinfos.empty()) { // if old vector is empty, no need to update or remove elements, just add them emit addProcess(item.cmd(), item.tid(), item.euid(), item.scpu(), item.ucpu(), item.resident_memory(), item.state()); } else { std::vector::iterator it = std::find_if(m_procinfos.begin(), m_procinfos.end(), [item](const ProcInfo& proc) { return item.tid() == proc.tid(); }); if (it != m_procinfos.end()) { // the same ID exists in both vectors if (item.cmd() != it->cmd()) { // but different names: // the process in m_procinfos has died and a new one has its ID qDebug() << "The process ID has been reused for another process"; } emit updateProcess(item.cmd(), item.tid(), item.euid(), item.scpu(), item.ucpu(), item.resident_memory(), item.state()); m_procinfos.erase(it); } else { // if the ID was not found in old vector, that means it's a new process qDebug() << item.cmd() << " new process has been added"; emit addProcess(item.cmd(), item.tid(), item.euid(), item.scpu(), item.ucpu(), item.resident_memory(), item.state()); } } } // any remaining processes in old vector are all dead, remove them: for(auto &olditem : m_procinfos) { emit removeProcess(olditem.tid()); } m_procinfos.swap(procs); } void TaskManager::ProcessResponseExtraInfo(QJsonObject &info) { QString infoString; if (info.size() == 0) { // this is not a valid process list response QTextStream(&infoString) << "procces is not available"; } else { infoString = "Task : " + info["cmd"].toString() + "\n" + "Exec start : " + info["exec_start"].toString() + "\n" + "Exec runtime : " + info["vruntime"].toString() + "\n" + "Prio : " + info["prio"].toString(); } emit showProcessInfo(infoString); } void TaskManager::ProcessResponseLoadAvg(QJsonObject &loadInfo) { if (loadInfo.size() == 0) { return; } emit updateLoadAverage(loadInfo["value"].toDouble()); } void TaskManager::ProcessResponseNetStat(QJsonObject &netstat) { if (netstat.size() == 0) { return; } unsigned int newInOctets = netstat["InOctets"].toInt(); unsigned int newOutOctets = netstat["OutOctets"].toInt(); emit updateNetworkStats(newInOctets - inOctets, newOutOctets - outOctets); inOctets = newInOctets; outOctets = newOutOctets; }