/***
  This file is part of PulseAudio.

  Copyright 2018 Collabora Ltd.
    Author: George Kiagiadakis <george.kiagiadakis@collabora.com>

  PulseAudio is free software; you can redistribute it and/or modify
  it under the terms of the GNU Lesser General Public License as published
  by the Free Software Foundation; either version 2.1 of the License,
  or (at your option) any later version.

  PulseAudio is distributed in the hope that it will be useful, but
  WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  General Public License for more details.

  You should have received a copy of the GNU Lesser General Public License
  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
***/

#include <pulsecore/pulsecore-config.h>

#include "m4a_afb_comm.h"

#include <stdio.h>
#include <errno.h>
#include <unistd.h>

#include <pulsecore/thread.h>
#include <pulsecore/llist.h>
#include <pulsecore/mutex.h>
#include <pulse/xmalloc.h>

#include <systemd/sd-event.h>
#include <afb/afb-proto-ws.h>
#include <afb/afb-ws-client.h>

enum m4a_internal_pipe_opcode {
    OPCODE_CALL_ASYNC = 1,
    OPCODE_EXIT
};

typedef struct _m4a_internal_pipe_data m4a_internal_pipe_data;
typedef struct _m4a_afb_api_call_data m4a_afb_api_call_data;

/* structure sent over the internal pipe
 * for communication between the two threads */
struct _m4a_internal_pipe_data {
    enum m4a_internal_pipe_opcode opcode;
    void *data;
};

/* structure holding data related to an ongoing AFB API call */
struct _m4a_afb_api_call_data {
    m4a_afb_comm *comm;

    const char *verb;
    json_object *object;

    m4a_afb_done_cb_t done_cb;
    void *userdata;

    PA_LLIST_FIELDS(m4a_afb_api_call_data);
};

/* the main structure of this class */
struct _m4a_afb_comm {
    struct afb_proto_ws *afbhandle;

    char *sessid;

    /* the thread that runs the loop below */
    pa_thread *thread;
    /* our internal event loop */
    sd_event *loop;
    /* the pipe fd pair used to send commands from
     * the pulse threads to our internal thread */
    int tx_pipe[2];
    /* the source for listening to the above pipe from our event loop */
    sd_event_source *tx_source;

    /* a list of pending API calls, protected with a mutex */
    pa_mutex *pending_calls_lock;
    PA_LLIST_HEAD(m4a_afb_api_call_data, pending_calls);
};

static void m4a_afb_api_call_data_free(m4a_afb_api_call_data *c) {
    if (c->object)
        json_object_put(c->object);

    pa_mutex_lock(c->comm->pending_calls_lock);
    PA_LLIST_REMOVE(m4a_afb_api_call_data, c->comm->pending_calls, c);
    pa_mutex_unlock(c->comm->pending_calls_lock);

    pa_xfree(c);
}

static void m4a_afb_on_reply(void *ctx, void *handle, json_object *response,
                             const char *error, const char *info) {
    m4a_afb_api_call_data *c = handle;

    if (error) {
        pa_log("Got error reply from 4a: %s", error);
        c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
    } else {
        pa_log_debug("Got OK reply from 4a in call to %s", c->verb);
        c->done_cb(M4A_AFB_REPLY_OK, response, c->userdata);
    }

    m4a_afb_api_call_data_free(c);
}

static struct afb_proto_ws_client_itf itf = {
    .on_reply = m4a_afb_on_reply
};

static int internal_thread_event(sd_event_source *s, int fd, uint32_t revents, m4a_afb_comm *comm) {
    ssize_t r;
    m4a_internal_pipe_data d = { 0 };

    if ((r = read(fd, &d, sizeof(m4a_internal_pipe_data))) < (ssize_t) sizeof(m4a_internal_pipe_data)) {
        if (errno == EINTR)
            return 0;

        pa_log("read failed: %s", strerror(errno));
        return -1;
    }

    switch(d.opcode) {
    case OPCODE_CALL_ASYNC: {
        m4a_afb_api_call_data *c = d.data;

        if (afb_proto_ws_client_call(c->comm->afbhandle, c->verb, c->object,
                                     c->comm->sessid, c, NULL) < 0) {
            pa_log("afb_proto_ws_client_call: failed to call %s: %s",
                   c->verb, strerror(errno));
            c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
            m4a_afb_api_call_data_free(c);
        }
        break;
    }
    case OPCODE_EXIT:
        sd_event_exit(comm->loop, 0);
        break;
    default:
        break;
    };

    return 0;
}

static void internal_thread(m4a_afb_comm *comm) {
    pa_log_debug("afb comm thread starting...");
    sd_event_loop(comm->loop);
    pa_log_debug("afb comm thread exiting...");
}

static bool internal_thread_send(m4a_afb_comm *comm, char opcode, void *data) {
    m4a_internal_pipe_data d;
    ssize_t ret;

    d.opcode = opcode;
    d.data = data;

    do {
        ret = write(comm->tx_pipe[1], &d, sizeof(m4a_internal_pipe_data));
    } while (ret < 0 && errno == EINTR);

    if (ret < (ssize_t) sizeof(m4a_internal_pipe_data)) {
        pa_log("write: %s", strerror(errno));
        return false;
    }

    return true;
}

m4a_afb_comm *m4a_afb_comm_new(const char *uri) {
    int ret;
    m4a_afb_comm *comm;

    comm = pa_xnew0(m4a_afb_comm, 1);
    if (asprintf(&comm->sessid, "pulseaudio:%d", getpid()) < 0) {
        pa_log("asprintf failed");
        comm->sessid = NULL;
        goto fail;
    }

    if ((ret = sd_event_new(&comm->loop)) < 0) {
        pa_log("Failed to create systemd event loop: %s", strerror(-ret));
        goto fail;
    }

    if (!(comm->afbhandle = afb_ws_client_connect_api(comm->loop, uri, &itf, comm))) {
        pa_log("Connection to %s failed: %s", uri, strerror(errno));
        goto fail;
    }

    if (pipe(comm->tx_pipe) < 0) {
        pa_log("pipe failed: %s", strerror(errno));
        goto fail;
    }

    if ((ret = sd_event_add_io(comm->loop, &comm->tx_source, comm->tx_pipe[0], EPOLLIN,
                               (sd_event_io_handler_t) internal_thread_event, comm)) < 0) {
        pa_log("sd_event_add_io failed: %s", strerror(-ret));
        goto fail;
    }

    if (!(comm->thread = pa_thread_new("m4a_afb_comm_internal_thread",
                                       (pa_thread_func_t) internal_thread, comm))) {
        pa_log("Failed to start websocket communication thread");
        goto fail;
    }

    comm->pending_calls_lock = pa_mutex_new(true, false);
    comm->pending_calls = NULL;

    pa_log_debug("afb comm started");

    return comm;

fail:
    m4a_afb_comm_free(comm);
    return NULL;
}


static void discard_pending_calls(m4a_afb_comm *comm) {
    m4a_afb_api_call_data *c;

    pa_mutex_lock(comm->pending_calls_lock);
    while ((c = comm->pending_calls)) {
        c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
        m4a_afb_api_call_data_free(c);
    }
    pa_mutex_unlock(comm->pending_calls_lock);
}

void m4a_afb_comm_free(m4a_afb_comm *comm) {
    if (comm->thread) {
        if (!internal_thread_send(comm, OPCODE_EXIT, NULL)) {
            pa_log("failed to shutdown thread gracefully");
            pa_thread_free_nojoin(comm->thread);
        } else {
            pa_thread_free(comm->thread);
        }
    }
    if (comm->tx_source)
        sd_event_source_unref(comm->tx_source);
    if (comm->loop)
        sd_event_unref(comm->loop);

    if (comm->afbhandle)
        afb_proto_ws_unref(comm->afbhandle);

    if (comm->pending_calls_lock) {
        discard_pending_calls(comm);
        pa_mutex_free(comm->pending_calls_lock);
    }

    if (comm->tx_pipe[0] > 0)
        close(comm->tx_pipe[0]);
    if (comm->tx_pipe[1] > 0)
        close(comm->tx_pipe[1]);

    free(comm->sessid);
    pa_xfree(comm);
}

bool m4a_afb_call_async(m4a_afb_comm *comm, const char *verb,
                        json_object *object, m4a_afb_done_cb_t done_cb,
                        void *userdata) {
    m4a_afb_api_call_data *c;

    c = pa_xnew0(m4a_afb_api_call_data, 1);
    c->comm = comm;
    c->verb = verb;
    c->object = object;
    c->done_cb = done_cb;
    c->userdata = userdata;

    pa_mutex_lock(comm->pending_calls_lock);
    PA_LLIST_PREPEND(m4a_afb_api_call_data, comm->pending_calls, c);
    pa_mutex_unlock(comm->pending_calls_lock);

    if (!internal_thread_send(comm, OPCODE_CALL_ASYNC, c)) {
        c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
        m4a_afb_api_call_data_free(c);
        return false;
    }
    return true;
}