diff --git a/src/clj/athens/self_hosted/clients.clj b/src/clj/athens/self_hosted/clients.clj index f7a9b022da..fc3edcb40c 100644 --- a/src/clj/athens/self_hosted/clients.clj +++ b/src/clj/athens/self_hosted/clients.clj @@ -4,12 +4,7 @@ [athens.common-events :as common-events] [athens.common-events.schema :as schema] [athens.common.logging :as log] - [cognitect.transit :as transit] - [org.httpkit.server :as http]) - (:import - (java.io - ByteArrayInputStream - ByteArrayOutputStream))) + [org.httpkit.server :as http])) ;; Internal state @@ -17,21 +12,6 @@ (defonce clients (atom {})) -(defn ->transit - [data] - (let [out (ByteArrayOutputStream. 4096) - writer (transit/writer out :json)] - (transit/write writer data) - (.toString out))) - - -(defn <-transit - [transit-str] - (let [in (ByteArrayInputStream. (.getBytes transit-str)) - reader (transit/reader in :json)] - (transit/read reader))) - - ;; Client management API (defn get-client-session @@ -71,14 +51,24 @@ valid-server-event? (schema/valid-server-event? data)] (if (or valid-event-response? valid-server-event?) - (let [type (common-events/find-event-or-atomic-op-type data) - status (:event/status data)] - (log/debug "Sending to username:" username - ", event-id:" (:event/id data) - (if type - (str ", type: " type) - (str ", status: " status))) - (http/send! channel (->transit data))) + (let [type (common-events/find-event-or-atomic-op-type data) + status (:event/status data) + serialized-event (common-events/serialize data) + errors (when-not (common-events/ignore-serialized-event-validation? data) + (common-events/validate-serialized-event serialized-event))] + (if errors + (log/error "Not sending invalid event to username:" username + ", event-id:" (:event/id data) + ", type:" (common-events/find-event-or-atomic-op-type data) + ", invalid serialized event:" + "event-response take:" (str errors)) + (do + (log/debug "Sending to username:" username + ", event-id:" (:event/id data) + (if type + (str ", type: " type) + (str ", status: " status))) + (http/send! channel serialized-event)))) ;; TODO internal failure mode, collect in reporting (log/error "Not sending invalid event to username:" username ", event-id:" (:event/id data) diff --git a/src/clj/athens/self_hosted/components/web.clj b/src/clj/athens/self_hosted/components/web.clj index 8293f0eb15..56516edd24 100644 --- a/src/clj/athens/self_hosted/components/web.clj +++ b/src/clj/athens/self_hosted/components/web.clj @@ -59,13 +59,28 @@ (fn receive-handler [channel msg] (let [username (clients/get-client-username channel) - data (clients/<-transit msg)] - (if-not (schema/valid-event? data) + data (common-events/deserialize msg) + errors (common-events/validate-serialized-event msg)] + (cond + ;; TODO: we should be able to validate the serialized event without deserializing it, but + ;; we also need to build a rejected event to pass back to the client, and to do that we need + ;; the :event/id, which we only get after deserializing the event. + ;; I think this means that we should separate the :event/id from the payload itself. + errors + (do + (log/warn "username:" username "Invalid serialized event received, errors:" errors) + (clients/send! channel (common-events/build-event-rejected (:event/id data) + (str "Invalid serialized event") + errors))) + + (not (schema/valid-event? data)) (let [explanation (schema/explain-event data)] (log/warn "username:" username "Invalid event received, explanation:" explanation) (clients/send! channel (common-events/build-event-rejected (:event/id data) (str "Invalid event: " (pr-str data)) explanation))) + + :else (let [{:event/keys [id type]} data] (log/info "Received valid event" "username:" username ", event-id:" id ", type:" (common-events/find-event-or-atomic-op-type data)) (let [{:event/keys [status] diff --git a/src/cljc/athens/common_events.cljc b/src/cljc/athens/common_events.cljc index 337a5c546d..7c28e94cba 100644 --- a/src/cljc/athens/common_events.cljc +++ b/src/cljc/athens/common_events.cljc @@ -1,7 +1,85 @@ (ns athens.common-events "Event as Verbs executed on Knowledge Graph" (:require - [athens.common.utils :as utils])) + [athens.common.utils :as utils] + [cognitect.transit :as transit] + #?(:cljs [com.cognitect.transit.types :as ty])) + #?(:clj + (:import + (java.io + ByteArrayInputStream + ByteArrayOutputStream)))) + + +;; Limits + +;; Fluree default max size over websocket is ~2mb. +;; There doesn't seem to be a max for nginx +;; https://serverfault.com/questions/1034906/can-nginx-limit-incoming-websocket-message-size +;; Was able to transmit 500mb over websocket from the server to client. +;; Let's settle on a nice sensible 1MB limit for now. +(def max-event-size-in-bytes (* 1 1000 1000)) + + +(defn valid-serialized-event? + [serialized-event] + (< (count serialized-event) max-event-size-in-bytes)) + + +(defn validate-serialized-event + [serialized-event] + (when-not (valid-serialized-event? serialized-event) + (ex-info "Serialized event is larger than 1 MB" {}))) + + +(defn ignore-serialized-event-validation? + [event] + (-> event :event/type + ;; db-dump is sending the whole database and can (easily) go over max-event-size-in-bytes. + ;; Only real solution for this is to break down the db-dump into smaller pieces, + ;; possibly transitioning to partial loading by default in the future. + #{:datascript/db-dump})) + + +;; serialization and limits + +;; Really shouldn't need these UUID and datom-reader, but we still send datoms via db-dump. +#?(:cljs + ;; see https://github.com/cognitect/transit-cljs/issues/41#issuecomment-503287258 + (extend-type ty/UUID IUUID)) + + +(def ^:private datom-reader + (transit/read-handler + (fn [[e a v tx added]] + {:e e + :a a + :v v + :tx tx + :added added}))) + + +(def serialization-type :json) +(def serialization-opts {:handlers {:datom datom-reader}}) + + +(defn serialize + [event] + #?(:cljs (-> (transit/writer serialization-type) + (transit/write event)) + :clj (let [out (ByteArrayOutputStream. 4096) + writer (transit/writer out serialization-type)] + (transit/write writer event) + (.toString out)))) + + +(defn deserialize + [serialized-event] + #?(:cljs (-> (transit/reader serialization-type serialization-opts) + (transit/read serialized-event)) + :clj (let [in (ByteArrayInputStream. (.getBytes serialized-event)) + reader (transit/reader in serialization-type serialization-opts)] + (transit/read reader)))) ;; building events diff --git a/src/cljs/athens/effects.cljs b/src/cljs/athens/effects.cljs index 807e88c362..d1eb9a0dea 100644 --- a/src/cljs/athens/effects.cljs +++ b/src/cljs/athens/effects.cljs @@ -217,7 +217,10 @@ ;; valid event let's send it (do (log/debug "Sending event:" (pr-str event)) - (client/send! event)) + (let [ret (client/send! event)] + (when (= :rejected (:result ret)) + (rf/dispatch [:remote/reject-forwarded-event event]) + (log/warn "Tried to send invalid event. Error:" (pr-str (:reason ret)))))) (let [explanation (-> schema/event (m/explain event) (me/humanize))] diff --git a/src/cljs/athens/self_hosted/client.cljs b/src/cljs/athens/self_hosted/client.cljs index 213ed6b654..cf21ea0259 100644 --- a/src/cljs/athens/self_hosted/client.cljs +++ b/src/cljs/athens/self_hosted/client.cljs @@ -5,15 +5,10 @@ [athens.common-events.graph.atomic :as atomic-graph-ops] [athens.common-events.schema :as schema] [athens.common.logging :as log] - [cognitect.transit :as transit] - [com.cognitect.transit.types :as ty] [com.stuartsierra.component :as component] [re-frame.core :as rf])) -(extend-type ty/UUID IUUID) - - (defonce ^:private ws-connection (atom nil)) @@ -102,8 +97,14 @@ (log/debug "event-id:" (pr-str (:event/id data)) ", type:" (pr-str (:event/type data)) "WSClient sending to server") - (.send connection (transit/write (transit/writer :json) data)) - {:result :sent}) + (let [serialized-event (common-events/serialize data) + errors (common-events/validate-serialized-event serialized-event)] + (if errors + (do (log/warn "Tried sending invalid serialized event:" (pr-str errors)) + {:result :rejected + :reason :invalid-event-schema}) + (do (.send connection serialized-event) + {:result :sent})))) (do (log/warn "event-id:" (pr-str (:event/id data)) ", type:" (pr-str (:event/type data)) @@ -277,29 +278,14 @@ "WSClient Received invalid server event, explanation:" (pr-str (schema/explain-server-event packet))))) -(def ^:private datom-reader - (transit/read-handler - (fn [[e a v tx added]] - {:e e - :a a - :v v - :tx tx - :added added}))) - - (defn- message-handler [event] - (let [packet (->> event - .-data - (transit/read - (transit/reader - :json - {:handlers - {:datom datom-reader}})))] - - (if (schema/valid-event-response? packet) - (awaited-response-handler packet) - (server-event-handler packet)))) + (if-let [errors (common-events/validate-serialized-event (.-data event))] + (log/warn "Received invalid serialized event:" (pr-str errors)) + (let [packet (->> event .-data common-events/deserialize)] + (if (schema/valid-event-response? packet) + (awaited-response-handler packet) + (server-event-handler packet))))) (defn- remove-listeners!