PQ.带有 pq 的 NewListener/Listen 块:语法错误在 "listen" 或接近



我有minio/s3对象存储,其中lambda通知到cockroachdb(postgres db(。我正在尝试使用以下 golang 代码监控这些事件。

package main
import (
"database/sql"
"encoding/json"
"fmt"
"github.com/lib/pq"
"time"
)
const (
//crdbConnectStr = "dbname=alerts user=crdbuser1 host=localhost port=26257 sslmode=disable connect_timeout=5"
crdbConnectStr = "postgres://crdbuser1@localhost:26257/alerts?sslmode=disable"
dbDriver       = "postgres"
)
func monitorEvents() {
_, err := sql.Open(dbDriver, crdbConnectStr)
if err != nil {
fmt.Printf("connection open to crdb failed - %vn", err.Error())
}
fmt.Printf("sql open on crdb OKn")
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
fmt.Printf("NewListener - event : %v, err - %vn", ev, err.Error())
}
}
minReconnect := 2 * time.Second
maxReconnect := 20 * time.Second
listener := pq.NewListener(crdbConnectStr, minReconnect, maxReconnect, reportProblem)
err = listener.Listen("monitor")
if err != nil {
fmt.Printf("Listen error - %vn", err.Error())
return
}
fmt.Printf("begin monitoring events in CRDBn")
for {
waitForAlertEvents(listener)
}
}
// Record holds json data from object.
type Record struct {
Data struct {
Value struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
} `json:"value"`
} `json:"data"`
}
func waitForAlertEvents(l *pq.Listener) {
for {
select {
case n := <-l.Notify:
fmt.Printf("Received data from channel [%v]n", n.Channel)
// Prepare notification payload for pretty print
fmt.Println(n.Extra)
record := Record{}
jerr := json.Unmarshal([]byte(n.Extra), &record)
if jerr != nil {
fmt.Println("Error processing JSON: ", jerr)
return
}
bucket := record.Data.Value.Records[0].S3.Bucket.Name
object := record.Data.Value.Records[0].S3.Object.Key
fmt.Printf("received event on bucket: %v, object: %vn", bucket, object)
return
case <-time.After(60 * time.Second):
fmt.Println("Received no events for 90 seconds, checking connection")
go func() {
l.Ping()
}()
return
}
}
}
func main() {
monitorAlerts()
}

当我运行这个程序时,我看到以下错误并且它卡住了。

[root]# ./alerts 
sql open on crdb OK
NewListener - event : 3, err - pq: syntax error at or near "listen"
NewListener - event : 3, err - pq: syntax error at or near "listen"
NewListener - event : 3, err - pq: syntax error at or near "listen"

手动连接到蟑螂数据库工作正常。

[root]# cockroach sql --insecure --user=crdbuser1
crdbuser1@:26257/defaultdb> show databases;                                                                                                               database_name  
+---------------+
alerts         
(1 row)
Time: 1.22359ms
crdbuser1@:26257/defaultdb> set database=alerts;
SET
Time: 363.994µs
crdbuser1@:26257/alerts> show tables;
table_name  
+------------+
alertstab   
(1 row)
Time: 1.399014ms
crdbuser1@:26257/alerts> 

任何想法,为什么错误pq: syntax error at or near "listen".我也在看 pq 源,错误很可能与 notify.go#L756 有关

该错误表明 CockroachDB 不支持LISTENNOTIFY语句。

您将需要找到一种不同的方法来执行此操作。CRDB中最接近的是变更数据捕获,但这更多的是关于数据流而不是自定义通知。

您可以在此问题中找到有关CRDBLISTEN/NOTIFY的一些讨论,但迄今为止还没有确定的计划。

最新更新