MySQL服务器在运行Laravel队列时已经消失



我正在尝试使用laravel队列将数据从CSV文件导入数据库。这些 CSV 文件很大,大约有 500k 行数。

我在某处了解到,使用 laravel 队列我们不需要考虑连接超时,但这看起来不是真的。也许我错了。

请检查我的工作代码,这些方法是否有任何问题。我正在使用"联盟\csv"来读取CSV文件。

public function __construct($data,$error_arr,$error_row_numbers) {  
$this->data = $data;
$this->error_arr = $error_arr;
$this->error_row_numbers = $error_row_numbers;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$offset = $this->data['offset']; 
$limit = $this->data['limit'];
$filename = $this->data['file_name'];
$service = new Service();
$table = 'committees';
$dbase = new Committee();
//map_data have array about which csv column
//should be inserted in which column of database
$map_data = $this->data['map_data'];
//get all columns name of a table
$db_header_obj = new Committee();
$db_header = $db_header_obj->getTableColumns();
$csv_file_path = storage_path('app/files/committee/').$filename;
if (!ini_get("auto_detect_line_endings")) {
ini_set("auto_detect_line_endings", TRUE);
}
$csv = Reader::createFromPath($csv_file_path, 'r');
$csv->setOutputBOM(Reader::BOM_UTF8);
$csv->addStreamFilter('convert.iconv.ISO-8859-15/UTF-8');
$csv->setHeaderOffset(0); 
$csv_header = $csv->getHeader();    

$rec_arr = array();
$records = array();
$records_arr = array();
$stmt = (new Statement())
->offset($offset)
->limit($limit)
;
$records = $stmt->process($csv);
foreach ($records as $record) 
{
$rec_arr[] = array_values($record);
}
//trim index if the value of an array is empty
$records_arr = $service->trimArray($rec_arr);
if(count($records_arr)>0)
{
foreach($records_arr as $ck => $cv){

$committee_arr = array();
foreach ($map_data as $mk => $mv) {
if(isset($mv)){
$data_type = $service->getDatabaseColumnType($table,$mv);
//if data is one of datetime data type
//then format the csv data to mysql datetime format
if($data_type == 'date' || $data_type == 'datetime' || $data_type == 'timestamp'){
$datetime =  (array)$cv[$mk];
$dt = array_shift($datetime);
$dt = date('Y-m-d h:i:s', strtotime($dt));
$committee_arr[$mv] = $dt;
}else{
$committee_arr[$mv] = $cv[$mk];
}  
}
}
$error_encountered = false;

DB::beginTransaction();
if(!empty($committee_arr['com_id'])){
try{
$committee_row = Committee::updateOrCreate(
['com_id' => $committee_arr['com_id']],
$committee_arr
);
if ($committee_row->wasRecentlyCreated === true) {
$committee_row->created_by = $this->data['user_id'];
}else{
$committee_row->updated_by = $this->data['user_id'];
}
$committee_row->save();
} catch (Exception $e) {
$error_encountered = true;
$this->error_arr[] = $e->getMessage();
$this->error_row_numbers[] = $this->data['row_value']; 
}
}

DB::commit();
//just to keep track which row is currently processing
//so that user can be notified in which row of csv
//there is an error
$this->data['row_value'] = $this->data['row_value'] + 1;
}
//offset just to start fectch next chunk of data from csv
$this->data['offset'] = $offset + $limit;
//Call to same job but with increased offset value
$committeeInsertJob = (new StoreCommittee($this->data,$this->error_arr,$this->error_row_numbers))->delay(Carbon::now()->addSeconds(3)); 
dispatch($committeeInsertJob);
}else{
//Store activity just to keep track of activity
$activity = new Activity();
$activity->url = $this->data['url'];
$activity->action = 'store';
$activity->description = $table;
$activity->user_id = $this->data['user_id'];
$activity->created_at = date('Y-m-d H:i:s');
$activity->save();
$arr_data = [
'filename' => $filename,
'user_name' => $this->data['user_name'],
'error' => $this->error_arr,
'error_row_numbers' => $this->error_row_numbers
];
//Notify user that the job is complete
Mail::to($this->data['user_email'])->send(new CSVImportJobComplete($arr_data));
}

if (!ini_get("auto_detect_line_endings")) {
ini_set("auto_detect_line_endings", FALSE);
}
}

错误:来自(拉拉维尔.log内部存储)

[2019-04-05 07:13:23] local.ERROR: PDOStatement::execute(): MySQL server has gone away (SQL: insert into `jobs` (`queue`, `attempts`, `reserved_at`, `available_at`, `created_at`, `payload`) values (default, 0, , 1554448406, 1554448403, ....................................................(long list)

发件人:命令终端

$ php artisan queue:work --tries=3
[2019-04-05 07:09:11][1] Processing: AppJobsStoreCommittee
[2019-04-05 07:09:33][1] Processed:  AppJobsStoreCommittee
[2019-04-05 07:09:36][2] Processing: AppJobsStoreCommittee
[2019-04-05 07:09:58][2] Processed:  AppJobsStoreCommittee
[2019-04-05 07:10:01][3] Processing: AppJobsStoreCommittee
[2019-04-05 07:10:23][3] Processed:  AppJobsStoreCommittee
[2019-04-05 07:10:26][4] Processing: AppJobsStoreCommittee
[2019-04-05 07:10:48][4] Processed:  AppJobsStoreCommittee
[2019-04-05 07:10:51][5] Processing: AppJobsStoreCommittee
[2019-04-05 07:11:13][5] Processed:  AppJobsStoreCommittee
[2019-04-05 07:11:17][6] Processing: AppJobsStoreCommittee
[2019-04-05 07:11:40][6] Processed:  AppJobsStoreCommittee
[2019-04-05 07:11:43][7] Processing: AppJobsStoreCommittee
[2019-04-05 07:12:05][7] Processed:  AppJobsStoreCommittee
[2019-04-05 07:12:08][8] Processing: AppJobsStoreCommittee
[2019-04-05 07:12:31][8] Processed:  AppJobsStoreCommittee
[2019-04-05 07:12:34][9] Processing: AppJobsStoreCommittee
[2019-04-05 07:12:57][9] Processed:  AppJobsStoreCommittee
[2019-04-05 07:13:00][10] Processing: AppJobsStoreCommittee
dell@DESKTOP-UQ2 MINGW64 /d/wamp64/www/project(master)
$
(it stops without any error or failed notifications)

我的工作逻辑有什么可以改进的吗?我如何处理所有这些连接丢弃,或最大超时或其他一些东西?我不认为增加超时是解决方案。因为不能保证它会在这个固定的时间内完成。

相反,有没有办法,是否可以关闭连接并在每个队列之间重新连接?

尝试的解决方案

您解析了 CSV 文件,并尝试通过单个查询发送全部内容。MySQL包含阻止它接受太大查询的变量。它叫max_allowed_packet

你这样做的原因是性能。但是,在处理数据量太大的查询时,您可能会遇到与网络/MySQL相关的许多变量之一。

改进的解决方案标准

  • 干净的查询,以便可以看到正在执行的操作
  • 快速查询,因此可以发送大量数据以使写入速度更快
  • 不要命中限制变量的值,例如max_packet_size

溶液

  1. 准备了一次声明。预准备语句使用一次,多次执行

  2. 解析 CSV 并循环访问记录

  3. 将值绑定到预准备语句,并在循环过程中执行它

  4. 若要使一切更快,请使用事务。在事务中每 1000 条记录包装一次。 这将允许您编写简单的插入查询,但它们会很快,因为MySQL将多路复用写入。

您正在使用Laravel,因此上述步骤非常简单

使用拉拉维尔的伪代码

$csv = collect([]); // This is the array holding your CSV records
// Split the array into chunks. Let's assume you want to insert 1000 records in one attempt
$chunk_count = ceil($csv->count() / 1000);
$csv->chunk($chunk_count)->map(function($chunk) {
DB::beginTransaction();
// Create a record 
$chunk->map(function($data) {
StoreCommittee::create($data);
});
DB::commit();
});

最新更新