diff --git a/src/clj/athens/self_hosted/clients.clj b/src/clj/athens/self_hosted/clients.clj index f7a9b022da..b39559c1ed 100644 --- a/src/clj/athens/self_hosted/clients.clj +++ b/src/clj/athens/self_hosted/clients.clj @@ -4,12 +4,7 @@ [athens.common-events :as common-events] [athens.common-events.schema :as schema] [athens.common.logging :as log] - [cognitect.transit :as transit] - [org.httpkit.server :as http]) - (:import - (java.io - ByteArrayInputStream - ByteArrayOutputStream))) + [org.httpkit.server :as http])) ;; Internal state @@ -17,21 +12,6 @@ (defonce clients (atom {})) -(defn ->transit - [data] - (let [out (ByteArrayOutputStream. 4096) - writer (transit/writer out :json)] - (transit/write writer data) - (.toString out))) - - -(defn <-transit - [transit-str] - (let [in (ByteArrayInputStream. (.getBytes transit-str)) - reader (transit/reader in :json)] - (transit/read reader))) - - ;; Client management API (defn get-client-session @@ -71,14 +51,23 @@ valid-server-event? (schema/valid-server-event? data)] (if (or valid-event-response? valid-server-event?) - (let [type (common-events/find-event-or-atomic-op-type data) - status (:event/status data)] - (log/debug "Sending to username:" username - ", event-id:" (:event/id data) - (if type - (str ", type: " type) - (str ", status: " status))) - (http/send! channel (->transit data))) + (let [type (common-events/find-event-or-atomic-op-type data) + status (:event/status data) + serialized-event (common-events/serialize data) + errors (common-events/validate-serialized-event serialized-event)] + (if errors + (log/error "Not sending invalid event to username:" username + ", event-id:" (:event/id data) + ", type:" (common-events/find-event-or-atomic-op-type data) + ", invalid serialized event:" + "event-response take:" (str errors)) + (do + (log/debug "Sending to username:" username + ", event-id:" (:event/id data) + (if type + (str ", type: " type) + (str ", status: " status))) + (http/send! channel serialized-event)))) ;; TODO internal failure mode, collect in reporting (log/error "Not sending invalid event to username:" username ", event-id:" (:event/id data) diff --git a/src/clj/athens/self_hosted/components/web.clj b/src/clj/athens/self_hosted/components/web.clj index ef597eb20d..78b141d18f 100644 --- a/src/clj/athens/self_hosted/components/web.clj +++ b/src/clj/athens/self_hosted/components/web.clj @@ -59,13 +59,28 @@ (fn receive-handler [channel msg] (let [username (clients/get-client-username channel) - data (clients/<-transit msg)] - (if-not (schema/valid-event? data) + data (common-events/deserialize msg) + errors (common-events/validate-serialized-event msg)] + (cond + ;; TODO: we should be able to validate the serialized event without deserializing it, but + ;; we also need to build a rejected event to pass back to the client, and to do that we need + ;; the :event/id, which we only get after deserializing the event. + ;; I think this means that we should separate the :event/id from the payload itself. + errors + (do + (log/warn "username:" username "Invalid serialized event received, errors:" errors) + (clients/send! channel (common-events/build-event-rejected (:event/id data) + (str "Invalid serialized event") + errors))) + + (not (schema/valid-event? data)) (let [explanation (schema/explain-event data)] (log/warn "username:" username "Invalid event received, explanation:" explanation) (clients/send! channel (common-events/build-event-rejected (:event/id data) (str "Invalid event: " (pr-str data)) explanation))) + + :else (let [{:event/keys [id type]} data] (log/info "Received valid event" "username:" username ", event-id:" id ", type:" (common-events/find-event-or-atomic-op-type data)) (let [{:event/keys [status] diff --git a/src/cljc/athens/common_events.cljc b/src/cljc/athens/common_events.cljc index 337a5c546d..c8b8d18453 100644 --- a/src/cljc/athens/common_events.cljc +++ b/src/cljc/athens/common_events.cljc @@ -1,7 +1,65 @@ (ns athens.common-events "Event as Verbs executed on Knowledge Graph" (:require - [athens.common.utils :as utils])) + [athens.common.utils :as utils] + [cognitect.transit :as transit] + [com.cognitect.transit.types :as ty]) + #?(:clj + (:import + (java.io + ByteArrayInputStream + ByteArrayOutputStream)))) + + +;; Limits + +(def max-event-size-in-bytes (* 1 1000 1000)) ; 1 MB + +(defn valid-serialized-event? + [serialized-event] + (< (count serialized-event) max-event-size-in-bytes)) + + +(defn validate-serialized-event + [serialized-event] + (when-not (valid-serialized-event? serialized-event) + (ex-info "Serialized event is larger than 10 MB" {}))) + + +;; serialization and limits + +(def serialization-type :json) + + +(defn serialize + [event] + (let [writer #?(:cljs (transit/writer serialization-type) + :clj (transit/writer (ByteArrayOutputStream. 4096) + serialization-type))] + (transit/write writer event))) + + +;; Really shouldn't need these two, but we still send datoms via db-dump. +(extend-type ty/UUID IUUID) + + +(def ^:private datom-reader + (transit/read-handler + (fn [[e a v tx added]] + {:e e + :a a + :v v + :tx tx + :added added}))) + + +(defn deserialize + [serialized-event] + (let [opts {:handlers {:datom datom-reader}} + reader #?(:cljs (transit/reader serialization-type opts) + :clj (transit/reader (ByteArrayInputStream. (.getBytes serialized-event)) + serialization-type opts))] + (transit/read reader serialized-event))) ;; building events diff --git a/src/cljs/athens/effects.cljs b/src/cljs/athens/effects.cljs index 61a1578b32..c8c23e68d2 100644 --- a/src/cljs/athens/effects.cljs +++ b/src/cljs/athens/effects.cljs @@ -217,7 +217,10 @@ ;; valid event let's send it (do (log/debug "Sending event:" (pr-str event)) - (client/send! event)) + (let [ret (client/send! event)] + (when (= :rejected (:result ret)) + (rf/dispatch [:remote/reject-forwarded-event event]) + (log/warn "Tried to send invalid event. Error:" (pr-str (:reason ret)))))) (let [explanation (-> schema/event (m/explain event) (me/humanize))] diff --git a/src/cljs/athens/self_hosted/client.cljs b/src/cljs/athens/self_hosted/client.cljs index 213ed6b654..8f51923fc7 100644 --- a/src/cljs/athens/self_hosted/client.cljs +++ b/src/cljs/athens/self_hosted/client.cljs @@ -5,15 +5,10 @@ [athens.common-events.graph.atomic :as atomic-graph-ops] [athens.common-events.schema :as schema] [athens.common.logging :as log] - [cognitect.transit :as transit] - [com.cognitect.transit.types :as ty] [com.stuartsierra.component :as component] [re-frame.core :as rf])) -(extend-type ty/UUID IUUID) - - (defonce ^:private ws-connection (atom nil)) @@ -102,8 +97,14 @@ (log/debug "event-id:" (pr-str (:event/id data)) ", type:" (pr-str (:event/type data)) "WSClient sending to server") - (.send connection (transit/write (transit/writer :json) data)) - {:result :sent}) + (let [serialized-event (common-events/serialize data) + errors (common-events/validate-serialized-event serialized-event)] + (if errors + (do (log/warn "Received invalid serialized event:" (pr-str errors)) + {:result :rejected + :reason :invalid-event-schema}) + (do (.send connection serialized-event) + {:result :sent})))) (do (log/warn "event-id:" (pr-str (:event/id data)) ", type:" (pr-str (:event/type data)) @@ -277,29 +278,14 @@ "WSClient Received invalid server event, explanation:" (pr-str (schema/explain-server-event packet))))) -(def ^:private datom-reader - (transit/read-handler - (fn [[e a v tx added]] - {:e e - :a a - :v v - :tx tx - :added added}))) - - (defn- message-handler [event] - (let [packet (->> event - .-data - (transit/read - (transit/reader - :json - {:handlers - {:datom datom-reader}})))] - - (if (schema/valid-event-response? packet) - (awaited-response-handler packet) - (server-event-handler packet)))) + (if-let [errors (common-events/validate-serialized-event event)] + (log/warn "Received invalid serialized event:" (pr-str errors)) + (let [packet (->> event .-data common-events/deserialize)] + (if (schema/valid-event-response? packet) + (awaited-response-handler packet) + (server-event-handler packet))))) (defn- remove-listeners!