diff --git a/API.md b/API.md new file mode 100644 index 0000000..beb0e2d --- /dev/null +++ b/API.md @@ -0,0 +1,102 @@ +# Overview + +COMS uses a WebSocket API to communicate with its clients in JSON. In general, +packets sent to all clients end in `_evt` and packets sent in reply to a client +action end in `_ack`. Beyond that there isn't really a system. + +A packet takes this general form: + +```json +{ + "type": "", + "data": {...} +} +``` + +# Data types + +## Packet + +**Type:** +```C +struct { + PacketType type; + Any data; +} +``` + +A packet. + +## PacketType + +**Type:** +```C +enum { + join, + welcome, + join_evt, + msg, + msg_evt, + name, + name_evt, + ping, + pong, + msg_ack, + name_ack, + leave_evt, + bad, +} +``` + +The type a Packet may assume. + +## UserID + +**Type:** `uint64_t` + +Uniquely identifies a user. + +## Name + +**Type:** `char[16]` + +A name of a user. + +## UserData + +**Type:** +```C +struct { + UserID id; + Name name; +} +``` + +A user. + +## MsgID + +**Type:** `uint64_t` + +Uniquely identifies a message. + +## MsgContent + +**Type:** `char[1024]` + +The content of a message. + +## MsgData + +**Type:** +```C +struct { + MsgID id; + UserData author; + MsgID parent; + MsgContent content; + time_t timestamp; +} +``` + +A message. diff --git a/client/public/app.js b/client/public/app.js index ab17e68..0fa1dcd 100644 --- a/client/public/app.js +++ b/client/public/app.js @@ -27,7 +27,7 @@ document.addEventListener("DOMContentLoaded", () => { inputContainer.style.display = "flex"; const defaultName = "Anon"; - const MAX_NAME_LEN = 31; + const MAX_NAME_LEN = 15; // server limit is NAME_MAX_LENGTH-1 (15 chars) const { socket, sendMessage, sendName } = initWebSocket( defaultName, @@ -88,8 +88,8 @@ document.addEventListener("DOMContentLoaded", () => { chat.addEventListener("click", (e) => { const msgDiv = e.target.closest(".msg"); if (!msgDiv) return; - const id = Number(msgDiv.dataset.id); - if (!isFinite(id)) return; + const id = msgDiv.dataset.id; + if (!id) return; const threads = getThreads(); const author = threads.get(id)?.username; setReplyTo(id); @@ -124,9 +124,7 @@ document.addEventListener("DOMContentLoaded", () => { if (e.key === "ArrowDown" || e.key === "ArrowUp") { const msgs = Array.from(chat.querySelectorAll("div.msg")); if (!msgs.length) return; - let idx = msgs.findIndex( - (div) => Number(div.dataset.id) === getFocused(), - ); + let idx = msgs.findIndex((div) => div.dataset.id === getFocused()); if (idx === -1) { idx = e.key === "ArrowDown" ? -1 : msgs.length; } @@ -134,7 +132,7 @@ document.addEventListener("DOMContentLoaded", () => { e.key === "ArrowDown" ? Math.min(msgs.length - 1, idx + 1) : Math.max(0, idx - 1); - const newId = Number(msgs[idx].dataset.id); + const newId = msgs[idx].dataset.id; setFocused(newId); setReplyTo(newId); // Move input box under focused message. const threads = getThreads(); diff --git a/client/public/data.js b/client/public/data.js index 022050f..3386284 100644 --- a/client/public/data.js +++ b/client/public/data.js @@ -1,7 +1,7 @@ // coms/client/public/data.js // Data module: holds the in‐memory threads, notices and reply state -// Map of all messages and notices by ID +// Map of all messages and notices by ID (string IDs) // Client’s own session ID let myId = null; const threads = new Map(); @@ -9,28 +9,28 @@ const threads = new Map(); // Map of connected users by unique ID const users = new Map(); -// Ordered list of root‐level IDs (messages and notices) +// Ordered list of root‐level IDs (messages and notices, stored as strings) const rootIds = []; // Negative counter to generate unique IDs for notices let noticeCounter = -1; -// ID of the message we’re currently replying to (or null) +// ID of the message we’re currently replying to (string or null) let replyTo = null; -// ID of the message currently focused for navigation (or null) +// ID of the message currently focused for navigation (string or null) let focusedId = null; /** * Set the focused message ID (or null to clear). - * @param {number|null} id + * @param {string|null} id */ function setFocused(id) { focusedId = id; } /** * Get the currently focused message ID. - * @returns {number|null} + * @returns {string|null} */ function getFocused() { return focusedId; @@ -46,7 +46,7 @@ function clearFocused() { /** * Register a user with their unique ID. - * @param {number} id + * @param {string} id * @param {string} name */ function addUser(id, name) { @@ -55,7 +55,7 @@ function addUser(id, name) { /** * Remove a user by their ID. - * @param {number} id + * @param {string} id */ function removeUser(id) { users.delete(id); @@ -63,7 +63,7 @@ function removeUser(id) { /** * Update a user’s name. - * @param {number} id + * @param {string} id * @param {string} name */ function updateUser(id, name) { @@ -72,7 +72,7 @@ function updateUser(id, name) { /** * Get a list of {id, name} objects for all users. - * @returns {{id:number, name:string}[]} + * @returns {{id:string, name:string}[]} */ function getUsers() { return Array.from(users.entries()).map(([id, name]) => ({ id, name })); @@ -95,14 +95,31 @@ function getGroupedUsers() { /** * Add a new chat message to the thread structure. - * @param {{id: number, username: string, content: string, ts: number, parent?: number}} msg + * @param {{id: string|number, username: string, content: string, ts: number, parent?: string|number|null, authorId?: string|number|null}} msg */ -function addMessage({ id, username, content, ts, parent = null }) { - threads.set(id, { id, username, content, ts, parent, children: [] }); - if (parent !== null && threads.has(parent)) { - threads.get(parent).children.push(id); +function addMessage({ + id, + username, + content, + ts, + parent = null, + authorId = null, +}) { + const sid = String(id); + const sparent = parent === null ? null : String(parent); + threads.set(sid, { + id: sid, + username, + authorId: authorId === null ? null : String(authorId), + content, + ts, + parent: sparent, + children: [], + }); + if (sparent !== null && threads.has(sparent)) { + threads.get(sparent).children.push(sid); } else { - rootIds.push(id); + rootIds.push(sid); } } @@ -112,7 +129,7 @@ function addMessage({ id, username, content, ts, parent = null }) { */ function addNotice(content) { const ts = Math.floor(Date.now() / 1000); - const id = noticeCounter--; + const id = String(noticeCounter--); threads.set(id, { id, username: "", @@ -132,40 +149,41 @@ function clearData() { rootIds.length = 0; noticeCounter = -1; replyTo = null; + users.clear(); } -/** @returns {Map} The threads map. */ +/** @returns {Map} The threads map. */ function getThreads() { return threads; } -/** @returns {number[]} The ordered list of root‐level IDs. */ +/** @returns {string[]} The ordered list of root‐level IDs. */ function getRootIds() { return rootIds; } /** * Set this client’s own session ID. - * @param {number} id + * @param {string|number} id */ function setMyId(id) { - myId = id; + myId = String(id); } /** * Get this client’s own session ID. - * @returns {number|null} + * @returns {string|null} */ function getMyId() { return myId; } -/** @returns {number|null} The current reply‐to ID. */ +/** @returns {string|null} The current reply‐to ID. */ function getReplyTo() { return replyTo; } -/** @param {number|null} id — Set the current reply‐to ID. */ +/** @param {string|null} id — Set the current reply‐to ID. */ function setReplyTo(id) { replyTo = id; } diff --git a/client/public/wsModule.js b/client/public/wsModule.js index ca89443..58d4b63 100644 --- a/client/public/wsModule.js +++ b/client/public/wsModule.js @@ -9,6 +9,7 @@ import { addNotice, getReplyTo, addUser, + removeUser, updateUser, getGroupedUsers, setMyId, @@ -39,8 +40,7 @@ export function initWebSocket(username, chatEl, inputContainer, usersListEl) { socket.onopen = () => { // Announce join. - socket.send(JSON.stringify({ type: "join", data: { username } })); - //addNotice(`Connected as ${username}`); + socket.send(JSON.stringify({ type: "join", data: { name: username } })); renderChat(chatEl, inputContainer); }; @@ -48,32 +48,60 @@ export function initWebSocket(username, chatEl, inputContainer, usersListEl) { const packet = JSON.parse(event.data); switch (packet.type) { case "welcome": { - // Initial user list. - const myId = packet.data.you; + // Initial handshake. Populate ids, online list, and history if provided. clearData(); - setMyId(myId); - const users = packet.data.users || []; - users.forEach(({ id, username }) => addUser(id, username)); + const { id, online = [], history = [] } = packet.data; + setMyId(String(id)); + online.forEach(({ id: uid, name }) => addUser(String(uid), name)); + history.forEach((m) => { + addMessage({ + id: String(m.id), + username: m.author?.name || "Anon", + authorId: m.author?.id ? String(m.author.id) : null, + content: m.content, + ts: m.timestamp, + parent: + m.parent === null || m.parent === undefined || m.parent === "-1" + ? null + : String(m.parent), + }); + }); const groups = getGroupedUsers(); - usersListEl.innerHTML = "Online: " + groups.join(", "); - //addNotice(`Users online: ${groups.join(", ")}`); + usersListEl.innerHTML = groups.length + ? "Online: " + groups.join(", ") + : "Online: ?"; break; } - case "join-event": { + case "join_evt": { // A new user joined. - const { id, username } = packet.data; - addUser(id, username); + const { id, name } = packet.data; + addUser(String(id), name); addNotice( - `${new Option(username).innerHTML} joined.`, + `${new Option(name).innerHTML} joined.`, ); const groups = getGroupedUsers(); usersListEl.innerHTML = "Online: " + groups.join(", "); break; } - case "name-event": { + case "leave_evt": { + const { id, name } = packet.data; + const sid = String(id); + const lastName = users.get(sid) || name; + addNotice( + `${new Option(lastName).innerHTML} left.`, + ); + removeUser(sid); + const groups = getGroupedUsers(); + usersListEl.innerHTML = "Online: " + groups.join(", "); + break; + } + case "name_evt": { // A user changed name. - const { id, new: newName, old: oldName } = packet.data; - updateUser(id, newName); + const { + user: { id, name: oldName }, + new_name: newName, + } = packet.data; + updateUser(String(id), newName); addNotice( `${new Option(oldName).innerHTML} changed name to ${new Option(newName).innerHTML}`, ); @@ -81,19 +109,53 @@ export function initWebSocket(username, chatEl, inputContainer, usersListEl) { usersListEl.innerHTML = "Online: " + groups.join(", "); break; } - case "msg-event": { + case "msg_evt": { // A chat message arrived. - const { id, username: u, content, ts, parent } = packet.data; - addMessage({ id, username: u, content, ts, parent }); + const { id, author, content, parent, timestamp } = packet.data; + const username = author?.name || "Anon"; + const ts = timestamp || Math.floor(Date.now() / 1000); + if (author?.id) addUser(String(author.id), username); + addMessage({ + id: String(id), + username, + authorId: author?.id ? String(author.id) : null, + content, + ts, + parent: + parent === null || parent === undefined || parent === "-1" + ? null + : String(parent), + }); break; } - case "msg-ack": { - const { id } = packet.data; - if (getReplyTo() === null) { - setFocused(id); - setReplyTo(id); + case "msg_ack": { + const { status, id } = packet.data; + if (status !== "success") { + addNotice( + `Message failed: ${new Option(status).innerHTML}`, + ); + } else if (id) { + const replyTarget = getReplyTo(); + if (replyTarget === null) { + const sid = String(id); + setFocused(sid); + setReplyTo(sid); + } } - renderChat(chatEl, inputContainer); + break; + } + case "name_ack": { + const { status, name } = packet.data; + if (status !== "success") { + addNotice( + `Name change failed: ${new Option(status).innerHTML}`, + ); + } + break; + } + case "ping": { + const ts = packet.data?.ts ?? Math.floor(Date.now() / 1000); + socket.send(JSON.stringify({ type: "pong", data: { ts } })); break; } default: @@ -122,7 +184,12 @@ export function initWebSocket(username, chatEl, inputContainer, usersListEl) { */ function sendMessage(content) { const parent = getReplyTo(); - socket.send(JSON.stringify({ type: "msg", data: { content, parent } })); + socket.send( + JSON.stringify({ + type: "msg", + data: { content, parent: parent == null ? -1 : parent }, + }), + ); } /** @@ -130,7 +197,7 @@ export function initWebSocket(username, chatEl, inputContainer, usersListEl) { * @param {string} username - The new username to set. */ function sendName(username) { - socket.send(JSON.stringify({ type: "name", data: { username } })); + socket.send(JSON.stringify({ type: "name", data: { name: username } })); } return { socket, sendMessage, sendName }; diff --git a/server/src/api.c b/server/src/api.c index cd78c01..2a65b8d 100644 --- a/server/src/api.c +++ b/server/src/api.c @@ -1,17 +1,22 @@ #include "include/api.h" +#include #include #include +#include static const char* packet_type_strings[] = { [PACKET_TYPE_JOIN] = "join", [PACKET_TYPE_WELCOME] = "welcome", [PACKET_TYPE_JOIN_EVT] = "join_evt", [PACKET_TYPE_MSG] = "msg", - [PACKET_TYPE_MSG_EVT] = "msg_evt", + [PACKET_TYPE_MSG_EVT] = "msg_evt", [PACKET_TYPE_NAME] = "name", + [PACKET_TYPE_NAME_EVT] = "name_evt", [PACKET_TYPE_LEAVE_EVT] = "leave_evt", + [PACKET_TYPE_PING] = "ping", [PACKET_TYPE_PONG] = "pong", + [PACKET_TYPE_MSG_ACK] = "msg_ack", [PACKET_TYPE_NAME_ACK] = "name_ack", }; PacketType packet_type_parse(const char* type) { - for (int i = 0; i < PACKET_TYPE_MAX; i++) + for (int i = 0; i <= PACKET_TYPE_MAX; i++) if (!strcmp(type, packet_type_strings[i])) return (PacketType)i; return PACKET_TYPE_BAD; @@ -28,10 +33,168 @@ Packet* packet_init(PacketType type, void* data) { } Packet* packet_init_safe(const char* type, const void* data) { + if (!data) return NULL; + PacketType t = packet_type_parse(type); if (t == PACKET_TYPE_BAD) return NULL; - if (!data) return NULL; - - return packet_init(t, data); + return packet_init(t, (void*)data); +} + +// Per-packet-type encoders. +static void pack_welcome( + yyjson_mut_doc* doc, yyjson_mut_val* data, PacketWelcome* welcome +) { + yyjson_mut_obj_add_uint(doc, data, "id", welcome->id); + yyjson_mut_obj_add_int(doc, data, "onlinec", welcome->onlinec); + yyjson_mut_val* online = yyjson_mut_arr(doc); + for (size_t i = 0; i < welcome->onlinec; i++) { + yyjson_mut_val* u = yyjson_mut_obj(doc); + yyjson_mut_obj_add_uint(doc, u, "id", welcome->online[i].id); + if (welcome->online[i].name) + yyjson_mut_obj_add_str(doc, u, "name", *welcome->online[i].name); + else yyjson_mut_obj_add_str(doc, u, "name", ""); + yyjson_mut_arr_add_val(online, u); + } + yyjson_mut_obj_add_val(doc, data, "online", online); + + yyjson_mut_obj_add_int(doc, data, "historyc", welcome->historyc); + yyjson_mut_val* history = yyjson_mut_arr(doc); + for (size_t i = 0; i < welcome->historyc; i++) { + MsgData* m = welcome->history[i]; + if (!m) continue; + yyjson_mut_val* msg = yyjson_mut_obj(doc); + yyjson_mut_obj_add_uint(doc, msg, "id", m->id); + yyjson_mut_val* auth = yyjson_mut_obj(doc); + yyjson_mut_obj_add_uint(doc, auth, "id", m->author.id); + if (m->author.name) + yyjson_mut_obj_add_str(doc, auth, "name", *m->author.name); + else yyjson_mut_obj_add_str(doc, auth, "name", ""); + yyjson_mut_obj_add_val(doc, msg, "author", auth); + if (m->parent != UINT64_MAX) + yyjson_mut_obj_add_uint(doc, msg, "parent", m->parent); + else yyjson_mut_obj_add_null(doc, msg, "parent"); + yyjson_mut_obj_add_str(doc, msg, "content", m->content); + yyjson_mut_obj_add_int(doc, msg, "timestamp", m->timestamp); + yyjson_mut_arr_add_val(history, msg); + } + yyjson_mut_obj_add_val(doc, data, "history", history); +} + +static void +pack_msg_evt(yyjson_mut_doc* doc, yyjson_mut_val* data, PacketMsgEvt* msgevt) { + yyjson_mut_obj_add_uint(doc, data, "id", msgevt->id); + yyjson_mut_val* authordata = yyjson_mut_obj(doc); + yyjson_mut_obj_add_uint(doc, authordata, "id", msgevt->author.id); + yyjson_mut_obj_add_str(doc, authordata, "name", *msgevt->author.name); + yyjson_mut_obj_add_val(doc, data, "author", authordata); + if (msgevt->parent != UINT64_MAX) { + yyjson_mut_obj_add_uint(doc, data, "parent", msgevt->parent); + } else { + yyjson_mut_obj_add_null(doc, data, "parent"); + } + yyjson_mut_obj_add_str(doc, data, "content", msgevt->content); + yyjson_mut_obj_add_int(doc, data, "timestamp", msgevt->timestamp); +} + +static void pack_join_evt( + yyjson_mut_doc* doc, yyjson_mut_val* data, PacketJoinEvt* joinevt +) { + yyjson_mut_obj_add_uint(doc, data, "id", joinevt->id); + yyjson_mut_obj_add_str(doc, data, "name", *joinevt->name); +} + +static void pack_name_evt( + yyjson_mut_doc* doc, yyjson_mut_val* data, PacketNameEvt* nameevt +) { + yyjson_mut_val* user = yyjson_mut_obj(doc); + yyjson_mut_obj_add_uint(doc, user, "id", nameevt->user.id); + yyjson_mut_obj_add_str(doc, user, "name", nameevt->old_name); + yyjson_mut_obj_add_val(doc, data, "user", user); + yyjson_mut_obj_add_str(doc, data, "new_name", nameevt->new_name); +} + +static void pack_leave_evt( + yyjson_mut_doc* doc, yyjson_mut_val* data, PacketJoinEvt* leave +) { + yyjson_mut_obj_add_uint(doc, data, "id", leave->id); + yyjson_mut_obj_add_str(doc, data, "name", *leave->name); +} + +static void +pack_ping(yyjson_mut_doc* doc, yyjson_mut_val* data, PacketPing* p) { + yyjson_mut_obj_add_uint(doc, data, "ts", p->ts); +} + +static void +pack_msg_ack(yyjson_mut_doc* doc, yyjson_mut_val* data, PacketMsgAck* ack) { + yyjson_mut_obj_add_str(doc, data, "status", ack->status); + yyjson_mut_obj_add_uint(doc, data, "id", ack->id); +} + +static void +pack_name_ack(yyjson_mut_doc* doc, yyjson_mut_val* data, PacketNameAck* ack) { + yyjson_mut_obj_add_str(doc, data, "status", ack->status); + yyjson_mut_obj_add_str(doc, data, "name", ack->name); +} + +char* packet_string(Packet* packet, size_t* sz) { + yyjson_mut_doc* doc = yyjson_mut_doc_new(NULL); + + // Create document root. + yyjson_mut_val* root = yyjson_mut_obj(doc); + yyjson_mut_doc_set_root(doc, root); + + // Add the type field. + yyjson_mut_obj_add_str( + doc, root, "type", packet_type_strings[packet->type] + ); + + // Create the "data" field. + yyjson_mut_val* data = yyjson_mut_obj(doc); + + switch (packet->type) { + case PACKET_TYPE_WELCOME: + pack_welcome(doc, data, (PacketWelcome*)packet->data); + break; + case PACKET_TYPE_MSG_EVT: + pack_msg_evt(doc, data, (PacketMsgEvt*)packet->data); + break; + case PACKET_TYPE_JOIN_EVT: + pack_join_evt(doc, data, (PacketJoinEvt*)packet->data); + break; + case PACKET_TYPE_LEAVE_EVT: + pack_leave_evt(doc, data, (PacketJoinEvt*)packet->data); + break; + case PACKET_TYPE_NAME_EVT: + pack_name_evt(doc, data, (PacketNameEvt*)packet->data); + break; + case PACKET_TYPE_PING: + pack_ping(doc, data, (PacketPing*)packet->data); + break; + case PACKET_TYPE_PONG: + pack_ping(doc, data, (PacketPing*)packet->data); + break; + case PACKET_TYPE_MSG_ACK: + pack_msg_ack(doc, data, (PacketMsgAck*)packet->data); + break; + case PACKET_TYPE_NAME_ACK: + pack_name_ack(doc, data, (PacketNameAck*)packet->data); + break; + default: + printf( + "Something's gone badly wrong. Trying to get string of " + "non-sending packet type %d. \n", + packet->type + ); + exit(347); + } + + // Add data field to root. + yyjson_mut_obj_add_val(doc, root, "data", data); + + char* out = yyjson_mut_write(doc, 0, sz); + yyjson_mut_doc_free(doc); + + return out; } diff --git a/server/src/chat.c b/server/src/chat.c index 71ed5e7..61457ec 100644 --- a/server/src/chat.c +++ b/server/src/chat.c @@ -3,7 +3,7 @@ #include "include/data.h" #include "include/session.h" -#include +#include #include #include #include @@ -11,18 +11,14 @@ #include MsgData* chat_history[CHAT_HISTORY_SZ] = {NULL}; -size_t chat_history_head = 0; +size_t chat_history_head = 0; // Next insertion index. +size_t chat_history_count = 0; // Number of valid entries. +static MsgID next_msg_id = 1; MsgData* chat_history_msg_add(MsgData* msg) { - if (chat_history_head < CHAT_HISTORY_SZ - 1) { - chat_history[chat_history_head] = msg; - chat_history_head++; - } else if (chat_history_head == CHAT_HISTORY_SZ - 1) { - chat_history[chat_history_head] = msg; - chat_history_head = 0; - } else { - exit(132); // Bad. - } + chat_history[chat_history_head] = msg; + chat_history_head = (chat_history_head + 1) % CHAT_HISTORY_SZ; + if (chat_history_count < CHAT_HISTORY_SZ) { chat_history_count++; } return msg; } @@ -30,13 +26,13 @@ MsgData* chat_history_msg_add(MsgData* msg) { MsgData** chat_history_nice(void) { MsgData** msgs = calloc(CHAT_HISTORY_SZ, sizeof(MsgData*)); - size_t i = chat_history_head, j = 0; - while (true) { - if (!chat_history[i]) break; - msgs[j] = chat_history[i]; - i = (i + 1) % CHAT_HISTORY_SZ; - j++; - if (chat_history_head == i) break; + // Oldest entry is head - count (mod size). + size_t start = (chat_history_head + CHAT_HISTORY_SZ - chat_history_count) % + CHAT_HISTORY_SZ; + + for (size_t j = 0; j < chat_history_count; j++) { + size_t idx = (start + j) % CHAT_HISTORY_SZ; + msgs[j] = chat_history[idx]; } return msgs; @@ -62,6 +58,7 @@ Packet* packet_parse(const char* in, size_t len) { void do_join(Session* sess, Packet* packet) { yyjson_val* data = packet->data; yyjson_val* jname = data ? yyjson_obj_get(data, "name") : NULL; + if (!jname) { jname = data ? yyjson_obj_get(data, "username") : NULL; } const char* rname = (jname && yyjson_is_str(jname)) ? yyjson_get_str(jname) : "Anon"; @@ -72,8 +69,166 @@ void do_join(Session* sess, Packet* packet) { session_set_name(sess, name); } -// Do a msg packet. -void do_msg(Session* sess, Packet* packet) {} +// Do a welcome packet. +void do_welcome(Session* sess) { + // Build list of online users (only sessions with a name). + size_t onlinec = 0; + for (Session* it = session_get_head(); it; it = it->next) + if (session_has_name(it)) onlinec++; + + UserData* online = calloc(onlinec, sizeof(UserData)); + size_t oi = 0; + for (Session* it = session_get_head(); it; it = it->next) { + if (!session_has_name(it)) continue; + online[oi].id = session_get_id(it); + online[oi].name = session_get_name(it); + oi++; + } + + // Build history list. + MsgData** history = chat_history_nice(); + size_t historyc = 0; + while (historyc < CHAT_HISTORY_SZ && history[historyc]) historyc++; + + PacketWelcome data = { + .id = session_get_id(sess), + .onlinec = onlinec, + .online = online, + .historyc = historyc, + .history = history + }; + + Packet* packet = packet_init(PACKET_TYPE_WELCOME, &data); + + session_send(sess, packet); + free(packet); + free(online); + free(history); +} + +// Do a welcome packet. +void do_joinevt(Session* sess) { + PacketJoinEvt data = { + .id = session_get_id(sess), .name = session_get_name(sess) + }; + + Packet* packet = packet_init(PACKET_TYPE_JOIN_EVT, &data); + session_send_all(packet); +} + +// Do a name change event: user contains OLD name, new_name is the updated name. +void do_nameevt(Session* sess, const Name old_name, const Name new_name) { + PacketNameEvt data = {.user = {.id = session_get_id(sess), .name = NULL}}; + + strncpy(data.old_name, old_name, NAME_MAX_LENGTH - 1); + data.old_name[NAME_MAX_LENGTH - 1] = '\0'; + data.user.name = &data.old_name; + + strncpy(data.new_name, new_name, NAME_MAX_LENGTH - 1); + data.new_name[NAME_MAX_LENGTH - 1] = '\0'; + + Packet* packet = packet_init(PACKET_TYPE_NAME_EVT, &data); + session_send_all(packet); +} + +// Handle an incoming client message, store it, and broadcast an event. +void do_msg(Session* sess, Packet* packet) { + if (!sess || !packet) return; + + PacketMsgAck ack = {.id = 0, .status = "success"}; + + // Require the sender to have a name. + if (!session_has_name(sess)) { + ack.status = "unnamed_session"; + Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack); + session_send(sess, p); + free(p); + return; + } + + yyjson_val* data = (yyjson_val*)packet->data; + if (!data || !yyjson_is_obj(data)) { + ack.status = "bad_data"; + Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack); + session_send(sess, p); + free(p); + return; + } + + yyjson_val* jcontent = yyjson_obj_get(data, "content"); + if (!(jcontent && yyjson_is_str(jcontent))) { + ack.status = "bad_content"; + Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack); + session_send(sess, p); + free(p); + return; + } + const char* content = yyjson_get_str(jcontent); + if (!msg_verify(content)) { + ack.status = "invalid_content"; + Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack); + session_send(sess, p); + free(p); + return; + } + + yyjson_val* jparent = yyjson_obj_get(data, "parent"); + MsgID parent = UINT64_MAX; + if (jparent) { + if (yyjson_is_int(jparent)) { + int64_t v = yyjson_get_int(jparent); + if (v >= 0) parent = (MsgID)v; + } else if (yyjson_is_str(jparent)) { + const char* ps = yyjson_get_str(jparent); + if (ps) { + char* endp = NULL; + unsigned long long tmp = strtoull(ps, &endp, 10); + if (endp && *endp == '\0') parent = (MsgID)tmp; + } + } + } + + MsgData* msg = malloc(sizeof(MsgData)); + if (!msg) { + lwsl_err("Failed to allocate MsgData.\n"); + return; + } + + msg->id = next_msg_id++; + msg->author.id = session_get_id(sess); + msg->author.name = session_get_name(sess); + msg->parent = parent; + strncpy(msg->content, content, MSG_MAX_LENGTH - 1); + msg->content[MSG_MAX_LENGTH - 1] = '\0'; + msg->timestamp = time(NULL); + + lwsl_notice( + "msg from %" PRIu64 " (%s): \"%s\" parent=%s\n", msg->author.id, + *msg->author.name, msg->content, + (msg->parent == UINT64_MAX ? "null" : "") + ); + + chat_history_msg_add(msg); + + ack.id = msg->id; + Packet* ackp = packet_init(PACKET_TYPE_MSG_ACK, &ack); + session_send(sess, ackp); + free(ackp); + + Packet* evt = packet_init(PACKET_TYPE_MSG_EVT, msg); + session_send_all(evt); + free(evt); +} + +static void do_ping(Session* sess) { + PacketPing data = {.ts = (uint64_t)time(NULL)}; + Packet* packet = packet_init(PACKET_TYPE_PING, &data); + session_send(sess, packet); + free(packet); + + sess->awaiting_pong = true; + sess->last_ping_ts = data.ts; +} #define CHAT_PACKET_SIZE SESSION_CHAT_BUF_SIZE @@ -88,12 +243,12 @@ int cb_chat( ) { Session** ps_p = (Session**)user; Session* sess = ps_p ? *ps_p : NULL; - Session* head = session_get_head(); switch (reason) { case LWS_CALLBACK_ESTABLISHED: // New connection, create session. if (ps_p) { *ps_p = session_create(wsi); } + lws_set_timer_usecs(wsi, 10 * LWS_USEC_PER_SEC); break; case LWS_CALLBACK_RECEIVE: { // Parse inc JSON packet. @@ -107,60 +262,21 @@ int cb_chat( Packet* packet = packet_parse(in, len); if (!packet) { - lwsl_warn("Received bad packet: \"%s\".\n", (char*)in); + int loglen = (len > 256) ? 256 : (int)len; + lwsl_warn( + "Received bad packet: \"%.*s\".\n", loglen, (const char*)in + ); break; } switch (packet->type) { case PACKET_TYPE_JOIN: // #1. Handle join packet, add new session. + do_welcome(sess); do_join(sess, packet); - // - // #2. Welcome the new client. - { - yyjson_mut_doc* wdoc = yyjson_mut_doc_new(NULL); - yyjson_mut_val* wroot = yyjson_mut_obj(wdoc); - yyjson_mut_doc_set_root(wdoc, wroot); - yyjson_mut_obj_add_str(wdoc, wroot, "type", "welcome"); - - yyjson_mut_val* wdata = yyjson_mut_obj(wdoc); - yyjson_mut_obj_add_val(wdoc, wroot, "data", wdata); - yyjson_mut_obj_add_uint( - wdoc, wdata, "you", session_get_id(sess) - ); - - yyjson_mut_val* wusers = yyjson_mut_arr(wdoc); - yyjson_mut_obj_add_val(wdoc, wdata, "users", wusers); - - for (Session* s = head; s; s = s->next) { - if (session_has_name(s)) { - yyjson_mut_val* uobj = yyjson_mut_obj(wdoc); - /* add each user once */ - yyjson_mut_arr_add_val(wusers, uobj); - yyjson_mut_obj_add_uint( - wdoc, uobj, "id", session_get_id(s) - ); - yyjson_mut_obj_add_str( - wdoc, uobj, "username", session_get_name(s) - ); - } - } - // History will be streamed individually after welcome. - - size_t out_len; - char* out = yyjson_mut_write(wdoc, 0, &out_len); - size_t copy_len = out_len < SESSION_CHAT_BUF_SIZE - ? out_len - : SESSION_CHAT_BUF_SIZE; - sess->buf_len = copy_len; - memcpy(&sess->buf[LWS_PRE], out, copy_len); - // Schedule the welcome frame for writable callback. - lws_callback_on_writable(sess->wsi); - - free(out); - yyjson_mut_doc_free(wdoc); - } + do_joinevt(sess); +#if 0 // #2. Introduce our new client to everybody else. { yyjson_mut_doc* jdoc = yyjson_mut_doc_new(NULL); @@ -176,7 +292,7 @@ int cb_chat( jdoc, jdata, "id", session_get_id(sess) ); yyjson_mut_obj_add_str( - jdoc, jdata, "username", session_get_name(sess) + jdoc, jdata, "username", *session_get_name(sess) ); size_t out_len; @@ -185,208 +301,138 @@ int cb_chat( ? out_len : SESSION_CHAT_BUF_SIZE; for (Session* s = head; s; s = s->next) { - s->buf_len = copy_len; - memcpy(&s->buf[LWS_PRE], out, copy_len); - lws_callback_on_writable(s->wsi); + session_send_raw(s, out, copy_len); } free(out); yyjson_mut_doc_free(jdoc); } +#endif + break; + case PACKET_TYPE_MSG: do_msg(sess, packet); break; + case PACKET_TYPE_NAME: { + // Extract requested new name. + yyjson_val* pdata = packet->data; + yyjson_val* jname = + pdata ? yyjson_obj_get(pdata, "name") : NULL; + if (!jname) { + jname = + pdata ? yyjson_obj_get(pdata, "username") : NULL; + } + if (!(jname && yyjson_is_str(jname))) { + PacketNameAck ack = {.status = "bad_name"}; + Packet* ackp = packet_init(PACKET_TYPE_NAME_ACK, &ack); + session_send(sess, ackp); + free(ackp); + break; + } + const char* rname = yyjson_get_str(jname); + + Name new_name = {'\0'}; + for (size_t i = 0; rname[i] && i < NAME_MAX_LENGTH - 1; i++) + new_name[i] = rname[i]; + + // Capture old name, broadcast event, then update session. + Name old_name = {'\0'}; + strncpy( + old_name, *session_get_name(sess), NAME_MAX_LENGTH - 1 + ); + old_name[NAME_MAX_LENGTH - 1] = '\0'; + + do_nameevt(sess, old_name, new_name); + session_set_name(sess, new_name); + + PacketNameAck ack = {.status = "success"}; + strncpy(ack.name, new_name, NAME_MAX_LENGTH); + ack.name[NAME_MAX_LENGTH - 1] = '\0'; + Packet* ackp = packet_init(PACKET_TYPE_NAME_ACK, &ack); + session_send(sess, ackp); + free(ackp); + break; + } + case PACKET_TYPE_PONG: + // Client responded; nothing else to do (timer continues). + if (sess) { + sess->awaiting_pong = false; + sess->missed_pings = 0; + } break; - case PACKET_TYPE_MSG: do_msg(sess, packet); break; default: lwsl_warn( "Received client-only packet: \"%s\".\n", (char*)in ); } - } -#if 0 - // NAME. - else if (type && strcmp(type, "name") == 0 && sess && - session_has_username(sess)) { - const yyjson_val* dataVal = - root ? yyjson_obj_get((yyjson_val*)root, "data") : NULL; - const yyjson_val* nval = - dataVal ? yyjson_obj_get((yyjson_val*)dataVal, "username") - : NULL; - // Enforce max name length. - const char* raw_newname = - (nval && yyjson_is_str((yyjson_val*)nval)) - ? yyjson_get_str((yyjson_val*)nval) - : NULL; - char newname_buf[SESSION_USERNAME_MAX_LEN]; - if (raw_newname && raw_newname[0] != '\0') { - size_t _j; - for (_j = 0; - raw_newname[_j] && _j < SESSION_USERNAME_MAX_LEN - 1; - ++_j) { - unsigned char c = (unsigned char)raw_newname[_j]; - newname_buf[_j] = isprint(c) ? c : '?'; + } break; + + case LWS_CALLBACK_TIMER: + if (sess) { + if (sess->awaiting_pong) { + sess->missed_pings++; + if (sess->missed_pings >= 3) { + lwsl_notice( + "Closing session %" PRIu64 + " after %u missed pings\n", + session_get_id(sess), sess->missed_pings + ); + lws_close_reason( + wsi, LWS_CLOSE_STATUS_GOINGAWAY, + (unsigned char*)"ping timeout", 12 + ); + return -1; } - newname_buf[_j] = '\0'; } else { - // Disallow empty names. - strcpy(newname_buf, session_get_name(sess)); - } - const char* newname = newname_buf; - // Buffer old name before updating. - char oldname_buf[SESSION_USERNAME_MAX_LEN]; - const char* current = session_get_name(sess); - if (current) { - strncpy(oldname_buf, current, SESSION_USERNAME_MAX_LEN - 1); - oldname_buf[SESSION_USERNAME_MAX_LEN - 1] = '\0'; - } else { - oldname_buf[0] = '\0'; - } - // Now update to new name. - session_set_name(sess, newname); - - // Broadcast name-event to other clients. - { - yyjson_mut_doc* ndoc = yyjson_mut_doc_new(NULL); - yyjson_mut_val* nroot = yyjson_mut_obj(ndoc); - yyjson_mut_doc_set_root(ndoc, nroot); - yyjson_mut_obj_add_str(ndoc, nroot, "type", "name-event"); - yyjson_mut_val* ndata = yyjson_mut_obj(ndoc); - yyjson_mut_obj_add_val(ndoc, nroot, "data", ndata); - yyjson_mut_obj_add_uint( - ndoc, ndata, "id", session_get_id(sess) - ); - yyjson_mut_obj_add_str(ndoc, ndata, "old", oldname_buf); - yyjson_mut_obj_add_str(ndoc, ndata, "new", newname); - - size_t out_len; - char* out = yyjson_mut_write(ndoc, 0, &out_len); - size_t copy_len = out_len < SESSION_CHAT_BUF_SIZE - ? out_len - : SESSION_CHAT_BUF_SIZE; - for (Session* s = head; s; s = s->next) { - s->buf_len = copy_len; - memcpy(&s->buf[LWS_PRE], out, copy_len); - lws_callback_on_writable(s->wsi); - } - free(out); - yyjson_mut_doc_free(ndoc); + sess->missed_pings = 0; } + do_ping(sess); + lws_set_timer_usecs(wsi, 10 * LWS_USEC_PER_SEC); } - // MSG. - else if (type && strcmp(type, "msg") == 0 && sess && - session_has_username(sess)) { - const yyjson_val* data = - yyjson_obj_get((yyjson_val*)root, "data"); - const yyjson_val* cval = - data ? yyjson_obj_get((yyjson_val*)data, "content") : NULL; - // Enforce maximum message content length - const char* raw_msg = (cval && yyjson_is_str((yyjson_val*)cval)) - ? yyjson_get_str((yyjson_val*)cval) - : ""; - char msg_buf[CHAT_BUF_SIZE]; - strncpy(msg_buf, raw_msg, CHAT_BUF_SIZE - 1); - msg_buf[CHAT_BUF_SIZE - 1] = '\0'; - // Sanitize message to printable characters. - for (size_t _k = 0; msg_buf[_k]; ++_k) { - unsigned char c = (unsigned char)msg_buf[_k]; - if (!isprint(c)) { msg_buf[_k] = '?'; } - } - const char* msg = msg_buf; - - // Build msg-event JSON with ID, parent and timestamp. - // Get parent ID if present. - const yyjson_val* dataVal = - yyjson_obj_get((yyjson_val*)root, "data"); - const yyjson_val* pval = - dataVal ? yyjson_obj_get((yyjson_val*)dataVal, "parent") - : NULL; - uint64_t parent_id = (pval && yyjson_is_uint((yyjson_val*)pval)) - ? yyjson_get_uint((yyjson_val*)pval) - : 0; - time_t now = time(NULL); - size_t msg_id = next_msg_id++; - // Send ack with the new message ID. - { - yyjson_mut_doc* ackdoc = yyjson_mut_doc_new(NULL); - yyjson_mut_val* ackroot = yyjson_mut_obj(ackdoc); - yyjson_mut_doc_set_root(ackdoc, ackroot); - yyjson_mut_obj_add_str(ackdoc, ackroot, "type", "msg-ack"); - yyjson_mut_val* ackdat = yyjson_mut_obj(ackdoc); - yyjson_mut_obj_add_val(ackdoc, ackroot, "data", ackdat); - yyjson_mut_obj_add_uint(ackdoc, ackdat, "id", msg_id); - size_t ack_len; - char* ack_out = yyjson_mut_write(ackdoc, 0, &ack_len); - size_t copy_ack = ack_len < SESSION_CHAT_BUF_SIZE - ? ack_len - : SESSION_CHAT_BUF_SIZE; - sess->buf_len = copy_ack; - memcpy(&sess->buf[LWS_PRE], ack_out, copy_ack); - lws_write( - sess->wsi, &sess->buf[LWS_PRE], sess->buf_len, - LWS_WRITE_TEXT - ); - - free(ack_out); - yyjson_mut_doc_free(ackdoc); - } - - yyjson_mut_doc* mdoc = yyjson_mut_doc_new(NULL); - yyjson_mut_val* mroot = yyjson_mut_obj(mdoc); - yyjson_mut_doc_set_root(mdoc, mroot); - yyjson_mut_obj_add_str(mdoc, mroot, "type", "msg-event"); - - yyjson_mut_val* mdat = yyjson_mut_obj(mdoc); - yyjson_mut_obj_add_val(mdoc, mroot, "data", mdat); - yyjson_mut_obj_add_uint(mdoc, mdat, "id", msg_id); - if (parent_id) - yyjson_mut_obj_add_uint(mdoc, mdat, "parent", parent_id); - else yyjson_mut_obj_add_null(mdoc, mdat, "parent"); - yyjson_mut_obj_add_uint(mdoc, mdat, "ts", (uint64_t)now); - yyjson_mut_obj_add_str( - mdoc, mdat, "username", session_get_name(sess) - ); - yyjson_mut_obj_add_str(mdoc, mdat, "content", msg); - - size_t out_len; - char* out = yyjson_mut_write(mdoc, 0, &out_len); - // Store this msg-event JSON in the circular history buffer. - if (history_count == HISTORY_SIZE) { - free(history[history_pos]); - } else { - history_count++; - } - history[history_pos] = strdup(out); - history_len[history_pos] = out_len; - history_pos = (history_pos + 1) % HISTORY_SIZE; - size_t copy_len = out_len < SESSION_CHAT_BUF_SIZE - ? out_len - : SESSION_CHAT_BUF_SIZE; - for (Session* s = head; s; s = s->next) { - s->buf_len = copy_len; - memcpy(&s->buf[LWS_PRE], out, copy_len); - lws_callback_on_writable(s->wsi); - } - free(out); - yyjson_mut_doc_free(mdoc); - // Writable events already scheduled for each session above. - } - - yyjson_doc_free(doc); break; - } -#endif case LWS_CALLBACK_SERVER_WRITEABLE: - if (sess && sess->buf_len > 0) { - lws_write( - sess->wsi, &sess->buf[LWS_PRE], sess->buf_len, - LWS_WRITE_TEXT - ); - sess->buf_len = 0; - } + if (!sess->queue_head) break; + + MsgNode* node = sess->queue_head; + + // Send the data at payload + LWS_PRE. + lwsl_notice( + "write to session %" PRIu64 " len=%zu queue=%zu\n", + session_get_id(sess), node->ln, sess->queue_count + ); + int m = lws_write( + wsi, &node->payload[LWS_PRE], node->ln, LWS_WRITE_TEXT + ); + if (m < 0) return -1; // Socket error, kill connection. + + // Pop the queue. + sess->queue_head = node->nxt; + if (!sess->queue_head) { sess->queue_tail = NULL; } + sess->queue_count--; + + free(node); + + // If there are still messages left, notice them. + if (sess->queue_head) { lws_callback_on_writable(wsi); } + break; case LWS_CALLBACK_CLOSED: // Goodbye. - // TODO: Add leave event to proto. - if (sess) { session_destroy(sess); } + if (sess) { + // Broadcast leave event before destroying. + PacketJoinEvt leave = { + .id = session_get_id(sess), .name = session_get_name(sess) + }; + Packet* evt = packet_init(PACKET_TYPE_LEAVE_EVT, &leave); + session_send_all(evt); + free(evt); + + MsgNode* current = sess->queue_head; + while (current) { + MsgNode* tmp = current; + current = current->nxt; + free(tmp); + } + session_destroy(sess); + } break; default: break; diff --git a/server/src/include/api.h b/server/src/include/api.h index 282ec93..7df5415 100644 --- a/server/src/include/api.h +++ b/server/src/include/api.h @@ -4,12 +4,19 @@ #include "data.h" typedef enum { - PACKET_TYPE_JOIN, // C -> S. - PACKET_TYPE_WELCOME, // S -> C. - PACKET_TYPE_JOIN_EVT, // S->A. - PACKET_TYPE_MSG, // C->S. - PACKET_TYPE_MSG_EVT, // S->A. - PACKET_TYPE_MAX = PACKET_TYPE_MSG_EVT, + PACKET_TYPE_JOIN, // C -> S. + PACKET_TYPE_WELCOME, // S -> C. + PACKET_TYPE_JOIN_EVT, // S->A. + PACKET_TYPE_MSG, // C->S. + PACKET_TYPE_MSG_EVT, // S->A. + PACKET_TYPE_NAME, // C->S. Set/update name after connect. + PACKET_TYPE_NAME_EVT, // S->A. Broadcast when a name changes. + PACKET_TYPE_PING, // S -> C. Liveness ping. + PACKET_TYPE_PONG, // C -> S. Liveness pong. + PACKET_TYPE_MSG_ACK, // S -> C. Acknowledge message submission. + PACKET_TYPE_NAME_ACK, // S -> C. Acknowledge name change. + PACKET_TYPE_LEAVE_EVT, // S -> A. Broadcast when a user leaves. + PACKET_TYPE_MAX = PACKET_TYPE_LEAVE_EVT, PACKET_TYPE_BAD, } PacketType; @@ -27,6 +34,9 @@ Packet* packet_init(PacketType type, void* data); // Create a packet from untrusted data. Packet* packet_init_safe(const char* type, const void* data); +// Get a string representation of a packet. +char* packet_string(Packet* packet, size_t* sz); + typedef struct { Name name; // Your selected name. } PacketJoin; @@ -36,12 +46,10 @@ typedef struct { size_t onlinec; // The number of users online. UserData* online; // The list of online users. size_t historyc; // The number of messages in history. - MsgData* history; // The list of historical messages. + MsgData** history; // The list of historical messages. } PacketWelcome; -typedef struct { - UserData newguy; // Who just joined. -} PacketJoinEvent; +typedef UserData PacketJoinEvt; typedef struct { MsgContent content; // The content of the message. @@ -50,4 +58,26 @@ typedef struct { typedef MsgData PacketMsgEvt; +typedef struct { + UserData user; // The user who changed their name. + Name old_name; // The previous name (for serialization via user.name). + Name new_name; // The updated name. +} PacketNameEvt; + +typedef struct { + uint64_t ts; // Unix timestamp when ping was sent. +} PacketPing; + +typedef PacketPing PacketPong; + +typedef struct { + MsgID id; // Assigned message ID (0 on failure). + const char* status; // "success" or error reason. +} PacketMsgAck; + +typedef struct { + const char* status; // "success" or error reason. + Name name; // The name that was applied (empty on failure). +} PacketNameAck; + #endif diff --git a/server/src/include/chat.h b/server/src/include/chat.h index 409a48f..b61ca4f 100644 --- a/server/src/include/chat.h +++ b/server/src/include/chat.h @@ -10,7 +10,8 @@ // Message history (ring). #define CHAT_HISTORY_SZ 128 extern MsgData* chat_history[CHAT_HISTORY_SZ]; -extern size_t chat_history_head; // Points to the oldest message, or is NULL. +extern size_t chat_history_head; // Next insertion index. +extern size_t chat_history_count; // Number of valid history entries. // Add message to history ring. MsgData* chat_history_msg_add(MsgData* msg); diff --git a/server/src/include/data.h b/server/src/include/data.h index da37e91..7d6cfe4 100644 --- a/server/src/include/data.h +++ b/server/src/include/data.h @@ -2,12 +2,13 @@ #define DATA__H #include +#include #define NAME_MAX_LENGTH 16 #define MSG_MAX_LENGTH 1024 -typedef int UserID; -typedef int MsgID; +typedef uint64_t UserID; +typedef uint64_t MsgID; typedef char Name[NAME_MAX_LENGTH]; int name_verify(const char* name); @@ -17,7 +18,7 @@ int msg_verify(const char* content); typedef struct { UserID id; - Name name; + Name* name; } UserData; typedef struct { diff --git a/server/src/include/session.h b/server/src/include/session.h index e29bc66..742f379 100644 --- a/server/src/include/session.h +++ b/server/src/include/session.h @@ -1,28 +1,40 @@ #ifndef SESSION__H #define SESSION__H +#include "api.h" +#include "data.h" + #include #include #include -#include "data.h" - // Includes terminating null. #define SESSION_CHAT_BUF_SIZE 32768 +// Linked list of messages in queue to send. +typedef struct MSG_NODE_STRUCT { + struct MSG_NODE_STRUCT* nxt; + size_t ln; + unsigned char payload[]; +} MsgNode; + /** - * session_t + * Session * Represents a single WebSocket client session in the chat server. */ typedef struct SESSION { struct lws* wsi; // Libwebsockets connection handle. uint64_t id; // Unique session ID. struct SESSION* next; // Next session in the internal list. - Name name; // Stored name. + Name name; // Stored name. bool named; // True once name is set. - unsigned char - buf[LWS_PRE + SESSION_CHAT_BUF_SIZE]; // Outgoing buffer per session - size_t buf_len; // Length of data in buf + size_t queue_count; // Number of pending messages in the queue. + MsgNode* queue_head; // First to send. + MsgNode* queue_tail; // Last. + bool awaiting_pong; // True if a ping is outstanding. + unsigned missed_pings; // Consecutive missed pongs. + uint64_t last_ping_ts; // Timestamp of last ping sent. + } Session; /** @@ -50,6 +62,21 @@ void session_destroy(Session* sess); */ Session* session_get_head(void); +// Handles memory allocation and pointing logic. +void session_send_raw(Session* sess, const char* data, size_t ln); + +/** + * session_send + * Send a packet to this session. + * + * @param sess The session to send to + * @param packet The packet to send. + */ +void session_send(Session* sess, Packet* packet); + +// Send a packet to all sessions. +void session_send_all(Packet* packet); + /** * session_broadcast * Iterate every session in the internal list and invoke the callback. @@ -75,7 +102,7 @@ void session_set_name(Session* sess, const char* name); * @param sess The session to query. * @return Pointer to the stored name (readonly). */ -const char* session_get_name(const Session* sess); +Name* session_get_name(Session* sess); /** * session_has_name diff --git a/server/src/session.c b/server/src/session.c index 3a681ca..49c33a6 100644 --- a/server/src/session.c +++ b/server/src/session.c @@ -1,6 +1,8 @@ #include "include/session.h" +#include "include/api.h" #include "include/data.h" +#include #include #include #include @@ -18,6 +20,11 @@ Session* session_create(struct lws* wsi) { sess->named = false; sess->name[0] = '\0'; sess->next = head; + sess->queue_count = 0; + sess->queue_head = sess->queue_tail = NULL; + sess->awaiting_pong = false; + sess->missed_pings = 0; + sess->last_ping_ts = 0; head = sess; return sess; } @@ -37,6 +44,74 @@ void session_destroy(Session* sess) { Session* session_get_head(void) { return head; } +void session_send_raw(Session* sess, const char* data, size_t ln) { + if (!sess || !data) return; + + if (sess->queue_count > 4) { + lwsl_warn( + "Session %ld queue full, dropping future packets!\n", sess->id + ); + } + + MsgNode* node = malloc(sizeof(MsgNode) + LWS_PRE + ln); + if (!node) return; + + node->nxt = NULL; + node->ln = ln; + // Copy data into payload, leaving LWS_PRE bytes at the start. + memcpy(&node->payload[LWS_PRE], data, ln); + + // Append to the tail of the linked list. + if (sess->queue_tail) { + sess->queue_tail->nxt = node; + sess->queue_tail = node; + } else { + sess->queue_head = sess->queue_tail = node; + } + + sess->queue_count++; + + // Tell LWS there's something to write. + lwsl_notice( + "enqueue to session %" PRIu64 " len=%zu queue=%zu\n", sess->id, ln, + sess->queue_count + ); + lws_callback_on_writable(sess->wsi); +} + +void session_send(Session* sess, Packet* packet) { + if (!packet) return; + + size_t out_len; + char* out = packet_string(packet, &out_len); + + // Check for overflow. + if (out_len > SESSION_CHAT_BUF_SIZE) { + free(out); + return; + } + + session_send_raw(sess, out, out_len); + + free(out); +} + +void session_send_all(Packet* packet) { + if (!packet) return; + + size_t out_ln; + char* out = packet_string(packet, &out_ln); + if (!out) return; + + size_t copy_ln = + out_ln < SESSION_CHAT_BUF_SIZE ? out_ln : SESSION_CHAT_BUF_SIZE; + + for (Session* s = session_get_head(); s; s = s->next) + session_send_raw(s, out, copy_ln); + + free(out); +} + void session_broadcast(void (*cb)(Session* s, void* user), void* user) { for (Session* iter = head; iter; iter = iter->next) { cb(iter, user); } } @@ -48,9 +123,9 @@ void session_set_name(Session* sess, const char* name) { sess->named = true; } -const char* session_get_name(const Session* sess) { +Name* session_get_name(Session* sess) { if (!sess) { return NULL; } - return sess->name; + return &sess->name; } bool session_has_name(const Session* sess) {