net.sf.json.JSONException: A JSONObject text must begin with '{' at character 0 of(从kafka消费者取数据存数据库)

it2022-05-05  123

这是从kafka 消费者去去消费生产者产生的数据(产生的完整的一条数据): topic = testKafka, offset = 3266, value = {“data”{“address”:“33”,“conTel”:"",“desc”:"",“email”:"",“eptCarports”:496,“freeTime”:0,“inEntranceNum”:0,“isCharge”:1,“latitude”:“23.04715”,“longitude”:“113.102868”,“outEntraceNum”:0,“parkCode”:“2KPI5T73”,“parkName”:“对外调试停车场”,“totalCarports”:999},“message”:“业务成功”,“resultCode”:0,“status”:1} 需要的是value的数据: {“data”:{“address”:“33”,“conTel”:"",“desc”:"",“email”:"",“eptCarports”:496,“freeTime”:0,“inEntranceNum”:0,“isCharge”:1,“latitude”:“23.04715”,“longitude”:“113.102868”,“outEntraceNum”:0,“parkCode”:“2KPI5T73”,“parkName”:“对外调试停车场”,“totalCarports”:999},“message”:“业务成功”,“resultCode”:0,“status”:1}中这部分数据{“address”:“33”,“conTel”:"",“desc”:"",“email”:"",“eptCarports”:496,“freeTime”:0,“inEntranceNum”:0,“isCharge”:1,“latitude”:“23.04715”,“longitude”:“113.102868”,“outEntraceNum”:0,“parkCode”:“2KPI5T73”,“parkName”:“对外调试停车场”,“totalCarports”:999} 我的想法是:把value的数据转换为String类型,用split方法进行切分 切分后的数据为: “address”:“33”,“conTel”:"",“desc”:"",“email”:"",“eptCarports”:496,“freeTime”:0,“inEntranceNum”:0,“isCharge”:1,“latitude”:“23.04715”,“longitude”:“113.102868”,“outEntraceNum”:0,“parkCode”:“2KPI5T73”,“parkName”:“对外调试停车场”,“totalCarports”:999} 虽然切分后的数据和我想要的数据一样,但是报错了:

Caused by: net.sf.json.JSONException: A JSONObject text must begin with ‘{’ at character 0 of

以下为代码:

@Component public class kafkaConsumer { @Autowired private Parkepository parkepository; public void listen(ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value()); String parkstr = (String) record.value(); String[] proStr=parkstr.split("\\{\"data\":|,\"message\":\"业务成功\",\"resultCode\":0,\"status\":1}"); for(String str : proStr){ System.out.println(str); ParkInfo parkInfo = (ParkInfo) JSONObject.toBean(JSONObject.fromObject(str), ParkInfo.class); System.out.println(parkInfo); this.parkepository.save(parkInfo); } }

经过反复查资料和问大佬我终于知道我的的错误!!不应该把数据进行切分的,因为切分后数据结构发生破坏,不能满足json格式 于是我做了以下处理:

@Component public class kafkaConsumer { @Autowired private Parkepository parkepository; public void listen(ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value()); String parkstr = (String) record.value(); ResponseRetu responseRetu = (ResponseRetu)JSONObject.toBean(JSONObject.fromObject(parkstr),ResponseRetu.class); ParkInfo parkInfo = responseRetu.getData(); System.out.println(parkInfo); this.parkepository.save(parkInfo); }

通过建立俩个实体类去获取我所要的格式:

@Entity @Table(name="parkinfo") @Data public class ParkInfo { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) @Column(name="parkName") private String parkName; @Column(name="parkCode") private String parkCode; @Column(name="address") private String address; @Column(name="longitude") private String longitude; @Column(name="latitude") private String latitude; @Column(name="desc") private String desc; @Column(name="inEntranceNum") private Integer inEntranceNum; @Column(name="outEntraceNum") private Integer outEntraceNum; @Column(name="isCharge") private Integer isCharge; @Column(name="freeTime") private Integer freeTime; @Column(name="totalCarports") private Integer totalCarports; @Column(name="eptCarports") private Integer eptCarports; @Column(name="conTel") private String conTel; @Column(name="email") private String email; } public class ResponseRetu { private String status; private int resultCode; private String message; private ParkInfo data; public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public int getResultCode() { return resultCode; } public void setResultCode(int resultCode) { this.resultCode = resultCode; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public ParkInfo getData() { return data; } public void setData(ParkInfo data) { this.data = data; } @Override public String toString() { return "ResponseRetu{" + "status='" + status + '\'' + ", resultCode=" + resultCode + ", message='" + message + '\'' + ", data=" + data + '}'; } }

通过 ParkInfo parkInfo = responseRetu.getData();去获取我所需要的数据 思路:springboot和Kafka进行整合,使用一个新接口去继承JpaRepository<ParkInfo,String>来实现,通过往kafka消费者注入,并通过this.parkepository.save(parkInfo)数据库写的操作。

欢迎大家评论和留言,谢谢大家 禁止抄袭,违者必究,欢迎大家转发和分享经验


最新回复(0)