一个用于在ipython/jupyter笔记本中运行单元的新线程



有时运行单个单元格需要很长时间,而它正在运行,我想在同一笔记本中编写和运行其他单元格,访问相同上下文中的变量。

是否有任何python魔法可以使用,当它被添加到一个单元,运行单元将自动创建一个新的线程,并在笔记本中运行共享的全局数据?

这可能不是答案,而是答案的方向。我没见过那样的东西,但我对这个也很感兴趣。

我目前的发现表明,需要定义它的自己的自定义单元魔法。好的参考文献是文档中的自定义单元格魔术部分和我考虑的两个示例:

  • memit: magic memory usage bench for ippython https://gist.github.com/vene/3022718
  • 说明Python多线程与多处理:http://nathangrigg.net/2015/04/python-threading-vs-processes/

这两个链接都将代码封装在一个线程中。这可以作为一个起点。

UPDATE: ngcm-tutorial at github有后台作业类的描述

##github.com/jupyter/ngcm-tutorial/blob/master/Day-1/IPython%20Kernel/Background%20Jobs.ipynb
from IPython.lib import backgroundjobs as bg
jobs = bg.BackgroundJobManager()
def printfunc(interval=1, reps=5):
    for n in range(reps):
        time.sleep(interval)
        print('In the background... %i' % n)
        sys.stdout.flush()
    print('All done!')
    sys.stdout.flush()
jobs.new('printfunc(1,3)')
jobs.status()

UPDATE 2:另一个选项:

from IPython.display import display
from ipywidgets import IntProgress
import threading
class App(object):
    def __init__(self, nloops=2000):
        self.nloops = nloops
        self.pb = IntProgress(description='Thread loops', min=0, max=self.nloops)
    def start(self):
        display(self.pb)
        while self.pb.value < self.nloops:
            self.pb.value += 1 
        self.pb.color = 'red'
app = App(nloops=20000)
t = threading.Thread(target=app.start)
t.start()
#t.join()

这是我想出来的一小段代码

def jobs_manager():
    from IPython.lib.backgroundjobs import BackgroundJobManager
    from IPython.core.magic import register_line_magic
    from IPython import get_ipython
    jobs = BackgroundJobManager()
    @register_line_magic
    def job(line):
        ip = get_ipython()
        jobs.new(line, ip.user_global_ns)
    return jobs

它使用ippython内置模块IPython.lib.backgroundjobs。因此,代码既小又简单,也没有引入新的依赖项。

我这样使用它:

jobs = jobs_manager()
%job [fetch_url(_) for _ in urls]  # saves html file to disk
Starting job # 0 in a separate thread.

然后可以使用:

来监视状态:
jobs.status()
Running jobs:
1 : [fetch_url(_) for _ in urls]
Dead jobs:
0 : [fetch_url(_) for _ in urls]

如果作业失败,可以使用

检查堆栈跟踪
jobs.traceback(0)

没有办法杀死一个工作。所以我小心地使用了这个肮脏的技巧:

def kill_thread(thread):
    import ctypes
    id = thread.ident
    code = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        ctypes.c_long(id),
        ctypes.py_object(SystemError)
    )
    if code == 0:
        raise ValueError('invalid thread id')
    elif code != 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(
            ctypes.c_long(id),
            ctypes.c_long(0)
        )
        raise SystemError('PyThreadState_SetAsyncExc failed')

在给定线程中引发SystemError。因此,为了杀死一个工作,我做

kill_thread(jobs.all[1])

要杀死所有正在运行的作业,我执行

for thread in jobs.running:
    kill_thread(thread)

我喜欢使用%job与基于小部件的进度条https://github.com/alexanderkuk/log-progress像这样:

%job [fetch_url(_) for _ in log_progress(urls, every=1)]
http://g.recordit.co/iZJsJm8BOL.gif

甚至可以用%job代替multiprocessing.TreadPool:

for chunk in get_chunks(urls, 3):
    %job [fetch_url(_) for _ in log_progress(chunk, every=1)]
http://g.recordit.co/oTVCwugZYk.gif

这个代码的一些明显问题:

  1. 您不能在%job中使用任意代码。例如,可以没有作业,也不能打印。因此,我将它与将结果存储在硬盘上的例程一起使用

  2. 有时kill_thread中的脏hack不起作用。我想这就是为什么IPython.lib.backgroundjobs在设计上没有这个功能。如果线程正在执行一些系统调用,如sleepread,则忽略异常。

  3. 它使用线程。Python有GIL,所以%job不能用于一些需要Python字节码的繁重计算

最新更新