从3.1升级到4.0后,Redis没有返回结果



我最近升级了我的芹菜安装到4.0。经过几天的升级过程,我终于让它工作了……排序的。有些任务会返回,但最后一个任务不会返回。

我有一个类,SFF,它接受并解析一个文件:

# Constructor with I/O file
def __init__(self, file):
    # File data that's gonna get used a lot
    sffDescriptor = file.fileno()
    fileName = abspath(file.name)
    # Get the pointer to the file
    filePtr = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ)
    # Get the header info
    hdr = filePtr.read(HEADER_SIZE)
    self.header = SFFHeader._make(unpack(HEADER_FMT, hdr))
    # Read in the palette maps
    print self.header.onDemandDataSize
    print self.header.onLoadDataSize
    palMapsResult = getPalettes.delay(fileName, self.header.palBankOff - HEADER_SIZE, self.header.onDemandDataSize, self.header.numPals)
    # Read the sprite list nodes
    nodesStart = self.header.sprListOff
    nodesEnd = self.header.palBankOff
    print nodesEnd - nodesStart
    sprNodesResult = getSprNodes.delay(fileName, nodesStart, nodesEnd, self.header.numSprites)
    # Get palette data
    self.palettes = palMapsResult.get()
    # Get sprite data
    spriteNodes = sprNodesResult.get()
    # TESTING
    spritesResultSet = ResultSet([])
    numSpriteNodes = len(spriteNodes)
    # Split the nodes into chunks of size 32 elements
    for x in xrange(0, numSpriteNodes, 32):
        spritesResult = getSprites.delay(spriteNodes, x, x+32, fileName, self.palettes, self.header.palBankOff, self.header.onDemandDataSizeTotal)
        spritesResultSet.add(spritesResult)
        break  # REMEMBER TO REMOVE FOR ENTIRE SFF
    self.sprites = spritesResultSet.join_native()

如果它是返回整个spritesResult的单个任务,或者如果我使用ResultSet拆分它,结果总是相同的:我使用的Python控制台只是挂在spritesResultSet.join_native()spritesResult.get()(取决于我如何格式化它)。

下面是正在讨论的任务:

@task
def getSprites(nodes, start, end, fileName, palettes, palBankOff, onDemandDataSizeTotal):
sprites = []
with open(fileName, "rb") as file:
    sffDescriptor = file.fileno()
    sffData = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ)
    for node in nodes[start:end]:
        sprListNode = dict(SprListNode._make(node)._asdict())  # Need to convert it to a dict since values may change.
        #print node
        #print sprListNode
        # If it's a linked sprite, the data length is 0, so get the linked index.
        if sprListNode['dataLen'] == 0:
            sprListNodeTemp = SprListNode._make(nodes[sprListNode['index']])
            sprListNode['dataLen'] = sprListNodeTemp.dataLen
            sprListNode['dataOffset'] = sprListNodeTemp.dataOffset
            sprListNode['compression'] = sprListNodeTemp.compression
        # What does the offset need to be?
        dataOffset = sprListNode['dataOffset']
        if sprListNode['loadMode'] == 0:
            dataOffset += palBankOff #- HEADER_SIZE
        elif sprListNode['loadMode'] == 1:
            dataOffset += onDemandDataSizeTotal #- HEADER_SIZE
        #print sprListNode
        # Seek to the data location and "read" it in. First 4 bytes are just the image length
        start = dataOffset + 4
        end = dataOffset + sprListNode['dataLen']
        #sffData.seek(start)
        compressedSprite = sffData[start:end]
        # Create the sprite
        sprite = Sprite(sprListNode, palettes[sprListNode['palNo']], np.fromstring(compressedSprite, dtype=np.uint8))
        sprites.append(sprite)
return json.dumps(sprites, cls=SpriteJSONEncoder)

我知道它到达了返回语句,因为如果我在它上面加上一个print,它将在芹菜窗口中打印。我还知道任务正在运行到完成,因为我从worker获得以下消息:

[2016-11-16 00:03:33 . 639: INFO/PoolWorker-4]任务框架数据库。getSprites[285ac9b1-09b4-4cf1-a251-da6212863832] succeeded in 0.137236133218: '[{"width": 120, "palNo": 30, "group": 9000, "xAxis": 0, "yAxis": 0, "data":…'

以下是我在settings.py中的芹菜设置:

# Celery settings
BROKER_URL='redis://localhost:1717/1'
CELERY_RESULT_BACKEND='redis://localhost:1717/0'
CELERY_IGNORE_RESULT=False
CELERY_IMPORTS = ("framedatabase.tasks", )

…和我的celery.py:

from __future__ import absolute_import
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'framedatabase.settings')
from django.conf import settings  # noqa
app = Celery('framedatabase', backend='redis://localhost:1717/1', broker="redis://localhost:1717/0",
    include=['framedatabase.tasks'])
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

找到问题。显然,这会导致死锁,在芹菜文档中的"避免启动同步子任务"一节中提到:http://docs.celeryproject.org/en/latest/userguide/tasks.html#tips-and-best-practices

所以我去掉了这行
sprNodesResult.get()

并将最终结果更改为链:

self.sprites = chain(getSprNodes.s(fileName, nodesStart, nodesEnd, self.header.numSprites),
    getSprites.s(0,32,fileName,self.palettes,self.header.palBankOff,self.header.onDemandDataSizeTotal))().get()

它有效!现在我只需要想办法按我想要的方式分这笔钱!

相关内容

  • 没有找到相关文章

最新更新