Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Project file updates #4

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
(defproject org.purefn/sentenza "0.1.8"
:description "Data pipeline for building email records"
:dependencies [[org.clojure/clojure "1.9.0-alpha16"]
(defproject org.purefn/sentenza "0.1.9-SNAPSHOT"
:description "A library for easier parallel (single-node) processing"
:dependencies [[org.clojure/clojure "1.9.0" :scope "provided"]
[clj-time "0.14.2"]
[me.raynes/fs "1.4.6"]
[org.clojure/core.async "0.3.443"]
[com.taoensso/timbre "4.10.0"]
[cheshire "5.8.0"] ]
[cheshire "5.8.0"]]
:profiles {:dev {:dependencies [[org.clojure/tools.namespace "0.2.11"]
[com.stuartsierra/component "0.3.2"]
[criterium "0.4.4"]]
Expand Down
34 changes: 16 additions & 18 deletions src/org/purefn/sentenza/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
[clojure.core.async.impl.protocols :as async-impl]
[taoensso.timbre :as log]
[org.purefn.sentenza.proto :as proto]
[org.purefn.sentenza.annotate :as sann]))
[org.purefn.sentenza.annotate :as annotate]))

(defn threaded-pipe
"Takes elements from the from channel and calls the transformer on it in
Expand All @@ -20,7 +20,7 @@
[n from xf]
(log/info "Creating " n " threads.")
(let [chs (map
(fn [_] (chan 50 xf (sann/warner xf)))
(fn [_] (chan 50 xf (annotate/warner xf)))
(range n))]
(doseq [ch chs]
(async/thread
Expand Down Expand Up @@ -117,8 +117,8 @@
(defn paral?
"Returns whether a given trasnformer should be treated as parallelizable."
[xf]
(or (sann/threaded? xf)
(sann/cored? xf)))
(or (annotate/threaded? xf)
(annotate/cored? xf)))

;; =============================
;; Main API
Expand All @@ -141,13 +141,11 @@
core.async channels. Returns the last transducer's channel.

For each transducer, it will create a channel and pipe in the
contents of the previous channel. If the transducer has metadata
indicating that it involves blocking, IO-bound operations it will
pipe using core.async's thread and blocking operations. Otherwise
it simply defers to async/pipe to connect the two channels.

We chose not to use async/pipeline since it performs extra work for
features (ordering,multiple values) that we don't need."
contents of the previous channel. If the transducer has been
annotated as being parallel with `annotate/cored` or
`annotate/threaded`, the appropriate core.async pipeline will be
created for its computation. Otherwise it simply defers to
async/pipe to connect the two channels."
[source & xfs]
(if (channel? source)
(loop [from source
Expand All @@ -160,23 +158,23 @@
;; the same thread.
to (if (paral? xf)
(chan 500)
(chan 500 xf (sann/warner xf)))]
(chan 500 xf (annotate/warner xf)))]
(cond
(sann/threaded? xf)
(async/pipeline-blocking (sann/paral xf)
(annotate/threaded? xf)
(async/pipeline-blocking (annotate/paral xf)
to
xf
from
true
(sann/warner xf))
(annotate/warner xf))

(sann/cored? xf)
(async/pipeline (sann/paral xf)
(annotate/cored? xf)
(async/pipeline (annotate/paral xf)
to
xf
from
true
(sann/warner xf))
(annotate/warner xf))

:default
(async/pipe from to))
Expand Down