summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarius Vlad <marius.vlad@collabora.com>2022-10-18 20:22:47 +0300
committerMarius Vlad <marius.vlad@collabora.com>2022-10-25 11:55:44 +0300
commit33aea95f02b025ecaf5917b10975119c07c353b3 (patch)
tree7dcb9d569c5fcfc58680dc8944065f564364aac9
parent0eedfd70b4682a51c81b4b8738ab42f665dc8798 (diff)
Add more grpc - Asyncstuffsandbox/mvlad/switch-to-grpc
Switch to a more better structure protocol: Add support for sending out app_state events over gRPC Signed-off-by: Marius Vlad <marius.vlad@collabora.com> Change-Id: I2765d53a2123be0d52225d92c964d39c63ec4902
-rw-r--r--clients/grpc-async-cb.cpp111
-rw-r--r--clients/grpc-async-cb.h64
-rw-r--r--clients/grpc-async.cpp121
-rw-r--r--clients/grpc-async.h79
-rw-r--r--clients/grpc-sync.cpp80
-rw-r--r--clients/grpc-sync.h46
-rw-r--r--clients/grpc.h47
-rw-r--r--clients/log.h7
-rw-r--r--clients/main-grpc.cpp (renamed from clients/grpc.cpp)254
-rw-r--r--clients/main-grpc.h38
-rw-r--r--clients/meson.build4
-rw-r--r--clients/shell.cpp53
-rw-r--r--clients/shell.h21
-rw-r--r--protocol/agl_shell.proto13
-rw-r--r--src/ivi-compositor.h2
-rw-r--r--src/shell.c14
16 files changed, 726 insertions, 228 deletions
diff --git a/clients/grpc-async-cb.cpp b/clients/grpc-async-cb.cpp
new file mode 100644
index 0000000..71cd57e
--- /dev/null
+++ b/clients/grpc-async-cb.cpp
@@ -0,0 +1,111 @@
+#include <cstdio>
+#include <ctime>
+#include <algorithm>
+#include <queue>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "log.h"
+#include "agl_shell.grpc.pb.h"
+#include "grpc-async-cb.h"
+
+Lister::Lister(Shell *shell) : m_shell(shell)
+{
+ // don't call NextWrite() just yet we do it explicitly when getting
+ // the events from the compositor
+}
+
+void
+Lister::OnDone()
+{
+ delete this;
+}
+
+void Lister::OnWriteDone(bool ok)
+{
+ LOG("ok %d\n", ok);
+ if (ok) {
+ // normally we should finish here, but we don't do that to keep
+ // the channel open
+ //Finish(grpc::Status::OK);
+ }
+}
+
+void
+Lister::NextWrite(void)
+{
+ // we're going to have a Lister instance per client so we're safe here
+ StartWrite(&m_shell->m_shell_data->current_app_state);
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::ActivateRequest* request,
+ google::protobuf::Empty* /*response*/)
+{
+ LOG("activating app %s on output %s\n", request->app_id().c_str(),
+ request->output_name().c_str());
+
+ m_aglShell->ActivateApp(request->app_id(), request->output_name());
+
+ grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+ reactor->Finish(grpc::Status::OK);
+ return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::DeactivateRequest* request,
+ google::protobuf::Empty* /*response*/)
+{
+ m_aglShell->DeactivateApp(request->app_id());
+
+ grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+ reactor->Finish(grpc::Status::OK);
+ return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::FloatRequest* request,
+ google::protobuf::Empty* /* response */)
+{
+ m_aglShell->SetAppFloat(request->app_id());
+
+ grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+ reactor->Finish(grpc::Status::OK);
+ return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::SplitRequest* request,
+ google::protobuf::Empty* /*response*/)
+{
+ m_aglShell->SetAppSplit(request->app_id(), request->tile_orientation());
+
+ grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+ reactor->Finish(grpc::Status::OK);
+ return reactor;
+}
+
+grpc::ServerWriteReactor<::agl_shell_ipc::AppState>*
+GrpcServiceImpl::AppStatusState(grpc::CallbackServerContext* context,
+ const google::protobuf::Empty*)
+{
+
+ Lister *n = new Lister(m_aglShell);
+
+ m_aglShell->m_shell_data->server_context_list.push_back(std::pair(context, n));
+ LOG("added lister %p\n", static_cast<void *>(n));
+
+ // just return a Lister to keep the channel open
+ return n;
+}
diff --git a/clients/grpc-async-cb.h b/clients/grpc-async-cb.h
new file mode 100644
index 0000000..ce89f68
--- /dev/null
+++ b/clients/grpc-async-cb.h
@@ -0,0 +1,64 @@
+#pragma once
+
+#include <memory>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <mutex>
+#include <condition_variable>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "shell.h"
+#include "agl_shell.grpc.pb.h"
+
+namespace {
+ const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005";
+}
+
+class Lister : public grpc::ServerWriteReactor<::agl_shell_ipc::AppState> {
+public:
+ Lister(Shell *aglShell);
+ void OnDone() override;
+ void OnWriteDone(bool ok) override;
+ void NextWrite(void);
+private:
+ Shell *m_shell;
+};
+
+class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService {
+public:
+ GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {}
+
+ grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::ActivateRequest* request,
+ google::protobuf::Empty* /*response*/) override;
+
+ grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::DeactivateRequest* request,
+ google::protobuf::Empty* /*response*/) override;
+
+ grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::SplitRequest* request,
+ google::protobuf::Empty* /*response*/) override;
+
+ grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::FloatRequest* request,
+ google::protobuf::Empty* /*response*/) override;
+
+ grpc::ServerWriteReactor< ::agl_shell_ipc::AppState>* AppStatusState(
+ ::grpc::CallbackServerContext* /*context*/,
+ const ::google::protobuf::Empty* /*request*/) override;
+private:
+ Shell *m_aglShell;
+
+ std::mutex m_done_mutex;
+ std::condition_variable m_done_cv;
+ bool m_done = false;
+
+};
diff --git a/clients/grpc-async.cpp b/clients/grpc-async.cpp
new file mode 100644
index 0000000..72090fe
--- /dev/null
+++ b/clients/grpc-async.cpp
@@ -0,0 +1,121 @@
+#include <cstdio>
+#include <ctime>
+#include <algorithm>
+#include <queue>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "agl_shell.grpc.pb.h"
+#include "grpc-async.h"
+
+void
+CallData::Proceed(void)
+{
+ switch (m_status) {
+ case CREATE:
+ // Make this instance progress to the PROCESS state.
+ m_status = PROCESS;
+ std::cout << "Creating Call data for new client connections: "
+ << this << std::endl;
+
+ // As part of the initial CREATE state, we *request* that the
+ // system start processing AppStatusState requests.
+ //
+ // In this request, "this" acts are the tag uniquely
+ // identifying the request (so that different CallData
+ // instances can serve different requests concurrently), in
+ // this case the memory address of this CallData instance.
+ m_service->RequestAppStatusState(&m_ctx, &m_request, &m_responder,
+ m_cq, m_cq, (void *) this);
+ break;
+ case PROCESS:
+ // Spawn a new CallData instance to serve new clients while we
+ // process the one for this CallData. The instance will
+ // deallocate itself as part of its FINISH state.
+ CallData *cd = new CallData(m_service, m_cq);
+
+ // The actual processing.
+ m_status = PROCESSING;
+ m_repliesSent++;
+ break;
+ case PROCESSING:
+ if (m_repliesSent == MAX_REPLIES) {
+ // And we are done! Let the gRPC runtime know we've
+ // finished, using the memory address of this instance
+ // as the uniquely identifying tag for the event.
+ m_status = FINISH;
+ m_responder.Finish(Status::OK, this);
+ } else {
+ // The actual processing.
+ m_status = PROCESSING;
+ m_repliesSent++;
+ }
+ break;
+ case FINISH:
+ GPR_ASSERT(m_status == FINISH);
+ std::cout << "Completed RPC for: " << this << std::endl;
+ // Once in the FINISH state, deallocate ourselves (CallData).
+ delete this;
+ break;
+ default:
+ break;
+ }
+}
+
+GrpcServiceImpl::~GrpcServiceImpl()
+{
+ m_server->Shutdown();
+ // Always shutdown the completion queue after the server.
+ m_cq->Shutdown();
+}
+
+void
+GrpcServiceImpl::Run(void)
+{
+ std::string server_address(kDefaultGrpcServiceAddress);
+
+ grpc::ServerBuilder builder;
+ builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
+
+ builder.RegisterService(&m_service);
+ m_cq = builder.AddCompletionQueue();
+
+ m_server = builder.BuildAndStart();
+ std::cout << "Server listening on " << server_address << std::endl;
+
+ // Proceed to the server's main loop.
+ HandleRpcs();
+}
+
+void
+GrpcServiceImpl::HandleRpcs(void)
+{
+ // Spawn a new CallData instance to serve new clients.
+ CallData *cd = new CallData(&m_service, m_cq.get());
+
+ // uniquely identifies a request.
+ void *tag;
+ bool ok;
+
+ // Block waiting to read the next event from the completion queue. The
+ // event is uniquely identified by its tag, which in this case is the
+ // memory address of a CallData instance.
+ //
+ // The return value of Next should always be checked. This return value
+ // tells us whether there is any kind of event or cq_ is shutting down.
+ while (true) {
+ std::cout << "Blocked on next waiting for events" << std::endl;
+ GPR_ASSERT(m_cq->Next(&tag, &ok));
+ GPR_ASSERT(ok);
+
+ std::cout << "Calling tag " << tag << " with Proceed()" << std::endl;
+ static_cast<CallData*>(tag)->Proceed();
+ }
+}
diff --git a/clients/grpc-async.h b/clients/grpc-async.h
new file mode 100644
index 0000000..40de52c
--- /dev/null
+++ b/clients/grpc-async.h
@@ -0,0 +1,79 @@
+#pragma once
+
+#include <memory>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "shell.h"
+#include "agl_shell.grpc.pb.h"
+
+namespace {
+ const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005";
+}
+
+class CallData {
+public:
+ // Take in the "service" instance (in this case representing an
+ // asynchronous server) and the completion queue "cq" used for
+ // asynchronous communication with the gRPC runtime.
+ CallData(Greeter::AsyncService* service, grpc::ServerCompletionQueue* cq)
+ : m_service(service), m_cq(cq), m_repliesSent(0),
+ m_responder(&m_ctx), m_status(CREATE) { Proceed(); }
+ void Proceed();
+private:
+ // The means of communication with the gRPC runtime for an asynchronous
+ // server.
+ Greeter::AsyncService *m_service;
+ // The producer-consumer queue where for asynchronous server
+ // notifications.
+ grpc::ServerCompletionQueue *m_cq;
+ // Context for the rpc, allowing to tweak aspects of it such as the use
+ // of compression, authentication, as well as to send metadata back to
+ // the client.
+ grpc::ServerContext m_ctx;
+
+ // What we send back to the client.
+ ::agl_shell_ipc::AppState m_reply;
+
+ uint32_t m_repliesSent;
+ const uint32_t MAX_REPLIES = 5;
+
+ // The means to get back to the client.
+ grpc::ServerAsyncWriter<::agl_shell_ipc::AppState> m_responder;
+
+ // Let's implement a tiny state machine with the following states.
+ enum CallStatus {
+ CREATE,
+ PROCESS,
+ PROCESSING,
+ FINISH
+ };
+
+ // The current serving state.
+ CallStatus m_status;
+};
+
+
+class GrpcServiceImpl final {
+public:
+ GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {}
+ ~GrpcServiceImpl();
+ void Run();
+
+ // This can be run in multiple threads if needed.
+ void HandleRpcs();
+
+private:
+ Shell *m_aglShell;
+
+ std::unique_ptr<grpc::ServerCompletionQueue> m_cq;
+ Greeter::AsyncService m_service;
+ std::unique_ptr<grpc::Server> m_server;
+};
diff --git a/clients/grpc-sync.cpp b/clients/grpc-sync.cpp
new file mode 100644
index 0000000..5aee817
--- /dev/null
+++ b/clients/grpc-sync.cpp
@@ -0,0 +1,80 @@
+#include <cstdio>
+#include <ctime>
+#include <algorithm>
+#include <queue>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "agl_shell.grpc.pb.h"
+#include "grpc-sync.h"
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::ActivateRequest* request,
+ google::protobuf::Empty* /*response*/)
+{
+ fprintf(stderr, "activating app %s on output %s\n",
+ request->app_id().c_str(),
+ request->output_name().c_str());
+
+ m_aglShell->ActivateApp(request->app_id(), request->output_name());
+
+ grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+ reactor->Finish(grpc::Status::OK);
+ return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::DeactivateRequest* request,
+ google::protobuf::Empty* /*response*/)
+{
+ m_aglShell->DeactivateApp(request->app_id());
+
+ grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+ reactor->Finish(grpc::Status::OK);
+ return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::FloatRequest* request,
+ google::protobuf::Empty* /* response */)
+{
+ m_aglShell->SetAppFloat(request->app_id());
+
+ grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+ reactor->Finish(grpc::Status::OK);
+ return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::SplitRequest* request,
+ google::protobuf::Empty* /*response*/)
+{
+ m_aglShell->SetAppSplit(request->app_id(), request->tile_orientation());
+
+ grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+ reactor->Finish(grpc::Status::OK);
+ return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::AppStatusState(grpc::CallbackServerContext *context,
+ google::protobuf::Empty*,
+ ::grpc::ServerWriter<::agl_shell_ipc::AppState>* writer)
+{
+ (void) writer;
+ grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+ reactor->Finish(grpc::Status::OK);
+
+ return reactor;
+}
diff --git a/clients/grpc-sync.h b/clients/grpc-sync.h
new file mode 100644
index 0000000..caaee54
--- /dev/null
+++ b/clients/grpc-sync.h
@@ -0,0 +1,46 @@
+#pragma once
+
+#include <memory>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "shell.h"
+#include "agl_shell.grpc.pb.h"
+
+namespace {
+ const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005";
+}
+
+
+class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService {
+public:
+ GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {}
+
+ grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::ActivateRequest* request,
+ google::protobuf::Empty* /*response*/);
+
+ grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::DeactivateRequest* request,
+ google::protobuf::Empty* /*response*/);
+
+ grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::SplitRequest* request,
+ google::protobuf::Empty* /*response*/);
+
+ grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context,
+ const ::agl_shell_ipc::FloatRequest* request,
+ google::protobuf::Empty* /*response*/);
+ grpc::ServerUnaryReactor *AppStatusState(grpc::CallbackServerContext *context,
+ google::protobuf::Empty *empty,
+ ::grpc::ServerWriter<::agl_shell_ipc::AppState>* writer);
+private:
+ Shell *m_aglShell;
+};
diff --git a/clients/grpc.h b/clients/grpc.h
deleted file mode 100644
index 3a3c864..0000000
--- a/clients/grpc.h
+++ /dev/null
@@ -1,47 +0,0 @@
-#include <grpc/grpc.h>
-#include <grpcpp/grpcpp.h>
-#include <grpcpp/server.h>
-#include <grpcpp/server_builder.h>
-#include <grpcpp/server_context.h>
-
-#include <grpcpp/ext/proto_server_reflection_plugin.h>
-#include <grpcpp/health_check_service_interface.h>
-
-#include "agl_shell.grpc.pb.h"
-#include "agl-shell-client-protocol.h"
-
-namespace {
- const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005";
-}
-
-
-class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService {
-
- grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context,
- const ::agl_shell_ipc::ActivateRequest* request,
- google::protobuf::Empty* /*response*/);
-
- grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context,
- const ::agl_shell_ipc::DeactivateRequest* request,
- google::protobuf::Empty* /*response*/);
-
- grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context,
- const ::agl_shell_ipc::SplitRequest* request,
- google::protobuf::Empty* /*response*/);
-
- grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context,
- const ::agl_shell_ipc::FloatRequest* request,
- google::protobuf::Empty* /*response*/);
-};
-
-
-class Shell {
-public:
- std::shared_ptr<struct agl_shell> m_shell;
- Shell(std::shared_ptr<struct agl_shell> shell) : m_shell(shell) { }
- void ActivateApp(const std::string &app_id, const std::string &output_name);
- void DeactivateApp(const std::string &app_id);
- void SetAppSplit(const std::string &app_id, uint32_t orientation);
- void SetAppFloat(const std::string &app_id);
-
-};
diff --git a/clients/log.h b/clients/log.h
new file mode 100644
index 0000000..127d8e0
--- /dev/null
+++ b/clients/log.h
@@ -0,0 +1,7 @@
+#pragma once
+
+#include <cstdio>
+
+#ifndef LOG
+#define LOG(fmt, ...) do { fprintf(stderr, "%s() " fmt, __func__, ##__VA_ARGS__); } while (0)
+#endif
diff --git a/clients/grpc.cpp b/clients/main-grpc.cpp
index 503453d..5f35c85 100644
--- a/clients/grpc.cpp
+++ b/clients/main-grpc.cpp
@@ -1,147 +1,17 @@
#include <cstdio>
#include <ctime>
#include <algorithm>
+#include <queue>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
-#include "grpc.h"
+#include "shell.h"
+#include "log.h"
+#include "main-grpc.h"
+#include "grpc-async-cb.h"
-struct shell_data {
- struct wl_display *wl_display;
- struct agl_shell *shell;
- struct agl_shell_ext *shell_ext;
- Shell *aglShell;
-
- bool wait_for_bound;
- bool wait_for_doas;
-
- bool bound_ok;
- bool doas_ok;
-
- uint32_t version;
- struct wl_list output_list; /** window_output::link */
-};
-
-struct window_output {
- struct shell_data *shell_data;
- struct wl_output *output;
- char *name;
- struct wl_list link; /** display::output_list */
-};
-
-static struct shell_data *sh = nullptr;
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context,
- const ::agl_shell_ipc::ActivateRequest* request,
- google::protobuf::Empty* /*response*/)
-{
- fprintf(stderr, "activating app %s on output %s\n",
- request->app_id().c_str(),
- request->output_name().c_str());
-
- sh->aglShell->ActivateApp(request->app_id(), request->output_name());
-
- grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
- reactor->Finish(grpc::Status::OK);
- return reactor;
-}
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context,
- const ::agl_shell_ipc::DeactivateRequest* request,
- google::protobuf::Empty* /*response*/)
-{
- sh->aglShell->DeactivateApp(request->app_id());
-
- grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
- reactor->Finish(grpc::Status::OK);
- return reactor;
-}
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context,
- const ::agl_shell_ipc::FloatRequest* request,
- google::protobuf::Empty* /* response */)
-{
- sh->aglShell->SetAppFloat(request->app_id());
-
- grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
- reactor->Finish(grpc::Status::OK);
- return reactor;
-}
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context,
- const ::agl_shell_ipc::SplitRequest* request,
- google::protobuf::Empty* /*response*/)
-{
- sh->aglShell->SetAppSplit(request->app_id(), request->tile_orientation());
-
- grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
- reactor->Finish(grpc::Status::OK);
- return reactor;
-}
-
-void
-Shell::ActivateApp(const std::string &app_id, const std::string &output_name)
-{
- struct window_output *woutput, *w_output;
-
- woutput = nullptr;
- w_output = nullptr;
-
- wl_list_for_each(woutput, &sh->output_list, link) {
- if (woutput->name && !strcmp(woutput->name, output_name.c_str())) {
- w_output = woutput;
- break;
- }
- }
-
- // else, get the first one available
- if (!w_output)
- w_output = wl_container_of(sh->output_list.prev, w_output, link);
-
- agl_shell_activate_app(this->m_shell.get(), app_id.c_str(), w_output->output);
- wl_display_flush(sh->wl_display);
-}
-
-void
-Shell::DeactivateApp(const std::string &app_id)
-{
- (void) app_id;
-}
-
-void
-Shell::SetAppFloat(const std::string &app_id)
-{
- (void) app_id;
-}
-
-void
-Shell::SetAppSplit(const std::string &app_id, uint32_t orientation)
-{
- (void) app_id;
- (void) orientation;
-}
-
-static void
-start_grpc_server(void)
-{
- // instantiante the grpc server
- std::string server_address(kDefaultGrpcServiceAddress);
- GrpcServiceImpl service;
-
- grpc::EnableDefaultHealthCheckService(true);
- grpc::reflection::InitProtoReflectionServerBuilderPlugin();
-
- grpc::ServerBuilder builder;
- builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
- builder.RegisterService(&service);
-
- std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
- fprintf(stderr, "Server listening on %s\n", server_address.c_str());
-
- server->Wait();
-}
+static int running = 1;
static void
agl_shell_bound_ok(void *data, struct agl_shell *agl_shell)
@@ -169,10 +39,24 @@ static void
agl_shell_app_state(void *data, struct agl_shell *agl_shell,
const char *app_id, uint32_t state)
{
- (void) data;
(void) agl_shell;
- (void) app_id;
- (void) state;
+ struct shell_data *sh = static_cast<struct shell_data *>(data);
+ LOG("got app_state event app_id %s, state %d\n", app_id, state);
+
+ if (sh->server_context_list.empty())
+ return;
+
+ ::agl_shell_ipc::AppState app;
+
+ sh->current_app_state.set_app_id(std::string(app_id));
+ sh->current_app_state.set_state(state);
+
+ auto start = sh->server_context_list.begin();
+ while (start != sh->server_context_list.end()) {
+ LOG("writing to lister %p\n", static_cast<void *>(start->second));
+ start->second->NextWrite();
+ start++;
+ }
}
static const struct agl_shell_listener shell_listener = {
@@ -294,7 +178,6 @@ destroy_output(struct window_output *w_output)
free(w_output);
}
-
static void
global_add(void *data, struct wl_registry *reg, uint32_t id,
const char *interface, uint32_t version)
@@ -340,7 +223,8 @@ global_add_ext(void *data, struct wl_registry *reg, uint32_t id,
static_cast<struct agl_shell_ext *>(wl_registry_bind(reg, id,
&agl_shell_ext_interface, std::min(static_cast<uint32_t>(1),
version)));
- agl_shell_ext_add_listener(sh->shell_ext, &shell_ext_listener, data);
+ agl_shell_ext_add_listener(sh->shell_ext,
+ &shell_ext_listener, data);
}
}
@@ -364,7 +248,7 @@ static const struct wl_registry_listener registry_listener = {
};
static void
-register_shell_ext(struct wl_display *wl_display)
+register_shell_ext(struct wl_display *wl_display, struct shell_data *sh)
{
struct wl_registry *registry;
@@ -377,7 +261,7 @@ register_shell_ext(struct wl_display *wl_display)
}
static void
-register_shell(struct wl_display *wl_display)
+register_shell(struct wl_display *wl_display, struct shell_data *sh)
{
struct wl_registry *registry;
@@ -391,7 +275,21 @@ register_shell(struct wl_display *wl_display)
wl_registry_destroy(registry);
}
-static int
+static void
+destroy_shell_data(struct shell_data *sh)
+{
+ struct window_output *w_output, *w_output_next;
+
+ wl_list_for_each_safe(w_output, w_output_next, &sh->output_list, link)
+ destroy_output(w_output);
+
+ wl_display_flush(sh->wl_display);
+ wl_display_disconnect(sh->wl_display);
+
+ delete sh;
+}
+
+static struct shell_data *
start_agl_shell_client(void)
{
int ret = 0;
@@ -399,22 +297,25 @@ start_agl_shell_client(void)
wl_display = wl_display_connect(NULL);
- sh = new struct shell_data;
+ struct shell_data *sh = new struct shell_data;
+
sh->wl_display = wl_display;
sh->wait_for_doas = true;
sh->wait_for_bound = true;
- register_shell_ext(wl_display);
+ register_shell_ext(wl_display, sh);
// check for agl_shell_ext
if (!sh->shell_ext) {
fprintf(stderr, "Failed to bind to agl_shell_ext interface\n");
- return -1;
+ delete sh;
+ return nullptr;
}
if (wl_list_empty(&sh->output_list)) {
fprintf(stderr, "Failed get any outputs!\n");
- return -1;
+ delete sh;
+ return nullptr;
}
agl_shell_ext_doas_shell_client(sh->shell_ext);
@@ -426,11 +327,12 @@ start_agl_shell_client(void)
if (!sh->doas_ok) {
fprintf(stderr, "Failed to get doas_done event\n");
- return -1;
+ delete sh;
+ return nullptr;
}
// bind to agl-shell
- register_shell(wl_display);
+ register_shell(wl_display, sh);
while (ret != -1 && sh->wait_for_bound) {
ret = wl_display_dispatch(sh->wl_display);
if (sh->wait_for_bound)
@@ -440,34 +342,40 @@ start_agl_shell_client(void)
// at this point, we can't do anything about it
if (!sh->bound_ok) {
fprintf(stderr, "Failed to get bound_ok event!\n");
- return -1;
+ delete sh;
+ return nullptr;
}
fprintf(stderr, "agl_shell/agl_shell_ext interface OK\n");
- std::shared_ptr<struct agl_shell> agl_shell{sh->shell, agl_shell_destroy};
- sh->aglShell = new Shell(agl_shell);
- return 0;
+ return sh;
}
static void
-destroy_shell_data(void)
+start_grpc_server(Shell *aglShell)
{
- struct window_output *w_output, *w_output_next;
+ // instantiante the grpc server
+ std::string server_address(kDefaultGrpcServiceAddress);
+ GrpcServiceImpl service{aglShell};
- wl_list_for_each_safe(w_output, w_output_next, &sh->output_list, link)
- destroy_output(w_output);
+ grpc::EnableDefaultHealthCheckService(true);
+ grpc::reflection::InitProtoReflectionServerBuilderPlugin();
- wl_display_flush(sh->wl_display);
- wl_display_disconnect(sh->wl_display);
+ grpc::ServerBuilder builder;
+ builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
+ builder.RegisterService(&service);
- delete sh;
+ std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
+ LOG("Server listening on %s\n", server_address.c_str());
+
+ server->Wait();
}
int main(int argc, char **argv)
{
(void) argc;
(void) argv;
+ Shell *aglShell;
int ret = 0;
// do not start right up, give shell client time to boot up
@@ -479,14 +387,22 @@ int main(int argc, char **argv)
nanosleep(&ts, NULL);
- ret = start_agl_shell_client();
- if (ret) {
- fprintf(stderr, "Failed to initialize agl-shell/agl-shell-ext\n");
+ struct shell_data *sh = start_agl_shell_client();
+ if (!sh) {
+ LOG("Failed to initialize agl-shell/agl-shell-ext\n");
exit(EXIT_FAILURE);
}
- start_grpc_server();
+ std::shared_ptr<struct agl_shell> agl_shell{sh->shell, agl_shell_destroy};
+ aglShell = new Shell(agl_shell, sh);
+
+ std::thread thread(start_grpc_server, aglShell);
+
+ // serve wayland requests
+ while (running && ret != -1) {
+ ret = wl_display_dispatch(sh->wl_display);
+ }
- destroy_shell_data();
+ destroy_shell_data(sh);
return 0;
}
diff --git a/clients/main-grpc.h b/clients/main-grpc.h
new file mode 100644
index 0000000..9b687e9
--- /dev/null
+++ b/clients/main-grpc.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <cstdio>
+#include <algorithm>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include <wayland-client.h>
+
+#include "agl_shell.grpc.pb.h"
+
+// forward declaration created in grpc-async-cb
+class Lister;
+
+struct shell_data {
+ struct wl_display *wl_display;
+ struct agl_shell *shell;
+ struct agl_shell_ext *shell_ext;
+
+ bool wait_for_bound;
+ bool wait_for_doas;
+
+ bool bound_ok;
+ bool doas_ok;
+
+ uint32_t version;
+ struct wl_list output_list; /** window_output::link */
+
+ ::agl_shell_ipc::AppState current_app_state;
+ std::list<std::pair<grpc::CallbackServerContext*, Lister *> > server_context_list;
+};
+
+struct window_output {
+ struct shell_data *shell_data;
+ struct wl_output *output;
+ char *name;
+ struct wl_list link; /** display::output_list */
+};
diff --git a/clients/meson.build b/clients/meson.build
index ff27a60..648c95a 100644
--- a/clients/meson.build
+++ b/clients/meson.build
@@ -46,7 +46,9 @@ clients = [
{
'basename': 'agl-shell-grpc-server',
'sources': [
- 'grpc.cpp',
+ 'main-grpc.cpp',
+ 'grpc-async-cb.cpp',
+ 'shell.cpp',
generated_protoc_sources,
generated_grpc_sources,
agl_shell_client_protocol_h,
diff --git a/clients/shell.cpp b/clients/shell.cpp
new file mode 100644
index 0000000..343b36e
--- /dev/null
+++ b/clients/shell.cpp
@@ -0,0 +1,53 @@
+#include <cstdio>
+#include <ctime>
+#include <algorithm>
+#include <cstring>
+#include <string>
+#include <queue>
+
+#include "main-grpc.h"
+#include "shell.h"
+
+void
+Shell::ActivateApp(const std::string &app_id, const std::string &output_name)
+{
+ struct window_output *woutput, *w_output;
+ struct agl_shell *shell = this->m_shell.get();
+
+ woutput = nullptr;
+ w_output = nullptr;
+
+ wl_list_for_each(woutput, &m_shell_data->output_list, link) {
+ if (woutput->name && !strcmp(woutput->name, output_name.c_str())) {
+ w_output = woutput;
+ break;
+ }
+ }
+
+ // else, get the first one available
+ if (!w_output)
+ w_output = wl_container_of(m_shell_data->output_list.prev,
+ w_output, link);
+
+ agl_shell_activate_app(shell, app_id.c_str(), w_output->output);
+ wl_display_flush(m_shell_data->wl_display);
+}
+
+void
+Shell::DeactivateApp(const std::string &app_id)
+{
+ (void) app_id;
+}
+
+void
+Shell::SetAppFloat(const std::string &app_id)
+{
+ (void) app_id;
+}
+
+void
+Shell::SetAppSplit(const std::string &app_id, uint32_t orientation)
+{
+ (void) app_id;
+ (void) orientation;
+}
diff --git a/clients/shell.h b/clients/shell.h
new file mode 100644
index 0000000..1cdbd1d
--- /dev/null
+++ b/clients/shell.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <memory>
+
+#include "agl-shell-client-protocol.h"
+
+#include "main-grpc.h"
+
+class Shell {
+public:
+ std::shared_ptr<struct agl_shell> m_shell;
+ struct shell_data *m_shell_data;
+
+ Shell(std::shared_ptr<struct agl_shell> shell,
+ struct shell_data *sh_data) :
+ m_shell(shell), m_shell_data(sh_data) { }
+ void ActivateApp(const std::string &app_id, const std::string &output_name);
+ void DeactivateApp(const std::string &app_id);
+ void SetAppSplit(const std::string &app_id, uint32_t orientation);
+ void SetAppFloat(const std::string &app_id);
+};
diff --git a/protocol/agl_shell.proto b/protocol/agl_shell.proto
index 721fac2..414162b 100644
--- a/protocol/agl_shell.proto
+++ b/protocol/agl_shell.proto
@@ -3,10 +3,11 @@ import "google/protobuf/empty.proto";
package agl_shell_ipc;
service AglShellManagerService {
- rpc ActivateApp(ActivateRequest) returns (google.protobuf.Empty) {}
- rpc DeactivateApp(DeactivateRequest) returns (google.protobuf.Empty) {}
- rpc SetAppSplit(SplitRequest) returns (google.protobuf.Empty) {}
- rpc SetAppFloat(FloatRequest) returns (google.protobuf.Empty) {}
+ rpc ActivateApp(ActivateRequest) returns (google.protobuf.Empty) {}
+ rpc DeactivateApp(DeactivateRequest) returns (google.protobuf.Empty) {}
+ rpc SetAppSplit(SplitRequest) returns (google.protobuf.Empty) {}
+ rpc SetAppFloat(FloatRequest) returns (google.protobuf.Empty) {}
+ rpc AppStatusState(google.protobuf.Empty) returns (stream AppState) {}
}
message ActivateRequest {
@@ -27,3 +28,7 @@ message FloatRequest {
string app_id = 1;
}
+message AppState {
+ int32 state = 1;
+ string app_id = 2;
+}
diff --git a/src/ivi-compositor.h b/src/ivi-compositor.h
index 889c3f3..891c093 100644
--- a/src/ivi-compositor.h
+++ b/src/ivi-compositor.h
@@ -86,8 +86,6 @@ struct ivi_compositor {
struct {
struct wl_client *client;
struct wl_resource *resource;
-
- struct wl_client *client_ext;
struct wl_resource *resource_ext;
bool ready;
enum agl_shell_bound_status status;
diff --git a/src/shell.c b/src/shell.c
index f8d2e32..d583fe5 100644
--- a/src/shell.c
+++ b/src/shell.c
@@ -1145,12 +1145,16 @@ shell_send_app_state(struct ivi_compositor *ivi, const char *app_id,
if (app_id && wl_resource_get_version(ivi->shell_client.resource) >=
AGL_SHELL_APP_STATE_SINCE_VERSION) {
+ weston_log("%s() should sent app_state\n", __func__);
agl_shell_send_app_state(ivi->shell_client.resource,
app_id, state);
- if (ivi->shell_client_ext.resource)
- agl_shell_send_app_state(ivi->shell_client_ext.resource,
+ if (ivi->shell_client.resource_ext) {
+ weston_log("%s() 2. should sent app_state %p\n",
+ __func__, ivi->shell_client.resource_ext);
+ agl_shell_send_app_state(ivi->shell_client.resource_ext,
app_id, state);
+ }
}
}
@@ -1634,13 +1638,13 @@ bind_agl_shell(struct wl_client *client,
wl_resource_set_implementation(resource, &agl_shell_implementation,
ivi, NULL);
- ivi->shell_client_ext.resource = resource;
+ ivi->shell_client.resource_ext = resource;
if (ivi->shell_client.status == BOUND_OK &&
wl_resource_get_version(resource) >= AGL_SHELL_BOUND_OK_SINCE_VERSION) {
- weston_log("Sent agl_shell_send_bound_ok to client ext\n");
ivi->shell_client_ext.status = BOUND_OK;
- agl_shell_send_bound_ok(ivi->shell_client_ext.resource);
+ agl_shell_send_bound_ok(ivi->shell_client.resource_ext);
+ weston_log("Sent agl_shell_send_bound_ok to client ext %p\n", ivi->shell_client.resource_ext);
}
return;