Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 123 additions & 41 deletions code/src/sixsq/nuvla/db/es/binding.clj
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
[client options]
(spandex/sniffer client (or options {})))

<<<<<<< Updated upstream

(defn create-index
[client index]
Expand Down Expand Up @@ -71,8 +72,63 @@
(let [{:keys [status body] :as _response} (ex-data e)
error (:error body)]
(log/warn index "mapping could not be updated (" status "). " (or error e))))))
=======
(defn get-error
[ex]
(let [response (ex-data ex)]
(or (-> response r/get-body :error) ex)))
>>>>>>> Stashed changes


(defn document-missing?
[ex]
(-> (ex-data ex)
r/get-body
(get-in [:root_cause 0 :type])
(= "document_missing_exception")))

(defn create-index
[client index]
(try
(let [response (spandex/request client {:url [index], :method :head})]
(if (r/status-ok? response)
(log/debug index "index already exists")
(log/error "unexpected status code when checking"
index "index (" (r/get-status response) ")")))
(catch Exception e
(let [response (ex-data e)
body (r/get-body response)
status (r/get-status response)]
(try
(if (r/status-not-found? response)
(let [{:keys [acknowledged shards_acknowledged]}
(r/get-body (spandex/request
client
{:url [index], :method :put}))]
(if (and acknowledged shards_acknowledged)
(log/info index "index created")
(log/warn index "index may or may not have been created")))
(log/error "unexpected status code when checking" index "index (" status "). " body))
(catch Exception e
(log/error "unexpected status code when creating" index "index (" status "). "
(get-error e))))))))

(defn set-index-mapping
[client index mapping]
(try
(let [response (spandex/request client {:url [index :_mapping]
:method :put
:body mapping})
status (r/get-status response)
body (r/get-body response)]
(if (r/status-ok? response)
(log/info index "mapping updated")
(log/warn index "mapping could not be updated (" status "). " body)))
(catch Exception e
(let [response (ex-data e)]
(log/warn index "mapping could not be updated ("
(r/get-status response) "). " (get-error e))))))

(defn add-data
[client {:keys [id] :as data}]
(try
Expand All @@ -85,16 +141,19 @@
:query-string {:refresh true}
:method :put
:body updated-doc})
<<<<<<< Updated upstream
success? (pos? (get-in response [:body :_shards :successful]))]
=======
success? (shards-successful? response)]
>>>>>>> Stashed changes
(if success?
(r/response-created id)
(r/response-conflict id)))
(throw (r/ex-conflict id))))
(catch Exception e
(let [{:keys [status body] :as _response} (ex-data e)
error (:error body)]
(if (= 409 status)
(r/response-conflict id)
(r/response-error (str "unexpected exception: " (or error e))))))))
(let [response (ex-data e)]
(if (r/status-conflict? response)
(throw (r/ex-conflict id))
(throw (r/ex-unexpected (get-error e))))))))


(defn update-data
Expand All @@ -109,15 +168,39 @@
:query-string {:refresh true}
:method :put
:body updated-doc})
<<<<<<< Updated upstream
success? (pos? (get-in response [:body :_shards :successful]))]
=======
success? (shards-successful? response)]
>>>>>>> Stashed changes
(if success?
(r/json-response data)
(r/response-conflict id)))
(throw (r/ex-conflict id))))
(catch Exception e
(let [{:keys [body] :as _response} (ex-data e)
error (:error body)]
(r/response-error (str "unexpected exception updating " id ": " (or error e)))))))

(if (r/status-not-found? (ex-data e))
(throw (r/ex-not-found id))
(throw (r/ex-unexpected (get-error e)))))))

<<<<<<< Updated upstream
=======
(defn scripted-update-data
[client id options]
(try
(let [[collection-id uuid] (cu/split-id id)
index (escu/collection-id->index collection-id)
response (spandex/request client {:url [index :_update uuid]
:query-string {:refresh true}
:method :post
:body options})
success? (shards-successful? response)]
(if success?
(r/response-updated id)
(throw (r/ex-conflict id))))
(catch Exception e
(if (r/status-not-found? (ex-data e))
(throw (r/ex-not-found id))
(throw (r/ex-unexpected (get-error e)))))))
>>>>>>> Stashed changes

(defn find-data
[client id]
Expand All @@ -126,15 +209,15 @@
index (escu/collection-id->index collection-id)
response (spandex/request client {:url [index :_doc uuid]
:method :get})
found? (get-in response [:body :found])]
body (r/get-body response)
found? (:found body)]
(if found?
(-> response :body :_source)
(:_source body)
(throw (r/ex-not-found id))))
(catch Exception e
(let [{:keys [status] :as _response} (ex-data e)]
(if (= 404 status)
(throw (r/ex-not-found id))
(throw e))))))
(if (r/status-not-found? (ex-data e))
(throw (r/ex-not-found id))
(throw (r/ex-unexpected (get-error e)))))))


(defn delete-data
Expand All @@ -144,11 +227,16 @@
response (spandex/request client {:url [index :_doc uuid]
:query-string {:refresh true}
:method :delete})
<<<<<<< Updated upstream
success? (pos? (get-in response [:body :_shards :successful]))
deleted? (= "deleted" (get-in response [:body :result]))]
=======
success? (shards-successful? response)
deleted? (= "deleted" (-> response r/get-body :result))]
>>>>>>> Stashed changes
(if (and success? deleted?)
(r/response-deleted id)
(r/response-error (str "could not delete document " id)))))
(throw (r/ex-unexpected (str "could not delete document " id))))))


(defn query-data
Expand All @@ -164,21 +252,28 @@
response (spandex/request client {:url [index :_search]
:method :post
:body body})
<<<<<<< Updated upstream
success? (-> response :body :_shards :successful pos?)
count-before-pagination (-> response :body :hits :total :value)
aggregations (-> response :body :aggregations)
meta (cond-> {:count count-before-pagination}
aggregations (assoc :aggregations aggregations))
hits (->> response :body :hits :hits (map :_source))]
(if success?
=======
body (r/get-body response)
body-hits (:hits body)
count-before-pagination (-> body-hits :total :value)
aggregations (:aggregations body)
meta (cond-> {:count count-before-pagination}
aggregations (assoc :aggregations aggregations))
hits (->> body-hits :hits (map :_source))]
(if (shards-successful? response)
>>>>>>> Stashed changes
[meta hits]
(let [msg (str "error when querying: " (:body response))]
(throw (r/ex-response msg 500)))))
(throw (r/ex-unexpected (str "error when querying: " body)))))
(catch Exception e
(let [{:keys [body] :as _response} (ex-data e)
error (:error body)
msg (str "unexpected exception querying: " (or error e))]
(throw (r/ex-response msg 500))))))
(throw (r/ex-unexpected (str "unexpected exception querying: " (get-error e)))))))


(defn bulk-edit-data
Expand All @@ -197,13 +292,9 @@
success? (-> body-response :failures empty?)]
(if success?
body-response
(let [msg (str "error when updating by query: " body-response)]
(throw (r/ex-response msg 500)))))
(throw (r/ex-unexpected (str "error when updating by query: " body-response)))))
(catch Exception e
(let [{:keys [body] :as _response} (ex-data e)
error (:error body)
msg (str "unexpected exception updating by query: " (or error e))]
(throw (r/ex-response msg 500))))))
(throw (r/ex-unexpected (str "error when updating by query: " (get-error e)))))))

(defn bulk-delete-data
[client collection-id {:keys [cimi-params] :as options}]
Expand All @@ -218,13 +309,9 @@
success? (-> body-response :failures empty?)]
(if success?
body-response
(let [msg (str "error when deleting by query: " body-response)]
(throw (r/ex-response msg 500)))))
(throw (r/ex-unexpected (str "error when deleting by query: " body-response)))))
(catch Exception e
(let [{:keys [body] :as _response} (ex-data e)
error (:error body)
msg (str "unexpected exception delete by query: " (or error e))]
(throw (r/ex-response msg 500))))))
(throw (r/ex-unexpected (str "error when deleting by query: " (get-error e)))))))


(deftype ElasticsearchRestBinding [client sniffer]
Expand All @@ -236,23 +323,18 @@
(create-index client index)
(set-index-mapping client index mapping)))


(add [_ data _options]
(add-data client data))


(add [_ _collection-id data _options]
(add-data client data))


(retrieve [_ id _options]
(find-data client id))


(delete [_ {:keys [id]} _options]
(delete-data client id))


(edit [_ data _options]
(update-data client data))

Expand Down
2 changes: 1 addition & 1 deletion code/src/sixsq/nuvla/server/resources/module.clj
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ component, or application.
:resource-type resource-type}
:body resource)
edit-impl)]
(if (r/status-200? response)
(if (r/status-ok? response)
response
(throw (r/ex-response (str error-message ": " response) 500)))))

Expand Down
28 changes: 24 additions & 4 deletions code/src/sixsq/nuvla/server/util/response.clj
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@
(let [query (str "?error=" (codec/url-encode msg))]
(ex-response msg 303 id (str redirect-url query))))

(defn ex-unexpected
"Provides an ExceptionInfo exception when an unexpected exception happens.
This is a 500 status code. Information from the request and the action
are used to provide a reasonable message."
[msg]
(ex-response (str "unexpected exception: " msg) 500))

(defn rethrow-response
[{{:keys [resource-id status message]} :body :as response}]
(if (and (r/response? response)
Expand All @@ -173,9 +180,22 @@
(let [msg "rethrow-response bad argument"]
(throw (ex-info msg (response-error msg))))))

(defn status-200?
(defn get-status
[{:keys [status] :as _response}]
(= status 200))
status)

(defn get-body
[{:keys [body] :as _response}]
body)

(defn status?
[expected-status response]
(= (get-status response) expected-status))

(def status-ok? (partial status? 200))
(def status-created? (partial status? 201))
(def status-not-found? (partial status? 404))
(def status-conflict? (partial status? 409))

(defn configurable-check-response
[response-ok? on-success on-error]
Expand All @@ -186,12 +206,12 @@

(def throw-response-not-200
(configurable-check-response
status-200? identity rethrow-response))
status-ok? identity rethrow-response))

(defn response-body
[response]
(:body response))

(def ignore-response-not-200
(configurable-check-response
status-200? identity (constantly nil)))
status-ok? identity (constantly nil)))
Loading