<small id='kbEF'></small> <noframes id='7eVNnz'>

  • <tfoot id='XHkT9ELWK3'></tfoot>

      <legend id='Ks8TC'><style id='B8wckalQN'><dir id='FsGBL'><q id='qFb8QtvyN'></q></dir></style></legend>
      <i id='CbaL'><tr id='QIkqBc'><dt id='zxNS8'><q id='Wg1OER'><span id='EjRQWeU'><b id='SP7vd0Xp'><form id='poC8DE6FqM'><ins id='lCqp'></ins><ul id='mOb1e'></ul><sub id='PGktIJEbM'></sub></form><legend id='nqHJ02aShx'></legend><bdo id='Kvd0MWS'><pre id='ElX28a'><center id='IQ6SEr'></center></pre></bdo></b><th id='45dY'></th></span></q></dt></tr></i><div id='f3MCSa'><tfoot id='UDgL5I6'></tfoot><dl id='2ztH4f'><fieldset id='qP9EvR'></fieldset></dl></div>

          <bdo id='OzxEB'></bdo><ul id='aHA2NC1tV'></ul>

          1. <li id='yGB4ko9SD'></li>
            登陆

            章鱼彩票app下载安装-Python Web:Flask 异步执行任务

            admin 2019-11-05 152人围观 ,发现0个评论

            原创: ayuliao 源:hackPython



            简介

            Flask 是 Python 中有名的轻量级同步 web 结构,在一些开发中,可能会遇到需求长期处理的使命,此刻就需求运用异步的办法来完成,让长期使命在后台运转,先将本次恳求的呼应状况回来给前端,不让前端界面「卡顿」,当异步使命处理好后,假如需求回来状况,再将状况回来。

            怎样完成呢?

            运用线程的办法

            当要履行耗时使命时,直接敞开一个新的线程来履行使命,这种办法最为简略快速。

            经过 ThreadPoolExecutor 来完成

            from
            flask
            import

            Flask
            from
            time
            import
            sleep
            from
            concurrent.futures
            import

            ThreadPoolExecutor
            # DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
            # 创立线程池履行器
            executor =
            ThreadPoolExecutor
            (
            2
            )
            app =
            Flask
            (__name__)
            @app
            .route(
            '/jobs'
            )
            def
            run_jobs():

            # 交由线程去履行耗时使命
            executor.submit(long_task,
            'hello'
            ,
            123
            )

            return

            'long task running.'
            # 耗时使命
            def
            long_task(arg1, arg2):

            print
            (
            "args: %s %s!"
            % (arg1, arg2))
            sleep(
            5
            )

            print
            (
            "Task is done!"
            )
            if
            __name__ ==
            '__main__'
            :
            app.run()

            当要履行一些比较简略的耗时使命时就能够运用这种办法,如发邮件、发短信验证码等。

            但这种办法有个问题,便是前端无法得知使命履行状况。

            假如想要前端知道,就需求规划一些逻辑,比方将使命履行状况存储到 redis 中,经过仅有的使命 id 进行标识,然后再写一个接口,经过使命 id 去获取使命的状况,然后让前端守时去恳求该接口,然后取得使命状况信息。

            悉数自己完成就显得有些麻烦了,而 Celery 刚好完成了这样的逻辑,来运用一下。

            运用 Celery

            为了满意前端能够取得使命状况的需求,能够运用 Celery。

            Celery 是实时使命处理与调度的分布式使命行列,它常用于 web 异步使命、守时使命等,后边独自写一篇文章描绘 Celery 的架构,这儿不深化评论。

            现在我想让前端能够经过一个进展条来判别后端使命的履行情况。运用 Celery 就很简略完成,首要经过 pip 装置 Celery 与 redis,之所以要装置 redis,是因为让 Celery 挑选 redis 作为「音讯署理 / 音讯中间件」。

            pip install celery
            pip install redis

            在 Flask 中运用 Celery 其实很简略,这儿先简略的过一下 Flask 中运用 Celery 的全体流程,然后再去完成详细的项目

            • 1. 在 Flask 中初始化 Celery
            from
            flask
            import

            Flask
            from
            celery
            import

            Celery
            app =
            Flask
            (__name__)
            # 装备
            # 装备音讯署理的途径,假如是在长途服务器上,则装备长途服务器中redis的URL
            app.config[
            'CELERY_BROKER_URL'
            ] =
            'redis://localhost:6379/0'
            # 要存储 Celery 使命的状况或运转成果时就必须要装备
            app.config[
            'CELERY_RESULT_BACKEND'
            ] =
            'redis://localhost:6379/0'
            # 初始化Celery
            celery =
            Celery
            (app.name, broker=app.config[
            'CELERY_BROKER_URL'
            ])
            # 将Flask中的装备直接传递给Celery
            celery.conf.update(app.config)

            上述代码中,经过 Celery 类初始化 celery 目标,传入的运用称号与音讯署理的衔接 URL。

            • 2. 经过 celery.task 装修器装修耗时使命对应的函数
            @celery
            .task
            def
            long_task(arg1, arg2):

            # 耗时使命的逻辑

            return
            result
            • 3.Flask 中界说接口经过异步的办法履行耗时使命
            @app
            .route(
            '/'
            , methods=[
            'GET'
            ,
            'POST'
            ])
            def
            index():
            task = long_task.delay(
            1
            ,
            2
            )

            delay () 办法是 applyasync () 办法的快捷办法,applyasync () 参数更多,能够愈加详尽的操控耗时使命,比方想要 long_task () 在一分钟后再履行

            @app
            .route(
            '/'
            , methods=[
            'GET'
            ,
            'POST'
            ])
            def
            index():
            task = long_task.apply_async(args=[
            1
            ,
            2
            ], countdown=
            60
            )

            delay () 与 apply_async () 会回来一个使命目标,该目标能够获取使命的状况与各种相关信息。

            经过这 3 步就能够运用 Celery 了。

            接着就详细来完成「让前端能够经过一个进展条来判别后端使命的履行情况」的需求。

            # bind为True,会传入self给被装修的办法
            @celery
            .task(bind=
            True
            )
            def
            long_task(self):
            verb = [
            'Starting up'
            ,
            'Booting'
            ,
            'Repairing'
            ,
            'Loading'
            ,
            'Checking'
            ]
            adjective = [
            'master'
            ,
            'radiant'
            ,
            'silent'
            ,
            'harmonic'
            ,
            'fast'
            ]
            noun = [
            'solar array'
            ,
            'particle reshaper'
            ,
            'cosmic ray'
            ,
            'orbiter'
            ,
            'bit'
            ]
            message =
            ''
            total = random.randint(
            10
            ,
            50
            )

            for
            i
            in
            range(total):

            if

            not
            message
            or
            random.random() <
            0.25
            :

            # 随机的获取一些信息
            message =
            '{0} {1} {2}...'
            .format(random.choice(verb),
            random.choice(adjective),
            random.choice(noun))

            # 更新Celery使命状况
            self.update_state(state=
            'PROGRESS'
            ,
            m章鱼彩票app下载安装-Python Web:Flask 异步执行任务eta={
            'current'
            : i,
            'total'
            : total,

            'status'
            : message})
            time.sleep(
            1
            )

            # 回来字典

            return
            {
            'current'
            :
            100
            ,
            'total'
            :
            100
            ,
            'status'
            :
            'Task completed!'
            ,

            'result'
            :
            42
            }

            上述代码中,celery.task () 装修器运用了 bind=True 参数,这个参数会让 Celery 将 Celery 自身传入,能够用于记载与更新使命状况。

            然后便是一个 for 迭代,迭代的逻辑没什么含义,便是随机从 list 中抽取一些词汇来模仿一些逻辑的运转,为了表明这是耗时逻辑,经过 time.sleep (1) 休眠一秒。

            每次获取一次词汇,就经过 self.update_state () 更新 Celery 使命的状况,Celery 包括一些内置状况,如 SUCCESS、STARTED 等等,这儿运用了自界说状况「PROGRESS」,除了状况外,还将本次循环的一些信息经过 meta 参数 (元数据) 以字典的方式存储起来。有了这些数据,前端就能够显现进展条了。

            界说好耗时办法后,再界说一个 Flask 接口办法来调用该耗时办法

            @app
            .route(
            '/longtask'
            , methods=[
            'POST'
            ])
            def
            longtask():

            # 异步调用
            task = long_task.apply_async()

            # 回来 202,与Location头

            return
            jsonify({}),
            202
            , {
            'Location'
            : url_for(
            'taskstatus'
            ,
            task_id=task.id)}

            简略而言,前端经过 POST 恳求到 /longtask,让后端开端去履行耗时使命。

            回来的状况码为 202,202 一般表明一个恳求正在进行中,然后还在回来数据包的包头 (Header) 中增加了 Location 头信息,前端能够经过读取数据包中 Header 中的 Location 的信息来获取使命 id 对应的完好 url。

            前端有了使命 id 对应的 url 后,还需求供给一个接口给前端,让前端能够经过使命 id 去获取当时时间使命的详细状况。

            @app
            .route(
            '/status/'
            )
            def
            taskstatus(task_id):
            task = long_task.
            AsyncResult
            (task_id)

            if
            task.state ==
            'PENDING'
            :
            # 在等候
            response = {

            'state'
            : task.state,

            'current'
            :
            0
            ,

            'total'
            :
            1
            ,

            'status'
            :
            'Pending...'
            }

            elif
            task.state !=
            'FAILURE'
            :
            # 没有失利
            response = {

            'state'
            : task.state,
            # 状况

            # meta中的数据,经过task.info.get()能够取得

            'current'
            : task.info.get(
            'current'
            ,
            0
            ),
            # 当时循环进展

            'total'
            : task.info.get(
            'total'
            ,
            1
            ),
            # 总循环进展

            'status'
            : task.info.get(
            'status'
            ,
            ''
            )
            }

            if

            'result'

            in
            task.info:
            response[
            'result'
            ] = task.info[
            'result'
            ]

            else
            :

            # 后端履行使命呈现了一些问题
            response = {

            'state'
            : task.state,

            'current'
            :
            1
            ,

            'total'
            :
            1
            ,

            'status'
            : str(task.info),
            # 报错的详细反常
            }

            return
            jsonify(response)

            为了能够取得使命目标中的信息,运用使命 id 初始化 AsyncResult 类,取得使命目标,然后就能够从使命目标中取得当时使命的信息。

            该办法会回来一个 JSON,其间包括了使命状况以及 meta 中指定的信息,前端能够运用这些信息构建一个进展条。

            假如使命在 PENDING 状况,表明该使命还没有开端,在这种状况下,使命中是没有什么信息的,这儿人为的回来一些数据。假如使命履行失利,就回来 task.info 中包括的反常信息,此外便是正常履行了,正常履行能够通 task.info 取得使命中详细的信息。

            这样,后端的逻辑就处理完成了,接着就来完成前端的逻辑,要完成图形进展条,能够直接运用 nanobar.js,简略两句话就能够完成一个进展条,其官网比如如下:

            var
            options = {
            classname:
            'my-class'
            ,
            id:
            'my-id'
            ,

            // 进展条要呈现的方位
            target: document.getElementById(
            'myDivId'
            )
            };
            // 初始化进展条目标
            var
            nanobar 章鱼彩票app下载安装-Python Web:Flask 异步执行任务=
            new

            Nanobar
            ( options );
            nanobar.go(
            30
            );
            // 30% 进展条
            nanobar.go(
            76
            );
            // 76% 进展条
            // 100% 进展条,进展条完毕
            nanobar.go(
            100
            );

            有了 nanobar.js 就十分简略了。

            先界说一个简略的 HTML 界面


            Long running task with progress updates







            id
            =
            "progress"
            >

            经过 JavaScript 完成对后台的恳求

            // 按钮点击工作
            $(
            function
            () {
            $(章鱼彩票app下载安装-Python Web:Flask 异步执行任务
            '#start-bg-job'
            ).click(start_long_task);
            });
            // 恳求 longtask 接口
            function
            start_long_task() {

            // 增加元素在html中
            div = $(
            '
            0%
            ...

            '
            );
            $(
            '#progre北仑天气ss'
            ).append(div);

            // 创立进展条目标

            var
            nanobar =
            new

            Nanobar
            ({
            bg:
            '#44f'
            ,
            target: div[
            0
            ].childNodes[
            0
            ]
            });

            // ajax恳求longtask
            $.ajax({
            type:
            'POST'
            ,
            url:
            '/longtask'
            ,

            // 取得数据,从呼应头中获取Location
            success:
            function
            (data, status, request) {
            status_url = request.getResponseHeader(
            'Location'
            );

            // 调用 update_progress() 办法更新进展条
            update_progress(status_url, nanobar, div[
            0
            ]);
            },
            error:
            function
            () {
            alert(
            'Unexpected error'
            );
            }
            });
            }
            // 更新进展条
            function
            update_progress(status_url, nanobar, status_div) {

            // getJSON()办法是JQuery内置办法,这儿向Location中对应的url建议恳求,即恳求「/status/
            $.getJSON(status_url,
            function
            (data) {

            // 核算进展
            percent = parseInt(data[
            'current'
            ] *
            100
            / data[
            'total'
            ]);

            // 更新进展条
            nanobar.go(percent);

            // 更新文字
            $(status_div.childNodes[
            1
            ]).text(percent +
            '%'
            );
            $(status_div.childNodes[
            2
            ]).text(data[
            'status'
            ]);

            if
            (data[
            'state'
            ] !=
            'PENDING'
            && data[
            'state'
            ] !=
            'PROGRESS'
            ) {

            if
            (
            'result'
            in data) {

            // 展现成果
            $(status_div.childNodes[
            3
            ]).text(
            'Result: '
            + data[
            'result'
            ]);
            }

            else
            {

            // 意料之外的工作发作
            $(status_div.childNodes[
            3
            ]).text(
            'Result: '
            + data[
            'state'
            ]);
            }
            }

            else
            {

            // 2秒后再次运转
            setTimeout(
            function
            () {
            update_progress(status_url, nanobar, status_div);
            },
            2000
            );
            }
            });
            }

            能够经过注释阅览代码全体逻辑。

            至此,需求完成完了,运转一下。

            首要运转 Redis

            redis-server

            然后运转 celery

            celery worker -A app.celery --loglevel=info

            最终运转 Flask 项目

            python app.py

            作用如下:



            最终,我自己是一名从事了多年开发的Python老程序员,辞去职务现在在做自己的Python私家定制课程,今年年初我花了一个月整理了一份最适合2019年学习的Python学习干货,能够送给每一位喜爱Python的小伙伴,想要获取的能够重视我的头条号并在后台私信我:01,即可免费获取。

            请关注微信公众号
            微信二维码
            不容错过
            Powered By Z-BlogPHP