Filtering...

futures-raw

futures-raw
other
(in-package "ACL2")
other
(defstruct st-future
  (value nil)
  (valid nil)
  (closure nil)
  (aborted nil))
st-futuremacro
(defmacro st-future
  (x)
  `(let ((st-future (make-st-future)))
    (setf (st-future-closure st-future) (lambda nil ,X))
    (setf (st-future-valid st-future) nil)
    st-future))
st-future-readfunction
(defun st-future-read
  (st-future)
  (assert (st-future-p st-future))
  (if (st-future-valid st-future)
    (values-list (st-future-value st-future))
    (progn (setf (st-future-value st-future)
        (multiple-value-list (funcall (st-future-closure st-future))))
      (setf (st-future-valid st-future) t)
      (values-list (st-future-value st-future)))))
st-future-abortfunction
(defun st-future-abort
  (st-future)
  (assert (st-future-p st-future))
  (setf (st-future-aborted st-future) t)
  (setf (st-future-closure st-future) nil)
  st-future)
other
(push :skip-resource-availability-test *features*)
other
(defstruct atomic-notification (value nil))
other
(defstruct barrier
  (value nil)
  (lock (make-lock))
  (wait-count 0)
  (sem (make-semaphore)))
broadcast-barrierfunction
(defun broadcast-barrier
  (barrier)
  (without-interrupts (setf (barrier-value barrier) t)
    (with-lock (barrier-lock barrier)
      (let ((count (barrier-wait-count barrier)))
        (loop for
          i
          from
          0
          to
          count
          do
          (signal-semaphore (barrier-sem barrier))
          (decf (barrier-wait-count barrier)))))))
wait-on-barrierfunction
(defun wait-on-barrier
  (barrier)
  (if (barrier-value barrier)
    t
    (progn (with-lock (barrier-lock barrier)
        (incf (barrier-wait-count barrier)))
      (when (not (barrier-value barrier))
        (wait-on-semaphore (barrier-sem barrier))))))
other
(defstruct mt-future
  (index nil)
  (value nil)
  (valid (make-barrier))
  (closure nil)
  (aborted nil)
  (thrown-tag nil))
other
(define-atomically-modifiable-counter *last-slot-saved* 0)
other
(define-atomically-modifiable-counter *last-slot-taken* 0)
other
(defvar *future-array*)
other
(defvar *thread-array*)
other
(defvar *future-dependencies*)
other
(defparameter *future-queue-length-history* nil)
other
(defvar *current-thread-index* 0)
other
(defconstant *starting-core* 'start)
other
(defconstant *resumptive-core* 'resumptive)
other
(defvar *allocated-core* *resumptive-core*)
other
(defvar *decremented-idle-future-thread-count* nil)
other
(defvar *idle-future-core-count*
  (make-atomically-modifiable-counter *core-count*))
other
(defvar *idle-future-resumptive-core-count*
  (make-atomically-modifiable-counter (1- *core-count*)))
other
(defvar *idle-core* (make-semaphore))
other
(define-atomically-modifiable-counter *idle-future-thread-count*
  0)
other
(defvar *future-added* (make-semaphore))
other
(defvar *idle-resumptive-core* (make-semaphore))
other
(defvar *threads-spawned* 0)
other
(define-atomically-modifiable-counter *unassigned-and-active-future-count*
  1)
other
(define-atomically-modifiable-counter *total-future-count*
  0)
other
(defconstant *future-array-size* 200000)
farefmacro
(defmacro faref
  (array subscript)
  `(aref ,ARRAY
    (if (equal 0 ,SUBSCRIPT)
      0
      (1+ (mod ,SUBSCRIPT (1- *future-array-size*))))))
other
(defvar *resource-and-timing-based-parallelizations*
  0
  "Tracks the number of times that we parallelize execution when
  waterfall-parallelism is set to :resource-and-timing-based")
other
(defvar *resource-and-timing-based-serializations*
  0
  "Tracks the number of times that we do not parallelize execution when
  waterfall-parallelism is set to :resource-and-timing-based")
other
(defvar *resource-based-parallelizations*
  0
  "Tracks the number of times that we parallelize execution when
  waterfall-parallelism is set to :resource-based")
other
(defvar *resource-based-serializations*
  0
  "Tracks the number of times that we do not parallelize execution when
  waterfall-parallelism is set to :resource-based")
reset-future-queue-length-historyfunction
(defun reset-future-queue-length-history
  nil
  (setf *future-queue-length-history* nil))
reset-future-parallelism-variablesfunction
(defun reset-future-parallelism-variables
  nil
  (setf *thread-array*
    (make-array *future-array-size* :initial-element nil))
  (setf *future-array*
    (make-array *future-array-size* :initial-element nil))
  (setf *future-dependencies*
    (make-array *future-array-size* :initial-element nil))
  (setf *future-added* (make-semaphore))
  (setf *idle-future-core-count*
    (make-atomically-modifiable-counter *core-count*))
  (setf *idle-future-resumptive-core-count*
    (make-atomically-modifiable-counter (1- *core-count*)))
  (setf *idle-core* (make-semaphore))
  (setf *idle-resumptive-core* (make-semaphore))
  (dotimes (i *core-count*) (signal-semaphore *idle-core*))
  (dotimes (i (1- *core-count*))
    (signal-semaphore *idle-resumptive-core*))
  (setf *last-slot-taken*
    (make-atomically-modifiable-counter 0))
  (setf *last-slot-saved*
    (make-atomically-modifiable-counter 0))
  (setf *threads-spawned* 0)
  (setf *total-future-count*
    (make-atomically-modifiable-counter 0))
  (setf *unassigned-and-active-future-count*
    (make-atomically-modifiable-counter 1))
  (setf *idle-future-thread-count*
    (make-atomically-modifiable-counter 0))
  (setf *resource-and-timing-based-parallelizations* 0)
  (setf *resource-and-timing-based-serializations* 0)
  (setf *resource-based-parallelizations* 0)
  (setf *resource-based-serializations* 0)
  (reset-future-queue-length-history)
  t)
other
(reset-future-parallelism-variables)
reset-all-parallelism-variablesfunction
(defun reset-all-parallelism-variables
  nil
  (format t
    "Resetting parallelism and futures variables.  This may take a ~
             few seconds (often either~% 0 or 15).~%")
  (reset-parallelism-variables)
  (reset-future-parallelism-variables)
  (format t
    "Done resetting parallelism and futures variables.~%"))
futures-parallelism-buffer-has-space-availablefunction
(defun futures-parallelism-buffer-has-space-available
  nil
  (< (atomically-modifiable-counter-read *unassigned-and-active-future-count*)
    *unassigned-and-active-work-count-limit*))
not-too-many-futures-already-in-existencefunction
(defun not-too-many-futures-already-in-existence
  nil
  (let ((total-parallelism-work-limit (f-get-global 'total-parallelism-work-limit
         *the-live-state*)))
    (cond ((equal total-parallelism-work-limit :none) t)
      ((< (atomically-modifiable-counter-read *total-future-count*)
         total-parallelism-work-limit) t)
      (t (let ((total-parallelism-work-limit-error (f-get-global 'total-parallelism-work-limit-error
               *the-live-state*)))
          (cond ((equal total-parallelism-work-limit-error t) (er hard
                'not-too-many-futures-already-in-existence
                "The system has encountered the limit placed upon the ~
                         total amount of parallelism work allowed.  Either ~
                         the limit must be increased, or this error must be ~
                         disabled.  See :DOC set-total-parallelism-work-limit ~
                         and :DOC set-total-parallelism-work-limit-error for ~
                         more information."))
            ((null total-parallelism-work-limit-error) nil)
            (t (er hard
                'not-too-many-futures-already-in-existence
                "The value for global variable ~
                           total-parallelism-work-limit-error must be one of ~
                           t or nil.  Please change the value of this global ~
                           variable to either of those values."))))))))
futures-resources-availablefunction
(defun futures-resources-available
  nil
  (and (f-get-global 'parallel-execution-enabled *the-live-state*)
    (futures-parallelism-buffer-has-space-available)
    (not-too-many-futures-already-in-existence)))
other
(define-atomically-modifiable-counter *threads-waiting-for-starting-core*
  0)
claim-starting-corefunction
(defun claim-starting-core
  nil
  (atomic-incf *threads-waiting-for-starting-core*)
  (let ((notification (make-semaphore-notification)))
    (unwind-protect-disable-interrupts-during-cleanup (wait-on-semaphore *idle-core* :notification notification)
      (progn (when (semaphore-notification-status notification)
          (setf *allocated-core* *starting-core*)
          (atomic-decf *idle-future-core-count*)
          (setf *decremented-idle-future-thread-count* t)
          (atomic-decf *idle-future-thread-count*))
        (atomic-decf *threads-waiting-for-starting-core*)))))
claim-resumptive-corefunction
(defun claim-resumptive-core
  nil
  (let ((notification (make-semaphore-notification)))
    (unwind-protect-disable-interrupts-during-cleanup (wait-on-semaphore *idle-resumptive-core*
        :notification notification)
      (when (semaphore-notification-status notification)
        (setf *allocated-core* *resumptive-core*)
        (atomic-decf *idle-future-resumptive-core-count*)))))
free-allocated-corefunction
(defun free-allocated-core
  nil
  (without-interrupts (cond ((eq *allocated-core* *starting-core*) (atomic-incf *idle-future-core-count*)
        (signal-semaphore *idle-core*)
        (setf *allocated-core* nil))
      ((eq *allocated-core* *resumptive-core*) (atomic-incf *idle-future-resumptive-core-count*)
        (signal-semaphore *idle-resumptive-core*)
        (setf *allocated-core* nil))
      (t nil))
    t))
early-terminate-childrenfunction
(defun early-terminate-children
  (index)
  (abort-future-indices (faref *future-dependencies* index))
  (setf (faref *future-dependencies* index) nil))
other
(defvar *aborted-futures-via-flag* 0)
other
(defvar *aborted-futures-total* 0)
other
(defvar *futures-resources-available-count* 0)
other
(defvar *futures-resources-unavailable-count* 0)
set-thread-check-for-abort-and-funcallfunction
(defun set-thread-check-for-abort-and-funcall
  (future)
  (let* ((index (mt-future-index future)) (closure (mt-future-closure future))
      (*allocated-core* nil)
      (*current-thread-index* index)
      (*decremented-idle-future-thread-count* nil)
      (early-terminated t))
    (unwind-protect-disable-interrupts-during-cleanup (progn (claim-starting-core)
        (setf (faref *thread-array* index) (current-thread))
        (if (mt-future-aborted future)
          (incf *aborted-futures-via-flag*)
          (progn (setf (mt-future-value future)
              (multiple-value-list (funcall closure)))
            (setq early-terminated nil)
            (broadcast-barrier (mt-future-valid future)))))
      (progn (setf (faref *thread-array* index) nil)
        (when early-terminated (early-terminate-children index))
        (setf (faref *future-dependencies* index) nil)
        (when *decremented-idle-future-thread-count*
          (atomic-incf *idle-future-thread-count*))
        (free-allocated-core)
        (setf (faref *future-array* index) nil)))))
other
(defvar *throwable-future-worker-thread* nil)
wait-for-a-closurefunction
(defun wait-for-a-closure
  nil
  (loop while
    (>= (atomically-modifiable-counter-read *last-slot-taken*)
      (atomically-modifiable-counter-read *last-slot-saved*))
    do
    (let ((random-amount-of-time (+ 10 (random 110.0))))
      (when (not (wait-on-semaphore *future-added*
            :timeout random-amount-of-time))
        (throw :worker-thread-no-longer-needed nil)))))
other
(defvar *busy-wait-var* 0)
other
(defvar *current-waiting-thread* nil)
other
(defvar *fresh-waiting-threads* 0)
make-tclet-thrown-symbol1function
(defun make-tclet-thrown-symbol1
  (tags first-tag)
  (if (endp tags)
    ""
    (concatenate 'string
      (if first-tag
        ""
        "-OR-")
      (symbol-name (car tags))
      "-THROWN"
      (make-tclet-thrown-symbol1 (cdr tags) nil))))
make-tclet-thrown-symbolfunction
(defun make-tclet-thrown-symbol
  (tags)
  (intern (make-tclet-thrown-symbol1 tags t) "ACL2"))
make-tclet-bindings1function
(defun make-tclet-bindings1
  (tags)
  (if (endp tags)
    nil
    (cons (list (make-tclet-thrown-symbol (reverse tags)) t)
      (make-tclet-bindings1 (cdr tags)))))
make-tclet-bindingsfunction
(defun make-tclet-bindings
  (tags)
  (reverse (make-tclet-bindings1 (reverse tags))))
make-tclet-thrown-tags1function
(defun make-tclet-thrown-tags1
  (tags)
  (if (endp tags)
    nil
    (cons (make-tclet-thrown-symbol (reverse tags))
      (make-tclet-thrown-tags1 (cdr tags)))))
make-tclet-thrown-tagsfunction
(defun make-tclet-thrown-tags
  (tags)
  (reverse (make-tclet-thrown-tags1 (reverse tags))))
make-tclet-catchesfunction
(defun make-tclet-catches
  (rtags body thrown-tag-bindings)
  (if (endp rtags)
    body
    (list 'catch
      (list 'quote (car rtags))
      (list 'prog1
        (make-tclet-catches (cdr rtags)
          body
          (cdr thrown-tag-bindings))
        `(setq ,(CAR THROWN-TAG-BINDINGS) nil)))))
make-tclet-cleanupsfunction
(defun make-tclet-cleanups
  (thrown-tags cleanups)
  (if (endp thrown-tags)
    '((t nil))
    (cons (list (car thrown-tags) (car cleanups))
      (make-tclet-cleanups (cdr thrown-tags) (cdr cleanups)))))
throw-catch-letmacro
(defmacro throw-catch-let
  (tags body cleanups)
  (let* ((thrown-tags (make-tclet-thrown-tags tags)))
    `(let ,(MAKE-TCLET-BINDINGS TAGS)
      (let ((tclet-result ,(MAKE-TCLET-CATCHES TAGS BODY THROWN-TAGS)))
        (prog2 (cond ,@(MAKE-TCLET-CLEANUPS THROWN-TAGS CLEANUPS))
          tclet-result)))))
eval-a-closurefunction
(defun eval-a-closure
  nil
  (let* ((index (atomic-incf *last-slot-taken*)) (*current-thread-index* index)
      (thrown-tag nil)
      (thrown-val nil)
      (future nil))
    (loop while
      (not (faref *future-array* index))
      do
      (incf *busy-wait-var*)
      (when (not (equal (current-thread) *current-waiting-thread*))
        (setf *current-waiting-thread* (current-thread))
        (incf *fresh-waiting-threads*)))
    (throw-catch-let (raw-ev-fncall local-top-level
        time-limit5-tag
        step-limit-tag)
      (catch :result-no-longer-needed (let ((*throwable-future-worker-thread* t))
          (progn (setq future (faref *future-array* index))
            (set-thread-check-for-abort-and-funcall future))))
      ((progn (setf thrown-tag 'raw-ev-fncall)
         (setf thrown-val tclet-result)) (progn (setf thrown-tag 'local-top-level)
          (setf thrown-val tclet-result))
        (progn (setf thrown-tag 'time-limit5-tag)
          (setf thrown-val tclet-result))
        (progn (setf thrown-tag 'step-limit-tag)
          (setf thrown-val tclet-result))))
    (atomic-decf *unassigned-and-active-future-count*)
    (atomic-decf *total-future-count*)
    (when thrown-tag
      (setf (mt-future-thrown-tag future)
        (cons thrown-tag thrown-val))
      (broadcast-barrier (mt-future-valid future)))))
eval-closuresfunction
(defun eval-closures
  nil
  (catch :worker-thread-no-longer-needed (let ((*throwable-worker-thread* t) (*default-hs* nil))
      (declare (special *default-hs*))
      (loop (wait-for-a-closure)
        (eval-a-closure))))
  (atomic-decf *idle-future-thread-count*))
number-of-idle-threads-and-threads-waiting-for-a-starting-corefunction
(defun number-of-idle-threads-and-threads-waiting-for-a-starting-core
  nil
  (+ (atomically-modifiable-counter-read *idle-future-thread-count*)
    (atomically-modifiable-counter-read *threads-waiting-for-starting-core*)))
spawn-closure-consumersfunction
(defun spawn-closure-consumers
  nil
  (without-interrupts (loop while
      (< (number-of-idle-threads-and-threads-waiting-for-a-starting-core)
        *max-idle-thread-count*)
      do
      (progn (atomic-incf *idle-future-thread-count*)
        (incf *threads-spawned*)
        (run-thread "Worker thread" 'eval-closures)))))
make-future-with-closurefunction
(defun make-future-with-closure
  (closure)
  (let ((future (make-mt-future)) (index (atomic-incf *last-slot-saved*)))
    (assert (not (faref *thread-array* index)))
    (assert (not (faref *future-array* index)))
    (assert (not (faref *future-dependencies* index)))
    (setf (mt-future-index future) index)
    (setf (faref *future-dependencies* *current-thread-index*)
      (cons index
        (faref *future-dependencies* *current-thread-index*)))
    (setf (mt-future-closure future) closure)
    future))
add-future-to-queuefunction
(defun add-future-to-queue
  (future)
  (setf (faref *future-array* (mt-future-index future))
    future)
  (atomic-incf *total-future-count*)
  (atomic-incf *unassigned-and-active-future-count*)
  (spawn-closure-consumers)
  (signal-semaphore *future-added*)
  future)
make-closure-expr-with-acl2-bindingsfunction
(defun make-closure-expr-with-acl2-bindings
  (body)
  (let ((ld-level-sym (gensym)) (ld-level-state-sym (gensym))
      (wormholep-sym (gensym))
      (local-safe-mode-sym (gensym))
      (local-gc-on-sym (gensym)))
    `(let* ((,LD-LEVEL-SYM *ld-level*) (,LD-LEVEL-STATE-SYM (assert$ (equal *ld-level* (f-get-global 'ld-level *the-live-state*))
            (f-get-global 'ld-level *the-live-state*)))
        (acl2-unwind-protect-stack *acl2-unwind-protect-stack*)
        (,WORMHOLEP-SYM *wormholep*)
        (,LOCAL-SAFE-MODE-SYM (f-get-global 'safe-mode *the-live-state*))
        (,LOCAL-GC-ON-SYM (f-get-global 'guard-checking-on *the-live-state*)))
      (lambda nil
        (let ((*ld-level* ,LD-LEVEL-SYM) (*acl2-unwind-protect-stack* acl2-unwind-protect-stack)
            (*wormholep* ,WORMHOLEP-SYM))
          (state-free-global-let* ((ld-level ,LD-LEVEL-STATE-SYM) (safe-mode ,LOCAL-SAFE-MODE-SYM)
              (guard-checking-on ,LOCAL-GC-ON-SYM))
            ,BODY))))))
mt-futuremacro
(defmacro mt-future
  (x)
  `(cond ((not (futures-resources-available)) (incf *futures-resources-unavailable-count*)
      (st-future ,X))
    (t (incf *futures-resources-available-count*)
      (without-interrupts (let ((future (make-future-with-closure ,(MAKE-CLOSURE-EXPR-WITH-ACL2-BINDINGS X))))
          (without-interrupts (add-future-to-queue future))
          future)))))
mt-future-readfunction
(defun mt-future-read
  (future)
  (cond ((st-future-p future) (st-future-read future))
    ((mt-future-p future) (when (not (barrier-value (mt-future-valid future)))
        (let ((notif nil))
          (unwind-protect-disable-interrupts-during-cleanup (progn (without-interrupts (free-allocated-core)
                (atomic-decf *unassigned-and-active-future-count*)
                (setq notif t))
              (wait-on-barrier (mt-future-valid future))
              (claim-resumptive-core))
            (when notif
              (atomic-incf *unassigned-and-active-future-count*)))))
      (when (mt-future-thrown-tag future)
        (throw (car (mt-future-thrown-tag future))
          (cdr (mt-future-thrown-tag future))))
      (values-list (mt-future-value future)))
    (t (error "future-read was given a non-future argument"))))
other
(defvar *aborted-futures-via-throw* 0)
other
(defvar *almost-aborted-future-count* 0)
mt-future-abortfunction
(defun mt-future-abort
  (future)
  (incf *aborted-futures-total*)
  (cond ((st-future-p future) (st-future-abort future))
    ((mt-future-p future) (without-interrupts (let ((index (mt-future-index future)))
          (assert index)
          (setf (mt-future-aborted future) t)
          (let ((thread (faref *thread-array* index)))
            (when thread
              (interrupt-thread thread
                (lambda nil
                  (if (equal (mt-future-index future) *current-thread-index*)
                    (when *throwable-future-worker-thread*
                      (incf *aborted-futures-via-throw*)
                      (throw :result-no-longer-needed nil))
                    (incf *almost-aborted-future-count*)))))))))
    ((null future) t)
    (t (error "future-abort was given a non-future argument")))
  future)
abort-future-indicesfunction
(defun abort-future-indices
  (indices)
  (if (endp indices)
    t
    (progn (mt-future-abort (faref *future-array* (car indices)))
      (abort-future-indices (cdr indices)))))
print-non-nils-in-arrayfunction
(defun print-non-nils-in-array
  (array n)
  (if (equal n (length array))
    "end"
    (if (null (faref array n))
      (print-non-nils-in-array array (1+ n))
      (progn (print n)
        (print (faref array n))
        (print-non-nils-in-array array (1+ n))))))
futures-still-in-flightfunction
(defun futures-still-in-flight
  nil
  (< 1
    (atomically-modifiable-counter-read *unassigned-and-active-future-count*)))
futuremacro
(defmacro future (x) `(mt-future ,X))
future-readfunction
(defun future-read (x) (mt-future-read x))
future-abortfunction
(defun future-abort (x) (mt-future-abort x))
abort-futuresfunction
(defun abort-futures
  (futures)
  (cond ((endp futures) t)
    (t (future-abort (car futures))
      (abort-futures (cdr futures)))))