summaryrefslogtreecommitdiffstats
path: root/src/plugins/capabilities/core/src/SubscriberForwarder.cpp
blob: 970dbf02ccb8a3dbb1e27b7ac3900317e9d49537 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/*
 * Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://aws.amazon.com/apache2.0/
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */
#include "capabilities/core/include/SubscriberForwarder.h"

static string TAG = "vshlcapabilities::capabilities::SubscriberForwarder";

using Level = vshlcapabilities::common::interfaces::ILogger::Level;

namespace vshlcapabilities {
namespace capabilities {
namespace core {

// Create a SubscriberForwarder.
std::shared_ptr<SubscriberForwarder> SubscriberForwarder::create(
    shared_ptr<vshlcapabilities::common::interfaces::ILogger> logger,
    shared_ptr<vshlcapabilities::common::interfaces::IAFBApi> afbApi,
    shared_ptr<vshlcapabilities::common::interfaces::ICapability> capability) {
    if (logger == nullptr) {
        return nullptr;
    }

    if (afbApi == nullptr) {
        logger->log(Level::ERROR, TAG, "Failed to create SubscriberForwarder: AFB API null");
        return nullptr;
    }

    if (capability == nullptr) {
        logger->log(Level::ERROR, TAG, "Failed to create SubscriberForwarder: Capability null");
        return nullptr;
    }

    auto subscriberForwarder =
        std::shared_ptr<SubscriberForwarder>(new SubscriberForwarder(logger, afbApi, capability));
    return subscriberForwarder;
}

SubscriberForwarder::SubscriberForwarder(
    shared_ptr<vshlcapabilities::common::interfaces::ILogger> logger,
    shared_ptr<vshlcapabilities::common::interfaces::IAFBApi> afbApi,
    shared_ptr<vshlcapabilities::common::interfaces::ICapability> capability) :
        mAfbApi(afbApi),
        mLogger(logger),
        mCapability(capability) {
    createEvents();
}

SubscriberForwarder::~SubscriberForwarder() {
    mUpstreamEventsMap.clear();
    mDownstreamEventsMap.clear();
}

void SubscriberForwarder::createEvents() {
    if (!mCapability) {
        mLogger->log(Level::NOTICE, TAG, "Create Events failed. No capability assigned.");
        return;
    }

    // Upstream events
    auto upstreamEvents = mCapability->getUpstreamMessages();
    for (auto upstreamEventName : upstreamEvents) {
        auto it = mUpstreamEventsMap.find(upstreamEventName);
        if (it == mUpstreamEventsMap.end() && mAfbApi) {
            // create a new event and add it to the map.
            shared_ptr<common::interfaces::IAFBApi::IAFBEvent> event = mAfbApi->createEvent(upstreamEventName);
            if (event == nullptr) {
                mLogger->log(Level::ERROR, TAG, "Failed to create upstream event: " + upstreamEventName);
            } else {
                mUpstreamEventsMap.insert(make_pair(upstreamEventName, event));
            }
        }
    }

    // Downstream events
    auto downstreamEvents = mCapability->getDownstreamMessages();
    for (auto downstreamEventName : downstreamEvents) {
        auto it = mDownstreamEventsMap.find(downstreamEventName);
        if (it == mDownstreamEventsMap.end() && mAfbApi) {
            // create a new event and add it to the map.
            shared_ptr<common::interfaces::IAFBApi::IAFBEvent> event = mAfbApi->createEvent(downstreamEventName);
            if (event == nullptr) {
                mLogger->log(Level::ERROR, TAG, "Failed to create downstream event: " + downstreamEventName);
            } else {
                mDownstreamEventsMap.insert(make_pair(downstreamEventName, event));
            }
        }
    }
}

bool SubscriberForwarder::forwardMessage(const string action, json_object* payload) {
    auto upstreamEventIt = mUpstreamEventsMap.find(action);
    if (upstreamEventIt != mUpstreamEventsMap.end()) {
        mLogger->log(Level::NOTICE, TAG, "Publishing upstream event: " + action);
        upstreamEventIt->second->publishEvent(payload);
        return true;
    }

    auto downstreamEventIt = mDownstreamEventsMap.find(action);
    if (downstreamEventIt != mDownstreamEventsMap.end()) {
        mLogger->log(Level::NOTICE, TAG, "Publishing downstream event: " + action);
        downstreamEventIt->second->publishEvent(payload);
        return true;
    }

    mLogger->log(Level::NOTICE, TAG, "Failed to publish upstream event: " + action);
    return false;
}

bool SubscriberForwarder::subscribe(vshlcapabilities::common::interfaces::IAFBRequest& request, const string action) {
    auto upstreamEventIt = mUpstreamEventsMap.find(action);
    if (upstreamEventIt != mUpstreamEventsMap.end()) {
        mLogger->log(Level::NOTICE, TAG, "Subscribing to upstream event: " + action);
        return upstreamEventIt->second->subscribe(request);
    }

    auto downstreamEventIt = mDownstreamEventsMap.find(action);
    if (downstreamEventIt != mDownstreamEventsMap.end()) {
        mLogger->log(Level::NOTICE, TAG, "Subscribing to downstream event: " + action);
        return downstreamEventIt->second->subscribe(request);
    }

    mLogger->log(Level::NOTICE, TAG, "Failed to subscribe to upstream event: " + action);
    return false;
}

}  // namespace core
}  // namespace capabilities
}  // namespace vshl