Apache Strahl Cross -Language JDBC (MSSQL) - Falsches negativer Ganzzahltypkonvertierung
Posted: 27 Feb 2025, 11:20
Wir verwenden JDBC-Cross-Sprach-Transformation, um Daten von MSSQL zu BigQuery zu lesen, und wir haben festgestellt, dass negative Ganzzahlen falsch konvertiert werden. Ich habe das Gefühl, dass es etwas mit logischem Typ zu tun hat, aber nicht sicher ist, wo ich anfangen soll.
Umgebung:
Apache Beam 2.63.0, Google Cloud DataFlow
Code: Select all
import apache_beam as beam
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.typehints.schemas import LogicalType
from apache_beam.typehints.schemas import MillisInstant
from apache_beam.options.pipeline_options import PipelineOptions
import logging
"""
table1
CREATE TABLE Customers (
quanitity int,
LastName varchar(255),
);
INSERT INTO Customers (quanitity, LastName) VALUES (44, 'Tom');
INSERT INTO Customers (quanitity, LastName) VALUES (-1, 'Tom');
"""
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("elment.logger - : %s", element)
yield element
def row_to_dict(row):
as_dict = row._asdict()
return as_dict
def run(argv=None, save_main_session=True):
# Start the pipeline
pipeline_args = ""
pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
LogicalType.register_logical_type(MillisInstant)
with beam.Pipeline(options=pipeline_options) as p:
p | "full-Read" >> ReadFromJdbc(
query="select * from Customers",
table_name=f"xxxxxx",
driver_class_name="com.microsoft.sqlserver.jdbc.SQLServerDriver",
jdbc_url="jdbc:sqlserver://{0};databaseName={1}".format("xx", "xx"),
username="username",
password="password",
classpath=["gs://xxxxx/mssql-jdbc-12.6.2.jre11.jar"],
)
"row to map" >> beam.Map(row_to_dict)
"log result" | beam.ParDo(LogResults())
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
run()
< /code>
Hier ist die Ausgabe: < /p>
elment.logger - : {'quanitity': 44, 'LastName': "Tom" }
elment.logger - : {'quanitity': 4294967295, 'LastName': "Tom" }
Apache Beam 2.63.0, Google Cloud DataFlow