在DataFlow DOFN子类之间共享Boogtable连接对象



我正在在数据流中设置Java管道,以读取.csv文件并根据文件的内容创建一堆Begtable行。我在笨拙的文档中看到了注释,与Boogtable连接是一个"昂贵"的操作,只能进行一次并在需要它的功能之间共享连接是一个好主意。

但是,如果我将连接对象声明为主类中的 public static变量,并且首先连接到主函数中的bigtable,那么当我随后尝试在DoFn子类的processElement()中引用连接时,我会获得NullPointerException作为我的数据流管线的一部分。

相反,如果我将连接声明为实际DoFn类中的静态变量,则该操作成功工作。

做到这一点的最佳实践或最佳方法是什么?

我担心,如果我在大规模上实现第二个选项,我将浪费大量时间和资源。如果我将变量保持在DoFn类中的静态,这是否足以确保API不尝试每次重新建立连接?

我意识到有一个特殊的bigtable i/o调用与Bigtable同步数据流管线对象,但我认为我需要自己编写一个对象,以在DoFn processElement()函数中构建一些特殊的逻辑... p>这是"工作"代码的样子:

class DigitizeBT extends DoFn<String, String>{
    private static Connection m_locConn;
    @Override
    public void processElement(ProcessContext c)
    {       
        try
        {
            m_locConn = BigtableConfiguration.connect("projectID", "instanceID");
            Table tbl = m_locConn.getTable(TableName.valueOf("TableName"));
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(
                Bytes.toBytes("CF1"),
                Bytes.toBytes("SomeName"),
                Bytes.toBytes("SomeValue"));
            tbl.put(put);
        }
        catch (IOException e)
        {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

这是更新的代码,fyi:

    public void SmallKVJob()
    {
        CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
                .withProjectId(DEF.ID_PROJ)
                .withInstanceId(DEF.ID_INST)
                .withTableId(DEF.ID_TBL_UNITS)
                .build();
        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setProject(DEF.ID_PROJ);
        options.setStagingLocation(DEF.ID_STG_LOC);
//      options.setNumWorkers(3);
//      options.setMaxNumWorkers(5);        
//      options.setRunner(BlockingDataflowPipelineRunner.class);
        options.setRunner(DirectPipelineRunner.class);
        Pipeline p = Pipeline.create(options);
        p.apply(TextIO.Read.from(DEF.ID_BAL))
        .apply(ParDo.of(new DoFn1()))
        .apply(ParDo.of(new DoFn2()))
        .apply(ParDo.of(new DoFn3(config)));
        m_log.info("starting to run the job");
        p.run();
        m_log.info("finished running the job");
    }
}
class DoFn1 extends DoFn<String, KV<String, Integer>>
{
    @Override
    public void processElement(ProcessContext c)
    {
        c.output(KV.of(c.element().split("\,")[0],Integer.valueOf(c.element().split("\,")[1])));
    }
}
class DoFn2 extends DoFn<KV<String, Integer>, KV<String, Integer>>
{
    @Override
    public void processElement(ProcessContext c)
    {
        int max = c.element().getValue();
        String name = c.element().getKey();
        for(int i = 0; i<max;i++)
            c.output(KV.of(name,  1));
    }
}
class DoFn3 extends AbstractCloudBigtableTableDoFn<KV<String, Integer>, String>
{   
    public DoFn3(CloudBigtableConfiguration config)
    {
        super(config);
    }
    @Override
    public void processElement(ProcessContext c) 
    {
        try
        {
            Integer max = c.element().getValue();
            for(int i = 0; i<max; i++)
            {
                String owner = c.element().getKey();
                String rnd = UUID.randomUUID().toString();  
                Put p = new Put(Bytes.toBytes(owner+"*"+rnd));
                p.addColumn(Bytes.toBytes(DEF.ID_CF1), Bytes.toBytes("Owner"), Bytes.toBytes(owner));
                getConnection().getTable(TableName.valueOf(DEF.ID_TBL_UNITS)).put(p);
                c.output("Success");
            }
        } catch (IOException e)
        {
            c.output(e.toString());
            e.printStackTrace();
        }
    }
}

输入.csv文件看起来像这样:
玛丽,3000
约翰,5000
彼得,2000
因此,对于.csv文件中的每一行,我必须将x行数放入bigtable中,其中x是.csv文件中的第二个单元格...

我们为此目的构建了 AbstractCloudBigtableTableDoFn(源&amp; docs)。扩展该类而不是DOFN,并致电getConnection()而不是自己创建连接。

10,000小排应进行一秒钟的实际工作。

编辑:根据注释,应使用bufferedmutator代替table.put()最佳吞吐量。

最新更新