1 /**
2   Manages the Discord websocket client.
3 */
4 module dcord.gateway.client;
5 
6 import std.stdio,
7        std.uni,
8        std.functional,
9        std.zlib,
10        std.datetime,
11        std.variant,
12        std.format,
13        core.exception,
14        core.memory;
15 
16 static import std.typecons;
17 
18 import vibe.core.core,
19        vibe.inet.url,
20        vibe.http.websockets;
21 
22 import dcord.client,
23        dcord.gateway,
24        dcord.util.emitter,
25        dcord.util.json,
26        dcord.util.counter,
27        dcord.types;
28 
29 /** Maximum reconnects the GatewayClient will try before resetting session state */
30 const ubyte MAX_RECONNECTS = 6;
31 
32 /** Current implemented Gateway version. */
33 const ubyte GATEWAY_VERSION = 6;
34 /**
35   GatewayClient is the base abstraction for connecting to, and interacting with
36   the Discord Websocket (gateway) API.
37 */
38 class GatewayClient {
39   /** Client instance for this gateway connection */
40   Client     client;
41 
42   /** WebSocket connection for this gateway connection */
43   WebSocket  sock;
44 
45   /** Gateway SessionID, used for resuming. */
46   string  sessionID;
47 
48   /** Gateway sequence number, used for resuming */
49   uint    seq;
50 
51   /** Heartbeat interval */
52   uint    heartbeatInterval;
53 
54   /** Whether this GatewayClient is currently connected */
55   bool    connected;
56 
57   /** Number of reconnects attempted */
58   ubyte   reconnects;
59 
60   /** The heartbeater task */
61   Task    heartbeater;
62 
63   /** Event emitter for Gateway Packets */
64   Emitter  eventEmitter;
65 
66   private {
67     /** Cached gateway URL from the API */
68     string  cachedGatewayURL;
69     Counter!string eventCounter;
70     bool eventTracking;
71   }
72 
73   /**
74     Params:
75       client = base client
76       eventTracking = if true, log information about events recieved
77   */
78   this(Client client, bool eventTracking = false) {
79     this.client = client;
80     this.eventTracking = eventTracking;
81 
82     // Create the event emitter and listen to some required gateway events.
83     this.eventEmitter = new Emitter;
84     this.eventEmitter.listen!Ready(toDelegate(&this.handleReadyEvent));
85     this.eventEmitter.listen!Resumed(toDelegate(&this.handleResumedEvent));
86 
87     // Copy emitters to client for easier API access
88     client.events = this.eventEmitter;
89 
90     if (this.eventTracking) {
91       this.eventCounter = new Counter!string;
92     }
93   }
94 
95   /**
96     Logger for this GatewayClient.
97   */
98   @property Logger log() {
99     return this.client.log;
100   }
101 
102   /**
103     Starts a connection to the gateway. Also called for resuming/reconnecting.
104   */
105   void start() {
106     if(this.sock && this.sock.connected) this.sock.close();
107 
108     // If this is our first connection, get a gateway websocket URL. Later on it is cached.
109     if(!this.cachedGatewayURL) {
110       this.cachedGatewayURL = client.api.gatewayGet();
111       this.cachedGatewayURL ~= format("/?v=%s&encoding=%s", GATEWAY_VERSION, "json");
112     }
113 
114     // Start the main task
115     this.log.infof("Starting connection to Gateway WebSocket (%s)", this.cachedGatewayURL);
116     this.sock = connectWebSocket(URL(this.cachedGatewayURL));
117     runTask(toDelegate(&this.run));
118 
119   }
120 
121   /**
122     Send a gateway payload.
123   */
124   void send(Serializable p) {
125     string data = p.serialize().toString;
126     version (DEBUG_GATEWAY_DATA) {
127       this.log.tracef("GATEWAY SEND: %s", data);
128     }
129     this.sock.send(data);
130   }
131 
132   private void debugEventCounts() {
133     while (true) {
134       this.eventCounter.resetAll();
135       sleep(5.seconds);
136       this.log.infof("%s total events", this.eventCounter.total);
137 
138       foreach (ref event; this.eventCounter.mostCommon(5)) {
139         this.log.infof("  %s: %s", event, this.eventCounter.get(event));
140       }
141     }
142   }
143 
144   private void handleReadyEvent(Ready  r) {
145     this.log.infof("Recieved READY payload, starting heartbeater");
146     // this.hb_interval = r.heartbeatInterval;
147     this.sessionID = r.sessionID;
148     this.reconnects = 0;
149 
150     if (this.eventTracking) {
151       runTask(toDelegate(&this.debugEventCounts));
152     }
153   }
154 
155   private void handleResumedEvent(Resumed r) {
156     this.heartbeater = runTask(toDelegate(&this.heartbeat));
157   }
158 
159   private void emitDispatchEvent(T)(VibeJSON obj) {
160     T v = new T(this.client, obj["d"]);
161     this.eventEmitter.emit!T(v);
162     v.resolveDeferreds();
163     // TODO: determine if we really need to destory things here
164     // v.destroy();
165   }
166 
167   private void handleDispatchPacket(VibeJSON obj, size_t size) {
168     // Update sequence number if it's larger than what we have
169     uint seq = obj["s"].get!uint;
170     if (seq > this.seq) {
171       this.seq = seq;
172     }
173 
174     string type = obj["t"].get!string;
175 
176     if (this.eventTracking) {
177       this.eventCounter.tick(type);
178     }
179 
180     switch (type) {
181       case "READY":
182         this.log.infof("Recieved READY payload, size in bytes: %s", size);
183         this.emitDispatchEvent!Ready(obj);
184         break;
185       case "RESUMED":
186         this.emitDispatchEvent!Resumed(obj);
187         break;
188       case "CHANNEL_CREATE":
189         this.emitDispatchEvent!ChannelCreate(obj);
190         break;
191       case "CHANNEL_UPDATE":
192         this.emitDispatchEvent!ChannelUpdate(obj);
193         break;
194       case "CHANNEL_DELETE":
195         this.emitDispatchEvent!ChannelDelete(obj);
196         break;
197       case "GUILD_BAN_ADD":
198         this.emitDispatchEvent!GuildBanAdd(obj);
199         break;
200       case "GUILD_BAN_REMOVE":
201         this.emitDispatchEvent!GuildBanRemove(obj);
202         break;
203       case "GUILD_CREATE":
204         this.emitDispatchEvent!GuildCreate(obj);
205         break;
206       case "GUILD_UPDATE":
207         this.emitDispatchEvent!GuildUpdate(obj);
208         break;
209       case "GUILD_DELETE":
210         this.emitDispatchEvent!GuildDelete(obj);
211         break;
212       case "GUILD_EMOJIS_UPDATE":
213         this.emitDispatchEvent!GuildEmojisUpdate(obj);
214         break;
215       case "GUILD_INTEGRATIONS_UPDATE":
216         this.emitDispatchEvent!GuildIntegrationsUpdate(obj);
217         break;
218       case "GUILD_MEMBERS_CHUNK":
219         this.emitDispatchEvent!GuildMembersChunk(obj);
220         break;
221       case "GUILD_MEMBER_ADD":
222         this.emitDispatchEvent!GuildMemberAdd(obj);
223         break;
224       case "GUILD_MEMBER_UPDATE":
225         this.emitDispatchEvent!GuildMemberUpdate(obj);
226         break;
227       case "GUILD_MEMBER_REMOVE":
228         this.emitDispatchEvent!GuildMemberRemove(obj);
229         break;
230       case "GUILD_ROLE_CREATE":
231         this.emitDispatchEvent!GuildRoleCreate(obj);
232         break;
233       case "GUILD_ROLE_UPDATE":
234         this.emitDispatchEvent!GuildRoleUpdate(obj);
235         break;
236       case "GUILD_ROLE_DELETE":
237         this.emitDispatchEvent!GuildRoleDelete(obj);
238         break;
239       case "MESSAGE_CREATE":
240         this.emitDispatchEvent!MessageCreate(obj);
241         break;
242       case "MESSAGE_UPDATE":
243         this.emitDispatchEvent!MessageUpdate(obj);
244         break;
245       case "MESSAGE_DELETE":
246         this.emitDispatchEvent!MessageDelete(obj);
247         break;
248       case "PRESENCE_UPDATE":
249         this.emitDispatchEvent!PresenceUpdate(obj);
250         break;
251       case "TYPING_START":
252         this.emitDispatchEvent!TypingStart(obj);
253         break;
254       case "USER_SETTINGS_UPDATE":
255         this.emitDispatchEvent!UserSettingsUpdate(obj);
256         break;
257       case "USER_UPDATE":
258         this.emitDispatchEvent!UserUpdate(obj);
259         break;
260       case "VOICE_STATE_UPDATE":
261         this.emitDispatchEvent!VoiceStateUpdate(obj);
262         break;
263       case "VOICE_SERVER_UPDATE":
264         this.emitDispatchEvent!VoiceServerUpdate(obj);
265         break;
266       case "CHANNEL_PINS_UPDATE":
267         this.emitDispatchEvent!ChannelPinsUpdate(obj);
268         break;
269       case "MESSAGE_DELETE_BULK":
270         this.emitDispatchEvent!MessageDeleteBulk(obj);
271         break;
272       default:
273         this.log.warningf("Unhandled dispatch event: %s", type);
274         break;
275     }
276   }
277 
278   private void parse(string rawData) {
279     GC.disable; // because GC messes stuff up
280     VibeJSON json = parseJsonString(rawData);
281 
282     version (DEBUG_GATEWAY_DATA) {
283     }
284 
285     OPCode op = json["op"].get!OPCode;
286     switch (op) {
287       case OPCode.DISPATCH:
288         this.handleDispatchPacket(json, rawData.length);
289         break;
290       case OPCode.HEARTBEAT:
291         this.send(new HeartbeatPacket(this.seq));
292         break;
293       case OPCode.RECONNECT:
294         this.log.warningf("Recieved RECONNECT OPCode, resetting connection...");
295         if (this.sock && this.sock.connected) this.sock.close();
296         break;
297       case OPCode.INVALID_SESSION:
298         this.log.warningf("Recieved INVALID_SESSION OPCode, resetting connection...");
299         if (this.sock && this.sock.connected) this.sock.close();
300         break;
301       case OPCode.HELLO:
302         this.log.tracef("Recieved HELLO OPCode, starting heartbeater...");
303         this.heartbeatInterval = json["d"]["heartbeat_interval"].get!uint;
304         this.heartbeater = runTask(toDelegate(&this.heartbeat));
305         break;
306       case OPCode.HEARTBEAT_ACK:
307         break;
308       default:
309         this.log.warningf("Unhandled gateway packet: %s", op);
310         break;
311     }
312   }
313 
314   private void heartbeat() {
315     while (this.connected) {
316       this.send(new HeartbeatPacket(this.seq));
317       sleep(this.heartbeatInterval.msecs);
318     }
319   }
320 
321   /**
322     Runs the GatewayClient until completion.
323   */
324   void run() {
325     string data;
326 
327     // If we already have a sequence number, attempt to resume
328     if (this.sessionID && this.seq) {
329       this.log.infof("Sending Resume Payload (we where %s at %s)", this.sessionID, this.seq);
330       this.send(new ResumePacket(this.client.token, this.sessionID, this.seq));
331     } else {
332       // On startup, send the identify payload
333       this.log.info("Sending Identify Payload");
334       this.send(new IdentifyPacket(
335           this.client.token,
336           this.client.shardInfo.shard,
337           this.client.shardInfo.numShards));
338     }
339 
340     this.log.info("Connected to Gateway");
341     this.connected = true;
342     while (this.sock.waitForData()) {
343       if (!this.connected) break;
344 
345       try {
346         ubyte[] rawdata = this.sock.receiveBinary();
347         data = cast(string)uncompress(rawdata);
348       } catch (Exception e) {
349         data = this.sock.receiveText();
350       }
351 
352       if (data == "") {
353         continue;
354       }
355 
356       try {
357         this.parse(data);
358       } catch (Exception e) {
359         this.log.warningf("failed to handle data (%s)", e);
360       } catch (Error e) {
361         this.log.warningf("failed to handle data (%s)", e);
362       } 
363     }
364 
365     this.log.critical("Gateway websocket closed (code " ~ this.sock.closeCode().toString() ~ ")");
366     this.connected = false;
367     this.reconnects++;
368 
369     if (this.reconnects > MAX_RECONNECTS) {
370       this.log.errorf("Max Gateway reconnects (%s) hit, aborting...", this.reconnects);
371       return;
372     }
373 
374     if (this.reconnects > 1) {
375       this.sessionID = null;
376       this.seq = 0;
377       this.log.warning("Waiting 5 seconds before reconnecting...");
378       sleep(5.seconds);
379     }
380 
381     this.log.info("Attempting reconnection...");
382     return this.start();
383   }
384 }