1 /** 2 Manages the Discord websocket client. 3 */ 4 module dcord.gateway.client; 5 6 import std.stdio, 7 std.uni, 8 std.functional, 9 std.zlib, 10 std.datetime, 11 std.variant, 12 std.format, 13 core.exception, 14 core.memory; 15 16 static import std.typecons; 17 18 import vibe.core.core, 19 vibe.inet.url, 20 vibe.http.websockets; 21 22 import dcord.client, 23 dcord.gateway, 24 dcord.util.emitter, 25 dcord.util.json, 26 dcord.util.counter, 27 dcord.types; 28 29 /** Maximum reconnects the GatewayClient will try before resetting session state */ 30 const ubyte MAX_RECONNECTS = 6; 31 32 /** Current implemented Gateway version. */ 33 const ubyte GATEWAY_VERSION = 6; 34 /** 35 GatewayClient is the base abstraction for connecting to, and interacting with 36 the Discord Websocket (gateway) API. 37 */ 38 class GatewayClient { 39 /** Client instance for this gateway connection */ 40 Client client; 41 42 /** WebSocket connection for this gateway connection */ 43 WebSocket sock; 44 45 /** Gateway SessionID, used for resuming. */ 46 string sessionID; 47 48 /** Gateway sequence number, used for resuming */ 49 uint seq; 50 51 /** Heartbeat interval */ 52 uint heartbeatInterval; 53 54 /** Whether this GatewayClient is currently connected */ 55 bool connected; 56 57 /** Number of reconnects attempted */ 58 ubyte reconnects; 59 60 /** The heartbeater task */ 61 Task heartbeater; 62 63 /** Event emitter for Gateway Packets */ 64 Emitter eventEmitter; 65 66 private { 67 /** Cached gateway URL from the API */ 68 string cachedGatewayURL; 69 Counter!string eventCounter; 70 bool eventTracking; 71 } 72 73 /** 74 Params: 75 client = base client 76 eventTracking = if true, log information about events recieved 77 */ 78 this(Client client, bool eventTracking = false) { 79 this.client = client; 80 this.eventTracking = eventTracking; 81 82 // Create the event emitter and listen to some required gateway events. 83 this.eventEmitter = new Emitter; 84 this.eventEmitter.listen!Ready(toDelegate(&this.handleReadyEvent)); 85 this.eventEmitter.listen!Resumed(toDelegate(&this.handleResumedEvent)); 86 87 // Copy emitters to client for easier API access 88 client.events = this.eventEmitter; 89 90 if (this.eventTracking) this.eventCounter = new Counter!string; 91 } 92 93 /** 94 Logger for this GatewayClient. 95 */ 96 @property Logger log() { 97 return this.client.log; 98 } 99 100 /** 101 Starts a connection to the gateway. Also called for resuming/reconnecting. 102 */ 103 void start(Game game=null) { 104 if(this.sock && this.sock.connected) this.sock.close(); 105 106 // If this is our first connection, get a gateway websocket URL. Later on it is cached. 107 if(!this.cachedGatewayURL) { 108 this.cachedGatewayURL = client.api.gatewayGet(); 109 this.cachedGatewayURL ~= format("/?v=%s&encoding=%s", GATEWAY_VERSION, "json"); 110 } 111 112 // Start the main task 113 this.log.infof("Starting connection to Gateway WebSocket (%s)", this.cachedGatewayURL); 114 this.sock = connectWebSocket(URL(this.cachedGatewayURL)); 115 if(game is null) runTask(() => this.run()); 116 else runTask(() => this.run(game)); 117 } 118 119 /** 120 Send a gateway payload. 121 */ 122 void send(Serializable p) { 123 string data = p.serialize().toString; 124 version (DEBUG_GATEWAY_DATA) { 125 this.log.tracef("GATEWAY SEND: %s", data); 126 } 127 this.sock.send(data); 128 } 129 130 void updateStatus(Game game=null) { 131 this.send(new StatusUpdate(game)); 132 } 133 private void debugEventCounts() { 134 while (true) { 135 this.eventCounter.resetAll(); 136 sleep(5.seconds); 137 this.log.infof("%s total events", this.eventCounter.total); 138 139 foreach (ref event; this.eventCounter.mostCommon(5)) { 140 this.log.infof(" %s: %s", event, this.eventCounter.get(event)); 141 } 142 } 143 } 144 145 private void handleReadyEvent(Ready r) { 146 this.log.infof("Recieved READY payload, starting heartbeater"); 147 // this.hb_interval = r.heartbeatInterval; 148 this.sessionID = r.sessionID; 149 this.reconnects = 0; 150 151 if (this.eventTracking) { 152 runTask(toDelegate(&this.debugEventCounts)); 153 } 154 } 155 156 private void handleResumedEvent(Resumed r) { // stfu 157 this.heartbeater = runTask(toDelegate(&this.heartbeat)); 158 // TODO: do an action in a closure with the Resumed event 159 } 160 161 private void emitDispatchEvent(T)(VibeJSON obj) { 162 T v = new T(this.client, obj["d"]); 163 this.eventEmitter.emit!T(v); 164 v.resolveDeferreds(); 165 } 166 167 private void handleDispatchPacket(VibeJSON obj, size_t size) { 168 // Update sequence number if it's larger than what we have 169 uint seq = obj["s"].get!uint; // stfu 170 if (seq > this.seq) { 171 this.seq = seq; 172 } 173 174 string type = obj["t"].get!string; 175 176 if (this.eventTracking) { 177 this.eventCounter.tick(type); 178 } 179 180 switch (type) { 181 case "READY": 182 this.log.infof("Recieved READY payload, size in bytes: %s", size); 183 this.emitDispatchEvent!Ready(obj); 184 break; 185 case "RESUMED": 186 this.emitDispatchEvent!Resumed(obj); 187 break; 188 case "CHANNEL_CREATE": 189 this.emitDispatchEvent!ChannelCreate(obj); 190 break; 191 case "CHANNEL_UPDATE": 192 this.emitDispatchEvent!ChannelUpdate(obj); 193 break; 194 case "CHANNEL_DELETE": 195 this.emitDispatchEvent!ChannelDelete(obj); 196 break; 197 case "GUILD_BAN_ADD": 198 this.emitDispatchEvent!GuildBanAdd(obj); 199 break; 200 case "GUILD_BAN_REMOVE": 201 this.emitDispatchEvent!GuildBanRemove(obj); 202 break; 203 case "GUILD_CREATE": 204 this.emitDispatchEvent!GuildCreate(obj); 205 break; 206 case "GUILD_UPDATE": 207 this.emitDispatchEvent!GuildUpdate(obj); 208 break; 209 case "GUILD_DELETE": 210 this.emitDispatchEvent!GuildDelete(obj); 211 break; 212 case "GUILD_EMOJIS_UPDATE": 213 this.emitDispatchEvent!GuildEmojisUpdate(obj); 214 break; 215 case "GUILD_INTEGRATIONS_UPDATE": 216 this.emitDispatchEvent!GuildIntegrationsUpdate(obj); 217 break; 218 case "GUILD_MEMBERS_CHUNK": 219 this.emitDispatchEvent!GuildMembersChunk(obj); 220 break; 221 case "GUILD_MEMBER_ADD": 222 this.emitDispatchEvent!GuildMemberAdd(obj); 223 break; 224 case "GUILD_MEMBER_UPDATE": 225 this.emitDispatchEvent!GuildMemberUpdate(obj); 226 break; 227 case "GUILD_MEMBER_REMOVE": 228 this.emitDispatchEvent!GuildMemberRemove(obj); 229 break; 230 case "GUILD_ROLE_CREATE": 231 this.emitDispatchEvent!GuildRoleCreate(obj); 232 break; 233 case "GUILD_ROLE_UPDATE": 234 this.emitDispatchEvent!GuildRoleUpdate(obj); 235 break; 236 case "GUILD_ROLE_DELETE": 237 this.emitDispatchEvent!GuildRoleDelete(obj); 238 break; 239 case "MESSAGE_CREATE": 240 this.emitDispatchEvent!MessageCreate(obj); 241 break; 242 case "MESSAGE_UPDATE": 243 this.emitDispatchEvent!MessageUpdate(obj); 244 break; 245 case "MESSAGE_DELETE": 246 this.emitDispatchEvent!MessageDelete(obj); 247 break; 248 case "PRESENCE_UPDATE": 249 this.emitDispatchEvent!PresenceUpdate(obj); 250 break; 251 case "TYPING_START": 252 this.emitDispatchEvent!TypingStart(obj); 253 break; 254 case "USER_SETTINGS_UPDATE": 255 this.emitDispatchEvent!UserSettingsUpdate(obj); 256 break; 257 case "USER_UPDATE": 258 this.emitDispatchEvent!UserUpdate(obj); 259 break; 260 case "VOICE_STATE_UPDATE": 261 this.emitDispatchEvent!VoiceStateUpdate(obj); 262 break; 263 case "VOICE_SERVER_UPDATE": 264 this.emitDispatchEvent!VoiceServerUpdate(obj); 265 break; 266 case "CHANNEL_PINS_UPDATE": 267 this.emitDispatchEvent!ChannelPinsUpdate(obj); 268 break; 269 case "MESSAGE_DELETE_BULK": 270 this.emitDispatchEvent!MessageDeleteBulk(obj); 271 break; 272 case "MESSAGE_REACTION_ADD": 273 this.emitDispatchEvent!MessageReactionAdd(obj); 274 break; 275 default: 276 this.log.warningf("Unhandled dispatch event: %s", type); 277 break; 278 } 279 } 280 281 private void parse(string rawData) { 282 GC.disable; // because GC messes stuff up 283 VibeJSON json = parseJsonString(rawData); 284 285 version (DEBUG_GATEWAY_DATA) { 286 } 287 288 OPCode op = json["op"].get!OPCode; 289 switch (op) { 290 case OPCode.DISPATCH: 291 this.handleDispatchPacket(json, rawData.length); 292 break; 293 case OPCode.HEARTBEAT: 294 this.send(new HeartbeatPacket(this.seq)); 295 break; 296 case OPCode.RECONNECT: 297 this.log.warningf("Recieved RECONNECT OPCode, resetting connection..."); 298 if (this.sock && this.sock.connected) this.sock.close(); 299 break; 300 case OPCode.INVALID_SESSION: 301 this.log.warningf("Recieved INVALID_SESSION OPCode, resetting connection..."); 302 if (this.sock && this.sock.connected) this.sock.close(); 303 break; 304 case OPCode.HELLO: 305 this.log.tracef("Recieved HELLO OPCode, starting heartbeater..."); 306 this.heartbeatInterval = json["d"]["heartbeat_interval"].get!uint; 307 this.heartbeater = runTask(toDelegate(&this.heartbeat)); 308 break; 309 case OPCode.HEARTBEAT_ACK: 310 break; 311 default: 312 this.log.warningf("Unhandled gateway packet: %s", op); 313 break; 314 } 315 } 316 317 private void heartbeat() { 318 while(this.connected) { 319 this.send(new HeartbeatPacket(this.seq)); 320 sleep(this.heartbeatInterval.msecs); 321 } 322 } 323 324 /// Runs the GatewayClient until completion 325 void run(Game game=null) { 326 string data; 327 328 // If we already have a sequence number, attempt to resume 329 if(this.sessionID && this.seq) { 330 this.log.infof("Sending resume payload (session ID %s with gateway sequence number %s)", 331 this.sessionID, this.seq); 332 333 this.send(new ResumePacket(this.client.token, this.sessionID, this.seq)); 334 } else { 335 // On startup, send the identify payload 336 this.log.info("Sending identify payload"); 337 this.send(new IdentifyPacket( 338 this.client.token, 339 this.client.shardInfo.shard, 340 this.client.shardInfo.numShards)); 341 } 342 343 this.log.info("Connected to Gateway"); 344 this.connected = true; 345 this.send(new StatusUpdate(game)); 346 while (this.sock.waitForData()) { 347 if(!this.connected) break; 348 try { 349 ubyte[] rawdata = this.sock.receiveBinary(); 350 data = cast(string)uncompress(rawdata); // raw cast could be dangerous - maybe use toString in the future 351 } catch (Exception e) { 352 data = this.sock.receiveText(); 353 } 354 if(data == "") continue; 355 356 try { 357 this.parse(data); 358 } catch(Exception e) { 359 this.log.warningf("failed to handle data (%s)", e); 360 } catch(Error e) { // stfu 361 this.log.warningf("failed to handle data (%s)", e); 362 } 363 } 364 365 this.log.critical("Gateway websocket closed (code " ~ this.sock.closeCode().toString() ~ ")"); 366 this.connected = false; 367 this.reconnects++; 368 369 if(this.reconnects > MAX_RECONNECTS) { 370 this.log.errorf("Max Gateway reconnects (%s) hit, aborting...", this.reconnects); 371 return; 372 } 373 374 if(this.reconnects > 1) { 375 this.sessionID = null; 376 this.seq = 0; 377 this.log.warning("Waiting 5 seconds before reconnecting..."); 378 sleep(5.seconds); 379 } 380 381 this.log.info("Attempting reconnection..."); 382 return this.start(); 383 } 384 }