mirror of
				https://github.com/janet-lang/janet
				synced 2025-10-31 15:43:01 +00:00 
			
		
		
		
	Work on #1596 - No detached threads, make sure to call pthread_join
Call pthread_join on all worker threads for timeouts. Previously, we were leaking some threads, as well as creating a timeout and leaving has_worker unset on certain timeouts.
This commit is contained in:
		| @@ -604,8 +604,31 @@ void janet_ev_init_common(void) { | |||||||
| #endif | #endif | ||||||
| } | } | ||||||
|  |  | ||||||
|  | static void handle_timeout_worker(JanetTimeout to) { | ||||||
|  |     if (!to.has_worker) return; | ||||||
|  | #ifdef JANET_WINDOWS | ||||||
|  |     QueueUserAPC(janet_timeout_stop, to.worker, 0); | ||||||
|  |     WaitForSingleObject(to.worker, INFINITE); | ||||||
|  |     CloseHandle(to.worker); | ||||||
|  | #else | ||||||
|  | #ifdef JANET_ANDROID | ||||||
|  |     pthread_kill(to.worker, SIGUSR1); | ||||||
|  | #else | ||||||
|  |     int ret = pthread_cancel(to.worker); | ||||||
|  |     janet_assert(!ret, "pthread_cancel"); | ||||||
|  | #endif | ||||||
|  |     void *res = NULL; | ||||||
|  |     janet_assert(!pthread_join(to.worker, &res), "pthread_join"); | ||||||
|  | #endif | ||||||
|  | } | ||||||
|  |  | ||||||
| /* Common deinit code */ | /* Common deinit code */ | ||||||
| void janet_ev_deinit_common(void) { | void janet_ev_deinit_common(void) { | ||||||
|  |     JanetTimeout to; | ||||||
|  |     while (peek_timeout(&to)) { | ||||||
|  |         handle_timeout_worker(to); | ||||||
|  |         pop_timeout(0); | ||||||
|  |     } | ||||||
|     janet_q_deinit(&janet_vm.spawn); |     janet_q_deinit(&janet_vm.spawn); | ||||||
|     janet_free(janet_vm.tq); |     janet_free(janet_vm.tq); | ||||||
|     janet_table_deinit(&janet_vm.threaded_abstracts); |     janet_table_deinit(&janet_vm.threaded_abstracts); | ||||||
| @@ -1434,6 +1457,7 @@ JanetFiber *janet_loop1(void) { | |||||||
|     JanetTimestamp now = ts_now(); |     JanetTimestamp now = ts_now(); | ||||||
|     while (peek_timeout(&to) && to.when <= now) { |     while (peek_timeout(&to) && to.when <= now) { | ||||||
|         pop_timeout(0); |         pop_timeout(0); | ||||||
|  |         handle_timeout_worker(to); | ||||||
|         if (to.curr_fiber != NULL) { |         if (to.curr_fiber != NULL) { | ||||||
|             if (janet_fiber_can_resume(to.curr_fiber)) { |             if (janet_fiber_can_resume(to.curr_fiber)) { | ||||||
|                 janet_cancel(to.fiber, janet_cstringv("deadline expired")); |                 janet_cancel(to.fiber, janet_cstringv("deadline expired")); | ||||||
| @@ -1495,27 +1519,14 @@ JanetFiber *janet_loop1(void) { | |||||||
|         while ((has_timeout = peek_timeout(&to))) { |         while ((has_timeout = peek_timeout(&to))) { | ||||||
|             if (to.curr_fiber != NULL) { |             if (to.curr_fiber != NULL) { | ||||||
|                 if (!janet_fiber_can_resume(to.curr_fiber)) { |                 if (!janet_fiber_can_resume(to.curr_fiber)) { | ||||||
|                     if (to.has_worker) { |  | ||||||
| #ifdef JANET_WINDOWS |  | ||||||
|                         QueueUserAPC(janet_timeout_stop, to.worker, 0); |  | ||||||
|                         WaitForSingleObject(to.worker, INFINITE); |  | ||||||
|                         CloseHandle(to.worker); |  | ||||||
| #else |  | ||||||
| #ifdef JANET_ANDROID |  | ||||||
|                         pthread_kill(to.worker, SIGUSR1); |  | ||||||
| #else |  | ||||||
|                         pthread_cancel(to.worker); |  | ||||||
| #endif |  | ||||||
|                         void *res; |  | ||||||
|                         pthread_join(to.worker, &res); |  | ||||||
| #endif |  | ||||||
|                     } |  | ||||||
|                     janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(to.curr_fiber)); |  | ||||||
|                     pop_timeout(0); |                     pop_timeout(0); | ||||||
|  |                     janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(to.curr_fiber)); | ||||||
|  |                     handle_timeout_worker(to); | ||||||
|                     continue; |                     continue; | ||||||
|                 } |                 } | ||||||
|             } else if (to.fiber->sched_id != to.sched_id) { |             } else if (to.fiber->sched_id != to.sched_id) { | ||||||
|                 pop_timeout(0); |                 pop_timeout(0); | ||||||
|  |                 handle_timeout_worker(to); | ||||||
|                 continue; |                 continue; | ||||||
|             } |             } | ||||||
|             break; |             break; | ||||||
| @@ -3170,6 +3181,7 @@ JANET_NO_RETURN void janet_sleep_await(double sec) { | |||||||
|     to.is_error = 0; |     to.is_error = 0; | ||||||
|     to.sched_id = to.fiber->sched_id; |     to.sched_id = to.fiber->sched_id; | ||||||
|     to.curr_fiber = NULL; |     to.curr_fiber = NULL; | ||||||
|  |     to.has_worker = 0; | ||||||
|     add_timeout(to); |     add_timeout(to); | ||||||
|     janet_await(); |     janet_await(); | ||||||
| } | } | ||||||
| @@ -3227,7 +3239,6 @@ JANET_CORE_FN(cfun_ev_deadline, | |||||||
|             janet_free(tto); |             janet_free(tto); | ||||||
|             janet_panicf("%s", janet_strerror(err)); |             janet_panicf("%s", janet_strerror(err)); | ||||||
|         } |         } | ||||||
|         janet_assert(!pthread_detach(worker), "pthread_detach"); |  | ||||||
| #endif | #endif | ||||||
|         to.has_worker = 1; |         to.has_worker = 1; | ||||||
|         to.worker = worker; |         to.worker = worker; | ||||||
|   | |||||||
| @@ -546,9 +546,33 @@ | |||||||
|   (ev/sleep 0.15) |   (ev/sleep 0.15) | ||||||
|   (assert (not terminated-normally) "early termination failure 3")) |   (assert (not terminated-normally) "early termination failure 3")) | ||||||
|  |  | ||||||
|  | # Deadline with interrupt | ||||||
|  | (defmacro with-deadline2 | ||||||
|  |   `` | ||||||
|  |   Create a fiber to execute `body`, schedule the event loop to cancel | ||||||
|  |   the task (root fiber) associated with `body`'s fiber, and start | ||||||
|  |   `body`'s fiber by resuming it. | ||||||
|  |  | ||||||
|  |   The event loop will try to cancel the root fiber if `body`'s fiber | ||||||
|  |   has not completed after at least `sec` seconds. | ||||||
|  |  | ||||||
|  |   `sec` is a number that can have a fractional part. | ||||||
|  |   `` | ||||||
|  |   [sec & body] | ||||||
|  |   (with-syms [f] | ||||||
|  |     ~(let [,f (coro ,;body)] | ||||||
|  |        (,ev/deadline ,sec nil ,f true) | ||||||
|  |        (,resume ,f)))) | ||||||
|  |  | ||||||
|  | (repeat 10 | ||||||
|  |         (assert (= :done (with-deadline2 10 | ||||||
|  |                            (ev/sleep 0.01) | ||||||
|  |                            :done)) "deadline with interrupt exits normally")) | ||||||
|  |  | ||||||
|  | (repeat 10 | ||||||
|         (let [f (coro (forever :foo))] |         (let [f (coro (forever :foo))] | ||||||
|           (ev/deadline 0.01 nil f true) |           (ev/deadline 0.01 nil f true) | ||||||
|   (assert-error "deadline expired" (resume f))) |           (assert-error "deadline expired" (resume f)))) | ||||||
|  |  | ||||||
| # Use :err :stdout | # Use :err :stdout | ||||||
| (def- subproc-code '(do (eprint "hi") (eflush) (print "there") (flush))) | (def- subproc-code '(do (eprint "hi") (eflush) (print "there") (flush))) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Calvin Rose
					Calvin Rose