Oh man that's a lot of changes.

This commit is contained in:
2026-02-28 15:59:03 -05:00
parent 226c4a8b52
commit de55288d37
11 changed files with 864 additions and 336 deletions

View File

@@ -3,7 +3,7 @@
#include "include/data.h"
#include "include/session.h"
#include <ctype.h>
#include <inttypes.h>
#include <libwebsockets.h>
#include <stdlib.h>
#include <string.h>
@@ -11,18 +11,14 @@
#include <yyjson.h>
MsgData* chat_history[CHAT_HISTORY_SZ] = {NULL};
size_t chat_history_head = 0;
size_t chat_history_head = 0; // Next insertion index.
size_t chat_history_count = 0; // Number of valid entries.
static MsgID next_msg_id = 1;
MsgData* chat_history_msg_add(MsgData* msg) {
if (chat_history_head < CHAT_HISTORY_SZ - 1) {
chat_history[chat_history_head] = msg;
chat_history_head++;
} else if (chat_history_head == CHAT_HISTORY_SZ - 1) {
chat_history[chat_history_head] = msg;
chat_history_head = 0;
} else {
exit(132); // Bad.
}
chat_history[chat_history_head] = msg;
chat_history_head = (chat_history_head + 1) % CHAT_HISTORY_SZ;
if (chat_history_count < CHAT_HISTORY_SZ) { chat_history_count++; }
return msg;
}
@@ -30,13 +26,13 @@ MsgData* chat_history_msg_add(MsgData* msg) {
MsgData** chat_history_nice(void) {
MsgData** msgs = calloc(CHAT_HISTORY_SZ, sizeof(MsgData*));
size_t i = chat_history_head, j = 0;
while (true) {
if (!chat_history[i]) break;
msgs[j] = chat_history[i];
i = (i + 1) % CHAT_HISTORY_SZ;
j++;
if (chat_history_head == i) break;
// Oldest entry is head - count (mod size).
size_t start = (chat_history_head + CHAT_HISTORY_SZ - chat_history_count) %
CHAT_HISTORY_SZ;
for (size_t j = 0; j < chat_history_count; j++) {
size_t idx = (start + j) % CHAT_HISTORY_SZ;
msgs[j] = chat_history[idx];
}
return msgs;
@@ -62,6 +58,7 @@ Packet* packet_parse(const char* in, size_t len) {
void do_join(Session* sess, Packet* packet) {
yyjson_val* data = packet->data;
yyjson_val* jname = data ? yyjson_obj_get(data, "name") : NULL;
if (!jname) { jname = data ? yyjson_obj_get(data, "username") : NULL; }
const char* rname =
(jname && yyjson_is_str(jname)) ? yyjson_get_str(jname) : "Anon";
@@ -72,8 +69,166 @@ void do_join(Session* sess, Packet* packet) {
session_set_name(sess, name);
}
// Do a msg packet.
void do_msg(Session* sess, Packet* packet) {}
// Do a welcome packet.
void do_welcome(Session* sess) {
// Build list of online users (only sessions with a name).
size_t onlinec = 0;
for (Session* it = session_get_head(); it; it = it->next)
if (session_has_name(it)) onlinec++;
UserData* online = calloc(onlinec, sizeof(UserData));
size_t oi = 0;
for (Session* it = session_get_head(); it; it = it->next) {
if (!session_has_name(it)) continue;
online[oi].id = session_get_id(it);
online[oi].name = session_get_name(it);
oi++;
}
// Build history list.
MsgData** history = chat_history_nice();
size_t historyc = 0;
while (historyc < CHAT_HISTORY_SZ && history[historyc]) historyc++;
PacketWelcome data = {
.id = session_get_id(sess),
.onlinec = onlinec,
.online = online,
.historyc = historyc,
.history = history
};
Packet* packet = packet_init(PACKET_TYPE_WELCOME, &data);
session_send(sess, packet);
free(packet);
free(online);
free(history);
}
// Do a welcome packet.
void do_joinevt(Session* sess) {
PacketJoinEvt data = {
.id = session_get_id(sess), .name = session_get_name(sess)
};
Packet* packet = packet_init(PACKET_TYPE_JOIN_EVT, &data);
session_send_all(packet);
}
// Do a name change event: user contains OLD name, new_name is the updated name.
void do_nameevt(Session* sess, const Name old_name, const Name new_name) {
PacketNameEvt data = {.user = {.id = session_get_id(sess), .name = NULL}};
strncpy(data.old_name, old_name, NAME_MAX_LENGTH - 1);
data.old_name[NAME_MAX_LENGTH - 1] = '\0';
data.user.name = &data.old_name;
strncpy(data.new_name, new_name, NAME_MAX_LENGTH - 1);
data.new_name[NAME_MAX_LENGTH - 1] = '\0';
Packet* packet = packet_init(PACKET_TYPE_NAME_EVT, &data);
session_send_all(packet);
}
// Handle an incoming client message, store it, and broadcast an event.
void do_msg(Session* sess, Packet* packet) {
if (!sess || !packet) return;
PacketMsgAck ack = {.id = 0, .status = "success"};
// Require the sender to have a name.
if (!session_has_name(sess)) {
ack.status = "unnamed_session";
Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack);
session_send(sess, p);
free(p);
return;
}
yyjson_val* data = (yyjson_val*)packet->data;
if (!data || !yyjson_is_obj(data)) {
ack.status = "bad_data";
Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack);
session_send(sess, p);
free(p);
return;
}
yyjson_val* jcontent = yyjson_obj_get(data, "content");
if (!(jcontent && yyjson_is_str(jcontent))) {
ack.status = "bad_content";
Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack);
session_send(sess, p);
free(p);
return;
}
const char* content = yyjson_get_str(jcontent);
if (!msg_verify(content)) {
ack.status = "invalid_content";
Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack);
session_send(sess, p);
free(p);
return;
}
yyjson_val* jparent = yyjson_obj_get(data, "parent");
MsgID parent = UINT64_MAX;
if (jparent) {
if (yyjson_is_int(jparent)) {
int64_t v = yyjson_get_int(jparent);
if (v >= 0) parent = (MsgID)v;
} else if (yyjson_is_str(jparent)) {
const char* ps = yyjson_get_str(jparent);
if (ps) {
char* endp = NULL;
unsigned long long tmp = strtoull(ps, &endp, 10);
if (endp && *endp == '\0') parent = (MsgID)tmp;
}
}
}
MsgData* msg = malloc(sizeof(MsgData));
if (!msg) {
lwsl_err("Failed to allocate MsgData.\n");
return;
}
msg->id = next_msg_id++;
msg->author.id = session_get_id(sess);
msg->author.name = session_get_name(sess);
msg->parent = parent;
strncpy(msg->content, content, MSG_MAX_LENGTH - 1);
msg->content[MSG_MAX_LENGTH - 1] = '\0';
msg->timestamp = time(NULL);
lwsl_notice(
"msg from %" PRIu64 " (%s): \"%s\" parent=%s\n", msg->author.id,
*msg->author.name, msg->content,
(msg->parent == UINT64_MAX ? "null" : "")
);
chat_history_msg_add(msg);
ack.id = msg->id;
Packet* ackp = packet_init(PACKET_TYPE_MSG_ACK, &ack);
session_send(sess, ackp);
free(ackp);
Packet* evt = packet_init(PACKET_TYPE_MSG_EVT, msg);
session_send_all(evt);
free(evt);
}
static void do_ping(Session* sess) {
PacketPing data = {.ts = (uint64_t)time(NULL)};
Packet* packet = packet_init(PACKET_TYPE_PING, &data);
session_send(sess, packet);
free(packet);
sess->awaiting_pong = true;
sess->last_ping_ts = data.ts;
}
#define CHAT_PACKET_SIZE SESSION_CHAT_BUF_SIZE
@@ -88,12 +243,12 @@ int cb_chat(
) {
Session** ps_p = (Session**)user;
Session* sess = ps_p ? *ps_p : NULL;
Session* head = session_get_head();
switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
// New connection, create session.
if (ps_p) { *ps_p = session_create(wsi); }
lws_set_timer_usecs(wsi, 10 * LWS_USEC_PER_SEC);
break;
case LWS_CALLBACK_RECEIVE: {
// Parse inc JSON packet.
@@ -107,60 +262,21 @@ int cb_chat(
Packet* packet = packet_parse(in, len);
if (!packet) {
lwsl_warn("Received bad packet: \"%s\".\n", (char*)in);
int loglen = (len > 256) ? 256 : (int)len;
lwsl_warn(
"Received bad packet: \"%.*s\".\n", loglen, (const char*)in
);
break;
}
switch (packet->type) {
case PACKET_TYPE_JOIN:
// #1. Handle join packet, add new session.
do_welcome(sess);
do_join(sess, packet);
//
// #2. Welcome the new client.
{
yyjson_mut_doc* wdoc = yyjson_mut_doc_new(NULL);
yyjson_mut_val* wroot = yyjson_mut_obj(wdoc);
yyjson_mut_doc_set_root(wdoc, wroot);
yyjson_mut_obj_add_str(wdoc, wroot, "type", "welcome");
yyjson_mut_val* wdata = yyjson_mut_obj(wdoc);
yyjson_mut_obj_add_val(wdoc, wroot, "data", wdata);
yyjson_mut_obj_add_uint(
wdoc, wdata, "you", session_get_id(sess)
);
yyjson_mut_val* wusers = yyjson_mut_arr(wdoc);
yyjson_mut_obj_add_val(wdoc, wdata, "users", wusers);
for (Session* s = head; s; s = s->next) {
if (session_has_name(s)) {
yyjson_mut_val* uobj = yyjson_mut_obj(wdoc);
/* add each user once */
yyjson_mut_arr_add_val(wusers, uobj);
yyjson_mut_obj_add_uint(
wdoc, uobj, "id", session_get_id(s)
);
yyjson_mut_obj_add_str(
wdoc, uobj, "username", session_get_name(s)
);
}
}
// History will be streamed individually after welcome.
size_t out_len;
char* out = yyjson_mut_write(wdoc, 0, &out_len);
size_t copy_len = out_len < SESSION_CHAT_BUF_SIZE
? out_len
: SESSION_CHAT_BUF_SIZE;
sess->buf_len = copy_len;
memcpy(&sess->buf[LWS_PRE], out, copy_len);
// Schedule the welcome frame for writable callback.
lws_callback_on_writable(sess->wsi);
free(out);
yyjson_mut_doc_free(wdoc);
}
do_joinevt(sess);
#if 0
// #2. Introduce our new client to everybody else.
{
yyjson_mut_doc* jdoc = yyjson_mut_doc_new(NULL);
@@ -176,7 +292,7 @@ int cb_chat(
jdoc, jdata, "id", session_get_id(sess)
);
yyjson_mut_obj_add_str(
jdoc, jdata, "username", session_get_name(sess)
jdoc, jdata, "username", *session_get_name(sess)
);
size_t out_len;
@@ -185,208 +301,138 @@ int cb_chat(
? out_len
: SESSION_CHAT_BUF_SIZE;
for (Session* s = head; s; s = s->next) {
s->buf_len = copy_len;
memcpy(&s->buf[LWS_PRE], out, copy_len);
lws_callback_on_writable(s->wsi);
session_send_raw(s, out, copy_len);
}
free(out);
yyjson_mut_doc_free(jdoc);
}
#endif
break;
case PACKET_TYPE_MSG: do_msg(sess, packet); break;
case PACKET_TYPE_NAME: {
// Extract requested new name.
yyjson_val* pdata = packet->data;
yyjson_val* jname =
pdata ? yyjson_obj_get(pdata, "name") : NULL;
if (!jname) {
jname =
pdata ? yyjson_obj_get(pdata, "username") : NULL;
}
if (!(jname && yyjson_is_str(jname))) {
PacketNameAck ack = {.status = "bad_name"};
Packet* ackp = packet_init(PACKET_TYPE_NAME_ACK, &ack);
session_send(sess, ackp);
free(ackp);
break;
}
const char* rname = yyjson_get_str(jname);
Name new_name = {'\0'};
for (size_t i = 0; rname[i] && i < NAME_MAX_LENGTH - 1; i++)
new_name[i] = rname[i];
// Capture old name, broadcast event, then update session.
Name old_name = {'\0'};
strncpy(
old_name, *session_get_name(sess), NAME_MAX_LENGTH - 1
);
old_name[NAME_MAX_LENGTH - 1] = '\0';
do_nameevt(sess, old_name, new_name);
session_set_name(sess, new_name);
PacketNameAck ack = {.status = "success"};
strncpy(ack.name, new_name, NAME_MAX_LENGTH);
ack.name[NAME_MAX_LENGTH - 1] = '\0';
Packet* ackp = packet_init(PACKET_TYPE_NAME_ACK, &ack);
session_send(sess, ackp);
free(ackp);
break;
}
case PACKET_TYPE_PONG:
// Client responded; nothing else to do (timer continues).
if (sess) {
sess->awaiting_pong = false;
sess->missed_pings = 0;
}
break;
case PACKET_TYPE_MSG: do_msg(sess, packet); break;
default:
lwsl_warn(
"Received client-only packet: \"%s\".\n", (char*)in
);
}
}
#if 0
// NAME.
else if (type && strcmp(type, "name") == 0 && sess &&
session_has_username(sess)) {
const yyjson_val* dataVal =
root ? yyjson_obj_get((yyjson_val*)root, "data") : NULL;
const yyjson_val* nval =
dataVal ? yyjson_obj_get((yyjson_val*)dataVal, "username")
: NULL;
// Enforce max name length.
const char* raw_newname =
(nval && yyjson_is_str((yyjson_val*)nval))
? yyjson_get_str((yyjson_val*)nval)
: NULL;
char newname_buf[SESSION_USERNAME_MAX_LEN];
if (raw_newname && raw_newname[0] != '\0') {
size_t _j;
for (_j = 0;
raw_newname[_j] && _j < SESSION_USERNAME_MAX_LEN - 1;
++_j) {
unsigned char c = (unsigned char)raw_newname[_j];
newname_buf[_j] = isprint(c) ? c : '?';
} break;
case LWS_CALLBACK_TIMER:
if (sess) {
if (sess->awaiting_pong) {
sess->missed_pings++;
if (sess->missed_pings >= 3) {
lwsl_notice(
"Closing session %" PRIu64
" after %u missed pings\n",
session_get_id(sess), sess->missed_pings
);
lws_close_reason(
wsi, LWS_CLOSE_STATUS_GOINGAWAY,
(unsigned char*)"ping timeout", 12
);
return -1;
}
newname_buf[_j] = '\0';
} else {
// Disallow empty names.
strcpy(newname_buf, session_get_name(sess));
}
const char* newname = newname_buf;
// Buffer old name before updating.
char oldname_buf[SESSION_USERNAME_MAX_LEN];
const char* current = session_get_name(sess);
if (current) {
strncpy(oldname_buf, current, SESSION_USERNAME_MAX_LEN - 1);
oldname_buf[SESSION_USERNAME_MAX_LEN - 1] = '\0';
} else {
oldname_buf[0] = '\0';
}
// Now update to new name.
session_set_name(sess, newname);
// Broadcast name-event to other clients.
{
yyjson_mut_doc* ndoc = yyjson_mut_doc_new(NULL);
yyjson_mut_val* nroot = yyjson_mut_obj(ndoc);
yyjson_mut_doc_set_root(ndoc, nroot);
yyjson_mut_obj_add_str(ndoc, nroot, "type", "name-event");
yyjson_mut_val* ndata = yyjson_mut_obj(ndoc);
yyjson_mut_obj_add_val(ndoc, nroot, "data", ndata);
yyjson_mut_obj_add_uint(
ndoc, ndata, "id", session_get_id(sess)
);
yyjson_mut_obj_add_str(ndoc, ndata, "old", oldname_buf);
yyjson_mut_obj_add_str(ndoc, ndata, "new", newname);
size_t out_len;
char* out = yyjson_mut_write(ndoc, 0, &out_len);
size_t copy_len = out_len < SESSION_CHAT_BUF_SIZE
? out_len
: SESSION_CHAT_BUF_SIZE;
for (Session* s = head; s; s = s->next) {
s->buf_len = copy_len;
memcpy(&s->buf[LWS_PRE], out, copy_len);
lws_callback_on_writable(s->wsi);
}
free(out);
yyjson_mut_doc_free(ndoc);
sess->missed_pings = 0;
}
do_ping(sess);
lws_set_timer_usecs(wsi, 10 * LWS_USEC_PER_SEC);
}
// MSG.
else if (type && strcmp(type, "msg") == 0 && sess &&
session_has_username(sess)) {
const yyjson_val* data =
yyjson_obj_get((yyjson_val*)root, "data");
const yyjson_val* cval =
data ? yyjson_obj_get((yyjson_val*)data, "content") : NULL;
// Enforce maximum message content length
const char* raw_msg = (cval && yyjson_is_str((yyjson_val*)cval))
? yyjson_get_str((yyjson_val*)cval)
: "";
char msg_buf[CHAT_BUF_SIZE];
strncpy(msg_buf, raw_msg, CHAT_BUF_SIZE - 1);
msg_buf[CHAT_BUF_SIZE - 1] = '\0';
// Sanitize message to printable characters.
for (size_t _k = 0; msg_buf[_k]; ++_k) {
unsigned char c = (unsigned char)msg_buf[_k];
if (!isprint(c)) { msg_buf[_k] = '?'; }
}
const char* msg = msg_buf;
// Build msg-event JSON with ID, parent and timestamp.
// Get parent ID if present.
const yyjson_val* dataVal =
yyjson_obj_get((yyjson_val*)root, "data");
const yyjson_val* pval =
dataVal ? yyjson_obj_get((yyjson_val*)dataVal, "parent")
: NULL;
uint64_t parent_id = (pval && yyjson_is_uint((yyjson_val*)pval))
? yyjson_get_uint((yyjson_val*)pval)
: 0;
time_t now = time(NULL);
size_t msg_id = next_msg_id++;
// Send ack with the new message ID.
{
yyjson_mut_doc* ackdoc = yyjson_mut_doc_new(NULL);
yyjson_mut_val* ackroot = yyjson_mut_obj(ackdoc);
yyjson_mut_doc_set_root(ackdoc, ackroot);
yyjson_mut_obj_add_str(ackdoc, ackroot, "type", "msg-ack");
yyjson_mut_val* ackdat = yyjson_mut_obj(ackdoc);
yyjson_mut_obj_add_val(ackdoc, ackroot, "data", ackdat);
yyjson_mut_obj_add_uint(ackdoc, ackdat, "id", msg_id);
size_t ack_len;
char* ack_out = yyjson_mut_write(ackdoc, 0, &ack_len);
size_t copy_ack = ack_len < SESSION_CHAT_BUF_SIZE
? ack_len
: SESSION_CHAT_BUF_SIZE;
sess->buf_len = copy_ack;
memcpy(&sess->buf[LWS_PRE], ack_out, copy_ack);
lws_write(
sess->wsi, &sess->buf[LWS_PRE], sess->buf_len,
LWS_WRITE_TEXT
);
free(ack_out);
yyjson_mut_doc_free(ackdoc);
}
yyjson_mut_doc* mdoc = yyjson_mut_doc_new(NULL);
yyjson_mut_val* mroot = yyjson_mut_obj(mdoc);
yyjson_mut_doc_set_root(mdoc, mroot);
yyjson_mut_obj_add_str(mdoc, mroot, "type", "msg-event");
yyjson_mut_val* mdat = yyjson_mut_obj(mdoc);
yyjson_mut_obj_add_val(mdoc, mroot, "data", mdat);
yyjson_mut_obj_add_uint(mdoc, mdat, "id", msg_id);
if (parent_id)
yyjson_mut_obj_add_uint(mdoc, mdat, "parent", parent_id);
else yyjson_mut_obj_add_null(mdoc, mdat, "parent");
yyjson_mut_obj_add_uint(mdoc, mdat, "ts", (uint64_t)now);
yyjson_mut_obj_add_str(
mdoc, mdat, "username", session_get_name(sess)
);
yyjson_mut_obj_add_str(mdoc, mdat, "content", msg);
size_t out_len;
char* out = yyjson_mut_write(mdoc, 0, &out_len);
// Store this msg-event JSON in the circular history buffer.
if (history_count == HISTORY_SIZE) {
free(history[history_pos]);
} else {
history_count++;
}
history[history_pos] = strdup(out);
history_len[history_pos] = out_len;
history_pos = (history_pos + 1) % HISTORY_SIZE;
size_t copy_len = out_len < SESSION_CHAT_BUF_SIZE
? out_len
: SESSION_CHAT_BUF_SIZE;
for (Session* s = head; s; s = s->next) {
s->buf_len = copy_len;
memcpy(&s->buf[LWS_PRE], out, copy_len);
lws_callback_on_writable(s->wsi);
}
free(out);
yyjson_mut_doc_free(mdoc);
// Writable events already scheduled for each session above.
}
yyjson_doc_free(doc);
break;
}
#endif
case LWS_CALLBACK_SERVER_WRITEABLE:
if (sess && sess->buf_len > 0) {
lws_write(
sess->wsi, &sess->buf[LWS_PRE], sess->buf_len,
LWS_WRITE_TEXT
);
sess->buf_len = 0;
}
if (!sess->queue_head) break;
MsgNode* node = sess->queue_head;
// Send the data at payload + LWS_PRE.
lwsl_notice(
"write to session %" PRIu64 " len=%zu queue=%zu\n",
session_get_id(sess), node->ln, sess->queue_count
);
int m = lws_write(
wsi, &node->payload[LWS_PRE], node->ln, LWS_WRITE_TEXT
);
if (m < 0) return -1; // Socket error, kill connection.
// Pop the queue.
sess->queue_head = node->nxt;
if (!sess->queue_head) { sess->queue_tail = NULL; }
sess->queue_count--;
free(node);
// If there are still messages left, notice them.
if (sess->queue_head) { lws_callback_on_writable(wsi); }
break;
case LWS_CALLBACK_CLOSED:
// Goodbye.
// TODO: Add leave event to proto.
if (sess) { session_destroy(sess); }
if (sess) {
// Broadcast leave event before destroying.
PacketJoinEvt leave = {
.id = session_get_id(sess), .name = session_get_name(sess)
};
Packet* evt = packet_init(PACKET_TYPE_LEAVE_EVT, &leave);
session_send_all(evt);
free(evt);
MsgNode* current = sess->queue_head;
while (current) {
MsgNode* tmp = current;
current = current->nxt;
free(tmp);
}
session_destroy(sess);
}
break;
default: break;