在单独的连续行中打印 3 个 JAVA 程序的输出,共享同一控制台,在多处理场景中使用 \r



我有 3 个java程序,具有单独的main functions,由父程序在 3 个单独的线程上调用。所有三个程序都使用以下代码在单个控制台中打印计数器的值

对于程序 # 1

   System.out.print(ANSI_PURPLE + "  r  EEG Sensor count =" + Sensor1_tuple_count);
   System.out.flush();

对于程序#2

    System.out.print(ANSI_PURPLE + " r  BP Sensor count = " + Sensor2_tuple_count + " ");
    System.out.flush();

对于程序 # 3

    System.out.print(ANSI_PURPLE + "r  ECG Sensor count =" + Sensor3_tuple_count);
    System.out.flush();

所有这些都在 lambda 函数中更新,现在所有这些值都在相互覆盖,我如何获得这样的输出

EEG Sensor count = X
BP Sensor count  = Y
ECG Sensor count = Z

编辑 1

下面给出了使用 Flink 的单个程序的代码

  public class bp_sensor {
    public static final String ANSI_RED     = "u001B[31m";
    public static final String ANSI_BLUE    = "u001B[34m";
    public static final String ANSI_PURPLE  = "u001B[35m";

    public static void main(String[] args)  throws Exception {
        Constants constants = null;
        //setting the envrionment variable as StreamExecutionEnvironment
        StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.getExecutionEnvironment();
        envrionment.setParallelism(1);

        DataStream<Sensor_Event> bp_stream  = envrionment
                .addSource(new EventGenerator_bp_sensor(constants.bp_data_rate,constants.bp_run_time_sec,1,1))
                .name("BP stream")
                .setParallelism(1);

        if(constants.send_to_timekeeper){
            //Sending the stream to timekeeper
            bp_stream.map(new RichMapFunction<Sensor_Event, String>() {
                @Override
                public String map(Sensor_Event event) throws Exception {
                    String tuple = event.toString();
                    System.out.println(tuple);
                    return tuple + "n";
                }
            }).writeToSocket(constants.timekeeper_ip, 8003, new SimpleStringSchema() );
        }


        // Sending the stream to mobile phone
        if(constants.send_to_android){
            DataStreamSink<String> total_tuples = bp_stream.map(new RichMapFunction<Sensor_Event, String>() {
                IntCounter Sensor2_tuple_count;
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    this.Sensor2_tuple_count = getRuntimeContext().getIntCounter("total_tuples");
                }
                @Override
                public String map(Sensor_Event event) throws Exception {
                    String tuple = event.toString();
                    Sensor2_tuple_count.add(1);
                    System.lineSeparator();
                    System.out.print(ANSI_PURPLE + " r  BP Sensor count = " + Sensor2_tuple_count + " ");
                    System.out.flush();
//                    System.out.println(ANSI_BLUE + tuple);
                    return tuple + "n";
                }
            }).writeToSocket(constants.mobile_ip, 7003, new SimpleStringSchema() );

        }

        //start the execution
        JobExecutionResult executionResult = envrionment.execute();
        Integer number_of_tuples = (Integer) executionResult.getAllAccumulatorResults().get("total_tuples");
        int input_rate = number_of_tuples/constants.bp_run_time_sec;
        System.out.println("n");
        System.out.println(ANSI_BLUE   + "  Expected Input rate of BP Sensor     = " + constants.bp_data_rate + " tuples/second");
        System.out.println(ANSI_RED    + "  Actual Input rate of BP Sensor       = " + input_rate + " tuples/second");
        System.out.println(ANSI_PURPLE + "  Total # of tuples sent by BP Sensor  = " + number_of_tuples );

    }// main

} //class

父程序的代码是

public class start_sensors {
    public static void main(String[] args) throws Exception {
        ecg_sensor ecg_sensor = null;
        bp_sensor bp_sensor = null;
        eeg_sensor eeg_sensor = null;

        Thread thread1 = new Thread() {
            @Override
            public void run() {
                try {
                    ecg_sensor.main(null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        Thread thread2 = new Thread() {
            @Override
            public void run() {
                try {
                    bp_sensor.main(null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        Thread thread3 = new Thread() {
            @Override
            public void run() {
                try {
                    eeg_sensor.main(null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        thread1.start();
        thread2.start();
        thread3.start();

    } //main
} //class

至于线程,要控制它们工作的顺序,您必须使用某种同步。至于控制台的行为,它超出了Java的范围,取决于环境。

相关内容

  • 没有找到相关文章

最新更新