1 /**
2   Manages the Discord websocket client.
3 */
4 module dcord.gateway.client;
5 
6 import std.stdio,
7        std.uni,
8        std.functional,
9        std.zlib,
10        std.datetime,
11        std.variant,
12        std.format,
13        core.exception,
14        core.memory;
15 
16 static import std.typecons;
17 
18 import vibe.core.core,
19        vibe.inet.url,
20        vibe.http.websockets;
21 
22 import dcord.client,
23        dcord.gateway,
24        dcord.util.emitter,
25        dcord.util.json,
26        dcord.util.counter,
27        dcord.types;
28 
29 /** Maximum reconnects the GatewayClient will try before resetting session state */
30 const ubyte MAX_RECONNECTS = 6;
31 
32 /** Current implemented Gateway version. */
33 const ubyte GATEWAY_VERSION = 6;
34 /**
35   GatewayClient is the base abstraction for connecting to, and interacting with
36   the Discord Websocket (gateway) API.
37 */
38 class GatewayClient {
39   /** Client instance for this gateway connection */
40   Client client;
41 
42   /** WebSocket connection for this gateway connection */
43   WebSocket sock;
44 
45   /** Gateway SessionID, used for resuming. */
46   string sessionID;
47 
48   /** Gateway sequence number, used for resuming */
49   uint seq;
50 
51   /** Heartbeat interval */
52   uint heartbeatInterval;
53 
54   /** Whether this GatewayClient is currently connected */
55   bool connected;
56 
57   /** Number of reconnects attempted */
58   ubyte reconnects;
59 
60   /** The heartbeater task */
61   Task heartbeater;
62 
63   /** Event emitter for Gateway Packets */
64   Emitter eventEmitter;
65 
66   private {
67     /** Cached gateway URL from the API */
68     string cachedGatewayURL;
69     Counter!string eventCounter;
70     bool eventTracking;
71   }
72 
73   /**
74     Params:
75       client = base client
76       eventTracking = if true, log information about events recieved
77   */
78   this(Client client, bool eventTracking = false) {
79     this.client = client;
80     this.eventTracking = eventTracking;
81 
82     // Create the event emitter and listen to some required gateway events.
83     this.eventEmitter = new Emitter;
84     this.eventEmitter.listen!Ready(toDelegate(&this.handleReadyEvent));
85     this.eventEmitter.listen!Resumed(toDelegate(&this.handleResumedEvent));
86 
87     // Copy emitters to client for easier API access
88     client.events = this.eventEmitter;
89 
90     if (this.eventTracking) this.eventCounter = new Counter!string;
91   }
92 
93   /**
94     Logger for this GatewayClient.
95   */
96   @property Logger log() {
97     return this.client.log;
98   }
99 
100   /**
101     Starts a connection to the gateway. Also called for resuming/reconnecting.
102   */
103   void start(Game game=null) {
104     if(this.sock && this.sock.connected) this.sock.close();
105 
106     // If this is our first connection, get a gateway websocket URL. Later on it is cached.
107     if(!this.cachedGatewayURL) {
108       this.cachedGatewayURL = client.api.gatewayGet();
109       this.cachedGatewayURL ~= format("/?v=%s&encoding=%s", GATEWAY_VERSION, "json");
110     }
111 
112     // Start the main task
113     this.log.infof("Starting connection to Gateway WebSocket (%s)", this.cachedGatewayURL);
114     this.sock = connectWebSocket(URL(this.cachedGatewayURL));
115     if(game is null) runTask(() => this.run());
116     else runTask(() => this.run(game));
117   }
118 
119   /**
120     Send a gateway payload.
121   */
122   void send(Serializable p) {
123     string data = p.serialize().toString;
124     version (DEBUG_GATEWAY_DATA) {
125       this.log.tracef("GATEWAY SEND: %s", data);
126     }
127     this.sock.send(data);
128   }
129 
130   void updateStatus(Game game=null) {
131     this.send(new StatusUpdate(game));
132   }
133   private void debugEventCounts() {
134     while (true) {
135       this.eventCounter.resetAll();
136       sleep(5.seconds);
137       this.log.infof("%s total events", this.eventCounter.total);
138 
139       foreach (ref event; this.eventCounter.mostCommon(5)) {
140         this.log.infof("  %s: %s", event, this.eventCounter.get(event));
141       }
142     }
143   }
144 
145   private void handleReadyEvent(Ready  r) {
146     this.log.infof("Recieved READY payload, starting heartbeater");
147     // this.hb_interval = r.heartbeatInterval;
148     this.sessionID = r.sessionID;
149     this.reconnects = 0;
150 
151     if (this.eventTracking) {
152       runTask(toDelegate(&this.debugEventCounts));
153     }
154   }
155 
156   private void handleResumedEvent(Resumed r) { // stfu
157     this.heartbeater = runTask(toDelegate(&this.heartbeat));
158     // TODO: do an action in a closure with the Resumed event
159   }
160 
161   private void emitDispatchEvent(T)(VibeJSON obj) {
162     T v = new T(this.client, obj["d"]);
163     this.eventEmitter.emit!T(v);
164     v.resolveDeferreds();
165   }
166 
167   private void handleDispatchPacket(VibeJSON obj, size_t size) {
168     // Update sequence number if it's larger than what we have
169     uint seq = obj["s"].get!uint; // stfu
170     if (seq > this.seq) {
171       this.seq = seq;
172     }
173 
174     string type = obj["t"].get!string;
175 
176     if (this.eventTracking) {
177       this.eventCounter.tick(type);
178     }
179 
180     switch (type) {
181       case "READY":
182         this.log.infof("Recieved READY payload, size in bytes: %s", size);
183         this.emitDispatchEvent!Ready(obj);
184         break;
185       case "RESUMED":
186         this.emitDispatchEvent!Resumed(obj);
187         break;
188       case "CHANNEL_CREATE":
189         this.emitDispatchEvent!ChannelCreate(obj);
190         break;
191       case "CHANNEL_UPDATE":
192         this.emitDispatchEvent!ChannelUpdate(obj);
193         break;
194       case "CHANNEL_DELETE":
195         this.emitDispatchEvent!ChannelDelete(obj);
196         break;
197       case "GUILD_BAN_ADD":
198         this.emitDispatchEvent!GuildBanAdd(obj);
199         break;
200       case "GUILD_BAN_REMOVE":
201         this.emitDispatchEvent!GuildBanRemove(obj);
202         break;
203       case "GUILD_CREATE":
204         this.emitDispatchEvent!GuildCreate(obj);
205         break;
206       case "GUILD_UPDATE":
207         this.emitDispatchEvent!GuildUpdate(obj);
208         break;
209       case "GUILD_DELETE":
210         this.emitDispatchEvent!GuildDelete(obj);
211         break;
212       case "GUILD_EMOJIS_UPDATE":
213         this.emitDispatchEvent!GuildEmojisUpdate(obj);
214         break;
215       case "GUILD_INTEGRATIONS_UPDATE":
216         this.emitDispatchEvent!GuildIntegrationsUpdate(obj);
217         break;
218       case "GUILD_MEMBERS_CHUNK":
219         this.emitDispatchEvent!GuildMembersChunk(obj);
220         break;
221       case "GUILD_MEMBER_ADD":
222         this.emitDispatchEvent!GuildMemberAdd(obj);
223         break;
224       case "GUILD_MEMBER_UPDATE":
225         this.emitDispatchEvent!GuildMemberUpdate(obj);
226         break;
227       case "GUILD_MEMBER_REMOVE":
228         this.emitDispatchEvent!GuildMemberRemove(obj);
229         break;
230       case "GUILD_ROLE_CREATE":
231         this.emitDispatchEvent!GuildRoleCreate(obj);
232         break;
233       case "GUILD_ROLE_UPDATE":
234         this.emitDispatchEvent!GuildRoleUpdate(obj);
235         break;
236       case "GUILD_ROLE_DELETE":
237         this.emitDispatchEvent!GuildRoleDelete(obj);
238         break;
239       case "MESSAGE_CREATE":
240         this.emitDispatchEvent!MessageCreate(obj);
241         break;
242       case "MESSAGE_UPDATE":
243         this.emitDispatchEvent!MessageUpdate(obj);
244         break;
245       case "MESSAGE_DELETE":
246         this.emitDispatchEvent!MessageDelete(obj);
247         break;
248       case "PRESENCE_UPDATE":
249         this.emitDispatchEvent!PresenceUpdate(obj);
250         break;
251       case "TYPING_START":
252         this.emitDispatchEvent!TypingStart(obj);
253         break;
254       case "USER_SETTINGS_UPDATE":
255         this.emitDispatchEvent!UserSettingsUpdate(obj);
256         break;
257       case "USER_UPDATE":
258         this.emitDispatchEvent!UserUpdate(obj);
259         break;
260       case "VOICE_STATE_UPDATE":
261         this.emitDispatchEvent!VoiceStateUpdate(obj);
262         break;
263       case "VOICE_SERVER_UPDATE":
264         this.emitDispatchEvent!VoiceServerUpdate(obj);
265         break;
266       case "CHANNEL_PINS_UPDATE":
267         this.emitDispatchEvent!ChannelPinsUpdate(obj);
268         break;
269       case "MESSAGE_DELETE_BULK":
270         this.emitDispatchEvent!MessageDeleteBulk(obj);
271         break;
272       case "MESSAGE_REACTION_ADD":
273         this.emitDispatchEvent!MessageReactionAdd(obj);
274         break;
275       default:
276         this.log.warningf("Unhandled dispatch event: %s", type);
277         break;
278     }
279   }
280 
281   private void parse(string rawData) {
282     GC.disable; // because GC messes stuff up
283     VibeJSON json = parseJsonString(rawData);
284 
285     version (DEBUG_GATEWAY_DATA) {
286     }
287 
288     OPCode op = json["op"].get!OPCode;
289     switch (op) {
290       case OPCode.DISPATCH:
291         this.handleDispatchPacket(json, rawData.length);
292         break;
293       case OPCode.HEARTBEAT:
294         this.send(new HeartbeatPacket(this.seq));
295         break;
296       case OPCode.RECONNECT:
297         this.log.warningf("Recieved RECONNECT OPCode, resetting connection...");
298         if (this.sock && this.sock.connected) this.sock.close();
299         break;
300       case OPCode.INVALID_SESSION:
301         this.log.warningf("Recieved INVALID_SESSION OPCode, resetting connection...");
302         if (this.sock && this.sock.connected) this.sock.close();
303         break;
304       case OPCode.HELLO:
305         this.log.tracef("Recieved HELLO OPCode, starting heartbeater...");
306         this.heartbeatInterval = json["d"]["heartbeat_interval"].get!uint;
307         this.heartbeater = runTask(toDelegate(&this.heartbeat));
308         break;
309       case OPCode.HEARTBEAT_ACK:
310         break;
311       default:
312         this.log.warningf("Unhandled gateway packet: %s", op);
313         break;
314     }
315   }
316 
317   private void heartbeat() {
318     while(this.connected) {
319       this.send(new HeartbeatPacket(this.seq));
320       sleep(this.heartbeatInterval.msecs);
321     }
322   }
323 
324   /// Runs the GatewayClient until completion 
325   void run(Game game=null) {
326     string data;
327 
328     // If we already have a sequence number, attempt to resume
329     if(this.sessionID && this.seq) {
330       this.log.infof("Sending resume payload (session ID %s with gateway sequence number %s)",
331       this.sessionID, this.seq);
332       
333       this.send(new ResumePacket(this.client.token, this.sessionID, this.seq));
334     } else {
335       // On startup, send the identify payload
336       this.log.info("Sending identify payload");
337       this.send(new IdentifyPacket(
338           this.client.token,
339           this.client.shardInfo.shard,
340           this.client.shardInfo.numShards));
341     }
342 
343     this.log.info("Connected to Gateway");
344     this.connected = true;
345     this.send(new StatusUpdate(game));
346     while (this.sock.waitForData()) {
347       if(!this.connected) break;
348       try {
349         ubyte[] rawdata = this.sock.receiveBinary();
350         data = cast(string)uncompress(rawdata); // raw cast could be dangerous - maybe use toString in the future 
351       } catch (Exception e) {
352         data = this.sock.receiveText();
353       }
354       if(data == "") continue; 
355 
356       try {
357         this.parse(data);
358       } catch(Exception e) {
359         this.log.warningf("failed to handle data (%s)", e);
360       } catch(Error e) { // stfu
361         this.log.warningf("failed to handle data (%s)", e);
362       } 
363     }
364 
365     this.log.critical("Gateway websocket closed (code " ~ this.sock.closeCode().toString() ~ ")");
366     this.connected = false;
367     this.reconnects++;
368 
369     if(this.reconnects > MAX_RECONNECTS) {
370       this.log.errorf("Max Gateway reconnects (%s) hit, aborting...", this.reconnects);
371       return;
372     }
373 
374     if(this.reconnects > 1) {
375       this.sessionID = null;
376       this.seq = 0;
377       this.log.warning("Waiting 5 seconds before reconnecting...");
378       sleep(5.seconds);
379     }
380 
381     this.log.info("Attempting reconnection...");
382     return this.start();
383   }
384 }