/***
  This file is part of PulseAudio.

  Copyright 2018 Collabora Ltd.
    Author: George Kiagiadakis <george.kiagiadakis@collabora.com>

  PulseAudio is free software; you can redistribute it and/or modify
  it under the terms of the GNU Lesser General Public License as published
  by the Free Software Foundation; either version 2.1 of the License,
  or (at your option) any later version.

  PulseAudio is distributed in the hope that it will be useful, but
  WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  General Public License for more details.

  You should have received a copy of the GNU Lesser General Public License
  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
***/

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

#include "m4a_afb_comm.h"

#include <errno.h>
#include <unistd.h>

#include <pulsecore/thread.h>
#include <pulsecore/llist.h>
#include <pulsecore/mutex.h>
#include <pulse/xmalloc.h>

#include <systemd/sd-event.h>
#include <afb/afb-wsj1.h>
#include <afb/afb-ws-client.h>

typedef struct _m4a_ws_comm_pipe_data m4a_ws_comm_pipe_data;
typedef struct _m4a_afb_api_call_data m4a_afb_api_call_data;

enum m4a_ws_comm_pipe_opcode {
    OPCODE_CALL_ASYNC = 1,
    OPCODE_EXIT
};

struct _m4a_ws_comm_pipe_data {
    enum m4a_ws_comm_pipe_opcode opcode;
    void *data;
};

struct _m4a_afb_comm {
    struct afb_wsj1 *wsj1;

    pa_thread *thread;
    sd_event *loop;
    int pipe[2];
    sd_event_source *source;

    pa_mutex *pending_calls_lock;
    PA_LLIST_HEAD(m4a_afb_api_call_data, pending_calls);
};

struct _m4a_afb_api_call_data {
    m4a_afb_comm *comm;

    const char *api;
    const char *verb;
    char *object;

    m4a_afb_done_cb_t done_cb;
    void *userdata;

    PA_LLIST_FIELDS(m4a_afb_api_call_data);
};

static void afb_api_call_data_free(m4a_afb_api_call_data *c) {
    if (c->object)
        pa_xfree(c->object);

    pa_mutex_lock(c->comm->pending_calls_lock);
    PA_LLIST_REMOVE(m4a_afb_api_call_data, c->comm->pending_calls, c);
    pa_mutex_unlock(c->comm->pending_calls_lock);

    pa_xfree(c);
}

static void afb_call_async_step3(void *cb_data, struct afb_wsj1_msg *msg) {
    m4a_afb_api_call_data *c = cb_data;
    pa_json_object *j;
    const pa_json_object *jr = NULL;

    j = pa_json_parse(afb_wsj1_msg_object_s(msg));
    if (pa_json_object_get_type(j) != PA_JSON_TYPE_OBJECT) {
        pa_log_warn("4a reply is not a json object");
    } else if (afb_wsj1_msg_is_reply_ok(msg)) {
        jr = pa_json_object_get_object_member(j, "response");
    } else {
        jr = pa_json_object_get_object_member(j, "request");
    }

    if (afb_wsj1_msg_is_reply_ok(msg)) {
        pa_log_debug("Got OK reply from 4a in call to %s/%s", c->api, c->verb);
        c->done_cb(M4A_AFB_REPLY_OK, jr, c->userdata);
    } else if (jr && pa_json_object_get_type(jr) == PA_JSON_TYPE_OBJECT) {
        const pa_json_object *status, *info;
        const char *status_str, *info_str;

        status = pa_json_object_get_object_member(jr, "status");
        info = pa_json_object_get_object_member(jr, "info");

        if (status && pa_json_object_get_type(status) == PA_JSON_TYPE_STRING)
            status_str = pa_json_object_get_string(status);
        else
            status_str = "(null)";

        if (info && pa_json_object_get_type(info) == PA_JSON_TYPE_STRING)
            info_str = pa_json_object_get_string(info);
        else
            info_str = "(null)";

        pa_log("Got error reply from 4a, status: '%s', info: '%s'", status_str, info_str);
        c->done_cb(M4A_AFB_REPLY_ERROR, jr, c->userdata);
    } else {
        pa_log("Got error reply from 4a");
        c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
    }

    pa_json_object_free(j);
    afb_api_call_data_free(c);
}

static void afb_call_async_step2(m4a_afb_api_call_data *c) {
    pa_log_debug("calling afb: %s/%s", c->api, c->verb);

    if (afb_wsj1_call_s(c->comm->wsj1, c->api, c->verb, c->object, afb_call_async_step3, c) < 0) {
        pa_log("afb_wsj1_call_s: failed to call %s/%s: %s", c->api, c->verb, strerror(errno));
        c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
        afb_api_call_data_free(c);
    }
}

static void discard_pending_calls(m4a_afb_comm *comm) {
    m4a_afb_api_call_data *c;

    pa_mutex_lock(comm->pending_calls_lock);
    while ((c = comm->pending_calls)) {
        c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
        afb_api_call_data_free(c);
    }
    pa_mutex_unlock(comm->pending_calls_lock);
}

static void on_wsj1_hangup(void *closure, struct afb_wsj1 *wsj1) {
    m4a_afb_comm *comm = closure;

    pa_log_warn("afb closed the communication websocket!");
    discard_pending_calls(comm);

    //TODO: attempt to re-connect
}

static void on_wsj1_call(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg) {
    /* we don't implement any method calls */
    afb_wsj1_reply_error_s(msg, "\"unimplemented\"", NULL);
}

static void on_wsj1_event(void *closure, const char *event, struct afb_wsj1_msg *msg) {
    //TODO dispatch events
}

static struct afb_wsj1_itf wsj1_itf = {
    .on_hangup = on_wsj1_hangup,
    .on_call = on_wsj1_call,
    .on_event = on_wsj1_event
};

static int ws_comm_thread_event(sd_event_source *s, int fd, uint32_t revents, m4a_afb_comm *comm) {
    ssize_t r;
    m4a_ws_comm_pipe_data d = { 0 };

    if ((r = read(fd, &d, sizeof(m4a_ws_comm_pipe_data))) < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) {
        if (errno == EINTR)
            return 0;

        pa_log("read failed: %s", strerror(errno));
        return -1;
    }

    switch(d.opcode) {
    case OPCODE_CALL_ASYNC: {
        m4a_afb_api_call_data *c = d.data;
        afb_call_async_step2(c);
        break;
    }
    case OPCODE_EXIT:
        sd_event_exit(comm->loop, 0);
        break;
    default:
        break;
    };

    return 0;
}

static void ws_comm_thread(m4a_afb_comm *comm) {
    pa_log_debug("websocket thread starting...");
    sd_event_loop(comm->loop);
    pa_log_debug("websocket thread exiting...");
}

static bool ws_comm_thread_send(int fd, char opcode, void *data) {
    m4a_ws_comm_pipe_data d;
    ssize_t ret;

    d.opcode = opcode;
    d.data = data;

    do {
        ret = write(fd, &d, sizeof(m4a_ws_comm_pipe_data));
    } while (ret < 0 && errno == EINTR);

    if (ret < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) {
        pa_log("write: %s", strerror(errno));
        return false;
    }

    return true;
}

bool m4a_afb_call_async(m4a_afb_comm *comm, const char *api, const char *verb,
                        char *object, m4a_afb_done_cb_t done_cb, void *userdata) {
    m4a_afb_api_call_data *c;

    c = pa_xnew0(m4a_afb_api_call_data, 1);
    c->comm = comm;
    c->api = api;
    c->verb = verb;
    c->object = object;
    c->done_cb = done_cb;
    c->userdata = userdata;

    pa_mutex_lock(comm->pending_calls_lock);
    PA_LLIST_PREPEND(m4a_afb_api_call_data, comm->pending_calls, c);
    pa_mutex_unlock(comm->pending_calls_lock);

    if (!ws_comm_thread_send(comm->pipe[1], OPCODE_CALL_ASYNC, c)) {
        c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
        afb_api_call_data_free(c);
        return false;
    }
    return true;
}

m4a_afb_comm *m4a_afb_comm_new(const char *uri) {
    int ret;
    m4a_afb_comm *comm;

    comm = pa_xnew0(m4a_afb_comm, 1);

    if ((ret = sd_event_new(&comm->loop)) < 0) {
        pa_log("Failed to create systemd event loop: %s", strerror(-ret));
        goto fail;
    }

    if (!(comm->wsj1 = afb_ws_client_connect_wsj1(comm->loop, uri, &wsj1_itf, comm))) {
        pa_log("Connection to %s failed: %s", uri, strerror(errno));
        goto fail;
    }

    if (pipe(comm->pipe) < 0) {
        pa_log("pipe2 failed: %s", strerror(errno));
        goto fail;
    }

    if ((ret = sd_event_add_io(comm->loop, &comm->source, comm->pipe[0], EPOLLIN,
                               (sd_event_io_handler_t) ws_comm_thread_event, comm)) < 0) {
        pa_log("sd_event_add_io failed: %s", strerror(-ret));
        goto fail;
    }

    if (!(comm->thread = pa_thread_new("afb_comm_loop", (pa_thread_func_t) ws_comm_thread, comm))) {
        pa_log("Failed to start websocket communication thread");
        goto fail;
    }

    comm->pending_calls_lock = pa_mutex_new(true, false);
    comm->pending_calls = NULL;

    pa_log_debug("afb comm started");

    return comm;

fail:
    m4a_afb_comm_free(comm);
    return NULL;
}

void m4a_afb_comm_free(m4a_afb_comm *comm) {
    if (comm->thread) {
        if (!ws_comm_thread_send(comm->pipe[1], OPCODE_EXIT, NULL)) {
            pa_log("failed to shutdown thread gracefully");
            pa_thread_free_nojoin(comm->thread);
        } else {
            pa_thread_free(comm->thread);
        }
    }
    if (comm->source)
        sd_event_source_unref(comm->source);
    if (comm->loop)
        sd_event_unref(comm->loop);

    if (comm->wsj1)
        afb_wsj1_unref(comm->wsj1);

    if (comm->pending_calls_lock) {
        discard_pending_calls(comm);
        pa_mutex_free(comm->pending_calls_lock);
    }

    if (comm->pipe[0] > 0)
        close(comm->pipe[0]);
    if (comm->pipe[1] > 0)
        close(comm->pipe[1]);

    pa_xfree(comm);
}