1 /** 2 Manages the Discord websocket client. 3 */ 4 module dcord.gateway.client; 5 6 import std.stdio, 7 std.uni, 8 std.functional, 9 std.zlib, 10 std.datetime, 11 std.variant, 12 std.format, 13 core.exception, 14 core.memory; 15 16 static import std.typecons; 17 18 import vibe.core.core, 19 vibe.inet.url, 20 vibe.http.websockets; 21 22 import dcord.client, 23 dcord.gateway, 24 dcord.util.emitter, 25 dcord.util.json, 26 dcord.util.counter, 27 dcord.types; 28 29 /** Maximum reconnects the GatewayClient will try before resetting session state */ 30 const ubyte MAX_RECONNECTS = 6; 31 32 /** Current implemented Gateway version. */ 33 const ubyte GATEWAY_VERSION = 6; 34 /** 35 GatewayClient is the base abstraction for connecting to, and interacting with 36 the Discord Websocket (gateway) API. 37 */ 38 class GatewayClient { 39 /** Client instance for this gateway connection */ 40 Client client; 41 42 /** WebSocket connection for this gateway connection */ 43 WebSocket sock; 44 45 /** Gateway SessionID, used for resuming. */ 46 string sessionID; 47 48 /** Gateway sequence number, used for resuming */ 49 uint seq; 50 51 /** Heartbeat interval */ 52 uint heartbeatInterval; 53 54 /** Whether this GatewayClient is currently connected */ 55 bool connected; 56 57 /** Number of reconnects attempted */ 58 ubyte reconnects; 59 60 /** The heartbeater task */ 61 Task heartbeater; 62 63 /** Event emitter for Gateway Packets */ 64 Emitter eventEmitter; 65 66 private { 67 /** Cached gateway URL from the API */ 68 string cachedGatewayURL; 69 Counter!string eventCounter; 70 bool eventTracking; 71 } 72 73 /** 74 Params: 75 client = base client 76 eventTracking = if true, log information about events recieved 77 */ 78 this(Client client, bool eventTracking = false) { 79 this.client = client; 80 this.eventTracking = eventTracking; 81 82 // Create the event emitter and listen to some required gateway events. 83 this.eventEmitter = new Emitter; 84 this.eventEmitter.listen!Ready(toDelegate(&this.handleReadyEvent)); 85 this.eventEmitter.listen!Resumed(toDelegate(&this.handleResumedEvent)); 86 87 // Copy emitters to client for easier API access 88 client.events = this.eventEmitter; 89 90 if (this.eventTracking) { 91 this.eventCounter = new Counter!string; 92 } 93 } 94 95 /** 96 Logger for this GatewayClient. 97 */ 98 @property Logger log() { 99 return this.client.log; 100 } 101 102 /** 103 Starts a connection to the gateway. Also called for resuming/reconnecting. 104 */ 105 void start() { 106 if(this.sock && this.sock.connected) this.sock.close(); 107 108 // If this is our first connection, get a gateway websocket URL. Later on it is cached. 109 if(!this.cachedGatewayURL) { 110 this.cachedGatewayURL = client.api.gatewayGet(); 111 this.cachedGatewayURL ~= format("/?v=%s&encoding=%s", GATEWAY_VERSION, "json"); 112 } 113 114 // Start the main task 115 this.log.infof("Starting connection to Gateway WebSocket (%s)", this.cachedGatewayURL); 116 this.sock = connectWebSocket(URL(this.cachedGatewayURL)); 117 runTask(toDelegate(&this.run)); 118 119 } 120 121 /** 122 Send a gateway payload. 123 */ 124 void send(Serializable p) { 125 string data = p.serialize().toString; 126 version (DEBUG_GATEWAY_DATA) { 127 this.log.tracef("GATEWAY SEND: %s", data); 128 } 129 this.sock.send(data); 130 } 131 132 private void debugEventCounts() { 133 while (true) { 134 this.eventCounter.resetAll(); 135 sleep(5.seconds); 136 this.log.infof("%s total events", this.eventCounter.total); 137 138 foreach (ref event; this.eventCounter.mostCommon(5)) { 139 this.log.infof(" %s: %s", event, this.eventCounter.get(event)); 140 } 141 } 142 } 143 144 private void handleReadyEvent(Ready r) { 145 this.log.infof("Recieved READY payload, starting heartbeater"); 146 // this.hb_interval = r.heartbeatInterval; 147 this.sessionID = r.sessionID; 148 this.reconnects = 0; 149 150 if (this.eventTracking) { 151 runTask(toDelegate(&this.debugEventCounts)); 152 } 153 } 154 155 private void handleResumedEvent(Resumed r) { 156 this.heartbeater = runTask(toDelegate(&this.heartbeat)); 157 } 158 159 private void emitDispatchEvent(T)(VibeJSON obj) { 160 T v = new T(this.client, obj["d"]); 161 this.eventEmitter.emit!T(v); 162 v.resolveDeferreds(); 163 // TODO: determine if we really need to destory things here 164 // v.destroy(); 165 } 166 167 private void handleDispatchPacket(VibeJSON obj, size_t size) { 168 // Update sequence number if it's larger than what we have 169 uint seq = obj["s"].get!uint; 170 if (seq > this.seq) { 171 this.seq = seq; 172 } 173 174 string type = obj["t"].get!string; 175 176 if (this.eventTracking) { 177 this.eventCounter.tick(type); 178 } 179 180 switch (type) { 181 case "READY": 182 this.log.infof("Recieved READY payload, size in bytes: %s", size); 183 this.emitDispatchEvent!Ready(obj); 184 break; 185 case "RESUMED": 186 this.emitDispatchEvent!Resumed(obj); 187 break; 188 case "CHANNEL_CREATE": 189 this.emitDispatchEvent!ChannelCreate(obj); 190 break; 191 case "CHANNEL_UPDATE": 192 this.emitDispatchEvent!ChannelUpdate(obj); 193 break; 194 case "CHANNEL_DELETE": 195 this.emitDispatchEvent!ChannelDelete(obj); 196 break; 197 case "GUILD_BAN_ADD": 198 this.emitDispatchEvent!GuildBanAdd(obj); 199 break; 200 case "GUILD_BAN_REMOVE": 201 this.emitDispatchEvent!GuildBanRemove(obj); 202 break; 203 case "GUILD_CREATE": 204 this.emitDispatchEvent!GuildCreate(obj); 205 break; 206 case "GUILD_UPDATE": 207 this.emitDispatchEvent!GuildUpdate(obj); 208 break; 209 case "GUILD_DELETE": 210 this.emitDispatchEvent!GuildDelete(obj); 211 break; 212 case "GUILD_EMOJIS_UPDATE": 213 this.emitDispatchEvent!GuildEmojisUpdate(obj); 214 break; 215 case "GUILD_INTEGRATIONS_UPDATE": 216 this.emitDispatchEvent!GuildIntegrationsUpdate(obj); 217 break; 218 case "GUILD_MEMBERS_CHUNK": 219 this.emitDispatchEvent!GuildMembersChunk(obj); 220 break; 221 case "GUILD_MEMBER_ADD": 222 this.emitDispatchEvent!GuildMemberAdd(obj); 223 break; 224 case "GUILD_MEMBER_UPDATE": 225 this.emitDispatchEvent!GuildMemberUpdate(obj); 226 break; 227 case "GUILD_MEMBER_REMOVE": 228 this.emitDispatchEvent!GuildMemberRemove(obj); 229 break; 230 case "GUILD_ROLE_CREATE": 231 this.emitDispatchEvent!GuildRoleCreate(obj); 232 break; 233 case "GUILD_ROLE_UPDATE": 234 this.emitDispatchEvent!GuildRoleUpdate(obj); 235 break; 236 case "GUILD_ROLE_DELETE": 237 this.emitDispatchEvent!GuildRoleDelete(obj); 238 break; 239 case "MESSAGE_CREATE": 240 this.emitDispatchEvent!MessageCreate(obj); 241 break; 242 case "MESSAGE_UPDATE": 243 this.emitDispatchEvent!MessageUpdate(obj); 244 break; 245 case "MESSAGE_DELETE": 246 this.emitDispatchEvent!MessageDelete(obj); 247 break; 248 case "PRESENCE_UPDATE": 249 this.emitDispatchEvent!PresenceUpdate(obj); 250 break; 251 case "TYPING_START": 252 this.emitDispatchEvent!TypingStart(obj); 253 break; 254 case "USER_SETTINGS_UPDATE": 255 this.emitDispatchEvent!UserSettingsUpdate(obj); 256 break; 257 case "USER_UPDATE": 258 this.emitDispatchEvent!UserUpdate(obj); 259 break; 260 case "VOICE_STATE_UPDATE": 261 this.emitDispatchEvent!VoiceStateUpdate(obj); 262 break; 263 case "VOICE_SERVER_UPDATE": 264 this.emitDispatchEvent!VoiceServerUpdate(obj); 265 break; 266 case "CHANNEL_PINS_UPDATE": 267 this.emitDispatchEvent!ChannelPinsUpdate(obj); 268 break; 269 case "MESSAGE_DELETE_BULK": 270 this.emitDispatchEvent!MessageDeleteBulk(obj); 271 break; 272 default: 273 this.log.warningf("Unhandled dispatch event: %s", type); 274 break; 275 } 276 } 277 278 private void parse(string rawData) { 279 GC.disable; // because GC messes stuff up 280 VibeJSON json = parseJsonString(rawData); 281 282 version (DEBUG_GATEWAY_DATA) { 283 } 284 285 OPCode op = json["op"].get!OPCode; 286 switch (op) { 287 case OPCode.DISPATCH: 288 this.handleDispatchPacket(json, rawData.length); 289 break; 290 case OPCode.HEARTBEAT: 291 this.send(new HeartbeatPacket(this.seq)); 292 break; 293 case OPCode.RECONNECT: 294 this.log.warningf("Recieved RECONNECT OPCode, resetting connection..."); 295 if (this.sock && this.sock.connected) this.sock.close(); 296 break; 297 case OPCode.INVALID_SESSION: 298 this.log.warningf("Recieved INVALID_SESSION OPCode, resetting connection..."); 299 if (this.sock && this.sock.connected) this.sock.close(); 300 break; 301 case OPCode.HELLO: 302 this.log.tracef("Recieved HELLO OPCode, starting heartbeater..."); 303 this.heartbeatInterval = json["d"]["heartbeat_interval"].get!uint; 304 this.heartbeater = runTask(toDelegate(&this.heartbeat)); 305 break; 306 case OPCode.HEARTBEAT_ACK: 307 break; 308 default: 309 this.log.warningf("Unhandled gateway packet: %s", op); 310 break; 311 } 312 } 313 314 private void heartbeat() { 315 while (this.connected) { 316 this.send(new HeartbeatPacket(this.seq)); 317 sleep(this.heartbeatInterval.msecs); 318 } 319 } 320 321 /** 322 Runs the GatewayClient until completion. 323 */ 324 void run() { 325 string data; 326 327 // If we already have a sequence number, attempt to resume 328 if (this.sessionID && this.seq) { 329 this.log.infof("Sending Resume Payload (we where %s at %s)", this.sessionID, this.seq); 330 this.send(new ResumePacket(this.client.token, this.sessionID, this.seq)); 331 } else { 332 // On startup, send the identify payload 333 this.log.info("Sending Identify Payload"); 334 this.send(new IdentifyPacket( 335 this.client.token, 336 this.client.shardInfo.shard, 337 this.client.shardInfo.numShards)); 338 } 339 340 this.log.info("Connected to Gateway"); 341 this.connected = true; 342 while (this.sock.waitForData()) { 343 if (!this.connected) break; 344 345 try { 346 ubyte[] rawdata = this.sock.receiveBinary(); 347 data = cast(string)uncompress(rawdata); 348 } catch (Exception e) { 349 data = this.sock.receiveText(); 350 } 351 352 if (data == "") { 353 continue; 354 } 355 356 try { 357 this.parse(data); 358 } catch (Exception e) { 359 this.log.warningf("failed to handle data (%s)", e); 360 } catch (Error e) { 361 this.log.warningf("failed to handle data (%s)", e); 362 } 363 } 364 365 this.log.critical("Gateway websocket closed (code " ~ this.sock.closeCode().toString() ~ ")"); 366 this.connected = false; 367 this.reconnects++; 368 369 if (this.reconnects > MAX_RECONNECTS) { 370 this.log.errorf("Max Gateway reconnects (%s) hit, aborting...", this.reconnects); 371 return; 372 } 373 374 if (this.reconnects > 1) { 375 this.sessionID = null; 376 this.seq = 0; 377 this.log.warning("Waiting 5 seconds before reconnecting..."); 378 sleep(5.seconds); 379 } 380 381 this.log.info("Attempting reconnection..."); 382 return this.start(); 383 } 384 }