线程池中的保留顺序:如何在特定的索引位置将行插入csv



我正在编写一个ruby脚本,该脚本在csv文件上循环,然后每行从第三方api检索数据,然后将检索到的数据写入csv文件
我正在尝试实现一个thread_pool,以便并行处理api调用和行的插入。我不确定我所做的是否正确,所以欢迎任何建议
我遇到的一个具体问题是如何保留原始文件的顺序
我的解决方案是将第一个文件的索引传递给线程,然后强制线程将该索引位置的行插入csv。

这是我希望进行多线程处理的任务类。

class Task
def initialize(row, index, conn)
@row = row
@index = index
@file = CSV.open("temp_and_cases_parallel.csv", "ab")
@conn = conn
end
def run
get_climate_data
writte_climate_data
end
private
def get_climate_data
uri = "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/weatherdata/history?&aggregateHours=24&startDateTime=#{@row["day"].strip}T00:00:00&endDateTime=#{@row["day"].strip}T23:59:00&unitGroup=metric&contentType=csv&location=#{@row["lat"].strip},#{@row["long"].strip}&key=#{API_KEY}"
response = @conn.get uri
puts("calling #{uri}")
@climate_info = CSV.parse(response.body, headers: true).first
end
def writte_climate_data
if @index == 1
headers = @row.headers + @climate_info.headers
@file << headers
end
@file << @row.fields + @climate_info.fields
end
end

因此,在writte_climate_data中,我希望能够基于@index在特定位置插入@file

以下是线程池实现:

class ThreadPool
def initialize(size: 10)
@size = size
@tasks = Queue.new
@pool = []
end
def schedule(*args, &block)
@tasks << [block, args]
end
def start
Thread.new do
loop do
next if @pool.size >= @size
task, args = @tasks.pop
thread = Thread.new do
task.call(*args)
end_thread(thread)
end
@pool << thread
end
end
end
def inactive?
@tasks.empty? && @pool.empty?
end
def end_thread(thread)
@pool.delete(thread)
thread.kill
end
end

从cases_by_region.csv中读取并为每一行创建一个带有Task:的线程的脚本

RETRY_OPTIONS = {
max: 10,
interval: 3,
interval_randomness: 0.5,
backoff_factor: 2
}
conn = Faraday.new do |f|
f.request :retry, RETRY_OPTIONS
end
threads = []
thread_pool = ThreadPool.new
thread_pool.start
# CSV.open("temp_and_cases_parallel.csv", "ab") do |temp_and_cases|
CSV.foreach("cases_by_region.csv", headers: true).first(10).each_with_index do |row, index|
thread_pool.schedule do
Task.new(row, index, conn).run
end
end
# end
sleep(1) until thread_pool.inactive?

你将如何实现这一点,我如何在生成的csv中保留原始文件的原始行位置?

如果需要订单,就不应该使用Array。您可以尝试哈希数组。

irb(main):001:0> a = {id: 1, name: "a"}
=> {:id=>1, :name=>"a"}
irb(main):002:0> b = {id: 2, name: "b"}
=> {:id=>2, :name=>"b"}
irb(main):003:0> c = {id: 3, name: "c"}
=> {:id=>3, :name=>"c"}
irb(main):004:0> array = [c, a, b]
=> [{:id=>3, :name=>"c"}, {:id=>1, :name=>"a"}, {:id=>2, :name=>"b"}]
irb(main):006:0> array.sort_by {|h| h[:id] }
=> [{:id=>1, :name=>"a"}, {:id=>2, :name=>"b"}, {:id=>3, :name=>"c"}]

我创建了一个数组,散列顺序为"c,a,b",然后你只需按你想要的键排序。在这种情况下,我使用了一个"id"键使其直接前进。您可以根据行的ID进行排序,或者根据您的意愿按照任何其他标识符(CSV中已经存在一些值,可能是时间戳(进行排序。

最新更新