mirror of
				https://github.com/janet-lang/janet
				synced 2025-10-31 07:33:01 +00:00 
			
		
		
		
	Support using a background thread to monitor deadlines
This commit is contained in:
		
							
								
								
									
										108
									
								
								src/core/ev.c
									
									
									
									
									
								
							
							
						
						
									
										108
									
								
								src/core/ev.c
									
									
									
									
									
								
							| @@ -112,6 +112,13 @@ typedef struct { | |||||||
|     JanetHandle write_pipe; |     JanetHandle write_pipe; | ||||||
| } JanetEVThreadInit; | } JanetEVThreadInit; | ||||||
|  |  | ||||||
|  | /* Structure used to initialize threads that run timeouts */ | ||||||
|  | typedef struct { | ||||||
|  |     double sec; | ||||||
|  |     JanetVM *vm; | ||||||
|  |     JanetFiber *fiber; | ||||||
|  | } JanetThreadedTimeout; | ||||||
|  |  | ||||||
| #define JANET_MAX_Q_CAPACITY 0x7FFFFFF | #define JANET_MAX_Q_CAPACITY 0x7FFFFFF | ||||||
|  |  | ||||||
| static void janet_q_init(JanetQueue *q) { | static void janet_q_init(JanetQueue *q) { | ||||||
| @@ -1431,6 +1438,17 @@ JanetFiber *janet_loop1(void) { | |||||||
|         while ((has_timeout = peek_timeout(&to))) { |         while ((has_timeout = peek_timeout(&to))) { | ||||||
|             if (to.curr_fiber != NULL) { |             if (to.curr_fiber != NULL) { | ||||||
|                 if (!janet_fiber_can_resume(to.curr_fiber)) { |                 if (!janet_fiber_can_resume(to.curr_fiber)) { | ||||||
|  |                     if (to.has_worker) { | ||||||
|  | #ifdef JANET_WINDOWS | ||||||
|  |                         QueueUserAPC(janet_timeout_stop, to.worker, 0); | ||||||
|  |                         WaitForSingleObject(to.worker, INFINITE); | ||||||
|  |                         CloseHandle(to.worker); | ||||||
|  | #else | ||||||
|  |                         pthread_cancel(to.worker); | ||||||
|  |                         void *res; | ||||||
|  |                         pthread_join(to.worker, &res); | ||||||
|  | #endif | ||||||
|  |                     } | ||||||
|                     janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(to.curr_fiber)); |                     janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(to.curr_fiber)); | ||||||
|                     pop_timeout(0); |                     pop_timeout(0); | ||||||
|                     continue; |                     continue; | ||||||
| @@ -3102,27 +3120,95 @@ JANET_CORE_FN(cfun_ev_sleep, | |||||||
|     janet_sleep_await(sec); |     janet_sleep_await(sec); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #ifdef JANET_WINDOWS | ||||||
|  | static VOID CALLBACK janet_timeout_stop(ULONG_PTR ptr) { | ||||||
|  |     UNREFERENCED_PARAMETER(ptr); | ||||||
|  |     ExitThread(0); | ||||||
|  | } | ||||||
|  | #endif | ||||||
|  |  | ||||||
|  | static void janet_timeout_cb(JanetEVGenericMessage msg) { | ||||||
|  |     (void) msg; | ||||||
|  |     janet_interpreter_interrupt_handled(&janet_vm); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #ifdef JANET_WINDOWS | ||||||
|  | static DWORD WINAPI janet_timeout_body(LPVOID ptr) { | ||||||
|  |     JanetThreadedTimeout *tto = (JanetThreadedTimeout *)ptr; | ||||||
|  |     double sec = (tto->sec > 0) ? tto->sec : 0; | ||||||
|  |     SleepEx((DWORD)(tto->sec * 1000), TRUE); | ||||||
|  |     if (janet_fiber_can_resume(tto->fiber)) { | ||||||
|  |         janet_interpreter_interrupt(tto->vm); | ||||||
|  |         JanetEVGenericMessage msg = {0}; | ||||||
|  |         janet_ev_post_event(tto->vm, janet_timeout_cb, msg); | ||||||
|  |     } | ||||||
|  |     return 0; | ||||||
|  | } | ||||||
|  | #else | ||||||
|  | static void *janet_timeout_body(void *ptr) { | ||||||
|  |     JanetThreadedTimeout *tto = (JanetThreadedTimeout *)ptr; | ||||||
|  |     struct timespec ts; | ||||||
|  |     ts.tv_sec = (time_t) tto->sec; | ||||||
|  |     ts.tv_nsec = (tto->sec <= UINT32_MAX) | ||||||
|  |                  ? (long)((tto->sec - ((uint32_t)tto->sec)) * 1000000000) | ||||||
|  |                  : 0; | ||||||
|  |     nanosleep(&ts, &ts); | ||||||
|  |     if (janet_fiber_can_resume(tto->fiber)) { | ||||||
|  |         janet_interpreter_interrupt(tto->vm); | ||||||
|  |         JanetEVGenericMessage msg = {0}; | ||||||
|  |         janet_ev_post_event(tto->vm, janet_timeout_cb, msg); | ||||||
|  |     } | ||||||
|  |     return NULL; | ||||||
|  | } | ||||||
|  | #endif | ||||||
|  |  | ||||||
| JANET_CORE_FN(cfun_ev_deadline, | JANET_CORE_FN(cfun_ev_deadline, | ||||||
|               "(ev/deadline sec &opt tocancel tocheck)", |               "(ev/deadline sec &opt tocancel tocheck intr?)", | ||||||
|               "Schedules the event loop to try to cancel the `tocancel` " |               "Schedules the event loop to try to cancel the `tocancel` task as with `ev/cancel`. " | ||||||
|               "task as with `ev/cancel`. After `sec` seconds, the event " |               "After `sec` seconds, the event loop will attempt cancellation of `tocancel` if the " | ||||||
|               "loop will attempt cancellation of `tocancel` if the " |               "`tocheck` fiber is resumable. `sec` is a number that can have a fractional part. " | ||||||
|               "`tocheck` fiber is resumable. `sec` is a number that can " |               "`tocancel` defaults to `(fiber/root)`, but if specified, must be a task (root " | ||||||
|               "have a fractional part. `tocancel` defaults to " |               "fiber). `tocheck` defaults to `(fiber/current)`, but if specified, must be a fiber. " | ||||||
|               "`(fiber/root)`, but if specified, must be a task (root " |               "Returns `tocancel` immediately. If `interrupt?` is set to true, will create a " | ||||||
|               "fiber). `tocheck` defaults to `(fiber/current)`, but if " |               "background thread to try to interrupt the VM if the timeout expires.") { | ||||||
|               "specified, should be a fiber. Returns `tocancel` " |     janet_arity(argc, 1, 4); | ||||||
|               "immediately.") { |  | ||||||
|     janet_arity(argc, 1, 3); |  | ||||||
|     double sec = janet_getnumber(argv, 0); |     double sec = janet_getnumber(argv, 0); | ||||||
|     JanetFiber *tocancel = janet_optfiber(argv, argc, 1, janet_vm.root_fiber); |     JanetFiber *tocancel = janet_optfiber(argv, argc, 1, janet_vm.root_fiber); | ||||||
|     JanetFiber *tocheck = janet_optfiber(argv, argc, 2, janet_vm.fiber); |     JanetFiber *tocheck = janet_optfiber(argv, argc, 2, janet_vm.fiber); | ||||||
|  |     int use_interrupt = janet_optboolean(argv, argc, 3, 0); | ||||||
|     JanetTimeout to; |     JanetTimeout to; | ||||||
|     to.when = ts_delta(ts_now(), sec); |     to.when = ts_delta(ts_now(), sec); | ||||||
|     to.fiber = tocancel; |     to.fiber = tocancel; | ||||||
|     to.curr_fiber = tocheck; |     to.curr_fiber = tocheck; | ||||||
|     to.is_error = 0; |     to.is_error = 0; | ||||||
|     to.sched_id = to.fiber->sched_id; |     to.sched_id = to.fiber->sched_id; | ||||||
|  |     if (use_interrupt) { | ||||||
|  |         JanetThreadedTimeout *tto = janet_malloc(sizeof(JanetThreadedTimeout)); | ||||||
|  |         if (NULL == tto) { | ||||||
|  |             JANET_OUT_OF_MEMORY; | ||||||
|  |         } | ||||||
|  |         tto->sec = sec; | ||||||
|  |         tto->vm = &janet_vm; | ||||||
|  |         tto->fiber = tocheck; | ||||||
|  | #ifdef JANET_WINDOWS | ||||||
|  |         HANDLE worker = CreateThread(NULL, 0, janet_timeout_body, tto, 0, NULL); | ||||||
|  |         if (NULL == worker) { | ||||||
|  |             janet_free(tto); | ||||||
|  |             janet_panic("failed to create thread"); | ||||||
|  |         } | ||||||
|  | #else | ||||||
|  |         pthread_t worker; | ||||||
|  |         int err = pthread_create(&worker, NULL, janet_timeout_body, tto); | ||||||
|  |         if (err) { | ||||||
|  |             janet_free(tto); | ||||||
|  |             janet_panicf("%s", janet_strerror(err)); | ||||||
|  |         } | ||||||
|  | #endif | ||||||
|  |         to.has_worker = 1; | ||||||
|  |         to.worker = worker; | ||||||
|  |     } else { | ||||||
|  |         to.has_worker = 0; | ||||||
|  |     } | ||||||
|     add_timeout(to); |     add_timeout(to); | ||||||
|     return janet_wrap_fiber(tocancel); |     return janet_wrap_fiber(tocancel); | ||||||
| } | } | ||||||
|   | |||||||
| @@ -59,6 +59,12 @@ typedef struct { | |||||||
|     JanetFiber *curr_fiber; |     JanetFiber *curr_fiber; | ||||||
|     uint32_t sched_id; |     uint32_t sched_id; | ||||||
|     int is_error; |     int is_error; | ||||||
|  |     int has_worker; | ||||||
|  | #ifdef JANET_WINDOWS | ||||||
|  |     HANDLE worker; | ||||||
|  | #else | ||||||
|  |     pthread_t worker; | ||||||
|  | #endif | ||||||
| } JanetTimeout; | } JanetTimeout; | ||||||
|  |  | ||||||
| /* Registry table for C functions - contains metadata that can | /* Registry table for C functions - contains metadata that can | ||||||
|   | |||||||
| @@ -550,4 +550,8 @@ | |||||||
|   (ev/sleep 0.15) |   (ev/sleep 0.15) | ||||||
|   (assert (not terminated-normally) "early termination failure 3")) |   (assert (not terminated-normally) "early termination failure 3")) | ||||||
|  |  | ||||||
|  | (let [f (coro (forever :foo))] | ||||||
|  |   (ev/deadline 0.01 nil f true) | ||||||
|  |   (assert-error "deadline expired" (resume f))) | ||||||
|  |  | ||||||
| (end-suite) | (end-suite) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Michael Camilleri
					Michael Camilleri