;; This Source Code Form is subject to the terms of the Mozilla Public ;; License, v. 2.0. If a copy of the MPL was not distributed with this ;; file, You can obtain one at http://mozilla.org/MPL/2.0/. ;; ;; Copyright (c) UXBOX Labs SL (ns app.util.websocket "A general protocol implementation on top of websockets." (:require [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.transit :as t] [app.loggers.audit :refer [parse-client-ip]] [app.util.time :as dt] [clojure.core.async :as a] [yetti.request :as yr] [yetti.util :as yu] [yetti.websocket :as yws]) (:import java.nio.ByteBuffer)) (declare decode-beat) (declare encode-beat) (declare process-heartbeat) (declare process-input) (declare process-output) (declare ws-ping!) (declare ws-send!) (declare filter-options) (def noop (constantly nil)) (def identity-3 (fn [_ _ o] o)) (defn handler "A WebSocket upgrade handler factory. Returns a handler that can be used to upgrade to websocket connection. This handler implements the basic custom protocol on top of websocket connection with all the borring stuff already handled (lifecycle, heartbeat,...). The provided function should have the `(fn [ws msg])` signature. It also accepts some options that allows you parametrize the protocol behavior. The options map will be used as-as for the initial data of the `ws` data structure" [& {:keys [::on-rcv-message ::on-snd-message ::on-connect ::input-buff-size ::output-buff-size ::handler ::idle-timeout] :or {input-buff-size 64 output-buff-size 64 idle-timeout 30000 on-connect noop on-snd-message identity-3 on-rcv-message identity-3} :as options}] (assert (fn? on-rcv-message) "'on-rcv-message' should be a function") (assert (fn? on-snd-message) "'on-snd-message' should be a function") (assert (fn? on-connect) "'on-connect' should be a function") (fn [{:keys [::yws/channel session-id] :as request}] (let [input-ch (a/chan input-buff-size) output-ch (a/chan output-buff-size) pong-ch (a/chan (a/sliding-buffer 6)) close-ch (a/chan) stop-ch (a/chan) ip-addr (parse-client-ip request) uagent (yr/get-header request "user-agent") id (inst-ms (dt/now)) options (-> (filter-options options) (merge {::id id ::input-ch input-ch ::output-ch output-ch ::close-ch close-ch ::stop-ch stop-ch ::channel channel ::remote-addr ip-addr ::http-session-id session-id ::user-agent uagent}) (atom)) ;; call the on-connect hook and memoize the on-terminate instance on-terminate (on-connect options) on-ws-open (fn [channel] (l/trace :fn "on-ws-open" :conn-id id) (yws/idle-timeout! channel (dt/duration idle-timeout))) on-ws-terminate (fn [_ code reason] (l/trace :fn "on-ws-terminate" :conn-id id :code code :reason reason) (a/close! close-ch)) on-ws-error (fn [_ error] (a/close! close-ch) (when-not (or (instance? java.nio.channels.ClosedChannelException error) (instance? java.net.SocketException error)) (l/error :hint (ex-message error) :cause error))) on-ws-message (fn [_ message] (try (let [message (on-rcv-message options message) message (t/decode-str message)] (a/offer! input-ch message) (swap! options assoc ::last-activity-at (dt/now))) (catch Throwable e (l/warn :hint "error on decoding incoming message from websocket" :wsmsg (pr-str message) :cause e) (a/>! close-ch [8801 "decode error"]) (a/close! close-ch)))) on-ws-pong (fn [_ buffers] (a/>!! pong-ch (yu/copy-many buffers)))] ;; Launch heartbeat process (-> @options (assoc ::pong-ch pong-ch) (process-heartbeat)) ;; Wait a close signal (a/go (let [[code reason] (a/! output-ch {:type :error :error (ex-data val)}) (ex/exception? val) (a/>! output-ch {:type :error :error {:message (ex-message val)}}) (map? val) (a/>! output-ch (cond-> val (:request-id message) (assoc :request-id (:request-id message))))) (recur)))))) (a/= (count issued) max-missed-heartbeats) (do (a/>! close-ch [8802 "heart-beat timeout"]) (a/close! close-ch)) (recur (inc i))))))) (a/go-loop [] (when-let [buffer (a/