#include "include/chat.h" #include "include/api.h" #include "include/data.h" #include "include/session.h" #include #include #include #include #include #include MsgData* chat_history[CHAT_HISTORY_SZ] = {NULL}; size_t chat_history_head = 0; // Next insertion index. size_t chat_history_count = 0; // Number of valid entries. static MsgID next_msg_id = 1; MsgData* chat_history_msg_add(MsgData* msg) { chat_history[chat_history_head] = msg; chat_history_head = (chat_history_head + 1) % CHAT_HISTORY_SZ; if (chat_history_count < CHAT_HISTORY_SZ) { chat_history_count++; } return msg; } MsgData** chat_history_nice(void) { MsgData** msgs = calloc(CHAT_HISTORY_SZ, sizeof(MsgData*)); // Oldest entry is head - count (mod size). size_t start = (chat_history_head + CHAT_HISTORY_SZ - chat_history_count) % CHAT_HISTORY_SZ; for (size_t j = 0; j < chat_history_count; j++) { size_t idx = (start + j) % CHAT_HISTORY_SZ; msgs[j] = chat_history[idx]; } return msgs; } // Parse a raw packet. Packet* packet_parse(const char* in, size_t len) { yyjson_doc* doc = yyjson_read((const char*)in, len, 0); if (!doc) return NULL; yyjson_val* root = yyjson_doc_get_root(doc); yyjson_val* jtype = yyjson_obj_get(root, "type"); const char* type = (jtype && yyjson_is_str(jtype)) ? yyjson_get_str(jtype) : NULL; yyjson_val* rdata = yyjson_obj_get((yyjson_val*)root, "data"); const yyjson_val* data = ((rdata && yyjson_is_obj(rdata)) ? rdata : NULL); return packet_init_safe(type, data); } // Do a join packet. void do_join(Session* sess, Packet* packet) { yyjson_val* data = packet->data; yyjson_val* jname = data ? yyjson_obj_get(data, "name") : NULL; if (!jname) { jname = data ? yyjson_obj_get(data, "username") : NULL; } const char* rname = (jname && yyjson_is_str(jname)) ? yyjson_get_str(jname) : "Anon"; Name name = {'\0'}; for (size_t i = 0; rname[i] && i < NAME_MAX_LENGTH - 1; i++) name[i] = rname[i]; session_set_name(sess, name); } // Do a welcome packet. void do_welcome(Session* sess) { // Build list of online users (only sessions with a name). size_t onlinec = 0; for (Session* it = session_get_head(); it; it = it->next) if (session_has_name(it)) onlinec++; UserData* online = calloc(onlinec, sizeof(UserData)); size_t oi = 0; for (Session* it = session_get_head(); it; it = it->next) { if (!session_has_name(it)) continue; online[oi].id = session_get_id(it); online[oi].name = session_get_name(it); oi++; } // Build history list. MsgData** history = chat_history_nice(); size_t historyc = 0; while (historyc < CHAT_HISTORY_SZ && history[historyc]) historyc++; PacketWelcome data = { .id = session_get_id(sess), .onlinec = onlinec, .online = online, .historyc = historyc, .history = history }; Packet* packet = packet_init(PACKET_TYPE_WELCOME, &data); session_send(sess, packet); free(packet); free(online); free(history); } // Do a welcome packet. void do_joinevt(Session* sess) { PacketJoinEvt data = { .id = session_get_id(sess), .name = session_get_name(sess) }; Packet* packet = packet_init(PACKET_TYPE_JOIN_EVT, &data); session_send_all(packet); } // Do a name change event: user contains OLD name, new_name is the updated name. void do_nameevt(Session* sess, const Name old_name, const Name new_name) { PacketNameEvt data = {.user = {.id = session_get_id(sess), .name = NULL}}; strncpy(data.old_name, old_name, NAME_MAX_LENGTH - 1); data.old_name[NAME_MAX_LENGTH - 1] = '\0'; data.user.name = &data.old_name; strncpy(data.new_name, new_name, NAME_MAX_LENGTH - 1); data.new_name[NAME_MAX_LENGTH - 1] = '\0'; Packet* packet = packet_init(PACKET_TYPE_NAME_EVT, &data); session_send_all(packet); } // Handle an incoming client message, store it, and broadcast an event. void do_msg(Session* sess, Packet* packet) { if (!sess || !packet) return; PacketMsgAck ack = {.id = 0, .status = "success"}; // Require the sender to have a name. if (!session_has_name(sess)) { ack.status = "unnamed_session"; Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack); session_send(sess, p); free(p); return; } yyjson_val* data = (yyjson_val*)packet->data; if (!data || !yyjson_is_obj(data)) { ack.status = "bad_data"; Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack); session_send(sess, p); free(p); return; } yyjson_val* jcontent = yyjson_obj_get(data, "content"); if (!(jcontent && yyjson_is_str(jcontent))) { ack.status = "bad_content"; Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack); session_send(sess, p); free(p); return; } const char* content = yyjson_get_str(jcontent); if (!msg_verify(content)) { ack.status = "invalid_content"; Packet* p = packet_init(PACKET_TYPE_MSG_ACK, &ack); session_send(sess, p); free(p); return; } yyjson_val* jparent = yyjson_obj_get(data, "parent"); MsgID parent = UINT64_MAX; if (jparent) { if (yyjson_is_int(jparent)) { int64_t v = yyjson_get_int(jparent); if (v >= 0) parent = (MsgID)v; } else if (yyjson_is_str(jparent)) { const char* ps = yyjson_get_str(jparent); if (ps) { char* endp = NULL; unsigned long long tmp = strtoull(ps, &endp, 10); if (endp && *endp == '\0') parent = (MsgID)tmp; } } } MsgData* msg = malloc(sizeof(MsgData)); if (!msg) { lwsl_err("Failed to allocate MsgData.\n"); return; } msg->id = next_msg_id++; msg->author.id = session_get_id(sess); msg->author.name = session_get_name(sess); msg->parent = parent; strncpy(msg->content, content, MSG_MAX_LENGTH - 1); msg->content[MSG_MAX_LENGTH - 1] = '\0'; msg->timestamp = time(NULL); lwsl_notice( "msg from %" PRIu64 " (%s): \"%s\" parent=%s\n", msg->author.id, *msg->author.name, msg->content, (msg->parent == UINT64_MAX ? "null" : "") ); chat_history_msg_add(msg); ack.id = msg->id; Packet* ackp = packet_init(PACKET_TYPE_MSG_ACK, &ack); session_send(sess, ackp); free(ackp); Packet* evt = packet_init(PACKET_TYPE_MSG_EVT, msg); session_send_all(evt); free(evt); } static void do_ping(Session* sess) { PacketPing data = {.ts = (uint64_t)time(NULL)}; Packet* packet = packet_init(PACKET_TYPE_PING, &data); session_send(sess, packet); free(packet); sess->awaiting_pong = true; sess->last_ping_ts = data.ts; } #define CHAT_PACKET_SIZE SESSION_CHAT_BUF_SIZE /* * cb_chat - libwebsockets protocol callback. * Handles connection lifecycle, incoming messages, and writable events. */ int cb_chat( struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len ) { Session** ps_p = (Session**)user; Session* sess = ps_p ? *ps_p : NULL; switch (reason) { case LWS_CALLBACK_ESTABLISHED: // New connection, create session. if (ps_p) { *ps_p = session_create(wsi); } lws_set_timer_usecs(wsi, 10 * LWS_USEC_PER_SEC); break; case LWS_CALLBACK_RECEIVE: { // Parse inc JSON packet. if (len > CHAT_PACKET_SIZE) { lwsl_warn( "Received JSON payload exceeds limit: %zu bytes\n", len ); break; } Packet* packet = packet_parse(in, len); if (!packet) { int loglen = (len > 256) ? 256 : (int)len; lwsl_warn( "Received bad packet: \"%.*s\".\n", loglen, (const char*)in ); break; } switch (packet->type) { case PACKET_TYPE_JOIN: // #1. Handle join packet, add new session. do_welcome(sess); do_join(sess, packet); do_joinevt(sess); #if 0 // #2. Introduce our new client to everybody else. { yyjson_mut_doc* jdoc = yyjson_mut_doc_new(NULL); yyjson_mut_val* jroot = yyjson_mut_obj(jdoc); yyjson_mut_doc_set_root(jdoc, jroot); yyjson_mut_obj_add_str( jdoc, jroot, "type", "join-event" ); yyjson_mut_val* jdata = yyjson_mut_obj(jdoc); yyjson_mut_obj_add_val(jdoc, jroot, "data", jdata); yyjson_mut_obj_add_uint( jdoc, jdata, "id", session_get_id(sess) ); yyjson_mut_obj_add_str( jdoc, jdata, "username", *session_get_name(sess) ); size_t out_len; char* out = yyjson_mut_write(jdoc, 0, &out_len); size_t copy_len = out_len < SESSION_CHAT_BUF_SIZE ? out_len : SESSION_CHAT_BUF_SIZE; for (Session* s = head; s; s = s->next) { session_send_raw(s, out, copy_len); } free(out); yyjson_mut_doc_free(jdoc); } #endif break; case PACKET_TYPE_MSG: do_msg(sess, packet); break; case PACKET_TYPE_NAME: { // Extract requested new name. yyjson_val* pdata = packet->data; yyjson_val* jname = pdata ? yyjson_obj_get(pdata, "name") : NULL; if (!jname) { jname = pdata ? yyjson_obj_get(pdata, "username") : NULL; } if (!(jname && yyjson_is_str(jname))) { PacketNameAck ack = {.status = "bad_name"}; Packet* ackp = packet_init(PACKET_TYPE_NAME_ACK, &ack); session_send(sess, ackp); free(ackp); break; } const char* rname = yyjson_get_str(jname); Name new_name = {'\0'}; for (size_t i = 0; rname[i] && i < NAME_MAX_LENGTH - 1; i++) new_name[i] = rname[i]; // Capture old name, broadcast event, then update session. Name old_name = {'\0'}; strncpy( old_name, *session_get_name(sess), NAME_MAX_LENGTH - 1 ); old_name[NAME_MAX_LENGTH - 1] = '\0'; do_nameevt(sess, old_name, new_name); session_set_name(sess, new_name); PacketNameAck ack = {.status = "success"}; strncpy(ack.name, new_name, NAME_MAX_LENGTH); ack.name[NAME_MAX_LENGTH - 1] = '\0'; Packet* ackp = packet_init(PACKET_TYPE_NAME_ACK, &ack); session_send(sess, ackp); free(ackp); break; } case PACKET_TYPE_PONG: // Client responded; nothing else to do (timer continues). if (sess) { sess->awaiting_pong = false; sess->missed_pings = 0; } break; default: lwsl_warn( "Received client-only packet: \"%s\".\n", (char*)in ); } } break; case LWS_CALLBACK_TIMER: if (sess) { if (sess->awaiting_pong) { sess->missed_pings++; if (sess->missed_pings >= 3) { lwsl_notice( "Closing session %" PRIu64 " after %u missed pings\n", session_get_id(sess), sess->missed_pings ); lws_close_reason( wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char*)"ping timeout", 12 ); return -1; } } else { sess->missed_pings = 0; } do_ping(sess); lws_set_timer_usecs(wsi, 10 * LWS_USEC_PER_SEC); } break; case LWS_CALLBACK_SERVER_WRITEABLE: if (!sess->queue_head) break; MsgNode* node = sess->queue_head; // Send the data at payload + LWS_PRE. lwsl_notice( "write to session %" PRIu64 " len=%zu queue=%zu\n", session_get_id(sess), node->ln, sess->queue_count ); int m = lws_write( wsi, &node->payload[LWS_PRE], node->ln, LWS_WRITE_TEXT ); if (m < 0) return -1; // Socket error, kill connection. // Pop the queue. sess->queue_head = node->nxt; if (!sess->queue_head) { sess->queue_tail = NULL; } sess->queue_count--; free(node); // If there are still messages left, notice them. if (sess->queue_head) { lws_callback_on_writable(wsi); } break; case LWS_CALLBACK_CLOSED: // Goodbye. if (sess) { // Broadcast leave event before destroying. PacketJoinEvt leave = { .id = session_get_id(sess), .name = session_get_name(sess) }; Packet* evt = packet_init(PACKET_TYPE_LEAVE_EVT, &leave); session_send_all(evt); free(evt); MsgNode* current = sess->queue_head; while (current) { MsgNode* tmp = current; current = current->nxt; free(tmp); } session_destroy(sess); } break; default: break; } return 0; }