Django +芹菜+rabbitmq编码错误和sign -kill



我现在正在做一个小项目,它使用芹菜将csv和xlsx文件转换为postgresql表。下面的代码在没有芹菜的情况下工作得很好(除了大文件),但是在使用芹菜之后,它会产生一些错误和bug。我在StackOverFlow中寻找过类似的问题,但不知道如何做以及为什么。希望你们能帮我,谢谢。

    第一个错误如下:csv-1csv-2我认为这与我的编码部分有关,但我试图用utf-8-sig和big-5打开它,不起作用。(没有芹菜也行)

# -*- coding: utf-8 -*-
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse
from django.contrib import messages
from django.conf import settings
from django.db import connection
from django.views.decorators.csrf import csrf_exempt
from celery import Celery
from celery import task
import json
import csv
import sys
import random
import psycopg2
import xlrd
import openpyxl as pyxl
from .models import Document
from .forms import DocumentForm
app = Celery(
    'tasks',
    broker='amqp://guest:guest@localhost:5672//',
    backend='rpc://'
)
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_RESULT_PERSISTENT = False
@app.task()
def csvwritein(doc):# Transform csv to table
        doc = doc
        conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
 readcur = conn.cursor()
        readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
        check = readcur.fetchone()[0]
        try:
                fr = open(doc.path,encoding = 'utf-8-sig')
                dr.delay(fr,doc,check)
                fr.close()
        except Exception as e:
                fr = open(doc.path,encoding = 'big5')
                dr.delay(fr,doc,check)
                fr.close()
        conn.commit()
        readcur.close()
@app.task()
def dr(fr,doc,check): # make datareader as function to keep code 'dry'
        csvt = 0 #count csv reader loop time
        row_id = 1 # used for following id field
        conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
        maincur = conn.cursor()
        writecur = conn.cursor()
        datareader = csv.reader(fr, delimiter=',')
        for row in datareader:
                if csvt == 0: # first time in loop(create field) and check no
same file exists
                        if check == True:
                                app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
                                tname = '%s-%s' % (doc.tablename,app
tablename = '"%s-%s"' % (doc.tablename,app)
                                doc.tablename = tname
                                doc.save()
                        else:
                                tablename = '"%s"' % doc.tablename
                        maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
                        row_count = sum(1 for line in datareader)
                        col_count = len(row)
                        frow = row
                        for i in range(0,col_count,1):
                                row[i] = '"%s"' % row[i] # change number to
string
                                maincur.execute("ALTER TABLE %s ADD %s
CITEXT;" % (tablename,row[i]))
                        csvt = csvt+1
                        fr.seek(0)
                        next(datareader)
                elif csvt > 0: # not first time(insert data) and check no
same file exists
                        for j in range(0,col_count,1):
                                if j == 0:
                                        writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');" % (tablename,frow[j],row[j]))
                                else:
                                        writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';" %(tablename,frow[j],row[j],row_id))
                        csvt = csvt+1
                        row_id = row_id+1
                else:
                        break
        conn.commit()
        maincur.close()
        writecur.close()
        conn.close()
        csvt = 0
        doc = Document.objects.all()

  • 第二个错误是关于将xlsx文件(大约130,000行)转换为postgresql表,并且worker在2-3分钟后获得sign -kill。调试信息:
  • [2016-10-27 06:17:05,227: ERROR/MainProcess]进程'Worker-1' pid:13829 exit with 'signal 9 (SIGKILL)' [2016-10-27 06:17:05,328:ERROR/MainProcess]任务数据。xlsxwritin [5aec4679-c48b-4d07-a0a9-5e4e37fcd24b]引发了unexpected: WorkerLostError('Worker过早退出:信号9 (SIGKILL).',) Traceback(最近一次调用最后一次):文件"/usr/local/lib/python3.4/dist-packages/billiard/pool.py",第1175行,在mark_as_worker_lost human_status(exitcode)中,billiard.exceptions.WorkerLostError: Worker过早退出:信号9 (SIGKILL).

    #The code continues from the above task.py file
    @app.task()
    def xlsxwritein(doc): # write into database for file type xlsx
            xlsxt = 0
            conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
    password='eric40502' port='5432'")
            maincur = conn.cursor()
            readcur = conn.cursor()
            writecur = conn.cursor()
            readcur.execute("select exists(select * from
    information_schema.tables where table_name='%s')" % doc.tablename) # check if
    same file is already in database
            check = readcur.fetchone()[0]
            row_id = 1 # used for following id field
            wb = pyxl.load_workbook(doc.path)
            sheetnames = wb.get_sheet_names()
            ws = wb.get_sheet_by_name(sheetnames[0])
            for rown in range(ws.get_highest_row()):
                    if xlsxt == 0:
                            if check == True:
                                    app =
    ''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
    for i in range(6)])
                                    tname = '%s-%s' % (doc.tablename,app)
                                    tablename = '"%s-%s"' % (doc.tablename,app)
                                    doc.tablename = tname
                                    doc.save()
                            else:
                                    tablename = '"%s"' % doc.tablename
                            field = [ws.cell(row=1,column=col_index).value for
    col_index in range(1,ws.get_highest_column()+1)]
                            maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
    KEY);" % tablename)
                            for coln in range(ws.get_highest_column()):
                                    field[coln] = '"%s"' % field[coln] # change
    number to string
                                    if field[coln] == 'ID':
                                            field[coln] = 'original_id'
                                    maincur.execute("ALTER TABLE %s ADD %s
    CITEXT;"  % (tablename,field[coln]))
                            xlsxt = xlsxt+1
                    elif xlsxt > 0 and check == False: # not first time(insert
    data) and check no same file exists
                            for coln in range(ws.get_highest_column()):
                                    if coln == 0:
                                            writecur.execute("INSERT INTO %s (%s)
    VALUES ('%s');"
    %(tablename,field[coln],str(ws.cell(row=rown,column=coln+1).value)))
                                    else:
                                            writecur.execute("UPDATE %s SET %s =
    '%s' WHERE id = '%d';"
    %(tablename,field[coln],str(ws.cell(row=rown+1,column=coln+1).value),row_id))
                            xlsxt = xlsxt+1
                            row_id = row_id+1
                    else:
                            break
            conn.commit()
            maincur.close()
            readcur.close()
            writecur.close()
            conn.close()
            xlsxt = 0
    

    可能在参数反序列化过程中出了问题。

    不传递doc object,而是传递filename,然后在task中读取file。

    最新更新