From 33aea95f02b025ecaf5917b10975119c07c353b3 Mon Sep 17 00:00:00 2001 From: Marius Vlad Date: Tue, 18 Oct 2022 20:22:47 +0300 Subject: Add more grpc - Asyncstuff Switch to a more better structure protocol: Add support for sending out app_state events over gRPC Signed-off-by: Marius Vlad Change-Id: I2765d53a2123be0d52225d92c964d39c63ec4902 --- clients/grpc-async-cb.cpp | 111 +++++++++++ clients/grpc-async-cb.h | 64 ++++++ clients/grpc-async.cpp | 121 ++++++++++++ clients/grpc-async.h | 79 ++++++++ clients/grpc-sync.cpp | 80 ++++++++ clients/grpc-sync.h | 46 +++++ clients/grpc.cpp | 492 ---------------------------------------------- clients/grpc.h | 47 ----- clients/log.h | 7 + clients/main-grpc.cpp | 408 ++++++++++++++++++++++++++++++++++++++ clients/main-grpc.h | 38 ++++ clients/meson.build | 4 +- clients/shell.cpp | 53 +++++ clients/shell.h | 21 ++ protocol/agl_shell.proto | 13 +- src/ivi-compositor.h | 2 - src/shell.c | 14 +- 17 files changed, 1049 insertions(+), 551 deletions(-) create mode 100644 clients/grpc-async-cb.cpp create mode 100644 clients/grpc-async-cb.h create mode 100644 clients/grpc-async.cpp create mode 100644 clients/grpc-async.h create mode 100644 clients/grpc-sync.cpp create mode 100644 clients/grpc-sync.h delete mode 100644 clients/grpc.cpp delete mode 100644 clients/grpc.h create mode 100644 clients/log.h create mode 100644 clients/main-grpc.cpp create mode 100644 clients/main-grpc.h create mode 100644 clients/shell.cpp create mode 100644 clients/shell.h diff --git a/clients/grpc-async-cb.cpp b/clients/grpc-async-cb.cpp new file mode 100644 index 0000000..71cd57e --- /dev/null +++ b/clients/grpc-async-cb.cpp @@ -0,0 +1,111 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "log.h" +#include "agl_shell.grpc.pb.h" +#include "grpc-async-cb.h" + +Lister::Lister(Shell *shell) : m_shell(shell) +{ + // don't call NextWrite() just yet we do it explicitly when getting + // the events from the compositor +} + +void +Lister::OnDone() +{ + delete this; +} + +void Lister::OnWriteDone(bool ok) +{ + LOG("ok %d\n", ok); + if (ok) { + // normally we should finish here, but we don't do that to keep + // the channel open + //Finish(grpc::Status::OK); + } +} + +void +Lister::NextWrite(void) +{ + // we're going to have a Lister instance per client so we're safe here + StartWrite(&m_shell->m_shell_data->current_app_state); +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::ActivateRequest* request, + google::protobuf::Empty* /*response*/) +{ + LOG("activating app %s on output %s\n", request->app_id().c_str(), + request->output_name().c_str()); + + m_aglShell->ActivateApp(request->app_id(), request->output_name()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::DeactivateRequest* request, + google::protobuf::Empty* /*response*/) +{ + m_aglShell->DeactivateApp(request->app_id()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::FloatRequest* request, + google::protobuf::Empty* /* response */) +{ + m_aglShell->SetAppFloat(request->app_id()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::SplitRequest* request, + google::protobuf::Empty* /*response*/) +{ + m_aglShell->SetAppSplit(request->app_id(), request->tile_orientation()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerWriteReactor<::agl_shell_ipc::AppState>* +GrpcServiceImpl::AppStatusState(grpc::CallbackServerContext* context, + const google::protobuf::Empty*) +{ + + Lister *n = new Lister(m_aglShell); + + m_aglShell->m_shell_data->server_context_list.push_back(std::pair(context, n)); + LOG("added lister %p\n", static_cast(n)); + + // just return a Lister to keep the channel open + return n; +} diff --git a/clients/grpc-async-cb.h b/clients/grpc-async-cb.h new file mode 100644 index 0000000..ce89f68 --- /dev/null +++ b/clients/grpc-async-cb.h @@ -0,0 +1,64 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "shell.h" +#include "agl_shell.grpc.pb.h" + +namespace { + const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005"; +} + +class Lister : public grpc::ServerWriteReactor<::agl_shell_ipc::AppState> { +public: + Lister(Shell *aglShell); + void OnDone() override; + void OnWriteDone(bool ok) override; + void NextWrite(void); +private: + Shell *m_shell; +}; + +class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService { +public: + GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {} + + grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::ActivateRequest* request, + google::protobuf::Empty* /*response*/) override; + + grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::DeactivateRequest* request, + google::protobuf::Empty* /*response*/) override; + + grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::SplitRequest* request, + google::protobuf::Empty* /*response*/) override; + + grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::FloatRequest* request, + google::protobuf::Empty* /*response*/) override; + + grpc::ServerWriteReactor< ::agl_shell_ipc::AppState>* AppStatusState( + ::grpc::CallbackServerContext* /*context*/, + const ::google::protobuf::Empty* /*request*/) override; +private: + Shell *m_aglShell; + + std::mutex m_done_mutex; + std::condition_variable m_done_cv; + bool m_done = false; + +}; diff --git a/clients/grpc-async.cpp b/clients/grpc-async.cpp new file mode 100644 index 0000000..72090fe --- /dev/null +++ b/clients/grpc-async.cpp @@ -0,0 +1,121 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "agl_shell.grpc.pb.h" +#include "grpc-async.h" + +void +CallData::Proceed(void) +{ + switch (m_status) { + case CREATE: + // Make this instance progress to the PROCESS state. + m_status = PROCESS; + std::cout << "Creating Call data for new client connections: " + << this << std::endl; + + // As part of the initial CREATE state, we *request* that the + // system start processing AppStatusState requests. + // + // In this request, "this" acts are the tag uniquely + // identifying the request (so that different CallData + // instances can serve different requests concurrently), in + // this case the memory address of this CallData instance. + m_service->RequestAppStatusState(&m_ctx, &m_request, &m_responder, + m_cq, m_cq, (void *) this); + break; + case PROCESS: + // Spawn a new CallData instance to serve new clients while we + // process the one for this CallData. The instance will + // deallocate itself as part of its FINISH state. + CallData *cd = new CallData(m_service, m_cq); + + // The actual processing. + m_status = PROCESSING; + m_repliesSent++; + break; + case PROCESSING: + if (m_repliesSent == MAX_REPLIES) { + // And we are done! Let the gRPC runtime know we've + // finished, using the memory address of this instance + // as the uniquely identifying tag for the event. + m_status = FINISH; + m_responder.Finish(Status::OK, this); + } else { + // The actual processing. + m_status = PROCESSING; + m_repliesSent++; + } + break; + case FINISH: + GPR_ASSERT(m_status == FINISH); + std::cout << "Completed RPC for: " << this << std::endl; + // Once in the FINISH state, deallocate ourselves (CallData). + delete this; + break; + default: + break; + } +} + +GrpcServiceImpl::~GrpcServiceImpl() +{ + m_server->Shutdown(); + // Always shutdown the completion queue after the server. + m_cq->Shutdown(); +} + +void +GrpcServiceImpl::Run(void) +{ + std::string server_address(kDefaultGrpcServiceAddress); + + grpc::ServerBuilder builder; + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + + builder.RegisterService(&m_service); + m_cq = builder.AddCompletionQueue(); + + m_server = builder.BuildAndStart(); + std::cout << "Server listening on " << server_address << std::endl; + + // Proceed to the server's main loop. + HandleRpcs(); +} + +void +GrpcServiceImpl::HandleRpcs(void) +{ + // Spawn a new CallData instance to serve new clients. + CallData *cd = new CallData(&m_service, m_cq.get()); + + // uniquely identifies a request. + void *tag; + bool ok; + + // Block waiting to read the next event from the completion queue. The + // event is uniquely identified by its tag, which in this case is the + // memory address of a CallData instance. + // + // The return value of Next should always be checked. This return value + // tells us whether there is any kind of event or cq_ is shutting down. + while (true) { + std::cout << "Blocked on next waiting for events" << std::endl; + GPR_ASSERT(m_cq->Next(&tag, &ok)); + GPR_ASSERT(ok); + + std::cout << "Calling tag " << tag << " with Proceed()" << std::endl; + static_cast(tag)->Proceed(); + } +} diff --git a/clients/grpc-async.h b/clients/grpc-async.h new file mode 100644 index 0000000..40de52c --- /dev/null +++ b/clients/grpc-async.h @@ -0,0 +1,79 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "shell.h" +#include "agl_shell.grpc.pb.h" + +namespace { + const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005"; +} + +class CallData { +public: + // Take in the "service" instance (in this case representing an + // asynchronous server) and the completion queue "cq" used for + // asynchronous communication with the gRPC runtime. + CallData(Greeter::AsyncService* service, grpc::ServerCompletionQueue* cq) + : m_service(service), m_cq(cq), m_repliesSent(0), + m_responder(&m_ctx), m_status(CREATE) { Proceed(); } + void Proceed(); +private: + // The means of communication with the gRPC runtime for an asynchronous + // server. + Greeter::AsyncService *m_service; + // The producer-consumer queue where for asynchronous server + // notifications. + grpc::ServerCompletionQueue *m_cq; + // Context for the rpc, allowing to tweak aspects of it such as the use + // of compression, authentication, as well as to send metadata back to + // the client. + grpc::ServerContext m_ctx; + + // What we send back to the client. + ::agl_shell_ipc::AppState m_reply; + + uint32_t m_repliesSent; + const uint32_t MAX_REPLIES = 5; + + // The means to get back to the client. + grpc::ServerAsyncWriter<::agl_shell_ipc::AppState> m_responder; + + // Let's implement a tiny state machine with the following states. + enum CallStatus { + CREATE, + PROCESS, + PROCESSING, + FINISH + }; + + // The current serving state. + CallStatus m_status; +}; + + +class GrpcServiceImpl final { +public: + GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {} + ~GrpcServiceImpl(); + void Run(); + + // This can be run in multiple threads if needed. + void HandleRpcs(); + +private: + Shell *m_aglShell; + + std::unique_ptr m_cq; + Greeter::AsyncService m_service; + std::unique_ptr m_server; +}; diff --git a/clients/grpc-sync.cpp b/clients/grpc-sync.cpp new file mode 100644 index 0000000..5aee817 --- /dev/null +++ b/clients/grpc-sync.cpp @@ -0,0 +1,80 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "agl_shell.grpc.pb.h" +#include "grpc-sync.h" + +grpc::ServerUnaryReactor * +GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::ActivateRequest* request, + google::protobuf::Empty* /*response*/) +{ + fprintf(stderr, "activating app %s on output %s\n", + request->app_id().c_str(), + request->output_name().c_str()); + + m_aglShell->ActivateApp(request->app_id(), request->output_name()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::DeactivateRequest* request, + google::protobuf::Empty* /*response*/) +{ + m_aglShell->DeactivateApp(request->app_id()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::FloatRequest* request, + google::protobuf::Empty* /* response */) +{ + m_aglShell->SetAppFloat(request->app_id()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::SplitRequest* request, + google::protobuf::Empty* /*response*/) +{ + m_aglShell->SetAppSplit(request->app_id(), request->tile_orientation()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::AppStatusState(grpc::CallbackServerContext *context, + google::protobuf::Empty*, + ::grpc::ServerWriter<::agl_shell_ipc::AppState>* writer) +{ + (void) writer; + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + + return reactor; +} diff --git a/clients/grpc-sync.h b/clients/grpc-sync.h new file mode 100644 index 0000000..caaee54 --- /dev/null +++ b/clients/grpc-sync.h @@ -0,0 +1,46 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "shell.h" +#include "agl_shell.grpc.pb.h" + +namespace { + const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005"; +} + + +class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService { +public: + GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {} + + grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::ActivateRequest* request, + google::protobuf::Empty* /*response*/); + + grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::DeactivateRequest* request, + google::protobuf::Empty* /*response*/); + + grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::SplitRequest* request, + google::protobuf::Empty* /*response*/); + + grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::FloatRequest* request, + google::protobuf::Empty* /*response*/); + grpc::ServerUnaryReactor *AppStatusState(grpc::CallbackServerContext *context, + google::protobuf::Empty *empty, + ::grpc::ServerWriter<::agl_shell_ipc::AppState>* writer); +private: + Shell *m_aglShell; +}; diff --git a/clients/grpc.cpp b/clients/grpc.cpp deleted file mode 100644 index 503453d..0000000 --- a/clients/grpc.cpp +++ /dev/null @@ -1,492 +0,0 @@ -#include -#include -#include - -#include "grpc.h" - -struct shell_data { - struct wl_display *wl_display; - struct agl_shell *shell; - struct agl_shell_ext *shell_ext; - Shell *aglShell; - - bool wait_for_bound; - bool wait_for_doas; - - bool bound_ok; - bool doas_ok; - - uint32_t version; - struct wl_list output_list; /** window_output::link */ -}; - -struct window_output { - struct shell_data *shell_data; - struct wl_output *output; - char *name; - struct wl_list link; /** display::output_list */ -}; - -static struct shell_data *sh = nullptr; - -grpc::ServerUnaryReactor * -GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::ActivateRequest* request, - google::protobuf::Empty* /*response*/) -{ - fprintf(stderr, "activating app %s on output %s\n", - request->app_id().c_str(), - request->output_name().c_str()); - - sh->aglShell->ActivateApp(request->app_id(), request->output_name()); - - grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); - reactor->Finish(grpc::Status::OK); - return reactor; -} - -grpc::ServerUnaryReactor * -GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::DeactivateRequest* request, - google::protobuf::Empty* /*response*/) -{ - sh->aglShell->DeactivateApp(request->app_id()); - - grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); - reactor->Finish(grpc::Status::OK); - return reactor; -} - -grpc::ServerUnaryReactor * -GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::FloatRequest* request, - google::protobuf::Empty* /* response */) -{ - sh->aglShell->SetAppFloat(request->app_id()); - - grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); - reactor->Finish(grpc::Status::OK); - return reactor; -} - -grpc::ServerUnaryReactor * -GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::SplitRequest* request, - google::protobuf::Empty* /*response*/) -{ - sh->aglShell->SetAppSplit(request->app_id(), request->tile_orientation()); - - grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); - reactor->Finish(grpc::Status::OK); - return reactor; -} - -void -Shell::ActivateApp(const std::string &app_id, const std::string &output_name) -{ - struct window_output *woutput, *w_output; - - woutput = nullptr; - w_output = nullptr; - - wl_list_for_each(woutput, &sh->output_list, link) { - if (woutput->name && !strcmp(woutput->name, output_name.c_str())) { - w_output = woutput; - break; - } - } - - // else, get the first one available - if (!w_output) - w_output = wl_container_of(sh->output_list.prev, w_output, link); - - agl_shell_activate_app(this->m_shell.get(), app_id.c_str(), w_output->output); - wl_display_flush(sh->wl_display); -} - -void -Shell::DeactivateApp(const std::string &app_id) -{ - (void) app_id; -} - -void -Shell::SetAppFloat(const std::string &app_id) -{ - (void) app_id; -} - -void -Shell::SetAppSplit(const std::string &app_id, uint32_t orientation) -{ - (void) app_id; - (void) orientation; -} - -static void -start_grpc_server(void) -{ - // instantiante the grpc server - std::string server_address(kDefaultGrpcServiceAddress); - GrpcServiceImpl service; - - grpc::EnableDefaultHealthCheckService(true); - grpc::reflection::InitProtoReflectionServerBuilderPlugin(); - - grpc::ServerBuilder builder; - builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); - builder.RegisterService(&service); - - std::unique_ptr server(builder.BuildAndStart()); - fprintf(stderr, "Server listening on %s\n", server_address.c_str()); - - server->Wait(); -} - -static void -agl_shell_bound_ok(void *data, struct agl_shell *agl_shell) -{ - (void) agl_shell; - - struct shell_data *sh = static_cast(data); - sh->wait_for_bound = false; - - sh->bound_ok = true; -} - -static void -agl_shell_bound_fail(void *data, struct agl_shell *agl_shell) -{ - (void) agl_shell; - - struct shell_data *sh = static_cast(data); - sh->wait_for_bound = false; - - sh->bound_ok = false; -} - -static void -agl_shell_app_state(void *data, struct agl_shell *agl_shell, - const char *app_id, uint32_t state) -{ - (void) data; - (void) agl_shell; - (void) app_id; - (void) state; -} - -static const struct agl_shell_listener shell_listener = { - agl_shell_bound_ok, - agl_shell_bound_fail, - agl_shell_app_state, -}; - -static void -agl_shell_ext_doas_done(void *data, struct agl_shell_ext *agl_shell_ext, uint32_t status) -{ - (void) agl_shell_ext; - - struct shell_data *sh = static_cast(data); - sh->wait_for_doas = false; - - if (status == AGL_SHELL_EXT_DOAS_SHELL_CLIENT_STATUS_SUCCESS) - sh->doas_ok = true; -} - -static const struct agl_shell_ext_listener shell_ext_listener = { - agl_shell_ext_doas_done, -}; - -static void -display_handle_geometry(void *data, struct wl_output *wl_output, - int x, int y, int physical_width, int physical_height, - int subpixel, const char *make, const char *model, int transform) -{ - (void) data; - (void) wl_output; - (void) x; - (void) y; - (void) physical_width; - (void) physical_height; - (void) subpixel; - (void) make; - (void) model; - (void) transform; -} - -static void -display_handle_mode(void *data, struct wl_output *wl_output, uint32_t flags, - int width, int height, int refresh) -{ - (void) data; - (void) wl_output; - (void) flags; - (void) width; - (void) height; - (void) refresh; -} - -static void -display_handle_done(void *data, struct wl_output *wl_output) -{ - (void) data; - (void) wl_output; -} - -static void -display_handle_scale(void *data, struct wl_output *wl_output, int32_t factor) -{ - (void) data; - (void) wl_output; - (void) factor; -} - - -static void -display_handle_name(void *data, struct wl_output *wl_output, const char *name) -{ - (void) wl_output; - - struct window_output *woutput = static_cast(data); - woutput->name = strdup(name); -} - -static void -display_handle_description(void *data, struct wl_output *wl_output, const char *description) -{ - (void) data; - (void) wl_output; - (void) description; -} - -static const struct wl_output_listener output_listener = { - display_handle_geometry, - display_handle_mode, - display_handle_done, - display_handle_scale, - display_handle_name, - display_handle_description, -}; - -static void -display_add_output(struct shell_data *sh, struct wl_registry *reg, - uint32_t id, uint32_t version) -{ - struct window_output *w_output; - - w_output = new struct window_output; - w_output->shell_data = sh; - - w_output->output = - static_cast(wl_registry_bind(reg, id, - &wl_output_interface, - std::min(version, static_cast(4)))); - - wl_list_insert(&sh->output_list, &w_output->link); - wl_output_add_listener(w_output->output, &output_listener, w_output); -} - -static void -destroy_output(struct window_output *w_output) -{ - free(w_output->name); - wl_list_remove(&w_output->link); - free(w_output); -} - - -static void -global_add(void *data, struct wl_registry *reg, uint32_t id, - const char *interface, uint32_t version) -{ - - struct shell_data *sh = static_cast(data); - - if (!sh) - return; - - if (strcmp(interface, agl_shell_interface.name) == 0) { - sh->shell = - static_cast(wl_registry_bind(reg, id, - &agl_shell_interface, std::min(static_cast(3), - version))); - agl_shell_add_listener(sh->shell, &shell_listener, data); - sh->version = version; - } else if (strcmp(interface, "wl_output") == 0) { - display_add_output(sh, reg, id, version); - } -} - -static void -global_remove(void *data, struct wl_registry *reg, uint32_t id) -{ - /* Don't care */ - (void) data; - (void) reg; - (void) id; -} - -static void -global_add_ext(void *data, struct wl_registry *reg, uint32_t id, - const char *interface, uint32_t version) -{ - struct shell_data *sh = static_cast(data); - - if (!sh) - return; - - if (strcmp(interface, agl_shell_ext_interface.name) == 0) { - sh->shell_ext = - static_cast(wl_registry_bind(reg, id, - &agl_shell_ext_interface, std::min(static_cast(1), - version))); - agl_shell_ext_add_listener(sh->shell_ext, &shell_ext_listener, data); - } -} - -static void -global_remove_ext(void *data, struct wl_registry *reg, uint32_t id) -{ - /* Don't care */ - (void) data; - (void) reg; - (void) id; -} - -static const struct wl_registry_listener registry_ext_listener = { - global_add_ext, - global_remove_ext, -}; - -static const struct wl_registry_listener registry_listener = { - global_add, - global_remove, -}; - -static void -register_shell_ext(struct wl_display *wl_display) -{ - struct wl_registry *registry; - - registry = wl_display_get_registry(wl_display); - - wl_registry_add_listener(registry, ®istry_ext_listener, sh); - - wl_display_roundtrip(wl_display); - wl_registry_destroy(registry); -} - -static void -register_shell(struct wl_display *wl_display) -{ - struct wl_registry *registry; - - wl_list_init(&sh->output_list); - - registry = wl_display_get_registry(wl_display); - - wl_registry_add_listener(registry, ®istry_listener, sh); - - wl_display_roundtrip(wl_display); - wl_registry_destroy(registry); -} - -static int -start_agl_shell_client(void) -{ - int ret = 0; - struct wl_display *wl_display; - - wl_display = wl_display_connect(NULL); - - sh = new struct shell_data; - sh->wl_display = wl_display; - sh->wait_for_doas = true; - sh->wait_for_bound = true; - - register_shell_ext(wl_display); - - // check for agl_shell_ext - if (!sh->shell_ext) { - fprintf(stderr, "Failed to bind to agl_shell_ext interface\n"); - return -1; - } - - if (wl_list_empty(&sh->output_list)) { - fprintf(stderr, "Failed get any outputs!\n"); - return -1; - } - - agl_shell_ext_doas_shell_client(sh->shell_ext); - while (ret != -1 && sh->wait_for_doas) { - ret = wl_display_dispatch(sh->wl_display); - if (sh->wait_for_doas) - continue; - } - - if (!sh->doas_ok) { - fprintf(stderr, "Failed to get doas_done event\n"); - return -1; - } - - // bind to agl-shell - register_shell(wl_display); - while (ret != -1 && sh->wait_for_bound) { - ret = wl_display_dispatch(sh->wl_display); - if (sh->wait_for_bound) - continue; - } - - // at this point, we can't do anything about it - if (!sh->bound_ok) { - fprintf(stderr, "Failed to get bound_ok event!\n"); - return -1; - } - - fprintf(stderr, "agl_shell/agl_shell_ext interface OK\n"); - std::shared_ptr agl_shell{sh->shell, agl_shell_destroy}; - sh->aglShell = new Shell(agl_shell); - - return 0; -} - -static void -destroy_shell_data(void) -{ - struct window_output *w_output, *w_output_next; - - wl_list_for_each_safe(w_output, w_output_next, &sh->output_list, link) - destroy_output(w_output); - - wl_display_flush(sh->wl_display); - wl_display_disconnect(sh->wl_display); - - delete sh; -} - -int main(int argc, char **argv) -{ - (void) argc; - (void) argv; - int ret = 0; - - // do not start right up, give shell client time to boot up - struct timespec ts = {}; - - clock_gettime(CLOCK_MONOTONIC, &ts); - ts.tv_sec = 2; - ts.tv_nsec = 0; - - nanosleep(&ts, NULL); - - ret = start_agl_shell_client(); - if (ret) { - fprintf(stderr, "Failed to initialize agl-shell/agl-shell-ext\n"); - exit(EXIT_FAILURE); - } - - start_grpc_server(); - - destroy_shell_data(); - return 0; -} diff --git a/clients/grpc.h b/clients/grpc.h deleted file mode 100644 index 3a3c864..0000000 --- a/clients/grpc.h +++ /dev/null @@ -1,47 +0,0 @@ -#include -#include -#include -#include -#include - -#include -#include - -#include "agl_shell.grpc.pb.h" -#include "agl-shell-client-protocol.h" - -namespace { - const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005"; -} - - -class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService { - - grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::ActivateRequest* request, - google::protobuf::Empty* /*response*/); - - grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::DeactivateRequest* request, - google::protobuf::Empty* /*response*/); - - grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::SplitRequest* request, - google::protobuf::Empty* /*response*/); - - grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::FloatRequest* request, - google::protobuf::Empty* /*response*/); -}; - - -class Shell { -public: - std::shared_ptr m_shell; - Shell(std::shared_ptr shell) : m_shell(shell) { } - void ActivateApp(const std::string &app_id, const std::string &output_name); - void DeactivateApp(const std::string &app_id); - void SetAppSplit(const std::string &app_id, uint32_t orientation); - void SetAppFloat(const std::string &app_id); - -}; diff --git a/clients/log.h b/clients/log.h new file mode 100644 index 0000000..127d8e0 --- /dev/null +++ b/clients/log.h @@ -0,0 +1,7 @@ +#pragma once + +#include + +#ifndef LOG +#define LOG(fmt, ...) do { fprintf(stderr, "%s() " fmt, __func__, ##__VA_ARGS__); } while (0) +#endif diff --git a/clients/main-grpc.cpp b/clients/main-grpc.cpp new file mode 100644 index 0000000..5f35c85 --- /dev/null +++ b/clients/main-grpc.cpp @@ -0,0 +1,408 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "shell.h" +#include "log.h" +#include "main-grpc.h" +#include "grpc-async-cb.h" + +static int running = 1; + +static void +agl_shell_bound_ok(void *data, struct agl_shell *agl_shell) +{ + (void) agl_shell; + + struct shell_data *sh = static_cast(data); + sh->wait_for_bound = false; + + sh->bound_ok = true; +} + +static void +agl_shell_bound_fail(void *data, struct agl_shell *agl_shell) +{ + (void) agl_shell; + + struct shell_data *sh = static_cast(data); + sh->wait_for_bound = false; + + sh->bound_ok = false; +} + +static void +agl_shell_app_state(void *data, struct agl_shell *agl_shell, + const char *app_id, uint32_t state) +{ + (void) agl_shell; + struct shell_data *sh = static_cast(data); + LOG("got app_state event app_id %s, state %d\n", app_id, state); + + if (sh->server_context_list.empty()) + return; + + ::agl_shell_ipc::AppState app; + + sh->current_app_state.set_app_id(std::string(app_id)); + sh->current_app_state.set_state(state); + + auto start = sh->server_context_list.begin(); + while (start != sh->server_context_list.end()) { + LOG("writing to lister %p\n", static_cast(start->second)); + start->second->NextWrite(); + start++; + } +} + +static const struct agl_shell_listener shell_listener = { + agl_shell_bound_ok, + agl_shell_bound_fail, + agl_shell_app_state, +}; + +static void +agl_shell_ext_doas_done(void *data, struct agl_shell_ext *agl_shell_ext, uint32_t status) +{ + (void) agl_shell_ext; + + struct shell_data *sh = static_cast(data); + sh->wait_for_doas = false; + + if (status == AGL_SHELL_EXT_DOAS_SHELL_CLIENT_STATUS_SUCCESS) + sh->doas_ok = true; +} + +static const struct agl_shell_ext_listener shell_ext_listener = { + agl_shell_ext_doas_done, +}; + +static void +display_handle_geometry(void *data, struct wl_output *wl_output, + int x, int y, int physical_width, int physical_height, + int subpixel, const char *make, const char *model, int transform) +{ + (void) data; + (void) wl_output; + (void) x; + (void) y; + (void) physical_width; + (void) physical_height; + (void) subpixel; + (void) make; + (void) model; + (void) transform; +} + +static void +display_handle_mode(void *data, struct wl_output *wl_output, uint32_t flags, + int width, int height, int refresh) +{ + (void) data; + (void) wl_output; + (void) flags; + (void) width; + (void) height; + (void) refresh; +} + +static void +display_handle_done(void *data, struct wl_output *wl_output) +{ + (void) data; + (void) wl_output; +} + +static void +display_handle_scale(void *data, struct wl_output *wl_output, int32_t factor) +{ + (void) data; + (void) wl_output; + (void) factor; +} + + +static void +display_handle_name(void *data, struct wl_output *wl_output, const char *name) +{ + (void) wl_output; + + struct window_output *woutput = static_cast(data); + woutput->name = strdup(name); +} + +static void +display_handle_description(void *data, struct wl_output *wl_output, const char *description) +{ + (void) data; + (void) wl_output; + (void) description; +} + +static const struct wl_output_listener output_listener = { + display_handle_geometry, + display_handle_mode, + display_handle_done, + display_handle_scale, + display_handle_name, + display_handle_description, +}; + +static void +display_add_output(struct shell_data *sh, struct wl_registry *reg, + uint32_t id, uint32_t version) +{ + struct window_output *w_output; + + w_output = new struct window_output; + w_output->shell_data = sh; + + w_output->output = + static_cast(wl_registry_bind(reg, id, + &wl_output_interface, + std::min(version, static_cast(4)))); + + wl_list_insert(&sh->output_list, &w_output->link); + wl_output_add_listener(w_output->output, &output_listener, w_output); +} + +static void +destroy_output(struct window_output *w_output) +{ + free(w_output->name); + wl_list_remove(&w_output->link); + free(w_output); +} + +static void +global_add(void *data, struct wl_registry *reg, uint32_t id, + const char *interface, uint32_t version) +{ + + struct shell_data *sh = static_cast(data); + + if (!sh) + return; + + if (strcmp(interface, agl_shell_interface.name) == 0) { + sh->shell = + static_cast(wl_registry_bind(reg, id, + &agl_shell_interface, std::min(static_cast(3), + version))); + agl_shell_add_listener(sh->shell, &shell_listener, data); + sh->version = version; + } else if (strcmp(interface, "wl_output") == 0) { + display_add_output(sh, reg, id, version); + } +} + +static void +global_remove(void *data, struct wl_registry *reg, uint32_t id) +{ + /* Don't care */ + (void) data; + (void) reg; + (void) id; +} + +static void +global_add_ext(void *data, struct wl_registry *reg, uint32_t id, + const char *interface, uint32_t version) +{ + struct shell_data *sh = static_cast(data); + + if (!sh) + return; + + if (strcmp(interface, agl_shell_ext_interface.name) == 0) { + sh->shell_ext = + static_cast(wl_registry_bind(reg, id, + &agl_shell_ext_interface, std::min(static_cast(1), + version))); + agl_shell_ext_add_listener(sh->shell_ext, + &shell_ext_listener, data); + } +} + +static void +global_remove_ext(void *data, struct wl_registry *reg, uint32_t id) +{ + /* Don't care */ + (void) data; + (void) reg; + (void) id; +} + +static const struct wl_registry_listener registry_ext_listener = { + global_add_ext, + global_remove_ext, +}; + +static const struct wl_registry_listener registry_listener = { + global_add, + global_remove, +}; + +static void +register_shell_ext(struct wl_display *wl_display, struct shell_data *sh) +{ + struct wl_registry *registry; + + registry = wl_display_get_registry(wl_display); + + wl_registry_add_listener(registry, ®istry_ext_listener, sh); + + wl_display_roundtrip(wl_display); + wl_registry_destroy(registry); +} + +static void +register_shell(struct wl_display *wl_display, struct shell_data *sh) +{ + struct wl_registry *registry; + + wl_list_init(&sh->output_list); + + registry = wl_display_get_registry(wl_display); + + wl_registry_add_listener(registry, ®istry_listener, sh); + + wl_display_roundtrip(wl_display); + wl_registry_destroy(registry); +} + +static void +destroy_shell_data(struct shell_data *sh) +{ + struct window_output *w_output, *w_output_next; + + wl_list_for_each_safe(w_output, w_output_next, &sh->output_list, link) + destroy_output(w_output); + + wl_display_flush(sh->wl_display); + wl_display_disconnect(sh->wl_display); + + delete sh; +} + +static struct shell_data * +start_agl_shell_client(void) +{ + int ret = 0; + struct wl_display *wl_display; + + wl_display = wl_display_connect(NULL); + + struct shell_data *sh = new struct shell_data; + + sh->wl_display = wl_display; + sh->wait_for_doas = true; + sh->wait_for_bound = true; + + register_shell_ext(wl_display, sh); + + // check for agl_shell_ext + if (!sh->shell_ext) { + fprintf(stderr, "Failed to bind to agl_shell_ext interface\n"); + delete sh; + return nullptr; + } + + if (wl_list_empty(&sh->output_list)) { + fprintf(stderr, "Failed get any outputs!\n"); + delete sh; + return nullptr; + } + + agl_shell_ext_doas_shell_client(sh->shell_ext); + while (ret != -1 && sh->wait_for_doas) { + ret = wl_display_dispatch(sh->wl_display); + if (sh->wait_for_doas) + continue; + } + + if (!sh->doas_ok) { + fprintf(stderr, "Failed to get doas_done event\n"); + delete sh; + return nullptr; + } + + // bind to agl-shell + register_shell(wl_display, sh); + while (ret != -1 && sh->wait_for_bound) { + ret = wl_display_dispatch(sh->wl_display); + if (sh->wait_for_bound) + continue; + } + + // at this point, we can't do anything about it + if (!sh->bound_ok) { + fprintf(stderr, "Failed to get bound_ok event!\n"); + delete sh; + return nullptr; + } + + fprintf(stderr, "agl_shell/agl_shell_ext interface OK\n"); + + return sh; +} + +static void +start_grpc_server(Shell *aglShell) +{ + // instantiante the grpc server + std::string server_address(kDefaultGrpcServiceAddress); + GrpcServiceImpl service{aglShell}; + + grpc::EnableDefaultHealthCheckService(true); + grpc::reflection::InitProtoReflectionServerBuilderPlugin(); + + grpc::ServerBuilder builder; + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); + + std::unique_ptr server(builder.BuildAndStart()); + LOG("Server listening on %s\n", server_address.c_str()); + + server->Wait(); +} + +int main(int argc, char **argv) +{ + (void) argc; + (void) argv; + Shell *aglShell; + int ret = 0; + + // do not start right up, give shell client time to boot up + struct timespec ts = {}; + + clock_gettime(CLOCK_MONOTONIC, &ts); + ts.tv_sec = 2; + ts.tv_nsec = 0; + + nanosleep(&ts, NULL); + + struct shell_data *sh = start_agl_shell_client(); + if (!sh) { + LOG("Failed to initialize agl-shell/agl-shell-ext\n"); + exit(EXIT_FAILURE); + } + + std::shared_ptr agl_shell{sh->shell, agl_shell_destroy}; + aglShell = new Shell(agl_shell, sh); + + std::thread thread(start_grpc_server, aglShell); + + // serve wayland requests + while (running && ret != -1) { + ret = wl_display_dispatch(sh->wl_display); + } + + destroy_shell_data(sh); + return 0; +} diff --git a/clients/main-grpc.h b/clients/main-grpc.h new file mode 100644 index 0000000..9b687e9 --- /dev/null +++ b/clients/main-grpc.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "agl_shell.grpc.pb.h" + +// forward declaration created in grpc-async-cb +class Lister; + +struct shell_data { + struct wl_display *wl_display; + struct agl_shell *shell; + struct agl_shell_ext *shell_ext; + + bool wait_for_bound; + bool wait_for_doas; + + bool bound_ok; + bool doas_ok; + + uint32_t version; + struct wl_list output_list; /** window_output::link */ + + ::agl_shell_ipc::AppState current_app_state; + std::list > server_context_list; +}; + +struct window_output { + struct shell_data *shell_data; + struct wl_output *output; + char *name; + struct wl_list link; /** display::output_list */ +}; diff --git a/clients/meson.build b/clients/meson.build index ff27a60..648c95a 100644 --- a/clients/meson.build +++ b/clients/meson.build @@ -46,7 +46,9 @@ clients = [ { 'basename': 'agl-shell-grpc-server', 'sources': [ - 'grpc.cpp', + 'main-grpc.cpp', + 'grpc-async-cb.cpp', + 'shell.cpp', generated_protoc_sources, generated_grpc_sources, agl_shell_client_protocol_h, diff --git a/clients/shell.cpp b/clients/shell.cpp new file mode 100644 index 0000000..343b36e --- /dev/null +++ b/clients/shell.cpp @@ -0,0 +1,53 @@ +#include +#include +#include +#include +#include +#include + +#include "main-grpc.h" +#include "shell.h" + +void +Shell::ActivateApp(const std::string &app_id, const std::string &output_name) +{ + struct window_output *woutput, *w_output; + struct agl_shell *shell = this->m_shell.get(); + + woutput = nullptr; + w_output = nullptr; + + wl_list_for_each(woutput, &m_shell_data->output_list, link) { + if (woutput->name && !strcmp(woutput->name, output_name.c_str())) { + w_output = woutput; + break; + } + } + + // else, get the first one available + if (!w_output) + w_output = wl_container_of(m_shell_data->output_list.prev, + w_output, link); + + agl_shell_activate_app(shell, app_id.c_str(), w_output->output); + wl_display_flush(m_shell_data->wl_display); +} + +void +Shell::DeactivateApp(const std::string &app_id) +{ + (void) app_id; +} + +void +Shell::SetAppFloat(const std::string &app_id) +{ + (void) app_id; +} + +void +Shell::SetAppSplit(const std::string &app_id, uint32_t orientation) +{ + (void) app_id; + (void) orientation; +} diff --git a/clients/shell.h b/clients/shell.h new file mode 100644 index 0000000..1cdbd1d --- /dev/null +++ b/clients/shell.h @@ -0,0 +1,21 @@ +#pragma once + +#include + +#include "agl-shell-client-protocol.h" + +#include "main-grpc.h" + +class Shell { +public: + std::shared_ptr m_shell; + struct shell_data *m_shell_data; + + Shell(std::shared_ptr shell, + struct shell_data *sh_data) : + m_shell(shell), m_shell_data(sh_data) { } + void ActivateApp(const std::string &app_id, const std::string &output_name); + void DeactivateApp(const std::string &app_id); + void SetAppSplit(const std::string &app_id, uint32_t orientation); + void SetAppFloat(const std::string &app_id); +}; diff --git a/protocol/agl_shell.proto b/protocol/agl_shell.proto index 721fac2..414162b 100644 --- a/protocol/agl_shell.proto +++ b/protocol/agl_shell.proto @@ -3,10 +3,11 @@ import "google/protobuf/empty.proto"; package agl_shell_ipc; service AglShellManagerService { - rpc ActivateApp(ActivateRequest) returns (google.protobuf.Empty) {} - rpc DeactivateApp(DeactivateRequest) returns (google.protobuf.Empty) {} - rpc SetAppSplit(SplitRequest) returns (google.protobuf.Empty) {} - rpc SetAppFloat(FloatRequest) returns (google.protobuf.Empty) {} + rpc ActivateApp(ActivateRequest) returns (google.protobuf.Empty) {} + rpc DeactivateApp(DeactivateRequest) returns (google.protobuf.Empty) {} + rpc SetAppSplit(SplitRequest) returns (google.protobuf.Empty) {} + rpc SetAppFloat(FloatRequest) returns (google.protobuf.Empty) {} + rpc AppStatusState(google.protobuf.Empty) returns (stream AppState) {} } message ActivateRequest { @@ -27,3 +28,7 @@ message FloatRequest { string app_id = 1; } +message AppState { + int32 state = 1; + string app_id = 2; +} diff --git a/src/ivi-compositor.h b/src/ivi-compositor.h index 889c3f3..891c093 100644 --- a/src/ivi-compositor.h +++ b/src/ivi-compositor.h @@ -86,8 +86,6 @@ struct ivi_compositor { struct { struct wl_client *client; struct wl_resource *resource; - - struct wl_client *client_ext; struct wl_resource *resource_ext; bool ready; enum agl_shell_bound_status status; diff --git a/src/shell.c b/src/shell.c index f8d2e32..d583fe5 100644 --- a/src/shell.c +++ b/src/shell.c @@ -1145,12 +1145,16 @@ shell_send_app_state(struct ivi_compositor *ivi, const char *app_id, if (app_id && wl_resource_get_version(ivi->shell_client.resource) >= AGL_SHELL_APP_STATE_SINCE_VERSION) { + weston_log("%s() should sent app_state\n", __func__); agl_shell_send_app_state(ivi->shell_client.resource, app_id, state); - if (ivi->shell_client_ext.resource) - agl_shell_send_app_state(ivi->shell_client_ext.resource, + if (ivi->shell_client.resource_ext) { + weston_log("%s() 2. should sent app_state %p\n", + __func__, ivi->shell_client.resource_ext); + agl_shell_send_app_state(ivi->shell_client.resource_ext, app_id, state); + } } } @@ -1634,13 +1638,13 @@ bind_agl_shell(struct wl_client *client, wl_resource_set_implementation(resource, &agl_shell_implementation, ivi, NULL); - ivi->shell_client_ext.resource = resource; + ivi->shell_client.resource_ext = resource; if (ivi->shell_client.status == BOUND_OK && wl_resource_get_version(resource) >= AGL_SHELL_BOUND_OK_SINCE_VERSION) { - weston_log("Sent agl_shell_send_bound_ok to client ext\n"); ivi->shell_client_ext.status = BOUND_OK; - agl_shell_send_bound_ok(ivi->shell_client_ext.resource); + agl_shell_send_bound_ok(ivi->shell_client.resource_ext); + weston_log("Sent agl_shell_send_bound_ok to client ext %p\n", ivi->shell_client.resource_ext); } return; -- cgit 1.2.3-korg