summaryrefslogtreecommitdiffstats
path: root/clients/grpc-async.cpp
blob: 72090fe9f5dc7b85b636879359f107e5b92f5697 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#include <cstdio>
#include <ctime>
#include <algorithm>
#include <queue>

#include <grpc/grpc.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>

#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/health_check_service_interface.h>

#include "agl_shell.grpc.pb.h"
#include "grpc-async.h"

void
CallData::Proceed(void)
{
	switch (m_status) {
	case CREATE:
		// Make this instance progress to the PROCESS state.
		m_status = PROCESS;
		std::cout << "Creating Call data for new client connections: "
			<< this << std::endl;

		// As part of the initial CREATE state, we *request* that the
		// system start processing AppStatusState requests.
		//
		// In this request, "this" acts are the tag uniquely
		// identifying the request (so that different CallData
		// instances can serve different requests concurrently), in
		// this case the memory address of this CallData instance.
		m_service->RequestAppStatusState(&m_ctx, &m_request, &m_responder,
						 m_cq, m_cq, (void *) this);
		break;
	case PROCESS:
		// Spawn a new CallData instance to serve new clients while we
		// process the one for this CallData. The instance will
		// deallocate itself as part of its FINISH state.
		CallData *cd = new CallData(m_service, m_cq);

		// The actual processing.
		m_status = PROCESSING;
		m_repliesSent++;
		break;
	case PROCESSING:
		if (m_repliesSent == MAX_REPLIES) {
			// And we are done! Let the gRPC runtime know we've
			// finished, using the memory address of this instance
			// as the uniquely identifying tag for the event.
			m_status = FINISH;
			m_responder.Finish(Status::OK, this);
		} else {
			// The actual processing.
			m_status = PROCESSING;
			m_repliesSent++;
		}
		break;
	case FINISH:
		GPR_ASSERT(m_status == FINISH);
		std::cout << "Completed RPC for: " << this << std::endl;
		// Once in the FINISH state, deallocate ourselves (CallData).
		delete this;
		break;
	default:
		break;
	}
}

GrpcServiceImpl::~GrpcServiceImpl()
{
	m_server->Shutdown();
	// Always shutdown the completion queue after the server.
	m_cq->Shutdown();
}

void
GrpcServiceImpl::Run(void)
{
	std::string server_address(kDefaultGrpcServiceAddress);

	grpc::ServerBuilder builder;
	builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());

	builder.RegisterService(&m_service);
	m_cq = builder.AddCompletionQueue();

	m_server = builder.BuildAndStart();
	std::cout << "Server listening on " << server_address << std::endl;

	// Proceed to the server's main loop.
	HandleRpcs();
}

void
GrpcServiceImpl::HandleRpcs(void)
{
	// Spawn a new CallData instance to serve new clients.
	CallData *cd = new CallData(&m_service, m_cq.get());

	// uniquely identifies a request.
	void *tag;
	bool ok;

	// Block waiting to read the next event from the completion queue. The
	// event is uniquely identified by its tag, which in this case is the
	// memory address of a CallData instance.
	//
	// The return value of Next should always be checked. This return value
	// tells us whether there is any kind of event or cq_ is shutting down.
	while (true) {
		std::cout << "Blocked on next waiting for events" << std::endl;
		GPR_ASSERT(m_cq->Next(&tag, &ok));
		GPR_ASSERT(ok);

		std::cout << "Calling tag " << tag << " with Proceed()" << std::endl;
		static_cast<CallData*>(tag)->Proceed();
	}
}